From 6707af5f7c3f721bddb3ac9dc06c538dce9e30da Mon Sep 17 00:00:00 2001 From: Alva Swanson Date: Fri, 2 Feb 2024 14:47:24 +0100 Subject: [PATCH] Implement AsyncFileChannelWriter --- .../persistence/AsyncFileChannelWriter.java | 55 +++++++++++ .../AsyncFileChannelWriterTests.java | 97 +++++++++++++++++++ 2 files changed, 152 insertions(+) create mode 100644 persistence/src/main/java/bisq/persistence/AsyncFileChannelWriter.java create mode 100644 persistence/src/test/java/bisq/persistence/AsyncFileChannelWriterTests.java diff --git a/persistence/src/main/java/bisq/persistence/AsyncFileChannelWriter.java b/persistence/src/main/java/bisq/persistence/AsyncFileChannelWriter.java new file mode 100644 index 0000000000..7e1f8ed086 --- /dev/null +++ b/persistence/src/main/java/bisq/persistence/AsyncFileChannelWriter.java @@ -0,0 +1,55 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.persistence; + +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.CompletionHandler; + +import java.util.concurrent.CompletableFuture; + +public class AsyncFileChannelWriter implements AsyncFileWriter { + private final AsynchronousFileChannel fileChannel; + + public AsyncFileChannelWriter(AsynchronousFileChannel fileChannel) { + this.fileChannel = fileChannel; + } + + @Override + public CompletableFuture write(byte[] data, int offset) { + var byteBuffer = ByteBuffer.wrap(data); + var completableFuture = new CompletableFuture(); + var completionHandler = createCompletionHandler(completableFuture); + fileChannel.write(byteBuffer, offset, null, completionHandler); + return completableFuture; + } + + private CompletionHandler createCompletionHandler(CompletableFuture completableFuture) { + return new CompletionHandler<>() { + @Override + public void completed(Integer writtenData, Object o) { + completableFuture.complete(writtenData); + } + + @Override + public void failed(Throwable throwable, Object o) { + completableFuture.completeExceptionally(throwable); + } + }; + } +} diff --git a/persistence/src/test/java/bisq/persistence/AsyncFileChannelWriterTests.java b/persistence/src/test/java/bisq/persistence/AsyncFileChannelWriterTests.java new file mode 100644 index 0000000000..16b356cc2d --- /dev/null +++ b/persistence/src/test/java/bisq/persistence/AsyncFileChannelWriterTests.java @@ -0,0 +1,97 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.persistence; + +import java.nio.channels.AsynchronousFileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +import java.io.IOException; + +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class AsyncFileChannelWriterTests { + private Path filePath; + private AsynchronousFileChannel fileChannel; + private AsyncFileChannelWriter asyncFileChannelWriter; + + @BeforeEach + void setup(@TempDir Path tempDir) throws IOException { + filePath = tempDir.resolve("file"); + fileChannel = AsynchronousFileChannel.open(filePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE); + asyncFileChannelWriter = new AsyncFileChannelWriter(fileChannel); + } + + @Test + void writeData() throws IOException, InterruptedException, ExecutionException, TimeoutException { + byte[] expected = new byte[1050]; + new Random().nextBytes(expected); + + CompletableFuture completableFuture = asyncFileChannelWriter.write(expected, 0); + + int writtenBytes = completableFuture.get(30, TimeUnit.SECONDS); + while (writtenBytes < expected.length) { + completableFuture = asyncFileChannelWriter.write(expected, writtenBytes); + writtenBytes += completableFuture.get(30, TimeUnit.SECONDS); + } + + assertThat(writtenBytes, is(expected.length)); + + fileChannel.close(); + byte[] actual = Files.readAllBytes(filePath); + + assertThat(expected, is(actual)); + } + + @Test + void writeDataAtOffset() throws IOException, InterruptedException, ExecutionException, TimeoutException { + final int startOffset = 100; + byte[] data = new byte[1024]; + new Random().nextBytes(data); + + CompletableFuture completableFuture = asyncFileChannelWriter.write(data, startOffset); + + int writtenBytes = startOffset + completableFuture.get(30, TimeUnit.SECONDS); + while (writtenBytes < data.length) { + completableFuture = asyncFileChannelWriter.write(data, writtenBytes); + writtenBytes += completableFuture.get(30, TimeUnit.SECONDS); + } + + assertThat(writtenBytes - startOffset, is(data.length)); + + fileChannel.close(); + byte[] readFromFile = Files.readAllBytes(filePath); + assertThat(readFromFile.length, is(startOffset + data.length)); + + byte[] readFromFileWithoutOffset = Arrays.copyOfRange(readFromFile, 100, readFromFile.length); + assertThat(readFromFileWithoutOffset, is(data)); + } +}