From 62b3e51b22b36ce364377171553a50461ca71fa8 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Sun, 31 Oct 2021 19:42:23 +0100 Subject: [PATCH] Cleanups, change log levels --- build.gradle | 6 - .../bisq/common/persistence/db/DataStore.java | 78 -- .../bisq/common/persistence/db/FixedHash.java | 418 ------- .../bisq/common/persistence/db/HashBase.java | 20 - .../bisq/common/persistence/db/NoHeapDB.java | 380 ------ .../common/persistence/db/NoHeapDBStore.java | 1051 ----------------- .../main/java/bisq/common/util/GcUtil.java | 4 +- .../dao/state/DaoStateSnapshotService.java | 1 - .../dao/state/storage/BlocksPersistence.java | 26 +- .../storage/BsqBlocksStorageService.java | 27 +- .../state/storage/DaoStateStorageService.java | 10 +- .../core/dao/state/storage/DaoStateStore.java | 1 - 12 files changed, 17 insertions(+), 2005 deletions(-) delete mode 100644 common/src/main/java/bisq/common/persistence/db/DataStore.java delete mode 100644 common/src/main/java/bisq/common/persistence/db/FixedHash.java delete mode 100644 common/src/main/java/bisq/common/persistence/db/HashBase.java delete mode 100644 common/src/main/java/bisq/common/persistence/db/NoHeapDB.java delete mode 100644 common/src/main/java/bisq/common/persistence/db/NoHeapDBStore.java diff --git a/build.gradle b/build.gradle index c54ea3eb7a..7e467e2923 100644 --- a/build.gradle +++ b/build.gradle @@ -63,7 +63,6 @@ configure(subprojects) { kotlinVersion = '1.3.41' knowmXchangeVersion = '4.4.2' langVersion = '3.11' - leveldbVersion = '1.2' logbackVersion = '1.1.11' loggingVersion = '1.2' lombokVersion = '1.18.12' @@ -265,10 +264,6 @@ configure(project(':common')) { exclude(module: 'okhttp') exclude(module: 'okio') } - implementation("io.github.pcmind:leveldb:$leveldbVersion") { - exclude(module: 'guava') - } - runtimeOnly("io.grpc:grpc-netty-shaded:$grpcVersion") { exclude(module: 'guava') exclude(module: 'animal-sniffer-annotations') @@ -303,7 +298,6 @@ configure(project(':p2p')) { implementation("org.apache.httpcomponents:httpclient:$httpclientVersion") { exclude(module: 'commons-codec') } - compile "org.fxmisc.easybind:easybind:$easybindVersion" compileOnly "org.projectlombok:lombok:$lombokVersion" annotationProcessor "org.projectlombok:lombok:$lombokVersion" diff --git a/common/src/main/java/bisq/common/persistence/db/DataStore.java b/common/src/main/java/bisq/common/persistence/db/DataStore.java deleted file mode 100644 index 6c29074877..0000000000 --- a/common/src/main/java/bisq/common/persistence/db/DataStore.java +++ /dev/null @@ -1,78 +0,0 @@ -package bisq.common.persistence.db; - -public interface DataStore { - public enum Storage { - IN_MEMORY, - PERSISTED - } - - public static final byte INACTIVE_RECORD = 0; - public static final byte ACTIVE_RECORD = 1; - - public static final byte EMPTY_RECORD_TYPE = -1; - public static final byte OBJ_RECORD_TYPE = 1; - public static final byte TEXT_RECORD_TYPE = 2; - public static final byte LONG_RECORD_TYPE = 3; - public static final byte INT_RECORD_TYPE = 4; - public static final byte DOUBLE_RECORD_TYPE = 5; - public static final byte FLOAT_RECORD_TYPE = 6; - public static final byte SHORT_RECORD_TYPE = 7; - public static final byte CHAR_RECORD_TYPE = 8; - public static final byte BYTEARRAY_RECORD_TYPE = 9; - - - // Get Journal stats - public long getRecordCount(); - - public long getEmptyCount(); - - public String getName(); - - public String getFolder(); - - public long getFilesize(); - - public boolean putInteger(String key, Integer val); - - public Integer getInteger(String key); - - public boolean putShort(String key, Short val); - - public Short getShort(String key); - - public boolean putLong(String key, Long val); - - public Long getLong(String key); - - public boolean putFloat(String key, Float val); - - public Float getFloat(String key); - - public boolean putDouble(String key, Double val); - - public Double getDouble(String key); - - public boolean putString(String key, String val); - - public String getString(String key); - - public boolean putObject(String key, Object msg); - - public Object getObject(String key); // key is the object ID - - boolean putBytes(String key, byte[] bytes); - - byte[] getBytes(String key); - - public boolean putChar(String key, char val); - - public char getChar(String key); - - public boolean remove(String key); // ID is the object hashs - - public Object iterateStart(); - - public Object iterateNext(); - - public void delete(); -} diff --git a/common/src/main/java/bisq/common/persistence/db/FixedHash.java b/common/src/main/java/bisq/common/persistence/db/FixedHash.java deleted file mode 100644 index d29d67879b..0000000000 --- a/common/src/main/java/bisq/common/persistence/db/FixedHash.java +++ /dev/null @@ -1,418 +0,0 @@ -package bisq.common.persistence.db; - -import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; - -import java.io.File; -import java.io.RandomAccessFile; - -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * @author ebruno - */ -public class FixedHash implements HashBase { - - public enum Storage { - IN_MEMORY, - PERSISTED - } - - protected static final Logger logger = Logger.getLogger("FixedHash"); - protected boolean debugLogging = false; - - protected static int LOAD_THRESHOLD = 70; - protected static final int PAGE_SIZE = 1024 * 1024; - protected static final int SIZE_FACTOR = 2; - protected static final int DEFAULT_INDEX_JOURNAL_SIZE = SIZE_FACTOR * PAGE_SIZE; - protected static final int KEY_SIZE = 0; - - // - // 1 (byte) key length - // 4 (int) key hashcode - // 0 (bytes) key size - // 8 (long) record location - // - protected static final int INDEX_ENTRY_SIZE_BYTES = - 1 + Integer.BYTES + KEY_SIZE + Long.BYTES; - - protected long sizeInBytes = DEFAULT_INDEX_JOURNAL_SIZE; - protected int previousOffset = 0; // the last record inserted into the index - protected int bucketsFree = 0; - protected int bucketsUsed = 0; - protected int totalBuckets = 0; - protected int collisions = 0; - - protected String journalPath = ""; - protected boolean inMemory = true; - - protected RandomAccessFile indexFile = null; - protected FileChannel indexChannel = null; - protected ByteBuffer indexBuffer = null; - protected byte keyLength = 16; - protected long indexCurrentEnd = 0; - - protected int indexRecordReadCount = 1; - - // Used when iterating through the index - protected int iterateNext = 0; - - /////////////////////////////////////////////////////////////////////////// - - public FixedHash(String journalPath, boolean inMemory, boolean reuseExisting) { - this(DEFAULT_INDEX_JOURNAL_SIZE, journalPath, inMemory, reuseExisting); - } - - public FixedHash(int size, String journalPath, boolean inMemory, boolean reuseExisting) { - boolean success = false; - sizeInBytes = size; - this.inMemory = inMemory; - this.journalPath = journalPath; - - if (inMemory) { - success = createIndexJournalBB(); - } else { - success = createIndexJournalMBB(reuseExisting); - } - - if (success) { - totalBuckets = (int) (sizeInBytes / INDEX_ENTRY_SIZE_BYTES); - bucketsFree = (int) (sizeInBytes / INDEX_ENTRY_SIZE_BYTES); - bucketsUsed = 0; - } - } - - protected boolean createIndexJournalBB() { - try { - indexBuffer = ByteBuffer.allocateDirect((int) sizeInBytes); - indexCurrentEnd = indexBuffer.position(); - return true; - } catch (Exception e) { - logger.log(Level.SEVERE, "Exception", e); - } - - return false; - } - - protected boolean createIndexJournalMBB(boolean reuseExisting) { - try { - journalPath += "Index"; - - // If the journal file already exists, rename it unless we're - // supposed to reuse the existing file and its contents - boolean fileExists = false; - try { - File file = new File(journalPath); - fileExists = file.exists(); - if (fileExists && !reuseExisting) { - File newFile = new File(journalPath + "_prev"); - logger.info("Moving journal " + journalPath + " to " + newFile.getName()); - file.renameTo(newFile); - } - } catch (Exception e) { - } - - indexFile = new RandomAccessFile(journalPath, "rw"); - if (fileExists && reuseExisting) { - // Existing file, so use its existing length - sizeInBytes = indexFile.length(); - } else { - // New file, set its length - indexFile.setLength(sizeInBytes); - } - - indexChannel = indexFile.getChannel(); - indexBuffer = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, sizeInBytes); - indexCurrentEnd = indexBuffer.position(); - - return true; - } catch (Exception e) { - logger.log(Level.SEVERE, "Exception", e); - } - - return false; - } - - @Override - public void reset() { - try { - indexBuffer.clear(); - indexBuffer.limit(0); - - if (inMemory) { - indexBuffer = ByteBuffer.allocateDirect(0); - } else { - indexChannel.truncate(0); - indexChannel.close(); - indexFile.close(); - File f = new File(journalPath); - f.delete(); - } - } catch (Exception e) { - e.printStackTrace(); - } - } - - protected int getHashBucket(int hash) { - return _getHashBucket(hash); - } - - protected int getHashBucket(String key) { - int hash = key.hashCode(); - return _getHashBucket(hash); - } - - protected int _getHashBucket(int hash) { - hash = hash ^ (hash >>> 16); - int buckets = (int) (sizeInBytes / INDEX_ENTRY_SIZE_BYTES); - int bucket = Math.max(1, Math.abs(hash) % (buckets - 1)); - return bucket * INDEX_ENTRY_SIZE_BYTES; - } - - protected boolean enlargeIndex() { - try { - // Hold a reference to the original buffer to copy its contents - ByteBuffer oldBuffer = indexBuffer; - - if (inMemory) { - logger.log(Level.INFO, "Expanding in-memory index..."); - sizeInBytes += (PAGE_SIZE * SIZE_FACTOR); - createIndexJournalBB(); - } else { - logger.log(Level.INFO, "Expanding persisted index..."); - ((MappedByteBuffer) indexBuffer).force(); - indexFile.setLength(sizeInBytes + (PAGE_SIZE * SIZE_FACTOR)); - indexChannel = indexFile.getChannel(); - sizeInBytes = indexChannel.size(); - indexBuffer = indexChannel.map( - FileChannel.MapMode.READ_WRITE, 0, sizeInBytes); - } - - // Re-hash the index - // - collisions = 0; - bucketsUsed = 0; - oldBuffer.position(INDEX_ENTRY_SIZE_BYTES); - int buckets = (oldBuffer.capacity() / INDEX_ENTRY_SIZE_BYTES); - for (int i = 1; i <= buckets; i++) { - byte occupied = oldBuffer.get(); - if (occupied > 0) { - int keyHash = oldBuffer.getInt(); - byte[] fixedKeyBytes = null; - if (KEY_SIZE > 0) { - fixedKeyBytes = new byte[KEY_SIZE]; - oldBuffer.get(fixedKeyBytes); - } - Long location = oldBuffer.getLong(); - putInternal(fixedKeyBytes, keyHash, occupied, location); - } else { - // Bucket unocuppied, move to the next one - oldBuffer.position(i * INDEX_ENTRY_SIZE_BYTES); - } - } - - totalBuckets = (int) (sizeInBytes / INDEX_ENTRY_SIZE_BYTES); - bucketsFree = totalBuckets - bucketsUsed; - logger.log(Level.INFO, "Done!"); - - return true; - } catch (Exception e) { - e.printStackTrace(); - } - - return false; - } - - protected int findBucket(Integer hashcode, int offset, boolean mustFind) { - boolean found = false; - byte occupied = 1; - while (occupied > 0 && !found) { - int keyHash = indexBuffer.getInt(); - if (keyHash == hashcode) { - if (KEY_SIZE > 0) { - indexBuffer.position( - offset + 1 + Integer.BYTES + KEY_SIZE); - } - found = true; - break; - } else { - // Check for rollover past the end of the table - offset += INDEX_ENTRY_SIZE_BYTES; - if (offset >= (sizeInBytes - INDEX_ENTRY_SIZE_BYTES)) { - // Wrap to the beginning, skipping the first slot - // since it's reserved for the first record pointer - offset = INDEX_ENTRY_SIZE_BYTES; - } - - // Skip to the next bucket - indexBuffer.position(offset); - occupied = indexBuffer.get(); - } - } - - // return if the key was found in the index - if (!found && mustFind) { - return -1; - } - return offset; - } - - protected boolean putInternal(byte[] fixedKeyBytes, Integer hashcode, - byte keyLength, Long value) { - int offset = getHashBucket(hashcode); - indexBuffer.position(offset); - indexBuffer.mark(); - byte occupied = indexBuffer.get(); - if (occupied == 0) { - // found a free slot, go back to the beginning of it - indexBuffer.reset(); - } else { - collisions++; - - // When there's a collision, walk the table until a - // free slot is found - offset = findBucket(hashcode, offset, false); - - // found a free slot, seek to it - indexBuffer.position(offset); - } - - // Write the data - // - indexBuffer.put(keyLength); - indexBuffer.putInt(hashcode); // hashcode is faster for resolving collisions then comparing strings - if (KEY_SIZE > 0 && fixedKeyBytes != null && fixedKeyBytes.length > 0) { - // Make sure we copy *at most* KEY_SIZE bytes for the key - indexBuffer.put(fixedKeyBytes, - 0, Math.min(KEY_SIZE, fixedKeyBytes.length)); - } - indexBuffer.putLong(value); // indexed record location - - bucketsUsed++; - - return true; - } - - @Override - public boolean put(String key, Long value) { - // - // Entry: - // 1 (byte) key length - // 4 (int) key hashcode - // 0 (bytes) key size - // 8 (long) record location - // - try { - // Check load to see if Index needs to be enlarged - // - if (getLoad() > LOAD_THRESHOLD) { - enlargeIndex(); - } - - byte keylen = (byte) key.length(); - byte[] fixedKeyBytes = null; - if (KEY_SIZE > 0) { - fixedKeyBytes = new byte[KEY_SIZE]; - System.arraycopy(key.getBytes(), - 0, fixedKeyBytes, - 0, Math.min(KEY_SIZE, keylen)); - } - - return putInternal(fixedKeyBytes, key.hashCode(), keylen, value); - } catch (Exception e) { - e.printStackTrace(); - } - - return false; - } - - protected int getHashBucketOffset(String key) { - int offset = -1; - - try { - offset = getHashBucket(key.hashCode()); - indexBuffer.position(offset); - byte occupied = indexBuffer.get(); - if (occupied > 0) { - offset = findBucket(key.hashCode(), offset, true); - } - } catch (Exception e) { - e.printStackTrace(); - } - - return offset; - } - - @Override - public Long get(String key) { - int offset = getHashBucketOffset(key); - if (offset == -1) { - // key not found - return -1L; - } - - // Return the location of the data record - return indexBuffer.getLong(); - } - - @Override - public void remove(String key) { - int offset = getHashBucketOffset(key); - if (offset == -1) { - // key not found - return; - } - - offset = findBucket(key.hashCode(), offset, true); - if (offset != -1) { - // Simply zero out the occupied slot, but need to rewind first - int currPos = indexBuffer.position(); - currPos -= (Integer.BYTES + 1); - indexBuffer.position(currPos); - indexBuffer.put((byte) 0); - } - } - - @Override - public int getCollisions() { - return collisions; - } - - @Override - public void outputStats() { - System.out.println("Index " + journalPath + " Stats:"); - System.out.println(" -size: " + size()); - System.out.println(" -load: " + getLoad()); - System.out.println(" -entries: " + entries()); - System.out.println(" -capacity: " + capacity()); - System.out.println(" -available: " + available()); - System.out.println(" -collisions: " + getCollisions()); - - } - - public long size() { - return sizeInBytes; - } - - public int entries() { - return bucketsUsed; - } - - public int capacity() { - return totalBuckets; - } - - public int available() { - return capacity() - entries(); - } - - public int getLoad() { - int used = entries(); - int capac = capacity(); - float f = (float) used / (float) capac; - int load = (int) (f * 100); - return load; // percentage - } -} diff --git a/common/src/main/java/bisq/common/persistence/db/HashBase.java b/common/src/main/java/bisq/common/persistence/db/HashBase.java deleted file mode 100644 index d656c17bd9..0000000000 --- a/common/src/main/java/bisq/common/persistence/db/HashBase.java +++ /dev/null @@ -1,20 +0,0 @@ -package bisq.common.persistence.db; - -/** - * @author ebruno - */ -public interface HashBase { - public boolean put(String k, Long v); - - public Long get(String k); - - public void remove(String k); - - public int getCollisions(); - - public int getLoad(); - - public void outputStats(); - - public void reset(); -} diff --git a/common/src/main/java/bisq/common/persistence/db/NoHeapDB.java b/common/src/main/java/bisq/common/persistence/db/NoHeapDB.java deleted file mode 100644 index 9105dae688..0000000000 --- a/common/src/main/java/bisq/common/persistence/db/NoHeapDB.java +++ /dev/null @@ -1,380 +0,0 @@ -package bisq.common.persistence.db; - -import bisq.common.Timer; -import bisq.common.UserThread; -import bisq.common.util.Utilities; - -import java.nio.charset.StandardCharsets; - -import java.io.File; - -import java.util.HashMap; -import java.util.Random; -import java.util.concurrent.TimeUnit; - -import lombok.extern.slf4j.Slf4j; - -/** - * - * @author ebruno - */ -@Slf4j -public class NoHeapDB { - protected final static int MEGABYTE = 1024 * 1024; - protected final static int DEFAULT_STORE_SIZE = MEGABYTE * 100; - private static Thread thread; - static long c; - static long ts; - static String key; - - public static void main(String[] args) throws Exception { - NoHeapDB db = new NoHeapDB(); - /* db.createStore( - "MyTestDataStore", - DataStore.Storage.IN_MEMORY, //or DataStore.Storage.PERSISTED - 256); // in MB*/ - - db.createStore( - "MyTestDataStore", - DataStore.Storage.PERSISTED, //or DataStore.Storage.PERSISTED - 50, 5); // in MB - - DataStore store = db.getStore("MyTestDataStore"); - String sep = "_"; - String result = store.getString(2 + sep + 100); - log.error("result {}", result); - String name = "MyTestDataStore"; - - byte[] array = new byte[10000000]; - new Random().nextBytes(array); - String random = new String(array, StandardCharsets.UTF_8); - log.error(Utilities.readableFileSize(array.length)); - log.error(Utilities.readableFileSize(random.getBytes(StandardCharsets.UTF_8).length)); - - c = store.getRecordCount(); - key = c + sep; - ts = System.currentTimeMillis(); - String res1 = store.getString(key); - // log.error("read took {} ms. {}", System.currentTimeMillis() - ts, res1); - Timer timer = UserThread.runPeriodically(() -> { - - ts = System.currentTimeMillis(); - key = c + sep; - String val1 = random + c; - store.putString(key, val1); - //log.error("write took {} ms", System.currentTimeMillis() - ts); - - ts = System.currentTimeMillis(); - String res = store.getString(key); - // log.error("read took {} ms. res={}, val1={}, match {}", System.currentTimeMillis() - ts, res, val1, res.equals(val1)); - // log.error("read took {} ms. match {}", System.currentTimeMillis() - ts, res.equals(val1)); - c++; - log.error("getFilesize {} getRecordCount {}", Utilities.readableFileSize(store.getFilesize()), store.getRecordCount()); - System.gc(); - if (store.getFilesize() > 1800000000) { - log.error("too large"); - System.exit(0); - } - // 400 000 - /* long ts = System.currentTimeMillis(); - int size = 10000000; - for (int i = 0; i < size; i++) { - String val = String.valueOf(c * i); - String key = c + sep + i; - store.putString(key, val); //400 000 - // log.error("write key/val {}/{}", key, val); - } - - log.error("write took {} ms", System.currentTimeMillis() - ts); - ts = System.currentTimeMillis(); - for (int i = 0; i < size; i++) { - String key = c + sep + i; - String val = store.getString(key); - //log.error("read key/val {}/{}", key, val); - } - log.error("read took {} ms", System.currentTimeMillis() - ts); - c++;*/ - }, 100, TimeUnit.MILLISECONDS); - - - thread = new Thread(() -> { - while (true) { - } - }); - thread.start(); - UserThread.runAfter(() -> { - timer.stop(); - thread.interrupt(); - }, 500); - } - - HashMap stores = new HashMap<>(); - - String homeDirectory = - System.getProperty("user.home") + - File.separator + "JavaOffHeap"; - - public NoHeapDB() { - } - - public NoHeapDB(String homeDirectory) { - this.homeDirectory = homeDirectory; - } - - public boolean createStore(String name) throws Exception { - return createStore(name, - DataStore.Storage.IN_MEMORY, - 100, 10); - } - - public boolean createStore(String name, - DataStore.Storage storageType) - throws Exception { - return createStore(name, - storageType, - 100, 10); - } - - public boolean createStore(String name, - DataStore.Storage storageType, - int size, - int indexFileSize) throws Exception { - if (size > Integer.MAX_VALUE) { - throw new Exception("Database size exceeds " + Integer.MAX_VALUE); - } - - NoHeapDBStore nohdb = new NoHeapDBStore(homeDirectory, - name, - storageType, - size * MEGABYTE, - indexFileSize * MEGABYTE, - true); - - stores.put(name, nohdb); - - return true; - } - - public DataStore getStore(String storeName) { - return this.stores.get(storeName); - } - - public boolean deleteStore(String storeName) { - DataStore store = this.stores.get(storeName); - if (store != null) { - // Delete the store here - store.delete(); - if (stores.remove(storeName) != null) { - return true; - } - } - return false; - } - - public boolean putString(String storeName, String key, String value) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.putString(key, value); - } - - return false; - } - - public boolean putInteger(String storeName, String key, Integer value) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.putInteger(key, value); - } - - return false; - } - - public boolean putShort(String storeName, String key, Short value) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.putShort(key, value); - } - - return false; - } - - public boolean putLong(String storeName, String key, Long value) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.putLong(key, value); - } - - return false; - } - - public boolean putFloat(String storeName, String key, Float value) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.putFloat(key, value); - } - - return false; - } - - public boolean putDouble(String storeName, String key, Double value) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.putDouble(key, value); - } - - return false; - } - - public boolean putChar(String storeName, String key, char value) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.putChar(key, value); - } - - return false; - } - - public boolean putObject(String storeName, String key, Object value) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.putObject(key, value); - } - - return false; - } - - public String getString(String storeName, String key) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.getString(key); - } - - return null; - } - - public Integer getInteger(String storeName, String key) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.getInteger(key); - } - - return null; - } - - public Short getShort(String storeName, String key) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.getShort(key); - } - - return null; - } - - public Long getLong(String storeName, String key) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.getLong(key); - } - - return null; - } - - public Float getFloat(String storeName, String key) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.getFloat(key); - } - - return null; - } - - public Double getDouble(String storeName, String key) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.getDouble(key); - } - - return null; - } - - public char getChar(String storeName, String key) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.getChar(key); - } - - return (char) 0; - } - - public Object getObject(String storeName, String key) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.getObject(key); - } - - return null; - } - - public boolean remove(String storeName, String key) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.remove(key); - } - - return false; - } - - public Object iterateStart(String storeName) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.iterateStart(); - } - - return null; - } - - public Object iterateNext(String storeName) { - DataStore store = this.stores.get(storeName); - if (store != null) { - return store.iterateNext(); - } - - return null; - } - - - public int getCollisions(String storeName) { - NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName); - return niop.getCollisions(); - } - - public int getIndexLoad(String storeName) { - NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName); - return niop.getIndexLoad(); - } - - public long getObjectRetrievalTime(String storeName) { - NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName); - return niop.getObjectRetrievalTime(); - } - - public long getObjectStorageTime(String storeName) { - NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName); - return niop.getObjectStorageTime(); - } - - public void outputStats(String storeName) { - NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName); - niop.outputStats(); - } - - public long getRecordCount(String storeName) { - NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName); - return niop.getRecordCount(); - } - - public long getEmptyCount(String storeName) { - NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName); - return niop.getEmptyCount(); - } -} diff --git a/common/src/main/java/bisq/common/persistence/db/NoHeapDBStore.java b/common/src/main/java/bisq/common/persistence/db/NoHeapDBStore.java deleted file mode 100644 index 7e2d473e5c..0000000000 --- a/common/src/main/java/bisq/common/persistence/db/NoHeapDBStore.java +++ /dev/null @@ -1,1051 +0,0 @@ -package bisq.common.persistence.db; - -import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.EOFException; -import java.io.File; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.RandomAccessFile; -import java.io.Serializable; - -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.TreeMap; -import java.util.logging.Level; -import java.util.logging.Logger; - -public class NoHeapDBStore implements DataStore { - protected static final int MEGABYTE = 1024 * 1024; - protected static final int JOURNAL_SIZE_FACTOR = 10; - protected static final int DEFAULT_JOURNAL_SIZE = MEGABYTE * JOURNAL_SIZE_FACTOR; - protected static final int DEFAULT_INDEX_SIZE = DEFAULT_JOURNAL_SIZE / 4; - - private static final Logger logger = Logger.getLogger("NoHeapDBStore"); - public static String JOURNAL_VERSION = "JAVAOFFHEAPVERSION_1"; - - boolean debugLogging = false; - - // The journal is where persisted data is stored. - // Intend to move the bulk of the internals out of this class - // and hide implementation within its own class - // - protected RandomAccessFile journal = null; - protected FileChannel channel = null; - protected int numBuffers = 1; - protected ByteBuffer buffer = null; - - protected int bufferSize = DEFAULT_JOURNAL_SIZE; - protected int recordCount; - protected long currentEnd = 0; - - // Keep an index of all active entries in the storage file - // - protected HashBase index = null; - - // Keep an index of LinkedList, where the index key is the size - // of the empty record. The LinkedList contains pointers (offsets) to - // the empty records - // - public TreeMap> emptyIdx = - new TreeMap>(); - - public static byte[] clearBytes = null; - - public static class Header implements Serializable { - // Boolean - Active record indicator - // Byte - Message type (0=Empty, 1=Bytes, 2=String) - // Integer - Size of record's payload (not header) - - byte active; // 1 byte - byte type; // 1 byte - int size; // 4 bytes - public static final int HEADER_SIZE = Integer.BYTES + 2; - } - - class JournalLocationData { - long offset; - int newEmptyRecordSize; - } - - protected String journalFolder = ""; - protected String journalName = ""; - protected boolean inMemory = true; - private int indexFileSize = DEFAULT_INDEX_SIZE; - protected boolean reuseExisting = true; - - // Used when iterating through the index - protected long iterateNext = 0; - - // Performance tracking data - private boolean trackPerformance = false; - private int persistCount = 0; - private long worstPersistTime = 0; - private int deleteCount = 0; - private long worstDeleteTime = 0; - private long objectGetTime = 0; - private long objectPutTime = 0; - - /////////////////////////////////////////////////////////////////////////// - - private NoHeapDBStore() { - } - - // TODO: Need to add optional expected capacity - public NoHeapDBStore(String folder, String name) { - this(folder, name, Storage.IN_MEMORY, DEFAULT_JOURNAL_SIZE, DEFAULT_INDEX_SIZE, true); - } - - public NoHeapDBStore(String folder, String name, Storage type) { - this(folder, name, type, DEFAULT_JOURNAL_SIZE, DEFAULT_INDEX_SIZE, true); - } - - public NoHeapDBStore(String folder, String name, Storage type, int sizeInBytes) { - this(folder, name, type, sizeInBytes, DEFAULT_INDEX_SIZE, true); - } - - public NoHeapDBStore(String folder, - String name, - Storage type, - int sizeInBytes, - int indexFileSize, - boolean reuseExisting) { - this.indexFileSize = indexFileSize; - this.reuseExisting = reuseExisting; - this.journalFolder = folder; - this.journalName = name; - this.inMemory = (type == Storage.IN_MEMORY); - this.bufferSize = sizeInBytes; - - String journalPath = createJournalFolderName(journalFolder, journalName); - - createMessageJournal(journalPath, inMemory, reuseExisting); - createIndexJournal(journalPath, inMemory, reuseExisting); - } - - protected final boolean createMessageJournal(String journalPath, boolean inMemory, boolean reuseExisting) { - if (inMemory) { - // In-memory ByteBuffer Journal - return createMessageJournalBB(); - } else { - // Persisted File MappedByteBuffer Journal - return createMessageJournalMBB(journalPath, reuseExisting); - } - } - - protected final boolean createMessageJournalBB() { - try { - buffer = ByteBuffer.allocateDirect(bufferSize); - currentEnd = buffer.position(); - return true; - } catch (Exception e) { - logger.log(Level.SEVERE, "Exception", e); - } - - return false; - } - - protected final String createJournalFolderName(String folder, String name) { - StringBuffer filename = new StringBuffer(journalFolder); - filename.append(File.separator); - filename.append(journalName); // queue or server - filename.append("journal"); - return filename.toString(); - } - - protected final boolean createMessageJournalMBB(String journalPath, boolean reuseExisting) { - try { - // First create the directory - File filePath = new File(journalFolder); - boolean created = filePath.mkdir(); - if (!created) { - // It may have failed because the directory already existed - if (!filePath.exists()) { - logger.severe("Directory creation failed: " + journalFolder); - return false; - } - } - - // If the journal file already exists, rename it unless we're - // supposed to reuse the existing file and its contents - boolean fileExists = false; - try { - File file = new File(journalPath); - fileExists = file.exists(); - if (fileExists && !reuseExisting) { - File newFile = new File(journalPath + "_prev"); - logger.info("Moving journal " + journalPath + " to " + newFile.getName()); - file.renameTo(newFile); - } - } catch (Exception e) { - } - - //System.out.println("*** Creating journal: " + filename.toString()); - journal = new RandomAccessFile(journalPath, "rw"); - if (fileExists && reuseExisting) { - // Existing file, so use its existing length - bufferSize = (int) journal.length(); - } else { - // New file, set its length - journal.setLength(bufferSize); - } - - channel = journal.getChannel(); - buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, bufferSize); - - if (fileExists && reuseExisting) { - // Iterate through the existing records and find its current end - currentEnd = scanJournal(); - - if (debugLogging) { - logger.info("Initializized journal '" + journalName + - "', existing filename=" + journalPath); - } - } else { - // - // Write some journal header data - // - writeJournalHeader(journal); - - currentEnd = journal.getFilePointer(); - - if (debugLogging) { - logger.info("Created journal '" + journalName + - "', filename=" + journalPath); - } - } - - return true; - } catch (Exception e) { - logger.log(Level.SEVERE, "Exception", e); - } - - return false; - } - - // - // Iterate through the contents of the journal and create - // an index for the empty records - // - private long scanJournal() { - for (LinkedList val : emptyIdx.values()) { - val.clear(); - } - emptyIdx.clear(); - - int recordSize = 0; - - try { - long filesize = journal.length(); - String version = journal.readUTF(); - String name = journal.readUTF(); - Long createTime = journal.readLong(); - currentEnd = journal.getFilePointer(); - ByteBuffer bb = buffer; - bb.position((int) currentEnd); - - // Iterate through the records in the file with the intent to - // record the empty slots (for future reuse) and the end of the - // saved record - // - while (currentEnd < (filesize - Header.HEADER_SIZE)) { - // Begin reading the next record header - // - - // Active Record? - boolean active = true; - if (bb.get() == INACTIVE_RECORD) { - active = false; - } - - // Read record type - byte type = bb.get(); - if (type == 0) { - bb.position((int) currentEnd); - break; // end of data records in file - } - - // Get the data length (comes after the record header) - int datalen = bb.getInt(); - recordSize = Header.HEADER_SIZE + datalen; - - if (!active) { - // Record the inactive record location for reuse - storeEmptyRecord(currentEnd, datalen); - } - - // skip past the data to the beginning of the next record - currentEnd += recordSize; - bb.position((int) currentEnd); - } - } catch (Exception e) { - logger.log(Level.SEVERE, "Exception", e); - try { - StringBuffer sb = new StringBuffer("Persist data: "); - sb.append(" Journal: " + journalName); - sb.append(", length: " + channel.size()); - sb.append(", currentEnd: " + currentEnd); - sb.append(", recordSize: " + recordSize); - logger.info(sb.toString()); - } catch (Exception ee) { - } - } - - return currentEnd; - } - - private void writeJournalHeader(RandomAccessFile journal) throws IOException { - // write the journal version number to the file - journal.writeUTF(NoHeapDBStore.JOURNAL_VERSION); - - // write the journal name to the file - journal.writeUTF(journalName); - - // identify the journal as belonging to this server's run - // (to avoid it from being recovered by the recovery thread) - journal.writeLong(System.currentTimeMillis()); - } - - protected boolean createIndexJournal(String journalPath, boolean inMemory, boolean reuseExisting) { - try { - index = new FixedHash(indexFileSize, journalPath, inMemory, reuseExisting); - return true; - } catch (Exception e) { - e.printStackTrace(); - } - - return false; - } - - private JournalLocationData getStorageLocation(int recordLength) { - JournalLocationData location = new JournalLocationData(); - location.offset = -1; // where to write new record - location.newEmptyRecordSize = -1; // Left over portion of empty space - - // Check if the deleted record list is empty - if (emptyIdx == null || emptyIdx.isEmpty()) { - return location; - } - - try { - // Determine if there's an empty location to insert this new record - // There are a few criteria. The empty location must either be - // an exact fit for the new record with its header (replacing the - // existing empty record's header, or if the empty record is larger, - // it must be large enough for the new record's header and data, - // and another header to mark the empty record so the file can - // be traversed at a later time. In other words, the journal - // consists of sequential records, back-to-back, with no gap in - // between, otherwise it cannot be traversed from front to back - // without adding a substantial amount of indexing within the file. - // Therefore, even deleted records still exist within the journal, - // they are simply marked as deleted. But the record size is still - // part of the record so it can be skipped over when read back in - - // First locate an appropriate location. It must match exactly - // or be equal in size to the record to write and a minimal record - // which is just a header (5 bytes). So, HEADER + DATA + HEADER, - // or (data length + 10 bytes). - - // Is there an exact match? - LinkedList records = emptyIdx.get(recordLength); - if (records != null && !records.isEmpty()) { - location.offset = records.remove(); - - // No need to append an empty record, just return offset - location.newEmptyRecordSize = -1; - return location; - } - - // Can't modify the empty record list while iterating so - // create a list of objects to remove (they are actually entries - // of a size value) - // - ArrayList toRemove = new ArrayList<>(); - - // No exact size match, find one just large enough - for (Integer size : this.emptyIdx.keySet()) { - // If we're to split this record, make sure there's enough - // room for the new record and another emtpy record with - // a header and at least one byte of data - // - if (size >= recordLength + Header.HEADER_SIZE + 1) { - records = emptyIdx.get(size); - if (records == null || records.size() == 0) { - // This was the last empty record of this size - // so delete the entry in the index and continue - // searching for a larger empty region (if any) - toRemove.add(size); - continue; - } - - location.offset = records.remove(); - - // We need to append an empty record after the new record - // taking the size of the header into account - location.newEmptyRecordSize = - (size - recordLength - Header.HEADER_SIZE); - - int newOffset = - (int) location.offset + recordLength + Header.HEADER_SIZE; - - // Store the new empty record's offset - storeEmptyRecord(newOffset, location.newEmptyRecordSize); - break; - } - } - - // Remove any index records marked to delete - // - for (Integer offset : toRemove) { - emptyIdx.remove(offset); - } - } catch (Exception e) { - logger.log(Level.SEVERE, "Exception", e); - } - - return location; - } - - private JournalLocationData setNewRecordLocation(int datalen) { - int recordSize = Header.HEADER_SIZE + datalen; - - try { - // Always persist messages at the end of the journal unless - // there's an empty position large enough within the journal - // - JournalLocationData location = getStorageLocation(datalen); - if (location.offset == -1) { - // None found, need to add record to the end of the journal - // Seek there now only if we're not already there - long currentPos = buffer.position(); - if (currentPos != currentEnd) { - currentPos = buffer.position((int) currentEnd).position(); - } - - // Check to see if we need to grow the journal file - - long journalLen; - if (!inMemory) { - journalLen = channel.size(); - } else { - journalLen = buffer.capacity(); - } - - if ((currentPos + recordSize) >= journalLen) { - // Need to grow the buffer/file by another page - currentPos = expandJournal(journalLen, currentPos); - } - - location.offset = currentEnd; - - // Increment currentEnd by the size of the record appended - currentEnd += recordSize; - } else { - // Seek to the returned insertion point - buffer.position((int) location.offset); - } - - return location; - } catch (Exception e) { - e.printStackTrace(); - System.exit(-1); - } - - return null; - } - - protected long expandJournal(long journalLen, long currentPos) throws IOException { - if (debugLogging) { - logger.info("Expanding journal size"); - } - - if (inMemory) { - long newLength = - journalLen + (MEGABYTE * JOURNAL_SIZE_FACTOR); - System.out.print("Expanding ByteBuffer size to " + newLength + "..."); - ByteBuffer newBuffer = ByteBuffer.allocateDirect((int) newLength); - if (buffer.hasArray()) { - byte[] array = buffer.array(); - newBuffer.put(array); - } else { - buffer.position(0); - newBuffer.put(buffer); - } - buffer = newBuffer; - journalLen = buffer.capacity(); - } else { - System.out.print("Expanding RandomAccessFile journal size to " + (journalLen + (MEGABYTE * JOURNAL_SIZE_FACTOR)) + "..."); - ((MappedByteBuffer) buffer).force(); - journal.setLength(journalLen + (MEGABYTE * JOURNAL_SIZE_FACTOR)); - channel = journal.getChannel(); - journalLen = channel.size(); - buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, journalLen); - } - System.out.println("done"); - - // Since we re-mapped the file, double-check the position - currentPos = buffer.position(); - if (currentPos != currentEnd) { - buffer.position((int) currentEnd); - } - - return currentPos; - } - - protected void storeEmptyRecord(long offset, int length) { - // Store the empty record in an index. Look to see if there - // are other records of the same size (in a LinkedList). If - // so, add this one to the end of the linked list - // - LinkedList emptyRecs = emptyIdx.get(length); - if (emptyRecs == null) { - // There are no other records of this size. Add an entry - // in the hash table for this new linked list of records - emptyRecs = new LinkedList(); - emptyIdx.put(length, emptyRecs); - } - - // Add the pointer (file offset) to the new empty record - emptyRecs.add(offset); - } - - public int getIndexLoad() { - return this.index.getLoad(); - } - - /////////////////////////////////////////////////////////////////////////// - // Persistence interface - // - - // Empties the journal file and resets indexes - @Override - public void delete() { - try { - // Clear the existing indexes first - // - index.reset(); - emptyIdx.clear(); - - if (inMemory) { - buffer.clear(); - buffer.limit(0); - buffer = ByteBuffer.allocateDirect(0); - } else { - // Reset the file pointer and length - journal.seek(0); - channel.truncate(0); - channel.close(); - journal.close(); - File f = new File(createJournalFolderName(journalFolder, journalName)); - f.delete(); - } - } catch (EOFException eof) { - } catch (IOException io) { - } - } - - @Override - public synchronized long getRecordCount() { - return recordCount; - } - - @Override - public synchronized long getEmptyCount() { - return emptyIdx.size(); - } - - @Override - public String getName() { - return journalName; - } - - @Override - public String getFolder() { - return journalFolder; - } - - @Override - public synchronized long getFilesize() { - try { - return channel.size(); - } catch (Exception e) { - return 0; - } - } - - @Override - public boolean putLong(String key, Long val) { - return putVal(key, val, LONG_RECORD_TYPE); - } - - @Override - public boolean putInteger(String key, Integer val) { - return putVal(key, val, INT_RECORD_TYPE); - } - - @Override - public boolean putShort(String key, Short val) { - return putVal(key, val, SHORT_RECORD_TYPE); - } - - @Override - public boolean putChar(String key, char val) { - return putVal(key, val, CHAR_RECORD_TYPE); - } - - @Override - public boolean putFloat(String key, Float val) { - return putVal(key, val, FLOAT_RECORD_TYPE); - } - - @Override - public boolean putDouble(String key, Double val) { - return putVal(key, val, DOUBLE_RECORD_TYPE); - } - - @Override - public boolean putString(String key, String val) { - return putVal(key, val, TEXT_RECORD_TYPE); - } - - @Override - public boolean putObject(String key, Object obj) { - try (ByteArrayOutputStream bstream = new ByteArrayOutputStream(); - ObjectOutputStream ostream = new ObjectOutputStream(bstream)) { - - // Grab the payload and determine the record size - // - ostream.writeObject(obj); - byte[] bytes = bstream.toByteArray(); - ostream.close(); - - return putVal(key, bytes, BYTEARRAY_RECORD_TYPE); - } catch (Exception e) { - logger.severe("Exception: " + e.toString()); - } - - return false; - } - - @Override - public boolean putBytes(String key, byte[] bytes) { - return putVal(key, bytes, BYTEARRAY_RECORD_TYPE); - } - - @Override - public Long getLong(String key) { - return (Long) getValue(key, LONG_RECORD_TYPE); - } - - @Override - public Integer getInteger(String key) { - return (Integer) getValue(key, INT_RECORD_TYPE); - } - - @Override - public Short getShort(String key) { - return (Short) getValue(key, SHORT_RECORD_TYPE); - } - - @Override - public Float getFloat(String key) { - return (Float) getValue(key, FLOAT_RECORD_TYPE); - } - - @Override - public Double getDouble(String key) { - return (Double) getValue(key, DOUBLE_RECORD_TYPE); - } - - @Override - public char getChar(String key) { - Object obj = getValue(key, CHAR_RECORD_TYPE); - if (obj != null) { - return (char) obj; - } - return (char) 0; - } - - @Override - public String getString(String key) { - return (String) getValue(key, TEXT_RECORD_TYPE); - } - - @Override - public Object getObject(String key) { - Object object = null; - - Object obj = this.getValue(key, BYTEARRAY_RECORD_TYPE); - if (obj == null) { - return null; - } - - byte[] bytes = (byte[]) obj; - try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); - ObjectInputStream ostream = new ObjectInputStream(bis)) { - object = ostream.readObject(); - } catch (Exception e) { - logger.log(Level.SEVERE, "Exception", e); - } - - return object; - } - - @Override - public byte[] getBytes(String key) { - return (byte[]) this.getValue(key, BYTEARRAY_RECORD_TYPE); - } - - @Override - public boolean remove(String key) { - Long offset = (long) -1; - int datalength = -1; - - try { - synchronized (this) { - //bufferSize = buffer.capacity(); - - // Locate the message in the journal - offset = getRecordOffset(key); - - if (offset == -1) { - return false; - } - - // read the header (to get record length) then set it as inactive - buffer.position(offset.intValue()); - buffer.put(INACTIVE_RECORD); - buffer.put(EMPTY_RECORD_TYPE); - datalength = buffer.getInt(); - - // Store the empty record location and size for later reuse - storeEmptyRecord(offset, datalength); - - // Remove from the journal index - index.remove(key); - } - - return true; - } catch (Exception e) { - logger.log(Level.SEVERE, "Exception", e); - - logger.severe("deleteMessage data, offset=" + offset + ", length=" + datalength + ", bufferSize=" + bufferSize); - try { - logger.severe("current journal data, filePointer=" + journal.getFilePointer() - + ", filesize=" + journal.length()); - } catch (Exception e1) { - } - } - - return false; - } - - ////////////////////////////////////////////////////////////////////////// - - protected boolean putVal(String key, Object val, byte type) { - // Each message is written to a file with the following - // record structure: - // - // HEADER: - // Boolean - Active record indicator - // Byte - Message type (0=Empty, 1=Bytes, 2=String) - // Integer - Size of record's payload (not header) - // - // DATA: - // Byte array or data value - The message payload - // - try { - long start = System.currentTimeMillis(); - synchronized (this) { - int datalen; - - switch (type) { - case LONG_RECORD_TYPE: - datalen = Long.BYTES; - break; - case INT_RECORD_TYPE: - datalen = Integer.BYTES; - break; - case DOUBLE_RECORD_TYPE: - datalen = Double.BYTES; - break; - case FLOAT_RECORD_TYPE: - datalen = Float.BYTES; - break; - case SHORT_RECORD_TYPE: - datalen = Short.BYTES; - break; - case CHAR_RECORD_TYPE: - datalen = 2; // 16-bit Unicode character - break; - case TEXT_RECORD_TYPE: - datalen = ((String) val).getBytes().length; - break; - case BYTEARRAY_RECORD_TYPE: - datalen = ((byte[]) val).length; - break; - default: - return false; - } - - JournalLocationData location = setNewRecordLocation(datalen); - - // First write the header - // - buffer.put(ACTIVE_RECORD); - buffer.put(type); - buffer.putInt(datalen); - - // Write record value - // - switch (type) { - case LONG_RECORD_TYPE: - buffer.putLong((Long) val); - break; - case INT_RECORD_TYPE: - buffer.putInt((Integer) val); - break; - case DOUBLE_RECORD_TYPE: - buffer.putDouble((Double) val); - break; - case FLOAT_RECORD_TYPE: - buffer.putFloat((Float) val); - break; - case SHORT_RECORD_TYPE: - buffer.putShort((Short) val); - break; - case CHAR_RECORD_TYPE: - buffer.putChar((char) val); - break; - case TEXT_RECORD_TYPE: - buffer.put(((String) val).getBytes()); - break; - case BYTEARRAY_RECORD_TYPE: - buffer.put((byte[]) val); - break; - } - - // Next, see if we need to append an empty record if we inserted - // this new record at an empty location - if (location.newEmptyRecordSize != -1) { - // Write the header and data for the new record, as well - // as header indicating an empty record - buffer.put(INACTIVE_RECORD); // inactive record - buffer.put(EMPTY_RECORD_TYPE); // save message type EMPTY - buffer.putInt(location.newEmptyRecordSize); - - if (buffer.position() > currentEnd) { - currentEnd = buffer.position(); - } - } - - indexRecord(key, location.offset); - - recordCount++; - - long end = System.currentTimeMillis(); - this.objectPutTime += (end - start); - - return true; - } - } catch (Exception e) { - e.printStackTrace(); - } - - return false; - } - - protected Object getValue(String key, byte type) { - Long offset = getRecordOffset(key); - if (offset != null && offset > -1) { - return getValue(offset, type); - } - return null; - } - - protected Object getValue(Long offset, byte type) { - Object val = null; - try { - if (offset != null && offset > -1) { - long start = System.currentTimeMillis(); - - // Jump to this record's offset within the journal file - buffer.position(offset.intValue()); - - // First, read in the header - byte active = buffer.get(); - if (active != 1) { - return null; - } - - byte typeStored = buffer.get(); - if (type != typeStored) { - return null; - } - - int dataLength = buffer.getInt(); - - byte[] bytes; - switch (type) { - case LONG_RECORD_TYPE: - val = buffer.getLong(); - break; - case INT_RECORD_TYPE: - val = buffer.getInt(); - break; - case DOUBLE_RECORD_TYPE: - val = buffer.getDouble(); - break; - case FLOAT_RECORD_TYPE: - val = buffer.getFloat(); - break; - case SHORT_RECORD_TYPE: - val = buffer.getShort(); - break; - case CHAR_RECORD_TYPE: - val = buffer.getChar(); - break; - case BYTEARRAY_RECORD_TYPE: - bytes = new byte[dataLength]; - buffer.get(bytes); - val = bytes; - break; - case TEXT_RECORD_TYPE: - bytes = new byte[dataLength]; - buffer.get(bytes); - val = new String(bytes); - break; - } - - long end = System.currentTimeMillis(); - this.objectGetTime += (end - start); - } - } catch (Exception e) { - logger.log(Level.SEVERE, "Exception", e); - } - - return val; - } - - @Override - public Object iterateStart() { - try { - long current = 0; - - // Get past all the file header data - // - if (journal != null) { - journal.seek(0); - long filesize = journal.length(); - String version = journal.readUTF(); - String name = journal.readUTF(); - Long createTime = journal.readLong(); - current = journal.getFilePointer(); - } - - // Return the first active record found - return getNextRecord(current); - } catch (Exception e) { - e.printStackTrace(); - } - - return null; - } - - @Override - public Object iterateNext() { - return getNextRecord(iterateNext); - } - - protected Object getNextRecord(long current) { - int recordSize = 0; - try { - ByteBuffer bb = buffer; - if (bb.position() != current) { - bb.position((int) current); - } - - // Iterate through the records in the file with the intent to - // record the empty slots (for future reuse) and the end of the - // saved record - // - boolean found = false; - byte type = DataStore.EMPTY_RECORD_TYPE; - while (!found && current < (bufferSize - Header.HEADER_SIZE)) { - // Begin reading the next record header - // - - // Active Record? - boolean active = true; - if (bb.get() == INACTIVE_RECORD) { - active = false; - } - - // Read record type - type = bb.get(); - if (type == 0) { - bb.position((int) currentEnd); - break; // end of data records in file - } - - // Get the data length (comes after the record header) - int datalen = bb.getInt(); - recordSize = Header.HEADER_SIZE + datalen; - - if (active) { - // Found the next active record - found = true; - - // Store the location to the start of the next record - iterateNext = current + recordSize; - } else { - // skip past the data to the beginning of the next record - current += recordSize; - bb.position((int) current); - } - } - - if (found) { - // Return the record - return getValue(current, type); - } - } catch (Exception e) { - logger.log(Level.SEVERE, "Exception", e); - } - - return null; - } - - protected Long getRecordOffset(String key) { - Long location = index.get(key); - return location; - } - - protected boolean indexRecord(String key, long recordLocation) { - return index.put(key, recordLocation); - } - - public int getCollisions() { - return index.getCollisions(); - } - - public long getObjectRetrievalTime() { - return objectGetTime; - } - - public long getObjectStorageTime() { - return objectPutTime; - } - - public void outputStats() { - System.out.println("Data Store:"); - System.out.println(" -size: " + buffer.capacity()); - index.outputStats(); - } -} diff --git a/common/src/main/java/bisq/common/util/GcUtil.java b/common/src/main/java/bisq/common/util/GcUtil.java index 5f89bd4f61..687c397094 100644 --- a/common/src/main/java/bisq/common/util/GcUtil.java +++ b/common/src/main/java/bisq/common/util/GcUtil.java @@ -26,8 +26,8 @@ import lombok.extern.slf4j.Slf4j; public class GcUtil { @Setter private static boolean DISABLE_GC_CALLS = false; - private static int TRIGGER_MEM = 1000; - private static int TRIGGER_MAX_MEM = 3000; + private static final int TRIGGER_MEM = 1000; + private static final int TRIGGER_MAX_MEM = 3000; private static int totalInvocations; private static long totalGCTime; diff --git a/core/src/main/java/bisq/core/dao/state/DaoStateSnapshotService.java b/core/src/main/java/bisq/core/dao/state/DaoStateSnapshotService.java index 4780176aad..c095891b28 100644 --- a/core/src/main/java/bisq/core/dao/state/DaoStateSnapshotService.java +++ b/core/src/main/java/bisq/core/dao/state/DaoStateSnapshotService.java @@ -143,7 +143,6 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene // This also comes with the improvement that the user does not need to load the past blocks back to the last // snapshot height. Though it comes also with the small risk that in case of re-orgs the user need to do // a resync in case the dao state would have been affected by that reorg. - //todo long ts = System.currentTimeMillis(); // We do not keep a copy of the clone as we use it immediately for persistence. GcUtil.maybeReleaseMemory(); diff --git a/core/src/main/java/bisq/core/dao/state/storage/BlocksPersistence.java b/core/src/main/java/bisq/core/dao/state/storage/BlocksPersistence.java index ddb3f661df..82826823f7 100644 --- a/core/src/main/java/bisq/core/dao/state/storage/BlocksPersistence.java +++ b/core/src/main/java/bisq/core/dao/state/storage/BlocksPersistence.java @@ -38,8 +38,6 @@ import javax.annotation.Nullable; @Slf4j public class BlocksPersistence { - // 10000->Writing 130014 blocks took 1658 msec - // 1000-> Writing 130014 blocks took 1685 msec Mapping blocks from DaoStateStore took 2250 ms public static final int BUCKET_SIZE = 1000; // results in about 1 MB files and about 1 new file per week private final File storageDir; @@ -51,10 +49,6 @@ public class BlocksPersistence { this.storageDir = storageDir; this.fileName = fileName; this.persistenceProtoResolver = persistenceProtoResolver; - - /* if (!storageDir.exists()) { - storageDir.mkdir(); - }*/ } public void writeBlocks(List protobufBlocks) { @@ -75,7 +69,6 @@ public class BlocksPersistence { int first = bucketIndex * BUCKET_SIZE - BUCKET_SIZE + 1; int last = bucketIndex * BUCKET_SIZE; File storageFile = new File(storageDir, fileName + "_" + first + "-" + last); - // log.error("addAll height={} items={}", height, temp.stream().map(e -> e.getHeight() + ", ").collect(Collectors.toList())); writeToDisk(storageFile, new BsqBlockStore(temp), null); temp = new ArrayList<>(); } @@ -85,11 +78,10 @@ public class BlocksPersistence { int first = bucketIndex * BUCKET_SIZE - BUCKET_SIZE + 1; int last = bucketIndex * BUCKET_SIZE; File storageFile = new File(storageDir, fileName + "_" + first + "-" + last); - // log.error("items={}", temp.stream().map(e -> e.getHeight()).collect(Collectors.toList())); writeToDisk(storageFile, new BsqBlockStore(temp), null); } - log.error("Write {} blocks to disk took {} msec", protobufBlocks.size(), System.currentTimeMillis() - ts); + log.info("Write {} blocks to disk took {} msec", protobufBlocks.size(), System.currentTimeMillis() - ts); } public void removeBlocksDirectory() { @@ -107,19 +99,15 @@ public class BlocksPersistence { storageDir.mkdir(); } - // log.error("getBlocks {}-{}", from, to); long ts = System.currentTimeMillis(); - // from = Math.max(571747, from); List buckets = new ArrayList<>(); int start = from / BUCKET_SIZE + 1; int end = to / BUCKET_SIZE + 1; for (int bucketIndex = start; bucketIndex <= end; bucketIndex++) { List bucket = readBucket(bucketIndex); - // log.error("read bucketIndex {}, items={}", bucketIndex, bucket.stream().map(e -> e.getHeight() + ", ").collect(Collectors.toList())); buckets.addAll(bucket); } - log.error("Reading {} blocks took {} msec", buckets.size(), System.currentTimeMillis() - ts); - // System.exit(0); + log.info("Reading {} blocks took {} msec", buckets.size(), System.currentTimeMillis() - ts); return buckets; } @@ -127,23 +115,17 @@ public class BlocksPersistence { private List readBucket(int bucketIndex) { int first = bucketIndex * BUCKET_SIZE - BUCKET_SIZE + 1; int last = bucketIndex * BUCKET_SIZE; - - /* int first = bucketIndex * BUCKET_SIZE + 1; - int last = first + BUCKET_SIZE - 1;*/ String child = fileName + "_" + first + "-" + last; - // log.error("getBlocksOfBucket {}", child); File storageFile = new File(storageDir, child); if (!storageFile.exists()) { - // log.error("storageFile not existing {}", storageFile.getName()); return new ArrayList<>(); } - try (FileInputStream fileInputStream = new FileInputStream(storageFile)) { protobuf.PersistableEnvelope proto = protobuf.PersistableEnvelope.parseDelimitedFrom(fileInputStream); BsqBlockStore bsqBlockStore = (BsqBlockStore) persistenceProtoResolver.fromProto(proto); return bsqBlockStore.getBlocksAsProto(); } catch (Throwable t) { - log.error("Reading {} failed with {}.", fileName, t.getMessage()); + log.info("Reading {} failed with {}.", fileName, t.getMessage()); return new ArrayList<>(); } } @@ -151,7 +133,6 @@ public class BlocksPersistence { private void writeToDisk(File storageFile, BsqBlockStore bsqBlockStore, @Nullable Runnable completeHandler) { - long ts = System.currentTimeMillis(); File tempFile = null; FileOutputStream fileOutputStream = null; try { @@ -196,7 +177,6 @@ public class BlocksPersistence { e.printStackTrace(); log.error("Cannot close resources." + e.getMessage()); } - // log.info("Writing the serialized {} completed in {} msec", fileName, System.currentTimeMillis() - ts); if (completeHandler != null) { completeHandler.run(); } diff --git a/core/src/main/java/bisq/core/dao/state/storage/BsqBlocksStorageService.java b/core/src/main/java/bisq/core/dao/state/storage/BsqBlocksStorageService.java index 61a9e3923e..6b6002e0fc 100644 --- a/core/src/main/java/bisq/core/dao/state/storage/BsqBlocksStorageService.java +++ b/core/src/main/java/bisq/core/dao/state/storage/BsqBlocksStorageService.java @@ -46,6 +46,7 @@ import lombok.extern.slf4j.Slf4j; @Singleton public class BsqBlocksStorageService { public final static String NAME = "BsqBlocks"; + private final int genesisBlockHeight; private final File storageDir; private final BlocksPersistence blocksPersistence; @@ -72,7 +73,7 @@ public class BsqBlocksStorageService { chainHeightOfPersistedBlocks = Math.max(chainHeightOfPersistedBlocks, getHeightOfLastFullBucket(blocks)); } - log.error("Persist (serialize+write) {} blocks took {} ms", + log.info("Persist (serialize+write) {} blocks took {} ms", blocks.size(), System.currentTimeMillis() - ts); } @@ -83,18 +84,15 @@ public class BsqBlocksStorageService { List list = blocksPersistence.readBlocks(genesisBlockHeight, chainHeight); list.stream().map(Block::fromProto) .forEach(blocks::add); - log.error("Reading and deserializing {} blocks took {} ms", blocks.size(), System.currentTimeMillis() - ts); + log.info("Reading and deserializing {} blocks took {} ms", blocks.size(), System.currentTimeMillis() - ts); if (!blocks.isEmpty()) { chainHeightOfPersistedBlocks = getHeightOfLastFullBucket(blocks); } return blocks; } - public LinkedList swapBlocks(List protobufBlocks) { + public LinkedList migrateBlocks(List protobufBlocks) { long ts = System.currentTimeMillis(); - log.error("We have {} blocks in the daoStateAsProto", protobufBlocks.size()); - - blocksPersistence.writeBlocks(protobufBlocks); LinkedList blocks = new LinkedList<>(); protobufBlocks.forEach(protobufBlock -> blocks.add(Block.fromProto(protobufBlock))); @@ -102,7 +100,7 @@ public class BsqBlocksStorageService { chainHeightOfPersistedBlocks = getHeightOfLastFullBucket(blocks); } - log.error("Mapping blocks (write+deserialization) from DaoStateStore took {} ms", System.currentTimeMillis() - ts); + log.info("Migrating blocks (write+deserialization) from DaoStateStore took {} ms", System.currentTimeMillis() - ts); return blocks; } @@ -138,18 +136,15 @@ public class BsqBlocksStorageService { File destinationFile = new File(storageDir, fileName); FileUtils.copyFile(resourceFile, destinationFile); } - log.error("Copying {} resource files took {} ms", fileNames.length, System.currentTimeMillis() - ts); + log.info("Copying {} resource files took {} ms", fileNames.length, System.currentTimeMillis() - ts); } catch (Throwable e) { e.printStackTrace(); } } - // todo private int getHeightOfLastFullBucket(List blocks) { - int i = blocks.get(blocks.size() - 1).getHeight() / BlocksPersistence.BUCKET_SIZE; - int i1 = i * BlocksPersistence.BUCKET_SIZE; - // log.error("getHeightOfLastFullBucket {}", i * BlocksPersistence.BUCKET_SIZE); - return i1; + int bucketIndex = blocks.get(blocks.size() - 1).getHeight() / BlocksPersistence.BUCKET_SIZE; + return bucketIndex * BlocksPersistence.BUCKET_SIZE; } public void removeBlocksDirectory() { @@ -163,11 +158,5 @@ public class BsqBlocksStorageService { if (!storageDir.exists()) { storageDir.mkdir(); } - /* List blocks = new ArrayList<>(); - // height, long time, String hash, String previousBlockHash - for (int i = genesisBlockHeight; i <= chainHeightOfPersistedBlocks; i++) { - blocks.add(new Block(i, 0, "", "").toProtoMessage()); - } - blocksPersistence.addAll(blocks);*/ } } diff --git a/core/src/main/java/bisq/core/dao/state/storage/DaoStateStorageService.java b/core/src/main/java/bisq/core/dao/state/storage/DaoStateStorageService.java index 7e38801fa5..3bd7fb70bc 100644 --- a/core/src/main/java/bisq/core/dao/state/storage/DaoStateStorageService.java +++ b/core/src/main/java/bisq/core/dao/state/storage/DaoStateStorageService.java @@ -102,7 +102,7 @@ public class DaoStateStorageService extends StoreService { persistenceManager.persistNow(() -> { // After we have written to disk we remove the daoStateAsProto in the store to avoid that it stays in // memory there until the next persist call. - log.error("Persist daoState took {} ms", System.currentTimeMillis() - ts); + log.info("Persist daoState took {} ms", System.currentTimeMillis() - ts); store.releaseMemory(); GcUtil.maybeReleaseMemory(); UserThread.execute(completeHandler); @@ -116,7 +116,6 @@ public class DaoStateStorageService extends StoreService { Thread.currentThread().setName("copyBsqBlocksFromResources"); bsqBlocksStorageService.copyFromResources(postFix); - // We read daoState and blocks ane keep them in fields in store and super.readFromResources(postFix, () -> { // We got mapped back to user thread so we need to create a new thread again as we dont want to // execute on user thread @@ -128,7 +127,7 @@ public class DaoStateStorageService extends StoreService { if (daoStateAsProto.getBlocksList().isEmpty()) { list = bsqBlocksStorageService.readBlocks(daoStateAsProto.getChainHeight()); } else { - list = bsqBlocksStorageService.swapBlocks(daoStateAsProto.getBlocksList()); + list = bsqBlocksStorageService.migrateBlocks(daoStateAsProto.getBlocksList()); } blocks.clear(); blocks.addAll(list); @@ -144,11 +143,10 @@ public class DaoStateStorageService extends StoreService { if (daoStateAsProto != null) { long ts = System.currentTimeMillis(); DaoState daoState = DaoState.fromProto(daoStateAsProto, blocks); - log.error("Deserializing DaoState with {} blocks took {} ms", + log.info("Deserializing DaoState with {} blocks took {} ms", daoState.getBlocks().size(), System.currentTimeMillis() - ts); return daoState; } - return new DaoState(); } @@ -189,7 +187,7 @@ public class DaoStateStorageService extends StoreService { } private void removeAndBackupDaoConsensusFiles(File storageDir, String backupDirName) throws IOException { - // We delete all DAO consensus data. Some will be rebuild from resources. + // We delete all DAO related data. Some will be rebuild from resources. long currentTime = System.currentTimeMillis(); String newFileName = "BlindVoteStore_" + currentTime; FileUtil.removeAndBackupFile(storageDir, new File(storageDir, "BlindVoteStore"), newFileName, backupDirName); diff --git a/core/src/main/java/bisq/core/dao/state/storage/DaoStateStore.java b/core/src/main/java/bisq/core/dao/state/storage/DaoStateStore.java index 1a041ed4d6..4b48c6f9ac 100644 --- a/core/src/main/java/bisq/core/dao/state/storage/DaoStateStore.java +++ b/core/src/main/java/bisq/core/dao/state/storage/DaoStateStore.java @@ -77,7 +77,6 @@ public class DaoStateStore implements PersistableEnvelope { return new DaoStateStore(proto.getDaoState(), daoStateHashList); } - public void releaseMemory() { daoStateAsProto = null; daoStateHashChain = null;