From 7c32587453bdd00a2b0c8d12694666330e991ed5 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Thu, 28 Oct 2021 02:54:52 +0200 Subject: [PATCH] Persist bsq blocks as buckets of 1000 blocks Improve logging Add BsqBlockStore to protobuf Remove DaoStateMonitoringService field Do not persist the blocks in daoState anymore. This improves persistence performance and reduces memory requirements for snapshots. --- build.gradle | 16 +- .../bisq/common/persistence/db/DataStore.java | 78 ++ .../bisq/common/persistence/db/FixedHash.java | 418 +++++++ .../bisq/common/persistence/db/HashBase.java | 20 + .../bisq/common/persistence/db/NoHeapDB.java | 380 ++++++ .../common/persistence/db/NoHeapDBStore.java | 1051 +++++++++++++++++ .../main/java/bisq/common/util/GcUtil.java | 16 +- .../bisq/core/dao/node/lite/LiteNode.java | 8 +- .../bisq/core/dao/state/DaoStateService.java | 10 +- .../dao/state/DaoStateSnapshotService.java | 73 +- .../bisq/core/dao/state/model/DaoState.java | 8 +- .../dao/state/storage/BlocksPersistence.java | 205 ++++ .../core/dao/state/storage/BsqBlockStore.java | 52 + .../storage/BsqBlocksStorageService.java | 173 +++ .../state/storage/DaoStateStorageService.java | 112 +- .../core/dao/state/storage/DaoStateStore.java | 6 + .../CorePersistenceProtoResolver.java | 3 + proto/src/main/proto/pb.proto | 5 + 18 files changed, 2559 insertions(+), 75 deletions(-) create mode 100644 common/src/main/java/bisq/common/persistence/db/DataStore.java create mode 100644 common/src/main/java/bisq/common/persistence/db/FixedHash.java create mode 100644 common/src/main/java/bisq/common/persistence/db/HashBase.java create mode 100644 common/src/main/java/bisq/common/persistence/db/NoHeapDB.java create mode 100644 common/src/main/java/bisq/common/persistence/db/NoHeapDBStore.java create mode 100644 core/src/main/java/bisq/core/dao/state/storage/BlocksPersistence.java create mode 100644 core/src/main/java/bisq/core/dao/state/storage/BsqBlockStore.java create mode 100644 core/src/main/java/bisq/core/dao/state/storage/BsqBlocksStorageService.java diff --git a/build.gradle b/build.gradle index cf41f0124c..c54ea3eb7a 100644 --- a/build.gradle +++ b/build.gradle @@ -63,6 +63,7 @@ configure(subprojects) { kotlinVersion = '1.3.41' knowmXchangeVersion = '4.4.2' langVersion = '3.11' + leveldbVersion = '1.2' logbackVersion = '1.1.11' loggingVersion = '1.2' lombokVersion = '1.18.12' @@ -238,7 +239,7 @@ configure(project(':common')) { javafx { version = "$javafxVersion" - modules = ['javafx.graphics'] + modules = ['javafx.graphics'] } dependencies { @@ -264,6 +265,10 @@ configure(project(':common')) { exclude(module: 'okhttp') exclude(module: 'okio') } + implementation("io.github.pcmind:leveldb:$leveldbVersion") { + exclude(module: 'guava') + } + runtimeOnly("io.grpc:grpc-netty-shaded:$grpcVersion") { exclude(module: 'guava') exclude(module: 'animal-sniffer-annotations') @@ -284,7 +289,7 @@ configure(project(':p2p')) { javafx { version = "$javafxVersion" - modules = ['javafx.base'] + modules = ['javafx.base'] } dependencies { @@ -298,6 +303,7 @@ configure(project(':p2p')) { implementation("org.apache.httpcomponents:httpclient:$httpclientVersion") { exclude(module: 'commons-codec') } + compile "org.fxmisc.easybind:easybind:$easybindVersion" compileOnly "org.projectlombok:lombok:$lombokVersion" annotationProcessor "org.projectlombok:lombok:$lombokVersion" @@ -324,7 +330,7 @@ configure(project(':core')) { javafx { version = "$javafxVersion" - modules = ['javafx.base'] + modules = ['javafx.base'] } dependencies { @@ -411,7 +417,7 @@ configure(project(':desktop')) { javafx { version = "$javafxVersion" - modules = ['javafx.controls', 'javafx.fxml'] + modules = ['javafx.controls', 'javafx.fxml'] } version = '1.7.5-SNAPSHOT' @@ -459,7 +465,7 @@ configure(project(':monitor')) { javafx { version = "$javafxVersion" - modules = ['javafx.base'] + modules = ['javafx.base'] } mainClassName = 'bisq.monitor.Monitor' diff --git a/common/src/main/java/bisq/common/persistence/db/DataStore.java b/common/src/main/java/bisq/common/persistence/db/DataStore.java new file mode 100644 index 0000000000..6c29074877 --- /dev/null +++ b/common/src/main/java/bisq/common/persistence/db/DataStore.java @@ -0,0 +1,78 @@ +package bisq.common.persistence.db; + +public interface DataStore { + public enum Storage { + IN_MEMORY, + PERSISTED + } + + public static final byte INACTIVE_RECORD = 0; + public static final byte ACTIVE_RECORD = 1; + + public static final byte EMPTY_RECORD_TYPE = -1; + public static final byte OBJ_RECORD_TYPE = 1; + public static final byte TEXT_RECORD_TYPE = 2; + public static final byte LONG_RECORD_TYPE = 3; + public static final byte INT_RECORD_TYPE = 4; + public static final byte DOUBLE_RECORD_TYPE = 5; + public static final byte FLOAT_RECORD_TYPE = 6; + public static final byte SHORT_RECORD_TYPE = 7; + public static final byte CHAR_RECORD_TYPE = 8; + public static final byte BYTEARRAY_RECORD_TYPE = 9; + + + // Get Journal stats + public long getRecordCount(); + + public long getEmptyCount(); + + public String getName(); + + public String getFolder(); + + public long getFilesize(); + + public boolean putInteger(String key, Integer val); + + public Integer getInteger(String key); + + public boolean putShort(String key, Short val); + + public Short getShort(String key); + + public boolean putLong(String key, Long val); + + public Long getLong(String key); + + public boolean putFloat(String key, Float val); + + public Float getFloat(String key); + + public boolean putDouble(String key, Double val); + + public Double getDouble(String key); + + public boolean putString(String key, String val); + + public String getString(String key); + + public boolean putObject(String key, Object msg); + + public Object getObject(String key); // key is the object ID + + boolean putBytes(String key, byte[] bytes); + + byte[] getBytes(String key); + + public boolean putChar(String key, char val); + + public char getChar(String key); + + public boolean remove(String key); // ID is the object hashs + + public Object iterateStart(); + + public Object iterateNext(); + + public void delete(); +} diff --git a/common/src/main/java/bisq/common/persistence/db/FixedHash.java b/common/src/main/java/bisq/common/persistence/db/FixedHash.java new file mode 100644 index 0000000000..d29d67879b --- /dev/null +++ b/common/src/main/java/bisq/common/persistence/db/FixedHash.java @@ -0,0 +1,418 @@ +package bisq.common.persistence.db; + +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; + +import java.io.File; +import java.io.RandomAccessFile; + +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * @author ebruno + */ +public class FixedHash implements HashBase { + + public enum Storage { + IN_MEMORY, + PERSISTED + } + + protected static final Logger logger = Logger.getLogger("FixedHash"); + protected boolean debugLogging = false; + + protected static int LOAD_THRESHOLD = 70; + protected static final int PAGE_SIZE = 1024 * 1024; + protected static final int SIZE_FACTOR = 2; + protected static final int DEFAULT_INDEX_JOURNAL_SIZE = SIZE_FACTOR * PAGE_SIZE; + protected static final int KEY_SIZE = 0; + + // + // 1 (byte) key length + // 4 (int) key hashcode + // 0 (bytes) key size + // 8 (long) record location + // + protected static final int INDEX_ENTRY_SIZE_BYTES = + 1 + Integer.BYTES + KEY_SIZE + Long.BYTES; + + protected long sizeInBytes = DEFAULT_INDEX_JOURNAL_SIZE; + protected int previousOffset = 0; // the last record inserted into the index + protected int bucketsFree = 0; + protected int bucketsUsed = 0; + protected int totalBuckets = 0; + protected int collisions = 0; + + protected String journalPath = ""; + protected boolean inMemory = true; + + protected RandomAccessFile indexFile = null; + protected FileChannel indexChannel = null; + protected ByteBuffer indexBuffer = null; + protected byte keyLength = 16; + protected long indexCurrentEnd = 0; + + protected int indexRecordReadCount = 1; + + // Used when iterating through the index + protected int iterateNext = 0; + + /////////////////////////////////////////////////////////////////////////// + + public FixedHash(String journalPath, boolean inMemory, boolean reuseExisting) { + this(DEFAULT_INDEX_JOURNAL_SIZE, journalPath, inMemory, reuseExisting); + } + + public FixedHash(int size, String journalPath, boolean inMemory, boolean reuseExisting) { + boolean success = false; + sizeInBytes = size; + this.inMemory = inMemory; + this.journalPath = journalPath; + + if (inMemory) { + success = createIndexJournalBB(); + } else { + success = createIndexJournalMBB(reuseExisting); + } + + if (success) { + totalBuckets = (int) (sizeInBytes / INDEX_ENTRY_SIZE_BYTES); + bucketsFree = (int) (sizeInBytes / INDEX_ENTRY_SIZE_BYTES); + bucketsUsed = 0; + } + } + + protected boolean createIndexJournalBB() { + try { + indexBuffer = ByteBuffer.allocateDirect((int) sizeInBytes); + indexCurrentEnd = indexBuffer.position(); + return true; + } catch (Exception e) { + logger.log(Level.SEVERE, "Exception", e); + } + + return false; + } + + protected boolean createIndexJournalMBB(boolean reuseExisting) { + try { + journalPath += "Index"; + + // If the journal file already exists, rename it unless we're + // supposed to reuse the existing file and its contents + boolean fileExists = false; + try { + File file = new File(journalPath); + fileExists = file.exists(); + if (fileExists && !reuseExisting) { + File newFile = new File(journalPath + "_prev"); + logger.info("Moving journal " + journalPath + " to " + newFile.getName()); + file.renameTo(newFile); + } + } catch (Exception e) { + } + + indexFile = new RandomAccessFile(journalPath, "rw"); + if (fileExists && reuseExisting) { + // Existing file, so use its existing length + sizeInBytes = indexFile.length(); + } else { + // New file, set its length + indexFile.setLength(sizeInBytes); + } + + indexChannel = indexFile.getChannel(); + indexBuffer = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, sizeInBytes); + indexCurrentEnd = indexBuffer.position(); + + return true; + } catch (Exception e) { + logger.log(Level.SEVERE, "Exception", e); + } + + return false; + } + + @Override + public void reset() { + try { + indexBuffer.clear(); + indexBuffer.limit(0); + + if (inMemory) { + indexBuffer = ByteBuffer.allocateDirect(0); + } else { + indexChannel.truncate(0); + indexChannel.close(); + indexFile.close(); + File f = new File(journalPath); + f.delete(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + protected int getHashBucket(int hash) { + return _getHashBucket(hash); + } + + protected int getHashBucket(String key) { + int hash = key.hashCode(); + return _getHashBucket(hash); + } + + protected int _getHashBucket(int hash) { + hash = hash ^ (hash >>> 16); + int buckets = (int) (sizeInBytes / INDEX_ENTRY_SIZE_BYTES); + int bucket = Math.max(1, Math.abs(hash) % (buckets - 1)); + return bucket * INDEX_ENTRY_SIZE_BYTES; + } + + protected boolean enlargeIndex() { + try { + // Hold a reference to the original buffer to copy its contents + ByteBuffer oldBuffer = indexBuffer; + + if (inMemory) { + logger.log(Level.INFO, "Expanding in-memory index..."); + sizeInBytes += (PAGE_SIZE * SIZE_FACTOR); + createIndexJournalBB(); + } else { + logger.log(Level.INFO, "Expanding persisted index..."); + ((MappedByteBuffer) indexBuffer).force(); + indexFile.setLength(sizeInBytes + (PAGE_SIZE * SIZE_FACTOR)); + indexChannel = indexFile.getChannel(); + sizeInBytes = indexChannel.size(); + indexBuffer = indexChannel.map( + FileChannel.MapMode.READ_WRITE, 0, sizeInBytes); + } + + // Re-hash the index + // + collisions = 0; + bucketsUsed = 0; + oldBuffer.position(INDEX_ENTRY_SIZE_BYTES); + int buckets = (oldBuffer.capacity() / INDEX_ENTRY_SIZE_BYTES); + for (int i = 1; i <= buckets; i++) { + byte occupied = oldBuffer.get(); + if (occupied > 0) { + int keyHash = oldBuffer.getInt(); + byte[] fixedKeyBytes = null; + if (KEY_SIZE > 0) { + fixedKeyBytes = new byte[KEY_SIZE]; + oldBuffer.get(fixedKeyBytes); + } + Long location = oldBuffer.getLong(); + putInternal(fixedKeyBytes, keyHash, occupied, location); + } else { + // Bucket unocuppied, move to the next one + oldBuffer.position(i * INDEX_ENTRY_SIZE_BYTES); + } + } + + totalBuckets = (int) (sizeInBytes / INDEX_ENTRY_SIZE_BYTES); + bucketsFree = totalBuckets - bucketsUsed; + logger.log(Level.INFO, "Done!"); + + return true; + } catch (Exception e) { + e.printStackTrace(); + } + + return false; + } + + protected int findBucket(Integer hashcode, int offset, boolean mustFind) { + boolean found = false; + byte occupied = 1; + while (occupied > 0 && !found) { + int keyHash = indexBuffer.getInt(); + if (keyHash == hashcode) { + if (KEY_SIZE > 0) { + indexBuffer.position( + offset + 1 + Integer.BYTES + KEY_SIZE); + } + found = true; + break; + } else { + // Check for rollover past the end of the table + offset += INDEX_ENTRY_SIZE_BYTES; + if (offset >= (sizeInBytes - INDEX_ENTRY_SIZE_BYTES)) { + // Wrap to the beginning, skipping the first slot + // since it's reserved for the first record pointer + offset = INDEX_ENTRY_SIZE_BYTES; + } + + // Skip to the next bucket + indexBuffer.position(offset); + occupied = indexBuffer.get(); + } + } + + // return if the key was found in the index + if (!found && mustFind) { + return -1; + } + return offset; + } + + protected boolean putInternal(byte[] fixedKeyBytes, Integer hashcode, + byte keyLength, Long value) { + int offset = getHashBucket(hashcode); + indexBuffer.position(offset); + indexBuffer.mark(); + byte occupied = indexBuffer.get(); + if (occupied == 0) { + // found a free slot, go back to the beginning of it + indexBuffer.reset(); + } else { + collisions++; + + // When there's a collision, walk the table until a + // free slot is found + offset = findBucket(hashcode, offset, false); + + // found a free slot, seek to it + indexBuffer.position(offset); + } + + // Write the data + // + indexBuffer.put(keyLength); + indexBuffer.putInt(hashcode); // hashcode is faster for resolving collisions then comparing strings + if (KEY_SIZE > 0 && fixedKeyBytes != null && fixedKeyBytes.length > 0) { + // Make sure we copy *at most* KEY_SIZE bytes for the key + indexBuffer.put(fixedKeyBytes, + 0, Math.min(KEY_SIZE, fixedKeyBytes.length)); + } + indexBuffer.putLong(value); // indexed record location + + bucketsUsed++; + + return true; + } + + @Override + public boolean put(String key, Long value) { + // + // Entry: + // 1 (byte) key length + // 4 (int) key hashcode + // 0 (bytes) key size + // 8 (long) record location + // + try { + // Check load to see if Index needs to be enlarged + // + if (getLoad() > LOAD_THRESHOLD) { + enlargeIndex(); + } + + byte keylen = (byte) key.length(); + byte[] fixedKeyBytes = null; + if (KEY_SIZE > 0) { + fixedKeyBytes = new byte[KEY_SIZE]; + System.arraycopy(key.getBytes(), + 0, fixedKeyBytes, + 0, Math.min(KEY_SIZE, keylen)); + } + + return putInternal(fixedKeyBytes, key.hashCode(), keylen, value); + } catch (Exception e) { + e.printStackTrace(); + } + + return false; + } + + protected int getHashBucketOffset(String key) { + int offset = -1; + + try { + offset = getHashBucket(key.hashCode()); + indexBuffer.position(offset); + byte occupied = indexBuffer.get(); + if (occupied > 0) { + offset = findBucket(key.hashCode(), offset, true); + } + } catch (Exception e) { + e.printStackTrace(); + } + + return offset; + } + + @Override + public Long get(String key) { + int offset = getHashBucketOffset(key); + if (offset == -1) { + // key not found + return -1L; + } + + // Return the location of the data record + return indexBuffer.getLong(); + } + + @Override + public void remove(String key) { + int offset = getHashBucketOffset(key); + if (offset == -1) { + // key not found + return; + } + + offset = findBucket(key.hashCode(), offset, true); + if (offset != -1) { + // Simply zero out the occupied slot, but need to rewind first + int currPos = indexBuffer.position(); + currPos -= (Integer.BYTES + 1); + indexBuffer.position(currPos); + indexBuffer.put((byte) 0); + } + } + + @Override + public int getCollisions() { + return collisions; + } + + @Override + public void outputStats() { + System.out.println("Index " + journalPath + " Stats:"); + System.out.println(" -size: " + size()); + System.out.println(" -load: " + getLoad()); + System.out.println(" -entries: " + entries()); + System.out.println(" -capacity: " + capacity()); + System.out.println(" -available: " + available()); + System.out.println(" -collisions: " + getCollisions()); + + } + + public long size() { + return sizeInBytes; + } + + public int entries() { + return bucketsUsed; + } + + public int capacity() { + return totalBuckets; + } + + public int available() { + return capacity() - entries(); + } + + public int getLoad() { + int used = entries(); + int capac = capacity(); + float f = (float) used / (float) capac; + int load = (int) (f * 100); + return load; // percentage + } +} diff --git a/common/src/main/java/bisq/common/persistence/db/HashBase.java b/common/src/main/java/bisq/common/persistence/db/HashBase.java new file mode 100644 index 0000000000..d656c17bd9 --- /dev/null +++ b/common/src/main/java/bisq/common/persistence/db/HashBase.java @@ -0,0 +1,20 @@ +package bisq.common.persistence.db; + +/** + * @author ebruno + */ +public interface HashBase { + public boolean put(String k, Long v); + + public Long get(String k); + + public void remove(String k); + + public int getCollisions(); + + public int getLoad(); + + public void outputStats(); + + public void reset(); +} diff --git a/common/src/main/java/bisq/common/persistence/db/NoHeapDB.java b/common/src/main/java/bisq/common/persistence/db/NoHeapDB.java new file mode 100644 index 0000000000..9105dae688 --- /dev/null +++ b/common/src/main/java/bisq/common/persistence/db/NoHeapDB.java @@ -0,0 +1,380 @@ +package bisq.common.persistence.db; + +import bisq.common.Timer; +import bisq.common.UserThread; +import bisq.common.util.Utilities; + +import java.nio.charset.StandardCharsets; + +import java.io.File; + +import java.util.HashMap; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + +/** + * + * @author ebruno + */ +@Slf4j +public class NoHeapDB { + protected final static int MEGABYTE = 1024 * 1024; + protected final static int DEFAULT_STORE_SIZE = MEGABYTE * 100; + private static Thread thread; + static long c; + static long ts; + static String key; + + public static void main(String[] args) throws Exception { + NoHeapDB db = new NoHeapDB(); + /* db.createStore( + "MyTestDataStore", + DataStore.Storage.IN_MEMORY, //or DataStore.Storage.PERSISTED + 256); // in MB*/ + + db.createStore( + "MyTestDataStore", + DataStore.Storage.PERSISTED, //or DataStore.Storage.PERSISTED + 50, 5); // in MB + + DataStore store = db.getStore("MyTestDataStore"); + String sep = "_"; + String result = store.getString(2 + sep + 100); + log.error("result {}", result); + String name = "MyTestDataStore"; + + byte[] array = new byte[10000000]; + new Random().nextBytes(array); + String random = new String(array, StandardCharsets.UTF_8); + log.error(Utilities.readableFileSize(array.length)); + log.error(Utilities.readableFileSize(random.getBytes(StandardCharsets.UTF_8).length)); + + c = store.getRecordCount(); + key = c + sep; + ts = System.currentTimeMillis(); + String res1 = store.getString(key); + // log.error("read took {} ms. {}", System.currentTimeMillis() - ts, res1); + Timer timer = UserThread.runPeriodically(() -> { + + ts = System.currentTimeMillis(); + key = c + sep; + String val1 = random + c; + store.putString(key, val1); + //log.error("write took {} ms", System.currentTimeMillis() - ts); + + ts = System.currentTimeMillis(); + String res = store.getString(key); + // log.error("read took {} ms. res={}, val1={}, match {}", System.currentTimeMillis() - ts, res, val1, res.equals(val1)); + // log.error("read took {} ms. match {}", System.currentTimeMillis() - ts, res.equals(val1)); + c++; + log.error("getFilesize {} getRecordCount {}", Utilities.readableFileSize(store.getFilesize()), store.getRecordCount()); + System.gc(); + if (store.getFilesize() > 1800000000) { + log.error("too large"); + System.exit(0); + } + // 400 000 + /* long ts = System.currentTimeMillis(); + int size = 10000000; + for (int i = 0; i < size; i++) { + String val = String.valueOf(c * i); + String key = c + sep + i; + store.putString(key, val); //400 000 + // log.error("write key/val {}/{}", key, val); + } + + log.error("write took {} ms", System.currentTimeMillis() - ts); + ts = System.currentTimeMillis(); + for (int i = 0; i < size; i++) { + String key = c + sep + i; + String val = store.getString(key); + //log.error("read key/val {}/{}", key, val); + } + log.error("read took {} ms", System.currentTimeMillis() - ts); + c++;*/ + }, 100, TimeUnit.MILLISECONDS); + + + thread = new Thread(() -> { + while (true) { + } + }); + thread.start(); + UserThread.runAfter(() -> { + timer.stop(); + thread.interrupt(); + }, 500); + } + + HashMap stores = new HashMap<>(); + + String homeDirectory = + System.getProperty("user.home") + + File.separator + "JavaOffHeap"; + + public NoHeapDB() { + } + + public NoHeapDB(String homeDirectory) { + this.homeDirectory = homeDirectory; + } + + public boolean createStore(String name) throws Exception { + return createStore(name, + DataStore.Storage.IN_MEMORY, + 100, 10); + } + + public boolean createStore(String name, + DataStore.Storage storageType) + throws Exception { + return createStore(name, + storageType, + 100, 10); + } + + public boolean createStore(String name, + DataStore.Storage storageType, + int size, + int indexFileSize) throws Exception { + if (size > Integer.MAX_VALUE) { + throw new Exception("Database size exceeds " + Integer.MAX_VALUE); + } + + NoHeapDBStore nohdb = new NoHeapDBStore(homeDirectory, + name, + storageType, + size * MEGABYTE, + indexFileSize * MEGABYTE, + true); + + stores.put(name, nohdb); + + return true; + } + + public DataStore getStore(String storeName) { + return this.stores.get(storeName); + } + + public boolean deleteStore(String storeName) { + DataStore store = this.stores.get(storeName); + if (store != null) { + // Delete the store here + store.delete(); + if (stores.remove(storeName) != null) { + return true; + } + } + return false; + } + + public boolean putString(String storeName, String key, String value) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.putString(key, value); + } + + return false; + } + + public boolean putInteger(String storeName, String key, Integer value) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.putInteger(key, value); + } + + return false; + } + + public boolean putShort(String storeName, String key, Short value) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.putShort(key, value); + } + + return false; + } + + public boolean putLong(String storeName, String key, Long value) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.putLong(key, value); + } + + return false; + } + + public boolean putFloat(String storeName, String key, Float value) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.putFloat(key, value); + } + + return false; + } + + public boolean putDouble(String storeName, String key, Double value) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.putDouble(key, value); + } + + return false; + } + + public boolean putChar(String storeName, String key, char value) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.putChar(key, value); + } + + return false; + } + + public boolean putObject(String storeName, String key, Object value) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.putObject(key, value); + } + + return false; + } + + public String getString(String storeName, String key) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.getString(key); + } + + return null; + } + + public Integer getInteger(String storeName, String key) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.getInteger(key); + } + + return null; + } + + public Short getShort(String storeName, String key) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.getShort(key); + } + + return null; + } + + public Long getLong(String storeName, String key) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.getLong(key); + } + + return null; + } + + public Float getFloat(String storeName, String key) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.getFloat(key); + } + + return null; + } + + public Double getDouble(String storeName, String key) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.getDouble(key); + } + + return null; + } + + public char getChar(String storeName, String key) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.getChar(key); + } + + return (char) 0; + } + + public Object getObject(String storeName, String key) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.getObject(key); + } + + return null; + } + + public boolean remove(String storeName, String key) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.remove(key); + } + + return false; + } + + public Object iterateStart(String storeName) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.iterateStart(); + } + + return null; + } + + public Object iterateNext(String storeName) { + DataStore store = this.stores.get(storeName); + if (store != null) { + return store.iterateNext(); + } + + return null; + } + + + public int getCollisions(String storeName) { + NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName); + return niop.getCollisions(); + } + + public int getIndexLoad(String storeName) { + NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName); + return niop.getIndexLoad(); + } + + public long getObjectRetrievalTime(String storeName) { + NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName); + return niop.getObjectRetrievalTime(); + } + + public long getObjectStorageTime(String storeName) { + NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName); + return niop.getObjectStorageTime(); + } + + public void outputStats(String storeName) { + NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName); + niop.outputStats(); + } + + public long getRecordCount(String storeName) { + NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName); + return niop.getRecordCount(); + } + + public long getEmptyCount(String storeName) { + NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName); + return niop.getEmptyCount(); + } +} diff --git a/common/src/main/java/bisq/common/persistence/db/NoHeapDBStore.java b/common/src/main/java/bisq/common/persistence/db/NoHeapDBStore.java new file mode 100644 index 0000000000..7e2d473e5c --- /dev/null +++ b/common/src/main/java/bisq/common/persistence/db/NoHeapDBStore.java @@ -0,0 +1,1051 @@ +package bisq.common.persistence.db; + +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.RandomAccessFile; +import java.io.Serializable; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.TreeMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class NoHeapDBStore implements DataStore { + protected static final int MEGABYTE = 1024 * 1024; + protected static final int JOURNAL_SIZE_FACTOR = 10; + protected static final int DEFAULT_JOURNAL_SIZE = MEGABYTE * JOURNAL_SIZE_FACTOR; + protected static final int DEFAULT_INDEX_SIZE = DEFAULT_JOURNAL_SIZE / 4; + + private static final Logger logger = Logger.getLogger("NoHeapDBStore"); + public static String JOURNAL_VERSION = "JAVAOFFHEAPVERSION_1"; + + boolean debugLogging = false; + + // The journal is where persisted data is stored. + // Intend to move the bulk of the internals out of this class + // and hide implementation within its own class + // + protected RandomAccessFile journal = null; + protected FileChannel channel = null; + protected int numBuffers = 1; + protected ByteBuffer buffer = null; + + protected int bufferSize = DEFAULT_JOURNAL_SIZE; + protected int recordCount; + protected long currentEnd = 0; + + // Keep an index of all active entries in the storage file + // + protected HashBase index = null; + + // Keep an index of LinkedList, where the index key is the size + // of the empty record. The LinkedList contains pointers (offsets) to + // the empty records + // + public TreeMap> emptyIdx = + new TreeMap>(); + + public static byte[] clearBytes = null; + + public static class Header implements Serializable { + // Boolean - Active record indicator + // Byte - Message type (0=Empty, 1=Bytes, 2=String) + // Integer - Size of record's payload (not header) + + byte active; // 1 byte + byte type; // 1 byte + int size; // 4 bytes + public static final int HEADER_SIZE = Integer.BYTES + 2; + } + + class JournalLocationData { + long offset; + int newEmptyRecordSize; + } + + protected String journalFolder = ""; + protected String journalName = ""; + protected boolean inMemory = true; + private int indexFileSize = DEFAULT_INDEX_SIZE; + protected boolean reuseExisting = true; + + // Used when iterating through the index + protected long iterateNext = 0; + + // Performance tracking data + private boolean trackPerformance = false; + private int persistCount = 0; + private long worstPersistTime = 0; + private int deleteCount = 0; + private long worstDeleteTime = 0; + private long objectGetTime = 0; + private long objectPutTime = 0; + + /////////////////////////////////////////////////////////////////////////// + + private NoHeapDBStore() { + } + + // TODO: Need to add optional expected capacity + public NoHeapDBStore(String folder, String name) { + this(folder, name, Storage.IN_MEMORY, DEFAULT_JOURNAL_SIZE, DEFAULT_INDEX_SIZE, true); + } + + public NoHeapDBStore(String folder, String name, Storage type) { + this(folder, name, type, DEFAULT_JOURNAL_SIZE, DEFAULT_INDEX_SIZE, true); + } + + public NoHeapDBStore(String folder, String name, Storage type, int sizeInBytes) { + this(folder, name, type, sizeInBytes, DEFAULT_INDEX_SIZE, true); + } + + public NoHeapDBStore(String folder, + String name, + Storage type, + int sizeInBytes, + int indexFileSize, + boolean reuseExisting) { + this.indexFileSize = indexFileSize; + this.reuseExisting = reuseExisting; + this.journalFolder = folder; + this.journalName = name; + this.inMemory = (type == Storage.IN_MEMORY); + this.bufferSize = sizeInBytes; + + String journalPath = createJournalFolderName(journalFolder, journalName); + + createMessageJournal(journalPath, inMemory, reuseExisting); + createIndexJournal(journalPath, inMemory, reuseExisting); + } + + protected final boolean createMessageJournal(String journalPath, boolean inMemory, boolean reuseExisting) { + if (inMemory) { + // In-memory ByteBuffer Journal + return createMessageJournalBB(); + } else { + // Persisted File MappedByteBuffer Journal + return createMessageJournalMBB(journalPath, reuseExisting); + } + } + + protected final boolean createMessageJournalBB() { + try { + buffer = ByteBuffer.allocateDirect(bufferSize); + currentEnd = buffer.position(); + return true; + } catch (Exception e) { + logger.log(Level.SEVERE, "Exception", e); + } + + return false; + } + + protected final String createJournalFolderName(String folder, String name) { + StringBuffer filename = new StringBuffer(journalFolder); + filename.append(File.separator); + filename.append(journalName); // queue or server + filename.append("journal"); + return filename.toString(); + } + + protected final boolean createMessageJournalMBB(String journalPath, boolean reuseExisting) { + try { + // First create the directory + File filePath = new File(journalFolder); + boolean created = filePath.mkdir(); + if (!created) { + // It may have failed because the directory already existed + if (!filePath.exists()) { + logger.severe("Directory creation failed: " + journalFolder); + return false; + } + } + + // If the journal file already exists, rename it unless we're + // supposed to reuse the existing file and its contents + boolean fileExists = false; + try { + File file = new File(journalPath); + fileExists = file.exists(); + if (fileExists && !reuseExisting) { + File newFile = new File(journalPath + "_prev"); + logger.info("Moving journal " + journalPath + " to " + newFile.getName()); + file.renameTo(newFile); + } + } catch (Exception e) { + } + + //System.out.println("*** Creating journal: " + filename.toString()); + journal = new RandomAccessFile(journalPath, "rw"); + if (fileExists && reuseExisting) { + // Existing file, so use its existing length + bufferSize = (int) journal.length(); + } else { + // New file, set its length + journal.setLength(bufferSize); + } + + channel = journal.getChannel(); + buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, bufferSize); + + if (fileExists && reuseExisting) { + // Iterate through the existing records and find its current end + currentEnd = scanJournal(); + + if (debugLogging) { + logger.info("Initializized journal '" + journalName + + "', existing filename=" + journalPath); + } + } else { + // + // Write some journal header data + // + writeJournalHeader(journal); + + currentEnd = journal.getFilePointer(); + + if (debugLogging) { + logger.info("Created journal '" + journalName + + "', filename=" + journalPath); + } + } + + return true; + } catch (Exception e) { + logger.log(Level.SEVERE, "Exception", e); + } + + return false; + } + + // + // Iterate through the contents of the journal and create + // an index for the empty records + // + private long scanJournal() { + for (LinkedList val : emptyIdx.values()) { + val.clear(); + } + emptyIdx.clear(); + + int recordSize = 0; + + try { + long filesize = journal.length(); + String version = journal.readUTF(); + String name = journal.readUTF(); + Long createTime = journal.readLong(); + currentEnd = journal.getFilePointer(); + ByteBuffer bb = buffer; + bb.position((int) currentEnd); + + // Iterate through the records in the file with the intent to + // record the empty slots (for future reuse) and the end of the + // saved record + // + while (currentEnd < (filesize - Header.HEADER_SIZE)) { + // Begin reading the next record header + // + + // Active Record? + boolean active = true; + if (bb.get() == INACTIVE_RECORD) { + active = false; + } + + // Read record type + byte type = bb.get(); + if (type == 0) { + bb.position((int) currentEnd); + break; // end of data records in file + } + + // Get the data length (comes after the record header) + int datalen = bb.getInt(); + recordSize = Header.HEADER_SIZE + datalen; + + if (!active) { + // Record the inactive record location for reuse + storeEmptyRecord(currentEnd, datalen); + } + + // skip past the data to the beginning of the next record + currentEnd += recordSize; + bb.position((int) currentEnd); + } + } catch (Exception e) { + logger.log(Level.SEVERE, "Exception", e); + try { + StringBuffer sb = new StringBuffer("Persist data: "); + sb.append(" Journal: " + journalName); + sb.append(", length: " + channel.size()); + sb.append(", currentEnd: " + currentEnd); + sb.append(", recordSize: " + recordSize); + logger.info(sb.toString()); + } catch (Exception ee) { + } + } + + return currentEnd; + } + + private void writeJournalHeader(RandomAccessFile journal) throws IOException { + // write the journal version number to the file + journal.writeUTF(NoHeapDBStore.JOURNAL_VERSION); + + // write the journal name to the file + journal.writeUTF(journalName); + + // identify the journal as belonging to this server's run + // (to avoid it from being recovered by the recovery thread) + journal.writeLong(System.currentTimeMillis()); + } + + protected boolean createIndexJournal(String journalPath, boolean inMemory, boolean reuseExisting) { + try { + index = new FixedHash(indexFileSize, journalPath, inMemory, reuseExisting); + return true; + } catch (Exception e) { + e.printStackTrace(); + } + + return false; + } + + private JournalLocationData getStorageLocation(int recordLength) { + JournalLocationData location = new JournalLocationData(); + location.offset = -1; // where to write new record + location.newEmptyRecordSize = -1; // Left over portion of empty space + + // Check if the deleted record list is empty + if (emptyIdx == null || emptyIdx.isEmpty()) { + return location; + } + + try { + // Determine if there's an empty location to insert this new record + // There are a few criteria. The empty location must either be + // an exact fit for the new record with its header (replacing the + // existing empty record's header, or if the empty record is larger, + // it must be large enough for the new record's header and data, + // and another header to mark the empty record so the file can + // be traversed at a later time. In other words, the journal + // consists of sequential records, back-to-back, with no gap in + // between, otherwise it cannot be traversed from front to back + // without adding a substantial amount of indexing within the file. + // Therefore, even deleted records still exist within the journal, + // they are simply marked as deleted. But the record size is still + // part of the record so it can be skipped over when read back in + + // First locate an appropriate location. It must match exactly + // or be equal in size to the record to write and a minimal record + // which is just a header (5 bytes). So, HEADER + DATA + HEADER, + // or (data length + 10 bytes). + + // Is there an exact match? + LinkedList records = emptyIdx.get(recordLength); + if (records != null && !records.isEmpty()) { + location.offset = records.remove(); + + // No need to append an empty record, just return offset + location.newEmptyRecordSize = -1; + return location; + } + + // Can't modify the empty record list while iterating so + // create a list of objects to remove (they are actually entries + // of a size value) + // + ArrayList toRemove = new ArrayList<>(); + + // No exact size match, find one just large enough + for (Integer size : this.emptyIdx.keySet()) { + // If we're to split this record, make sure there's enough + // room for the new record and another emtpy record with + // a header and at least one byte of data + // + if (size >= recordLength + Header.HEADER_SIZE + 1) { + records = emptyIdx.get(size); + if (records == null || records.size() == 0) { + // This was the last empty record of this size + // so delete the entry in the index and continue + // searching for a larger empty region (if any) + toRemove.add(size); + continue; + } + + location.offset = records.remove(); + + // We need to append an empty record after the new record + // taking the size of the header into account + location.newEmptyRecordSize = + (size - recordLength - Header.HEADER_SIZE); + + int newOffset = + (int) location.offset + recordLength + Header.HEADER_SIZE; + + // Store the new empty record's offset + storeEmptyRecord(newOffset, location.newEmptyRecordSize); + break; + } + } + + // Remove any index records marked to delete + // + for (Integer offset : toRemove) { + emptyIdx.remove(offset); + } + } catch (Exception e) { + logger.log(Level.SEVERE, "Exception", e); + } + + return location; + } + + private JournalLocationData setNewRecordLocation(int datalen) { + int recordSize = Header.HEADER_SIZE + datalen; + + try { + // Always persist messages at the end of the journal unless + // there's an empty position large enough within the journal + // + JournalLocationData location = getStorageLocation(datalen); + if (location.offset == -1) { + // None found, need to add record to the end of the journal + // Seek there now only if we're not already there + long currentPos = buffer.position(); + if (currentPos != currentEnd) { + currentPos = buffer.position((int) currentEnd).position(); + } + + // Check to see if we need to grow the journal file + + long journalLen; + if (!inMemory) { + journalLen = channel.size(); + } else { + journalLen = buffer.capacity(); + } + + if ((currentPos + recordSize) >= journalLen) { + // Need to grow the buffer/file by another page + currentPos = expandJournal(journalLen, currentPos); + } + + location.offset = currentEnd; + + // Increment currentEnd by the size of the record appended + currentEnd += recordSize; + } else { + // Seek to the returned insertion point + buffer.position((int) location.offset); + } + + return location; + } catch (Exception e) { + e.printStackTrace(); + System.exit(-1); + } + + return null; + } + + protected long expandJournal(long journalLen, long currentPos) throws IOException { + if (debugLogging) { + logger.info("Expanding journal size"); + } + + if (inMemory) { + long newLength = + journalLen + (MEGABYTE * JOURNAL_SIZE_FACTOR); + System.out.print("Expanding ByteBuffer size to " + newLength + "..."); + ByteBuffer newBuffer = ByteBuffer.allocateDirect((int) newLength); + if (buffer.hasArray()) { + byte[] array = buffer.array(); + newBuffer.put(array); + } else { + buffer.position(0); + newBuffer.put(buffer); + } + buffer = newBuffer; + journalLen = buffer.capacity(); + } else { + System.out.print("Expanding RandomAccessFile journal size to " + (journalLen + (MEGABYTE * JOURNAL_SIZE_FACTOR)) + "..."); + ((MappedByteBuffer) buffer).force(); + journal.setLength(journalLen + (MEGABYTE * JOURNAL_SIZE_FACTOR)); + channel = journal.getChannel(); + journalLen = channel.size(); + buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, journalLen); + } + System.out.println("done"); + + // Since we re-mapped the file, double-check the position + currentPos = buffer.position(); + if (currentPos != currentEnd) { + buffer.position((int) currentEnd); + } + + return currentPos; + } + + protected void storeEmptyRecord(long offset, int length) { + // Store the empty record in an index. Look to see if there + // are other records of the same size (in a LinkedList). If + // so, add this one to the end of the linked list + // + LinkedList emptyRecs = emptyIdx.get(length); + if (emptyRecs == null) { + // There are no other records of this size. Add an entry + // in the hash table for this new linked list of records + emptyRecs = new LinkedList(); + emptyIdx.put(length, emptyRecs); + } + + // Add the pointer (file offset) to the new empty record + emptyRecs.add(offset); + } + + public int getIndexLoad() { + return this.index.getLoad(); + } + + /////////////////////////////////////////////////////////////////////////// + // Persistence interface + // + + // Empties the journal file and resets indexes + @Override + public void delete() { + try { + // Clear the existing indexes first + // + index.reset(); + emptyIdx.clear(); + + if (inMemory) { + buffer.clear(); + buffer.limit(0); + buffer = ByteBuffer.allocateDirect(0); + } else { + // Reset the file pointer and length + journal.seek(0); + channel.truncate(0); + channel.close(); + journal.close(); + File f = new File(createJournalFolderName(journalFolder, journalName)); + f.delete(); + } + } catch (EOFException eof) { + } catch (IOException io) { + } + } + + @Override + public synchronized long getRecordCount() { + return recordCount; + } + + @Override + public synchronized long getEmptyCount() { + return emptyIdx.size(); + } + + @Override + public String getName() { + return journalName; + } + + @Override + public String getFolder() { + return journalFolder; + } + + @Override + public synchronized long getFilesize() { + try { + return channel.size(); + } catch (Exception e) { + return 0; + } + } + + @Override + public boolean putLong(String key, Long val) { + return putVal(key, val, LONG_RECORD_TYPE); + } + + @Override + public boolean putInteger(String key, Integer val) { + return putVal(key, val, INT_RECORD_TYPE); + } + + @Override + public boolean putShort(String key, Short val) { + return putVal(key, val, SHORT_RECORD_TYPE); + } + + @Override + public boolean putChar(String key, char val) { + return putVal(key, val, CHAR_RECORD_TYPE); + } + + @Override + public boolean putFloat(String key, Float val) { + return putVal(key, val, FLOAT_RECORD_TYPE); + } + + @Override + public boolean putDouble(String key, Double val) { + return putVal(key, val, DOUBLE_RECORD_TYPE); + } + + @Override + public boolean putString(String key, String val) { + return putVal(key, val, TEXT_RECORD_TYPE); + } + + @Override + public boolean putObject(String key, Object obj) { + try (ByteArrayOutputStream bstream = new ByteArrayOutputStream(); + ObjectOutputStream ostream = new ObjectOutputStream(bstream)) { + + // Grab the payload and determine the record size + // + ostream.writeObject(obj); + byte[] bytes = bstream.toByteArray(); + ostream.close(); + + return putVal(key, bytes, BYTEARRAY_RECORD_TYPE); + } catch (Exception e) { + logger.severe("Exception: " + e.toString()); + } + + return false; + } + + @Override + public boolean putBytes(String key, byte[] bytes) { + return putVal(key, bytes, BYTEARRAY_RECORD_TYPE); + } + + @Override + public Long getLong(String key) { + return (Long) getValue(key, LONG_RECORD_TYPE); + } + + @Override + public Integer getInteger(String key) { + return (Integer) getValue(key, INT_RECORD_TYPE); + } + + @Override + public Short getShort(String key) { + return (Short) getValue(key, SHORT_RECORD_TYPE); + } + + @Override + public Float getFloat(String key) { + return (Float) getValue(key, FLOAT_RECORD_TYPE); + } + + @Override + public Double getDouble(String key) { + return (Double) getValue(key, DOUBLE_RECORD_TYPE); + } + + @Override + public char getChar(String key) { + Object obj = getValue(key, CHAR_RECORD_TYPE); + if (obj != null) { + return (char) obj; + } + return (char) 0; + } + + @Override + public String getString(String key) { + return (String) getValue(key, TEXT_RECORD_TYPE); + } + + @Override + public Object getObject(String key) { + Object object = null; + + Object obj = this.getValue(key, BYTEARRAY_RECORD_TYPE); + if (obj == null) { + return null; + } + + byte[] bytes = (byte[]) obj; + try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + ObjectInputStream ostream = new ObjectInputStream(bis)) { + object = ostream.readObject(); + } catch (Exception e) { + logger.log(Level.SEVERE, "Exception", e); + } + + return object; + } + + @Override + public byte[] getBytes(String key) { + return (byte[]) this.getValue(key, BYTEARRAY_RECORD_TYPE); + } + + @Override + public boolean remove(String key) { + Long offset = (long) -1; + int datalength = -1; + + try { + synchronized (this) { + //bufferSize = buffer.capacity(); + + // Locate the message in the journal + offset = getRecordOffset(key); + + if (offset == -1) { + return false; + } + + // read the header (to get record length) then set it as inactive + buffer.position(offset.intValue()); + buffer.put(INACTIVE_RECORD); + buffer.put(EMPTY_RECORD_TYPE); + datalength = buffer.getInt(); + + // Store the empty record location and size for later reuse + storeEmptyRecord(offset, datalength); + + // Remove from the journal index + index.remove(key); + } + + return true; + } catch (Exception e) { + logger.log(Level.SEVERE, "Exception", e); + + logger.severe("deleteMessage data, offset=" + offset + ", length=" + datalength + ", bufferSize=" + bufferSize); + try { + logger.severe("current journal data, filePointer=" + journal.getFilePointer() + + ", filesize=" + journal.length()); + } catch (Exception e1) { + } + } + + return false; + } + + ////////////////////////////////////////////////////////////////////////// + + protected boolean putVal(String key, Object val, byte type) { + // Each message is written to a file with the following + // record structure: + // + // HEADER: + // Boolean - Active record indicator + // Byte - Message type (0=Empty, 1=Bytes, 2=String) + // Integer - Size of record's payload (not header) + // + // DATA: + // Byte array or data value - The message payload + // + try { + long start = System.currentTimeMillis(); + synchronized (this) { + int datalen; + + switch (type) { + case LONG_RECORD_TYPE: + datalen = Long.BYTES; + break; + case INT_RECORD_TYPE: + datalen = Integer.BYTES; + break; + case DOUBLE_RECORD_TYPE: + datalen = Double.BYTES; + break; + case FLOAT_RECORD_TYPE: + datalen = Float.BYTES; + break; + case SHORT_RECORD_TYPE: + datalen = Short.BYTES; + break; + case CHAR_RECORD_TYPE: + datalen = 2; // 16-bit Unicode character + break; + case TEXT_RECORD_TYPE: + datalen = ((String) val).getBytes().length; + break; + case BYTEARRAY_RECORD_TYPE: + datalen = ((byte[]) val).length; + break; + default: + return false; + } + + JournalLocationData location = setNewRecordLocation(datalen); + + // First write the header + // + buffer.put(ACTIVE_RECORD); + buffer.put(type); + buffer.putInt(datalen); + + // Write record value + // + switch (type) { + case LONG_RECORD_TYPE: + buffer.putLong((Long) val); + break; + case INT_RECORD_TYPE: + buffer.putInt((Integer) val); + break; + case DOUBLE_RECORD_TYPE: + buffer.putDouble((Double) val); + break; + case FLOAT_RECORD_TYPE: + buffer.putFloat((Float) val); + break; + case SHORT_RECORD_TYPE: + buffer.putShort((Short) val); + break; + case CHAR_RECORD_TYPE: + buffer.putChar((char) val); + break; + case TEXT_RECORD_TYPE: + buffer.put(((String) val).getBytes()); + break; + case BYTEARRAY_RECORD_TYPE: + buffer.put((byte[]) val); + break; + } + + // Next, see if we need to append an empty record if we inserted + // this new record at an empty location + if (location.newEmptyRecordSize != -1) { + // Write the header and data for the new record, as well + // as header indicating an empty record + buffer.put(INACTIVE_RECORD); // inactive record + buffer.put(EMPTY_RECORD_TYPE); // save message type EMPTY + buffer.putInt(location.newEmptyRecordSize); + + if (buffer.position() > currentEnd) { + currentEnd = buffer.position(); + } + } + + indexRecord(key, location.offset); + + recordCount++; + + long end = System.currentTimeMillis(); + this.objectPutTime += (end - start); + + return true; + } + } catch (Exception e) { + e.printStackTrace(); + } + + return false; + } + + protected Object getValue(String key, byte type) { + Long offset = getRecordOffset(key); + if (offset != null && offset > -1) { + return getValue(offset, type); + } + return null; + } + + protected Object getValue(Long offset, byte type) { + Object val = null; + try { + if (offset != null && offset > -1) { + long start = System.currentTimeMillis(); + + // Jump to this record's offset within the journal file + buffer.position(offset.intValue()); + + // First, read in the header + byte active = buffer.get(); + if (active != 1) { + return null; + } + + byte typeStored = buffer.get(); + if (type != typeStored) { + return null; + } + + int dataLength = buffer.getInt(); + + byte[] bytes; + switch (type) { + case LONG_RECORD_TYPE: + val = buffer.getLong(); + break; + case INT_RECORD_TYPE: + val = buffer.getInt(); + break; + case DOUBLE_RECORD_TYPE: + val = buffer.getDouble(); + break; + case FLOAT_RECORD_TYPE: + val = buffer.getFloat(); + break; + case SHORT_RECORD_TYPE: + val = buffer.getShort(); + break; + case CHAR_RECORD_TYPE: + val = buffer.getChar(); + break; + case BYTEARRAY_RECORD_TYPE: + bytes = new byte[dataLength]; + buffer.get(bytes); + val = bytes; + break; + case TEXT_RECORD_TYPE: + bytes = new byte[dataLength]; + buffer.get(bytes); + val = new String(bytes); + break; + } + + long end = System.currentTimeMillis(); + this.objectGetTime += (end - start); + } + } catch (Exception e) { + logger.log(Level.SEVERE, "Exception", e); + } + + return val; + } + + @Override + public Object iterateStart() { + try { + long current = 0; + + // Get past all the file header data + // + if (journal != null) { + journal.seek(0); + long filesize = journal.length(); + String version = journal.readUTF(); + String name = journal.readUTF(); + Long createTime = journal.readLong(); + current = journal.getFilePointer(); + } + + // Return the first active record found + return getNextRecord(current); + } catch (Exception e) { + e.printStackTrace(); + } + + return null; + } + + @Override + public Object iterateNext() { + return getNextRecord(iterateNext); + } + + protected Object getNextRecord(long current) { + int recordSize = 0; + try { + ByteBuffer bb = buffer; + if (bb.position() != current) { + bb.position((int) current); + } + + // Iterate through the records in the file with the intent to + // record the empty slots (for future reuse) and the end of the + // saved record + // + boolean found = false; + byte type = DataStore.EMPTY_RECORD_TYPE; + while (!found && current < (bufferSize - Header.HEADER_SIZE)) { + // Begin reading the next record header + // + + // Active Record? + boolean active = true; + if (bb.get() == INACTIVE_RECORD) { + active = false; + } + + // Read record type + type = bb.get(); + if (type == 0) { + bb.position((int) currentEnd); + break; // end of data records in file + } + + // Get the data length (comes after the record header) + int datalen = bb.getInt(); + recordSize = Header.HEADER_SIZE + datalen; + + if (active) { + // Found the next active record + found = true; + + // Store the location to the start of the next record + iterateNext = current + recordSize; + } else { + // skip past the data to the beginning of the next record + current += recordSize; + bb.position((int) current); + } + } + + if (found) { + // Return the record + return getValue(current, type); + } + } catch (Exception e) { + logger.log(Level.SEVERE, "Exception", e); + } + + return null; + } + + protected Long getRecordOffset(String key) { + Long location = index.get(key); + return location; + } + + protected boolean indexRecord(String key, long recordLocation) { + return index.put(key, recordLocation); + } + + public int getCollisions() { + return index.getCollisions(); + } + + public long getObjectRetrievalTime() { + return objectGetTime; + } + + public long getObjectStorageTime() { + return objectPutTime; + } + + public void outputStats() { + System.out.println("Data Store:"); + System.out.println(" -size: " + buffer.capacity()); + index.outputStats(); + } +} diff --git a/common/src/main/java/bisq/common/util/GcUtil.java b/common/src/main/java/bisq/common/util/GcUtil.java index 65227f2d91..5f89bd4f61 100644 --- a/common/src/main/java/bisq/common/util/GcUtil.java +++ b/common/src/main/java/bisq/common/util/GcUtil.java @@ -18,7 +18,6 @@ package bisq.common.util; import bisq.common.UserThread; -import bisq.common.app.DevEnv; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -29,6 +28,8 @@ public class GcUtil { private static boolean DISABLE_GC_CALLS = false; private static int TRIGGER_MEM = 1000; private static int TRIGGER_MAX_MEM = 3000; + private static int totalInvocations; + private static long totalGCTime; public static void autoReleaseMemory() { if (DISABLE_GC_CALLS) @@ -59,20 +60,25 @@ public class GcUtil { long preGcMemory = Runtime.getRuntime().totalMemory(); if (preGcMemory > trigger * 1024 * 1024) { System.gc(); + totalInvocations++; long postGcMemory = Runtime.getRuntime().totalMemory(); - log.info("GC reduced memory by {}. Total memory before/after: {}/{}. Took {} ms.", + long duration = System.currentTimeMillis() - ts; + totalGCTime += duration; + log.info("GC reduced memory by {}. Total memory before/after: {}/{}. Took {} ms. Total GC invocations: {} / Total GC time {} sec", Utilities.readableFileSize(preGcMemory - postGcMemory), Utilities.readableFileSize(preGcMemory), Utilities.readableFileSize(postGcMemory), - System.currentTimeMillis() - ts); - if (DevEnv.isDevMode()) { + duration, + totalInvocations, + totalGCTime / 1000d); + /* if (DevEnv.isDevMode()) { try { // To see from where we got called throw new RuntimeException("Dummy Exception for print stacktrace at maybeReleaseMemory"); } catch (Throwable t) { t.printStackTrace(); } - } + }*/ } } } diff --git a/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java b/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java index 966cfb545b..a7c6f0730e 100644 --- a/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java +++ b/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java @@ -35,6 +35,7 @@ import bisq.network.p2p.network.Connection; import bisq.common.Timer; import bisq.common.UserThread; +import bisq.common.util.MathUtils; import com.google.inject.Inject; @@ -222,8 +223,11 @@ public class LiteNode extends BsqNode { runDelayedBatchProcessing(new ArrayList<>(blockList), () -> { - log.info("Parsing {} blocks took {} seconds.", blockList.size(), - (System.currentTimeMillis() - ts) / 1000d); + double duration = System.currentTimeMillis() - ts; + log.info("Parsing {} blocks took {} seconds ({} min.) / {} ms in average / block", blockList.size(), + MathUtils.roundDouble(duration / 1000d, 2), + MathUtils.roundDouble(duration / 1000d / 60, 2), + MathUtils.roundDouble(duration / blockList.size(), 2)); // We only request again if wallet is synced, otherwise we would get repeated calls we want to avoid. // We deal with that case at the setupWalletBestBlockListener method above. if (walletsSetup.isDownloadComplete() && diff --git a/core/src/main/java/bisq/core/dao/state/DaoStateService.java b/core/src/main/java/bisq/core/dao/state/DaoStateService.java index 83fa5f932a..ee2620cdcc 100644 --- a/core/src/main/java/bisq/core/dao/state/DaoStateService.java +++ b/core/src/main/java/bisq/core/dao/state/DaoStateService.java @@ -149,8 +149,8 @@ public class DaoStateService implements DaoSetupService { return DaoState.getClone(daoState); } - public protobuf.DaoState getCloneAsProto() { - return DaoState.getCloneAsProto(daoState); + public protobuf.DaoState getBsqStateCloneExcludingBlocks() { + return DaoState.getBsqStateCloneExcludingBlocks(daoState); } public byte[] getSerializedStateForHashChain() { @@ -321,12 +321,16 @@ public class DaoStateService implements DaoSetupService { return getBlockAtHeight(height).map(Block::getTime).orElse(0L); } + public List getBlocksFromBlockHeight(int fromBlockHeight) { + return getBlocksFromBlockHeight(fromBlockHeight, Integer.MAX_VALUE); + } + public List getBlocksFromBlockHeight(int fromBlockHeight, int numMaxBlocks) { // We limit requests to numMaxBlocks blocks, to avoid performance issues and too // large network data in case a node requests too far back in history. return getBlocks().stream() .filter(block -> block.getHeight() >= fromBlockHeight) - .sorted(Comparator.comparing(Block::getHeight)) + .sorted(Comparator.comparing(Block::getHeight)) //todo not needed .limit(numMaxBlocks) .collect(Collectors.toList()); } diff --git a/core/src/main/java/bisq/core/dao/state/DaoStateSnapshotService.java b/core/src/main/java/bisq/core/dao/state/DaoStateSnapshotService.java index 179c396da8..4780176aad 100644 --- a/core/src/main/java/bisq/core/dao/state/DaoStateSnapshotService.java +++ b/core/src/main/java/bisq/core/dao/state/DaoStateSnapshotService.java @@ -37,6 +37,7 @@ import java.io.File; import java.io.IOException; import java.util.LinkedList; +import java.util.List; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -61,9 +62,10 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene private final Preferences preferences; private final File storageDir; - private protobuf.DaoState snapshotCandidate; - private int snapshotCandidateHeight; - private LinkedList daoStateHashChainSnapshotCandidate = new LinkedList<>(); + private protobuf.DaoState daoStateCandidate; + private LinkedList hashChainCandidate = new LinkedList<>(); + private List blocksCandidate; + private int snapshotHeight; private int chainHeightOfLastApplySnapshot; @Setter @Nullable @@ -141,15 +143,20 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene // This also comes with the improvement that the user does not need to load the past blocks back to the last // snapshot height. Though it comes also with the small risk that in case of re-orgs the user need to do // a resync in case the dao state would have been affected by that reorg. + //todo long ts = System.currentTimeMillis(); // We do not keep a copy of the clone as we use it immediately for persistence. GcUtil.maybeReleaseMemory(); - log.info("Create snapshot at height {}", daoStateService.getChainHeight()); - daoStateStorageService.requestPersistence(daoStateService.getCloneAsProto(), - new LinkedList<>(daoStateMonitoringService.getDaoStateHashChain()), + int chainHeight = daoStateService.getChainHeight(); + log.info("Create snapshot at height {}", chainHeight); + // We do not keep the data in our fields to enable gc as soon its released in the store + daoStateStorageService.requestPersistence(getDaoStateForSnapshot(), + getBlocksForSnapshot(), + getHashChainForSnapshot(), () -> { + GcUtil.maybeReleaseMemory(); log.info("Persisted daoState after parsing completed at height {}. Took {} ms", - daoStateService.getChainHeight(), System.currentTimeMillis() - ts); + chainHeight, System.currentTimeMillis() - ts); }); GcUtil.maybeReleaseMemory(); }); @@ -167,8 +174,8 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene // Either we don't have a snapshot candidate yet, or if we have one the height at that snapshot candidate must be // different to our current height. - boolean noSnapshotCandidateOrDifferentHeight = snapshotCandidate == null || - snapshotCandidateHeight != chainHeight; + boolean noSnapshotCandidateOrDifferentHeight = daoStateCandidate == null || + snapshotHeight != chainHeight; if (isSnapshotHeight(chainHeight) && !daoStateService.getBlocks().isEmpty() && isValidHeight(daoStateService.getBlockHeightOfLastBlock()) && @@ -191,7 +198,7 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene return; } - if (snapshotCandidate != null) { + if (daoStateCandidate != null) { persist(); } else { createSnapshot(); @@ -202,17 +209,12 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene private void persist() { long ts = System.currentTimeMillis(); readyForPersisting = false; - daoStateStorageService.requestPersistence(snapshotCandidate, - daoStateHashChainSnapshotCandidate, + daoStateStorageService.requestPersistence(daoStateCandidate, + blocksCandidate, + hashChainCandidate, () -> { - log.info("Serializing snapshotCandidate for writing to Disc at chainHeight {} took {} ms.\n" + - "snapshotCandidateHeight={};\n" + - "daoStateHashChainSnapshotCandidate.height={}", - daoStateService.getChainHeight(), - System.currentTimeMillis() - ts, - snapshotCandidateHeight, - daoStateHashChainSnapshotCandidate != null && !daoStateHashChainSnapshotCandidate.isEmpty() ? - daoStateHashChainSnapshotCandidate.getLast().getHeight() : "N/A"); + log.info("Serializing daoStateCandidate for writing to Disc at chainHeight {} took {} ms.", + snapshotHeight, System.currentTimeMillis() - ts); createSnapshot(); readyForPersisting = true; @@ -226,18 +228,13 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene // done from the write thread (mapped back to user thread). // As we want to prevent to maintain 2 clones we prefer that strategy. If we would do the clone // after the persist call we would keep an additional copy in memory. - snapshotCandidate = daoStateService.getCloneAsProto(); - snapshotCandidateHeight = daoStateService.getChainHeight(); - daoStateHashChainSnapshotCandidate = new LinkedList<>(daoStateMonitoringService.getDaoStateHashChain()); + daoStateCandidate = getDaoStateForSnapshot(); + blocksCandidate = getBlocksForSnapshot(); + hashChainCandidate = getHashChainForSnapshot(); + snapshotHeight = daoStateService.getChainHeight(); GcUtil.maybeReleaseMemory(); - log.info("Cloned new snapshotCandidate at chainHeight {} took {} ms.\n" + - "snapshotCandidateHeight={};\n" + - "daoStateHashChainSnapshotCandidate.height={}", - daoStateService.getChainHeight(), System.currentTimeMillis() - ts, - snapshotCandidateHeight, - daoStateHashChainSnapshotCandidate != null && !daoStateHashChainSnapshotCandidate.isEmpty() ? - daoStateHashChainSnapshotCandidate.getLast().getHeight() : "N/A"); + log.info("Cloned new daoStateCandidate at height {} took {} ms.", snapshotHeight, System.currentTimeMillis() - ts); } public void applySnapshot(boolean fromReorg) { @@ -247,13 +244,12 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene int chainHeightOfPersisted = persistedBsqState.getChainHeight(); if (!persistedBsqState.getBlocks().isEmpty()) { int heightOfLastBlock = persistedBsqState.getLastBlock().getHeight(); - log.debug("applySnapshot from persistedBsqState daoState with height of last block {}", heightOfLastBlock); if (isValidHeight(heightOfLastBlock)) { if (chainHeightOfLastApplySnapshot != chainHeightOfPersisted) { chainHeightOfLastApplySnapshot = chainHeightOfPersisted; daoStateService.applySnapshot(persistedBsqState); daoStateMonitoringService.applySnapshot(persistedDaoStateHashChain); - daoStateStorageService.pruneStore(); + daoStateStorageService.releaseMemory(); } else { // The reorg might have been caused by the previous parsing which might contains a range of // blocks. @@ -311,4 +307,17 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene private boolean isSnapshotHeight(int height) { return isSnapshotHeight(genesisTxInfo.getGenesisBlockHeight(), height, SNAPSHOT_GRID); } + + private protobuf.DaoState getDaoStateForSnapshot() { + return daoStateService.getBsqStateCloneExcludingBlocks(); + } + + private List getBlocksForSnapshot() { + int fromBlockHeight = daoStateStorageService.getChainHeightOfPersistedBlocks() + 1; + return daoStateService.getBlocksFromBlockHeight(fromBlockHeight); + } + + private LinkedList getHashChainForSnapshot() { + return new LinkedList<>(daoStateMonitoringService.getDaoStateHashChain()); + } } diff --git a/core/src/main/java/bisq/core/dao/state/model/DaoState.java b/core/src/main/java/bisq/core/dao/state/model/DaoState.java index 51c8b80adb..bf0d04200b 100644 --- a/core/src/main/java/bisq/core/dao/state/model/DaoState.java +++ b/core/src/main/java/bisq/core/dao/state/model/DaoState.java @@ -71,8 +71,8 @@ public class DaoState implements PersistablePayload { return DaoState.fromProto(daoState.getBsqStateBuilder().build()); } - public static protobuf.DaoState getCloneAsProto(DaoState daoState) { - return daoState.getBsqStateBuilder().build(); + public static protobuf.DaoState getBsqStateCloneExcludingBlocks(DaoState daoState) { + return daoState.getBsqStateBuilderExcludingBlocks().build(); } @@ -210,6 +210,10 @@ public class DaoState implements PersistablePayload { LinkedList blocks = proto.getBlocksList().stream() .map(Block::fromProto) .collect(Collectors.toCollection(LinkedList::new)); + return fromProto(proto, blocks); + } + + public static DaoState fromProto(protobuf.DaoState proto, LinkedList blocks) { LinkedList cycles = proto.getCyclesList().stream() .map(Cycle::fromProto).collect(Collectors.toCollection(LinkedList::new)); TreeMap unspentTxOutputMap = new TreeMap<>(proto.getUnspentTxOutputMapMap().entrySet().stream() diff --git a/core/src/main/java/bisq/core/dao/state/storage/BlocksPersistence.java b/core/src/main/java/bisq/core/dao/state/storage/BlocksPersistence.java new file mode 100644 index 0000000000..ddb3f661df --- /dev/null +++ b/core/src/main/java/bisq/core/dao/state/storage/BlocksPersistence.java @@ -0,0 +1,205 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.core.dao.state.storage; + +import bisq.common.file.FileUtil; +import bisq.common.proto.persistable.PersistenceProtoResolver; + +import protobuf.BaseBlock; + +import java.nio.file.Path; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; + +import java.util.ArrayList; +import java.util.List; + +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.Nullable; + +@Slf4j +public class BlocksPersistence { + // 10000->Writing 130014 blocks took 1658 msec + // 1000-> Writing 130014 blocks took 1685 msec Mapping blocks from DaoStateStore took 2250 ms + public static final int BUCKET_SIZE = 1000; // results in about 1 MB files and about 1 new file per week + + private final File storageDir; + private final String fileName; + private final PersistenceProtoResolver persistenceProtoResolver; + private Path usedTempFilePath; + + public BlocksPersistence(File storageDir, String fileName, PersistenceProtoResolver persistenceProtoResolver) { + this.storageDir = storageDir; + this.fileName = fileName; + this.persistenceProtoResolver = persistenceProtoResolver; + + /* if (!storageDir.exists()) { + storageDir.mkdir(); + }*/ + } + + public void writeBlocks(List protobufBlocks) { + long ts = System.currentTimeMillis(); + if (!storageDir.exists()) { + storageDir.mkdir(); + } + List temp = new ArrayList<>(); + int bucketIndex = 0; + for (BaseBlock block : protobufBlocks) { + temp.add(block); + + int height = block.getHeight(); + bucketIndex = height / BUCKET_SIZE; + int remainder = height % BUCKET_SIZE; + boolean isLastBucketItem = remainder == 0; + if (isLastBucketItem) { + int first = bucketIndex * BUCKET_SIZE - BUCKET_SIZE + 1; + int last = bucketIndex * BUCKET_SIZE; + File storageFile = new File(storageDir, fileName + "_" + first + "-" + last); + // log.error("addAll height={} items={}", height, temp.stream().map(e -> e.getHeight() + ", ").collect(Collectors.toList())); + writeToDisk(storageFile, new BsqBlockStore(temp), null); + temp = new ArrayList<>(); + } + } + if (!temp.isEmpty()) { + bucketIndex++; + int first = bucketIndex * BUCKET_SIZE - BUCKET_SIZE + 1; + int last = bucketIndex * BUCKET_SIZE; + File storageFile = new File(storageDir, fileName + "_" + first + "-" + last); + // log.error("items={}", temp.stream().map(e -> e.getHeight()).collect(Collectors.toList())); + writeToDisk(storageFile, new BsqBlockStore(temp), null); + + } + log.error("Write {} blocks to disk took {} msec", protobufBlocks.size(), System.currentTimeMillis() - ts); + } + + public void removeBlocksDirectory() { + if (storageDir.exists()) { + try { + FileUtil.deleteDirectory(storageDir); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + public List readBlocks(int from, int to) { + if (!storageDir.exists()) { + storageDir.mkdir(); + } + + // log.error("getBlocks {}-{}", from, to); + long ts = System.currentTimeMillis(); + // from = Math.max(571747, from); + List buckets = new ArrayList<>(); + int start = from / BUCKET_SIZE + 1; + int end = to / BUCKET_SIZE + 1; + for (int bucketIndex = start; bucketIndex <= end; bucketIndex++) { + List bucket = readBucket(bucketIndex); + // log.error("read bucketIndex {}, items={}", bucketIndex, bucket.stream().map(e -> e.getHeight() + ", ").collect(Collectors.toList())); + buckets.addAll(bucket); + } + log.error("Reading {} blocks took {} msec", buckets.size(), System.currentTimeMillis() - ts); + // System.exit(0); + return buckets; + } + + + private List readBucket(int bucketIndex) { + int first = bucketIndex * BUCKET_SIZE - BUCKET_SIZE + 1; + int last = bucketIndex * BUCKET_SIZE; + + /* int first = bucketIndex * BUCKET_SIZE + 1; + int last = first + BUCKET_SIZE - 1;*/ + String child = fileName + "_" + first + "-" + last; + // log.error("getBlocksOfBucket {}", child); + File storageFile = new File(storageDir, child); + if (!storageFile.exists()) { + // log.error("storageFile not existing {}", storageFile.getName()); + return new ArrayList<>(); + } + + try (FileInputStream fileInputStream = new FileInputStream(storageFile)) { + protobuf.PersistableEnvelope proto = protobuf.PersistableEnvelope.parseDelimitedFrom(fileInputStream); + BsqBlockStore bsqBlockStore = (BsqBlockStore) persistenceProtoResolver.fromProto(proto); + return bsqBlockStore.getBlocksAsProto(); + } catch (Throwable t) { + log.error("Reading {} failed with {}.", fileName, t.getMessage()); + return new ArrayList<>(); + } + } + + private void writeToDisk(File storageFile, + BsqBlockStore bsqBlockStore, + @Nullable Runnable completeHandler) { + long ts = System.currentTimeMillis(); + File tempFile = null; + FileOutputStream fileOutputStream = null; + try { + tempFile = usedTempFilePath != null + ? FileUtil.createNewFile(usedTempFilePath) + : File.createTempFile("temp_" + fileName, null, storageDir); + + // Don't use a new temp file path each time, as that causes the delete-on-exit hook to leak memory: + tempFile.deleteOnExit(); + + fileOutputStream = new FileOutputStream(tempFile); + bsqBlockStore.toProtoMessage().writeDelimitedTo(fileOutputStream); + + // Attempt to force the bits to hit the disk. In reality the OS or hard disk itself may still decide + // to not write through to physical media for at least a few seconds, but this is the best we can do. + fileOutputStream.flush(); + fileOutputStream.getFD().sync(); + + // Close resources before replacing file with temp file because otherwise it causes problems on windows + // when rename temp file + fileOutputStream.close(); + + FileUtil.renameFile(tempFile, storageFile); + usedTempFilePath = tempFile.toPath(); + } catch (Throwable t) { + // If an error occurred, don't attempt to reuse this path again, in case temp file cleanup fails. + usedTempFilePath = null; + log.error("Error at saveToFile, storageFile={}", fileName, t); + } finally { + if (tempFile != null && tempFile.exists()) { + log.warn("Temp file still exists after failed save. We will delete it now. storageFile={}", fileName); + if (!tempFile.delete()) { + log.error("Cannot delete temp file."); + } + } + + try { + if (fileOutputStream != null) { + fileOutputStream.close(); + } + } catch (IOException e) { + e.printStackTrace(); + log.error("Cannot close resources." + e.getMessage()); + } + // log.info("Writing the serialized {} completed in {} msec", fileName, System.currentTimeMillis() - ts); + if (completeHandler != null) { + completeHandler.run(); + } + } + } +} diff --git a/core/src/main/java/bisq/core/dao/state/storage/BsqBlockStore.java b/core/src/main/java/bisq/core/dao/state/storage/BsqBlockStore.java new file mode 100644 index 0000000000..868674c313 --- /dev/null +++ b/core/src/main/java/bisq/core/dao/state/storage/BsqBlockStore.java @@ -0,0 +1,52 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.core.dao.state.storage; + +import bisq.common.proto.persistable.PersistableEnvelope; + +import protobuf.BaseBlock; + +import com.google.protobuf.Message; + +import java.util.List; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * Wrapper for list of blocks + */ +@Slf4j +public class BsqBlockStore implements PersistableEnvelope { + @Getter + private final List blocksAsProto; + + public BsqBlockStore(List blocksAsProto) { + this.blocksAsProto = blocksAsProto; + } + + public Message toProtoMessage() { + return protobuf.PersistableEnvelope.newBuilder() + .setBsqBlockStore(protobuf.BsqBlockStore.newBuilder().addAllBlocks(blocksAsProto)) + .build(); + } + + public static BsqBlockStore fromProto(protobuf.BsqBlockStore proto) { + return new BsqBlockStore(proto.getBlocksList()); + } +} diff --git a/core/src/main/java/bisq/core/dao/state/storage/BsqBlocksStorageService.java b/core/src/main/java/bisq/core/dao/state/storage/BsqBlocksStorageService.java new file mode 100644 index 0000000000..b51f6b6eee --- /dev/null +++ b/core/src/main/java/bisq/core/dao/state/storage/BsqBlocksStorageService.java @@ -0,0 +1,173 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.core.dao.state.storage; + +import bisq.core.dao.state.GenesisTxInfo; +import bisq.core.dao.state.model.blockchain.Block; + +import bisq.common.config.Config; +import bisq.common.proto.persistable.PersistenceProtoResolver; + +import protobuf.BaseBlock; + +import javax.inject.Inject; +import javax.inject.Named; +import javax.inject.Singleton; + +import org.apache.commons.io.FileUtils; + +import java.net.URL; + +import java.io.File; + +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Collectors; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Singleton +public class BsqBlocksStorageService { + public final static String NAME = "BsqBlocks"; + private final int genesisBlockHeight; + private final File storageDir; + private final BlocksPersistence blocksPersistence; + @Getter + private int chainHeightOfPersistedBlocks; + + @Inject + public BsqBlocksStorageService(GenesisTxInfo genesisTxInfo, + PersistenceProtoResolver persistenceProtoResolver, + @Named(Config.STORAGE_DIR) File dbStorageDir) { + genesisBlockHeight = genesisTxInfo.getGenesisBlockHeight(); + storageDir = new File(dbStorageDir.getAbsolutePath() + File.separator + NAME); + blocksPersistence = new BlocksPersistence(storageDir, NAME, persistenceProtoResolver); + } + + public void persistBlocks(List blocks) { + long ts = System.currentTimeMillis(); + List protobufBlocks = blocks.stream() + .map(Block::toProtoMessage) + .collect(Collectors.toList()); + blocksPersistence.writeBlocks(protobufBlocks); + + if (!blocks.isEmpty()) { + chainHeightOfPersistedBlocks = Math.max(chainHeightOfPersistedBlocks, + getHeightOfLastFullBucket(blocks)); + } + log.error("Persist (serialize+write) {} blocks took {} ms", + blocks.size(), + System.currentTimeMillis() - ts); + } + + public LinkedList readBlocks(int chainHeight) { + long ts = System.currentTimeMillis(); + LinkedList blocks = new LinkedList<>(); + blocksPersistence.readBlocks(genesisBlockHeight, chainHeight).stream() + .map(Block::fromProto) + .forEach(blocks::add); + log.error("Reading {} blocks took {} ms", blocks.size(), System.currentTimeMillis() - ts); + if (!blocks.isEmpty()) { + chainHeightOfPersistedBlocks = getHeightOfLastFullBucket(blocks); + } + return blocks; + } + + public LinkedList swapBlocks(List protobufBlocks) { + long ts = System.currentTimeMillis(); + log.error("We have {} blocks in the daoStateAsProto", protobufBlocks.size()); + + + blocksPersistence.writeBlocks(protobufBlocks); + LinkedList blocks = new LinkedList<>(); + protobufBlocks.forEach(protobufBlock -> blocks.add(Block.fromProto(protobufBlock))); + if (!blocks.isEmpty()) { + chainHeightOfPersistedBlocks = getHeightOfLastFullBucket(blocks); + } + + log.error("Mapping blocks (write+deserialization) from DaoStateStore took {} ms", System.currentTimeMillis() - ts); + return blocks; + } + + + void copyFromResources(String postFix) { + long ts = System.currentTimeMillis(); + try { + String dirName = BsqBlocksStorageService.NAME; + String resourceDir = dirName + postFix; + + if (storageDir.exists()) { + log.info("No resource directory was copied. {} exists already.", dirName); + return; + } + + URL dirUrl = getClass().getClassLoader().getResource(resourceDir); + if (dirUrl == null) { + log.info("Directory {} in resources does not exist.", resourceDir); + return; + } + File dir = new File(dirUrl.toURI()); + String[] fileNames = dir.list(); + if (fileNames == null) { + log.info("No files in directory. {}", dir.getAbsolutePath()); + return; + } + if (!storageDir.exists()) { + storageDir.mkdir(); + } + for (String fileName : fileNames) { + URL url = getClass().getClassLoader().getResource(resourceDir + File.separator + fileName); + File resourceFile = new File(url.toURI()); + File destinationFile = new File(storageDir, fileName); + FileUtils.copyFile(resourceFile, destinationFile); + } + log.error("Copying {} resource files took {} ms", fileNames.length, System.currentTimeMillis() - ts); + } catch (Throwable e) { + e.printStackTrace(); + } + } + + // todo + private int getHeightOfLastFullBucket(List blocks) { + int i = blocks.get(blocks.size() - 1).getHeight() / BlocksPersistence.BUCKET_SIZE; + int i1 = i * BlocksPersistence.BUCKET_SIZE; + // log.error("getHeightOfLastFullBucket {}", i * BlocksPersistence.BUCKET_SIZE); + return i1; + } + + public void removeBlocksDirectory() { + blocksPersistence.removeBlocksDirectory(); + } + + // We recreate the directory so that we don't fill the blocks after restart from resources + // In copyFromResources we only check for the directory not the files inside. + public void removeBlocksInDirectory() { + blocksPersistence.removeBlocksDirectory(); + if (!storageDir.exists()) { + storageDir.mkdir(); + } + /* List blocks = new ArrayList<>(); + // height, long time, String hash, String previousBlockHash + for (int i = genesisBlockHeight; i <= chainHeightOfPersistedBlocks; i++) { + blocks.add(new Block(i, 0, "", "").toProtoMessage()); + } + blocksPersistence.addAll(blocks);*/ + } +} diff --git a/core/src/main/java/bisq/core/dao/state/storage/DaoStateStorageService.java b/core/src/main/java/bisq/core/dao/state/storage/DaoStateStorageService.java index 083d1f62ef..7e38801fa5 100644 --- a/core/src/main/java/bisq/core/dao/state/storage/DaoStateStorageService.java +++ b/core/src/main/java/bisq/core/dao/state/storage/DaoStateStorageService.java @@ -17,13 +17,14 @@ package bisq.core.dao.state.storage; -import bisq.core.dao.monitoring.DaoStateMonitoringService; import bisq.core.dao.monitoring.model.DaoStateHash; import bisq.core.dao.state.model.DaoState; +import bisq.core.dao.state.model.blockchain.Block; import bisq.network.p2p.storage.persistence.ResourceDataStoreService; import bisq.network.p2p.storage.persistence.StoreService; +import bisq.common.UserThread; import bisq.common.config.Config; import bisq.common.file.FileUtil; import bisq.common.persistence.PersistenceManager; @@ -36,6 +37,7 @@ import java.io.File; import java.io.IOException; import java.util.LinkedList; +import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -46,7 +48,9 @@ import lombok.extern.slf4j.Slf4j; public class DaoStateStorageService extends StoreService { private static final String FILE_NAME = "DaoStateStore"; - private final DaoStateMonitoringService daoStateMonitoringService; + private final BsqBlocksStorageService bsqBlocksStorageService; + private final File storageDir; + private final LinkedList blocks = new LinkedList<>(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -55,11 +59,12 @@ public class DaoStateStorageService extends StoreService { @Inject public DaoStateStorageService(ResourceDataStoreService resourceDataStoreService, - DaoStateMonitoringService daoStateMonitoringService, + BsqBlocksStorageService bsqBlocksStorageService, @Named(Config.STORAGE_DIR) File storageDir, PersistenceManager persistenceManager) { super(storageDir, persistenceManager); - this.daoStateMonitoringService = daoStateMonitoringService; + this.bsqBlocksStorageService = bsqBlocksStorageService; + this.storageDir = storageDir; resourceDataStoreService.addService(this); } @@ -74,7 +79,12 @@ public class DaoStateStorageService extends StoreService { return FILE_NAME; } + public int getChainHeightOfPersistedBlocks() { + return bsqBlocksStorageService.getChainHeightOfPersistedBlocks(); + } + public void requestPersistence(protobuf.DaoState daoStateAsProto, + List blocks, LinkedList daoStateHashChain, Runnable completeHandler) { if (daoStateAsProto == null) { @@ -82,52 +92,105 @@ public class DaoStateStorageService extends StoreService { return; } - store.setDaoStateAsProto(daoStateAsProto); - store.setDaoStateHashChain(daoStateHashChain); - - // We let the persistence run in a thread to avoid the slow protobuf serialisation to happen on the user - // thread. We also call it immediately to get notified about the completion event. new Thread(() -> { - Thread.currentThread().setName("Serialize and write DaoState"); + Thread.currentThread().setName("Write-blocks-and-DaoState"); + bsqBlocksStorageService.persistBlocks(blocks); + + store.setDaoStateAsProto(daoStateAsProto); + store.setDaoStateHashChain(daoStateHashChain); + long ts = System.currentTimeMillis(); persistenceManager.persistNow(() -> { // After we have written to disk we remove the daoStateAsProto in the store to avoid that it stays in // memory there until the next persist call. - pruneStore(); - completeHandler.run(); + log.error("Persist daoState took {} ms", System.currentTimeMillis() - ts); + store.releaseMemory(); + GcUtil.maybeReleaseMemory(); + UserThread.execute(completeHandler); }); }).start(); } - public void pruneStore() { - store.setDaoStateAsProto(null); - store.setDaoStateHashChain(null); - GcUtil.maybeReleaseMemory(); + @Override + protected void readFromResources(String postFix, Runnable completeHandler) { + new Thread(() -> { + Thread.currentThread().setName("copyBsqBlocksFromResources"); + bsqBlocksStorageService.copyFromResources(postFix); + + // We read daoState and blocks ane keep them in fields in store and + super.readFromResources(postFix, () -> { + // We got mapped back to user thread so we need to create a new thread again as we dont want to + // execute on user thread + new Thread(() -> { + Thread.currentThread().setName("Read-BsqBlocksStore"); + protobuf.DaoState daoStateAsProto = store.getDaoStateAsProto(); + if (daoStateAsProto != null) { + LinkedList list; + if (daoStateAsProto.getBlocksList().isEmpty()) { + list = bsqBlocksStorageService.readBlocks(daoStateAsProto.getChainHeight()); + } else { + list = bsqBlocksStorageService.swapBlocks(daoStateAsProto.getBlocksList()); + } + blocks.clear(); + blocks.addAll(list); + } + UserThread.execute(completeHandler); + }).start(); + }); + }).start(); } public DaoState getPersistedBsqState() { protobuf.DaoState daoStateAsProto = store.getDaoStateAsProto(); if (daoStateAsProto != null) { - return DaoState.fromProto(daoStateAsProto); - } else { - return new DaoState(); + long ts = System.currentTimeMillis(); + DaoState daoState = DaoState.fromProto(daoStateAsProto, blocks); + log.error("Deserializing DaoState with {} blocks took {} ms", + daoState.getBlocks().size(), System.currentTimeMillis() - ts); + return daoState; } + + return new DaoState(); } public LinkedList getPersistedDaoStateHashChain() { return store.getDaoStateHashChain(); } + public void releaseMemory() { + blocks.clear(); + store.releaseMemory(); + GcUtil.maybeReleaseMemory(); + } + public void resyncDaoStateFromGenesis(Runnable resultHandler) { - store.setDaoStateAsProto(DaoState.getCloneAsProto(new DaoState())); + String backupDirName = "out_of_sync_dao_data"; + try { + removeAndBackupDaoConsensusFiles(storageDir, backupDirName); + } catch (Throwable t) { + log.error(t.toString()); + } + + store.setDaoStateAsProto(DaoState.getBsqStateCloneExcludingBlocks(new DaoState())); store.setDaoStateHashChain(new LinkedList<>()); persistenceManager.persistNow(resultHandler); + bsqBlocksStorageService.removeBlocksInDirectory(); } public void resyncDaoStateFromResources(File storageDir) throws IOException { - // We delete all DAO consensus payload data and remove the daoState so it will rebuild from latest + // We delete all DAO consensus data and remove the daoState so it will rebuild from latest // resource files. - long currentTime = System.currentTimeMillis(); String backupDirName = "out_of_sync_dao_data"; + removeAndBackupDaoConsensusFiles(storageDir, backupDirName); + + String newFileName = "DaoStateStore_" + System.currentTimeMillis(); + FileUtil.removeAndBackupFile(storageDir, new File(storageDir, "DaoStateStore"), newFileName, backupDirName); + + bsqBlocksStorageService.removeBlocksDirectory(); + } + + private void removeAndBackupDaoConsensusFiles(File storageDir, String backupDirName) throws IOException { + // We delete all DAO consensus data. Some will be rebuild from resources. + long currentTime = System.currentTimeMillis(); String newFileName = "BlindVoteStore_" + currentTime; FileUtil.removeAndBackupFile(storageDir, new File(storageDir, "BlindVoteStore"), newFileName, backupDirName); @@ -140,9 +203,6 @@ public class DaoStateStorageService extends StoreService { newFileName = "UnconfirmedBsqChangeOutputList_" + currentTime; FileUtil.removeAndBackupFile(storageDir, new File(storageDir, "UnconfirmedBsqChangeOutputList"), newFileName, backupDirName); - - newFileName = "DaoStateStore_" + currentTime; - FileUtil.removeAndBackupFile(storageDir, new File(storageDir, "DaoStateStore"), newFileName, backupDirName); } @@ -152,7 +212,7 @@ public class DaoStateStorageService extends StoreService { @Override protected DaoStateStore createStore() { - return new DaoStateStore(null, new LinkedList<>(daoStateMonitoringService.getDaoStateHashChain())); + return new DaoStateStore(null, new LinkedList<>()); } @Override diff --git a/core/src/main/java/bisq/core/dao/state/storage/DaoStateStore.java b/core/src/main/java/bisq/core/dao/state/storage/DaoStateStore.java index 884d8012f9..1a041ed4d6 100644 --- a/core/src/main/java/bisq/core/dao/state/storage/DaoStateStore.java +++ b/core/src/main/java/bisq/core/dao/state/storage/DaoStateStore.java @@ -76,4 +76,10 @@ public class DaoStateStore implements PersistableEnvelope { .collect(Collectors.toList())); return new DaoStateStore(proto.getDaoState(), daoStateHashList); } + + + public void releaseMemory() { + daoStateAsProto = null; + daoStateHashChain = null; + } } diff --git a/core/src/main/java/bisq/core/proto/persistable/CorePersistenceProtoResolver.java b/core/src/main/java/bisq/core/proto/persistable/CorePersistenceProtoResolver.java index b0aa031257..bb63cbb065 100644 --- a/core/src/main/java/bisq/core/proto/persistable/CorePersistenceProtoResolver.java +++ b/core/src/main/java/bisq/core/proto/persistable/CorePersistenceProtoResolver.java @@ -30,6 +30,7 @@ import bisq.core.dao.governance.proposal.MyProposalList; import bisq.core.dao.governance.proposal.storage.appendonly.ProposalStore; import bisq.core.dao.governance.proposal.storage.temp.TempProposalStore; import bisq.core.dao.state.model.governance.BallotList; +import bisq.core.dao.state.storage.BsqBlockStore; import bisq.core.dao.state.storage.DaoStateStore; import bisq.core.dao.state.unconfirmed.UnconfirmedBsqChangeOutputList; import bisq.core.payment.PaymentAccountList; @@ -138,6 +139,8 @@ public class CorePersistenceProtoResolver extends CoreProtoResolver implements P return IgnoredMailboxMap.fromProto(proto.getIgnoredMailboxMap()); case REMOVED_PAYLOADS_MAP: return RemovedPayloadsMap.fromProto(proto.getRemovedPayloadsMap()); + case BSQ_BLOCK_STORE: + return BsqBlockStore.fromProto(proto.getBsqBlockStore()); default: throw new ProtobufferRuntimeException("Unknown proto message case(PB.PersistableEnvelope). " + "messageCase=" + proto.getMessageCase() + "; proto raw data=" + proto.toString()); diff --git a/proto/src/main/proto/pb.proto b/proto/src/main/proto/pb.proto index 9d4394a303..a728e495fb 100644 --- a/proto/src/main/proto/pb.proto +++ b/proto/src/main/proto/pb.proto @@ -1469,6 +1469,7 @@ message PersistableEnvelope { MailboxMessageList mailbox_message_list = 32; IgnoredMailboxMap ignored_mailbox_map = 33; RemovedPayloadsMap removed_payloads_map = 34; + BsqBlockStore bsq_block_store = 35; } } @@ -1996,6 +1997,10 @@ message BaseBlock { } } +message BsqBlockStore { + repeated BaseBlock blocks = 1; +} + message RawBlock { // Because of the way how PB implements inheritance we need to use the super class as type repeated BaseTx raw_txs = 1;