Make PersistenceFileWriter Async

This commit is contained in:
Alva Swanson 2024-02-01 14:20:30 +01:00
parent 80cdd31d45
commit 97cee35e39
No known key found for this signature in database
GPG Key ID: 004760E77F753090
3 changed files with 95 additions and 33 deletions

View File

@ -0,0 +1,24 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.persistence;
import java.util.concurrent.CompletableFuture;
public interface AsyncFileWriter {
CompletableFuture<Integer> write(byte[] data, int offset);
}

View File

@ -17,30 +17,45 @@
package bisq.persistence;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
public class PersistenceFileWriter {
public interface AsyncFileWriter {
int write(byte[] data, int offset);
}
public PersistenceFileWriter(AsyncFileWriter asyncWriter) {
this.asyncWriter = asyncWriter;
}
private final AsyncFileWriter asyncWriter;
private final ExecutorService writeRequestScheduler;
public boolean write(byte[] data) {
int totalWrittenBytes = asyncWriter.write(data, 0);
if (totalWrittenBytes == data.length) {
return true;
}
public PersistenceFileWriter(AsyncFileWriter asyncWriter, ExecutorService writeRequestScheduler) {
this.asyncWriter = asyncWriter;
this.writeRequestScheduler = writeRequestScheduler;
}
int remainingBytes = data.length - totalWrittenBytes;
while (remainingBytes > 0) {
int writtenBytes = asyncWriter.write(data, totalWrittenBytes);
totalWrittenBytes += writtenBytes;
remainingBytes = data.length - totalWrittenBytes;
}
public CountDownLatch write(byte[] data) {
CountDownLatch writeFinished = new CountDownLatch(1);
scheduleAsyncWrite(data, 0, data.length, writeFinished);
return writeFinished;
}
return true;
private void scheduleAsyncWrite(byte[] data, int offset, int size, CountDownLatch writeFinished) {
asyncWriter.write(data, offset)
.thenAcceptAsync(writeUntilEndAsync(data, offset, size, writeFinished), writeRequestScheduler);
}
private Consumer<Integer> writeUntilEndAsync(byte[] data,
int offset,
int totalBytes,
CountDownLatch writeFinished) {
return writtenBytes -> {
if (writtenBytes == totalBytes) {
writeFinished.countDown();
return;
}
int remainingBytes = totalBytes - writtenBytes;
if (remainingBytes > 0) {
int newOffset = offset + writtenBytes;
scheduleAsyncWrite(data, newOffset, remainingBytes, writeFinished);
}
};
}
}

View File

@ -17,15 +17,21 @@
package bisq.persistence;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import static org.hamcrest.CoreMatchers.is;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doReturn;
@ -34,38 +40,55 @@ import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
public class PersistenceFileWriterTests {
private static final ExecutorService writeRequestScheduler = Executors.newSingleThreadExecutor();
private final byte[] DATA = new byte[100];
private PersistenceFileWriter.AsyncFileWriter asyncWriter;
private AsyncFileWriter asyncWriter;
private PersistenceFileWriter fileWriter;
@BeforeEach
void setup(@Mock PersistenceFileWriter.AsyncFileWriter asyncWriter) {
void setup(@Mock AsyncFileWriter asyncWriter) {
this.asyncWriter = asyncWriter;
fileWriter = new PersistenceFileWriter(asyncWriter);
fileWriter = new PersistenceFileWriter(asyncWriter, writeRequestScheduler);
}
@AfterAll
static void teardown() {
writeRequestScheduler.shutdownNow();
}
@Test
void writeInOneGo() {
doReturn(DATA.length).when(asyncWriter).write(any(), anyInt());
boolean isSuccess = fileWriter.write(DATA);
void writeInOneGo() throws InterruptedException {
doReturn(completedFuture(DATA.length))
.when(asyncWriter).write(any(), anyInt());
boolean isSuccess = fileWriter.write(DATA)
.await(30, TimeUnit.SECONDS);
assertThat(isSuccess, is(true));
verify(asyncWriter, times(1)).write(any(), anyInt());
}
@Test
void writeInTwoPhases() {
doReturn(25, 75).when(asyncWriter).write(any(), anyInt());
boolean isSuccess = fileWriter.write(DATA);
void writeInTwoPhases() throws InterruptedException {
doReturn(completedFuture(25), completedFuture(75))
.when(asyncWriter).write(any(), anyInt());
boolean isSuccess = fileWriter.write(DATA)
.await(30, TimeUnit.SECONDS);
assertThat(isSuccess, is(true));
verify(asyncWriter, times(2)).write(any(), anyInt());
}
@Test
void writeInFivePhases() {
doReturn(10, 20, 30, 15, 25).when(asyncWriter).write(any(), anyInt());
boolean isSuccess = fileWriter.write(DATA);
void writeInFivePhases() throws InterruptedException {
doReturn(completedFuture(10), completedFuture(20),
completedFuture(30), completedFuture(15),
completedFuture(25))
.when(asyncWriter).write(any(), anyInt());
boolean isSuccess = fileWriter.write(DATA)
.await(30, TimeUnit.SECONDS);
assertThat(isSuccess, is(true));
verify(asyncWriter, times(5)).write(any(), anyInt());