Persist bsq blocks as buckets of 1000 blocks

Improve logging

Add BsqBlockStore to protobuf

Remove DaoStateMonitoringService field

Do not persist the blocks in daoState anymore.

This improves persistence performance and reduces memory
requirements for snapshots.
This commit is contained in:
chimp1984 2021-10-28 02:54:52 +02:00
parent 699048634b
commit 7c32587453
No known key found for this signature in database
GPG Key ID: 9801B4EC591F90E3
18 changed files with 2559 additions and 75 deletions

View File

@ -63,6 +63,7 @@ configure(subprojects) {
kotlinVersion = '1.3.41'
knowmXchangeVersion = '4.4.2'
langVersion = '3.11'
leveldbVersion = '1.2'
logbackVersion = '1.1.11'
loggingVersion = '1.2'
lombokVersion = '1.18.12'
@ -238,7 +239,7 @@ configure(project(':common')) {
javafx {
version = "$javafxVersion"
modules = ['javafx.graphics']
modules = ['javafx.graphics']
}
dependencies {
@ -264,6 +265,10 @@ configure(project(':common')) {
exclude(module: 'okhttp')
exclude(module: 'okio')
}
implementation("io.github.pcmind:leveldb:$leveldbVersion") {
exclude(module: 'guava')
}
runtimeOnly("io.grpc:grpc-netty-shaded:$grpcVersion") {
exclude(module: 'guava')
exclude(module: 'animal-sniffer-annotations')
@ -284,7 +289,7 @@ configure(project(':p2p')) {
javafx {
version = "$javafxVersion"
modules = ['javafx.base']
modules = ['javafx.base']
}
dependencies {
@ -298,6 +303,7 @@ configure(project(':p2p')) {
implementation("org.apache.httpcomponents:httpclient:$httpclientVersion") {
exclude(module: 'commons-codec')
}
compile "org.fxmisc.easybind:easybind:$easybindVersion"
compileOnly "org.projectlombok:lombok:$lombokVersion"
annotationProcessor "org.projectlombok:lombok:$lombokVersion"
@ -324,7 +330,7 @@ configure(project(':core')) {
javafx {
version = "$javafxVersion"
modules = ['javafx.base']
modules = ['javafx.base']
}
dependencies {
@ -411,7 +417,7 @@ configure(project(':desktop')) {
javafx {
version = "$javafxVersion"
modules = ['javafx.controls', 'javafx.fxml']
modules = ['javafx.controls', 'javafx.fxml']
}
version = '1.7.5-SNAPSHOT'
@ -459,7 +465,7 @@ configure(project(':monitor')) {
javafx {
version = "$javafxVersion"
modules = ['javafx.base']
modules = ['javafx.base']
}
mainClassName = 'bisq.monitor.Monitor'

View File

@ -0,0 +1,78 @@
package bisq.common.persistence.db;
public interface DataStore {
public enum Storage {
IN_MEMORY,
PERSISTED
}
public static final byte INACTIVE_RECORD = 0;
public static final byte ACTIVE_RECORD = 1;
public static final byte EMPTY_RECORD_TYPE = -1;
public static final byte OBJ_RECORD_TYPE = 1;
public static final byte TEXT_RECORD_TYPE = 2;
public static final byte LONG_RECORD_TYPE = 3;
public static final byte INT_RECORD_TYPE = 4;
public static final byte DOUBLE_RECORD_TYPE = 5;
public static final byte FLOAT_RECORD_TYPE = 6;
public static final byte SHORT_RECORD_TYPE = 7;
public static final byte CHAR_RECORD_TYPE = 8;
public static final byte BYTEARRAY_RECORD_TYPE = 9;
// Get Journal stats
public long getRecordCount();
public long getEmptyCount();
public String getName();
public String getFolder();
public long getFilesize();
public boolean putInteger(String key, Integer val);
public Integer getInteger(String key);
public boolean putShort(String key, Short val);
public Short getShort(String key);
public boolean putLong(String key, Long val);
public Long getLong(String key);
public boolean putFloat(String key, Float val);
public Float getFloat(String key);
public boolean putDouble(String key, Double val);
public Double getDouble(String key);
public boolean putString(String key, String val);
public String getString(String key);
public boolean putObject(String key, Object msg);
public Object getObject(String key); // key is the object ID
boolean putBytes(String key, byte[] bytes);
byte[] getBytes(String key);
public boolean putChar(String key, char val);
public char getChar(String key);
public boolean remove(String key); // ID is the object hashs
public Object iterateStart();
public Object iterateNext();
public void delete();
}

View File

@ -0,0 +1,418 @@
package bisq.common.persistence.db;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.io.File;
import java.io.RandomAccessFile;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* @author ebruno
*/
public class FixedHash implements HashBase {
public enum Storage {
IN_MEMORY,
PERSISTED
}
protected static final Logger logger = Logger.getLogger("FixedHash");
protected boolean debugLogging = false;
protected static int LOAD_THRESHOLD = 70;
protected static final int PAGE_SIZE = 1024 * 1024;
protected static final int SIZE_FACTOR = 2;
protected static final int DEFAULT_INDEX_JOURNAL_SIZE = SIZE_FACTOR * PAGE_SIZE;
protected static final int KEY_SIZE = 0;
//
// 1 (byte) key length
// 4 (int) key hashcode
// 0 (bytes) key size
// 8 (long) record location
//
protected static final int INDEX_ENTRY_SIZE_BYTES =
1 + Integer.BYTES + KEY_SIZE + Long.BYTES;
protected long sizeInBytes = DEFAULT_INDEX_JOURNAL_SIZE;
protected int previousOffset = 0; // the last record inserted into the index
protected int bucketsFree = 0;
protected int bucketsUsed = 0;
protected int totalBuckets = 0;
protected int collisions = 0;
protected String journalPath = "";
protected boolean inMemory = true;
protected RandomAccessFile indexFile = null;
protected FileChannel indexChannel = null;
protected ByteBuffer indexBuffer = null;
protected byte keyLength = 16;
protected long indexCurrentEnd = 0;
protected int indexRecordReadCount = 1;
// Used when iterating through the index
protected int iterateNext = 0;
///////////////////////////////////////////////////////////////////////////
public FixedHash(String journalPath, boolean inMemory, boolean reuseExisting) {
this(DEFAULT_INDEX_JOURNAL_SIZE, journalPath, inMemory, reuseExisting);
}
public FixedHash(int size, String journalPath, boolean inMemory, boolean reuseExisting) {
boolean success = false;
sizeInBytes = size;
this.inMemory = inMemory;
this.journalPath = journalPath;
if (inMemory) {
success = createIndexJournalBB();
} else {
success = createIndexJournalMBB(reuseExisting);
}
if (success) {
totalBuckets = (int) (sizeInBytes / INDEX_ENTRY_SIZE_BYTES);
bucketsFree = (int) (sizeInBytes / INDEX_ENTRY_SIZE_BYTES);
bucketsUsed = 0;
}
}
protected boolean createIndexJournalBB() {
try {
indexBuffer = ByteBuffer.allocateDirect((int) sizeInBytes);
indexCurrentEnd = indexBuffer.position();
return true;
} catch (Exception e) {
logger.log(Level.SEVERE, "Exception", e);
}
return false;
}
protected boolean createIndexJournalMBB(boolean reuseExisting) {
try {
journalPath += "Index";
// If the journal file already exists, rename it unless we're
// supposed to reuse the existing file and its contents
boolean fileExists = false;
try {
File file = new File(journalPath);
fileExists = file.exists();
if (fileExists && !reuseExisting) {
File newFile = new File(journalPath + "_prev");
logger.info("Moving journal " + journalPath + " to " + newFile.getName());
file.renameTo(newFile);
}
} catch (Exception e) {
}
indexFile = new RandomAccessFile(journalPath, "rw");
if (fileExists && reuseExisting) {
// Existing file, so use its existing length
sizeInBytes = indexFile.length();
} else {
// New file, set its length
indexFile.setLength(sizeInBytes);
}
indexChannel = indexFile.getChannel();
indexBuffer = indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, sizeInBytes);
indexCurrentEnd = indexBuffer.position();
return true;
} catch (Exception e) {
logger.log(Level.SEVERE, "Exception", e);
}
return false;
}
@Override
public void reset() {
try {
indexBuffer.clear();
indexBuffer.limit(0);
if (inMemory) {
indexBuffer = ByteBuffer.allocateDirect(0);
} else {
indexChannel.truncate(0);
indexChannel.close();
indexFile.close();
File f = new File(journalPath);
f.delete();
}
} catch (Exception e) {
e.printStackTrace();
}
}
protected int getHashBucket(int hash) {
return _getHashBucket(hash);
}
protected int getHashBucket(String key) {
int hash = key.hashCode();
return _getHashBucket(hash);
}
protected int _getHashBucket(int hash) {
hash = hash ^ (hash >>> 16);
int buckets = (int) (sizeInBytes / INDEX_ENTRY_SIZE_BYTES);
int bucket = Math.max(1, Math.abs(hash) % (buckets - 1));
return bucket * INDEX_ENTRY_SIZE_BYTES;
}
protected boolean enlargeIndex() {
try {
// Hold a reference to the original buffer to copy its contents
ByteBuffer oldBuffer = indexBuffer;
if (inMemory) {
logger.log(Level.INFO, "Expanding in-memory index...");
sizeInBytes += (PAGE_SIZE * SIZE_FACTOR);
createIndexJournalBB();
} else {
logger.log(Level.INFO, "Expanding persisted index...");
((MappedByteBuffer) indexBuffer).force();
indexFile.setLength(sizeInBytes + (PAGE_SIZE * SIZE_FACTOR));
indexChannel = indexFile.getChannel();
sizeInBytes = indexChannel.size();
indexBuffer = indexChannel.map(
FileChannel.MapMode.READ_WRITE, 0, sizeInBytes);
}
// Re-hash the index
//
collisions = 0;
bucketsUsed = 0;
oldBuffer.position(INDEX_ENTRY_SIZE_BYTES);
int buckets = (oldBuffer.capacity() / INDEX_ENTRY_SIZE_BYTES);
for (int i = 1; i <= buckets; i++) {
byte occupied = oldBuffer.get();
if (occupied > 0) {
int keyHash = oldBuffer.getInt();
byte[] fixedKeyBytes = null;
if (KEY_SIZE > 0) {
fixedKeyBytes = new byte[KEY_SIZE];
oldBuffer.get(fixedKeyBytes);
}
Long location = oldBuffer.getLong();
putInternal(fixedKeyBytes, keyHash, occupied, location);
} else {
// Bucket unocuppied, move to the next one
oldBuffer.position(i * INDEX_ENTRY_SIZE_BYTES);
}
}
totalBuckets = (int) (sizeInBytes / INDEX_ENTRY_SIZE_BYTES);
bucketsFree = totalBuckets - bucketsUsed;
logger.log(Level.INFO, "Done!");
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
protected int findBucket(Integer hashcode, int offset, boolean mustFind) {
boolean found = false;
byte occupied = 1;
while (occupied > 0 && !found) {
int keyHash = indexBuffer.getInt();
if (keyHash == hashcode) {
if (KEY_SIZE > 0) {
indexBuffer.position(
offset + 1 + Integer.BYTES + KEY_SIZE);
}
found = true;
break;
} else {
// Check for rollover past the end of the table
offset += INDEX_ENTRY_SIZE_BYTES;
if (offset >= (sizeInBytes - INDEX_ENTRY_SIZE_BYTES)) {
// Wrap to the beginning, skipping the first slot
// since it's reserved for the first record pointer
offset = INDEX_ENTRY_SIZE_BYTES;
}
// Skip to the next bucket
indexBuffer.position(offset);
occupied = indexBuffer.get();
}
}
// return if the key was found in the index
if (!found && mustFind) {
return -1;
}
return offset;
}
protected boolean putInternal(byte[] fixedKeyBytes, Integer hashcode,
byte keyLength, Long value) {
int offset = getHashBucket(hashcode);
indexBuffer.position(offset);
indexBuffer.mark();
byte occupied = indexBuffer.get();
if (occupied == 0) {
// found a free slot, go back to the beginning of it
indexBuffer.reset();
} else {
collisions++;
// When there's a collision, walk the table until a
// free slot is found
offset = findBucket(hashcode, offset, false);
// found a free slot, seek to it
indexBuffer.position(offset);
}
// Write the data
//
indexBuffer.put(keyLength);
indexBuffer.putInt(hashcode); // hashcode is faster for resolving collisions then comparing strings
if (KEY_SIZE > 0 && fixedKeyBytes != null && fixedKeyBytes.length > 0) {
// Make sure we copy *at most* KEY_SIZE bytes for the key
indexBuffer.put(fixedKeyBytes,
0, Math.min(KEY_SIZE, fixedKeyBytes.length));
}
indexBuffer.putLong(value); // indexed record location
bucketsUsed++;
return true;
}
@Override
public boolean put(String key, Long value) {
//
// Entry:
// 1 (byte) key length
// 4 (int) key hashcode
// 0 (bytes) key size
// 8 (long) record location
//
try {
// Check load to see if Index needs to be enlarged
//
if (getLoad() > LOAD_THRESHOLD) {
enlargeIndex();
}
byte keylen = (byte) key.length();
byte[] fixedKeyBytes = null;
if (KEY_SIZE > 0) {
fixedKeyBytes = new byte[KEY_SIZE];
System.arraycopy(key.getBytes(),
0, fixedKeyBytes,
0, Math.min(KEY_SIZE, keylen));
}
return putInternal(fixedKeyBytes, key.hashCode(), keylen, value);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
protected int getHashBucketOffset(String key) {
int offset = -1;
try {
offset = getHashBucket(key.hashCode());
indexBuffer.position(offset);
byte occupied = indexBuffer.get();
if (occupied > 0) {
offset = findBucket(key.hashCode(), offset, true);
}
} catch (Exception e) {
e.printStackTrace();
}
return offset;
}
@Override
public Long get(String key) {
int offset = getHashBucketOffset(key);
if (offset == -1) {
// key not found
return -1L;
}
// Return the location of the data record
return indexBuffer.getLong();
}
@Override
public void remove(String key) {
int offset = getHashBucketOffset(key);
if (offset == -1) {
// key not found
return;
}
offset = findBucket(key.hashCode(), offset, true);
if (offset != -1) {
// Simply zero out the occupied slot, but need to rewind first
int currPos = indexBuffer.position();
currPos -= (Integer.BYTES + 1);
indexBuffer.position(currPos);
indexBuffer.put((byte) 0);
}
}
@Override
public int getCollisions() {
return collisions;
}
@Override
public void outputStats() {
System.out.println("Index " + journalPath + " Stats:");
System.out.println(" -size: " + size());
System.out.println(" -load: " + getLoad());
System.out.println(" -entries: " + entries());
System.out.println(" -capacity: " + capacity());
System.out.println(" -available: " + available());
System.out.println(" -collisions: " + getCollisions());
}
public long size() {
return sizeInBytes;
}
public int entries() {
return bucketsUsed;
}
public int capacity() {
return totalBuckets;
}
public int available() {
return capacity() - entries();
}
public int getLoad() {
int used = entries();
int capac = capacity();
float f = (float) used / (float) capac;
int load = (int) (f * 100);
return load; // percentage
}
}

View File

@ -0,0 +1,20 @@
package bisq.common.persistence.db;
/**
* @author ebruno
*/
public interface HashBase {
public boolean put(String k, Long v);
public Long get(String k);
public void remove(String k);
public int getCollisions();
public int getLoad();
public void outputStats();
public void reset();
}

View File

@ -0,0 +1,380 @@
package bisq.common.persistence.db;
import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.util.Utilities;
import java.nio.charset.StandardCharsets;
import java.io.File;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
/**
*
* @author ebruno
*/
@Slf4j
public class NoHeapDB {
protected final static int MEGABYTE = 1024 * 1024;
protected final static int DEFAULT_STORE_SIZE = MEGABYTE * 100;
private static Thread thread;
static long c;
static long ts;
static String key;
public static void main(String[] args) throws Exception {
NoHeapDB db = new NoHeapDB();
/* db.createStore(
"MyTestDataStore",
DataStore.Storage.IN_MEMORY, //or DataStore.Storage.PERSISTED
256); // in MB*/
db.createStore(
"MyTestDataStore",
DataStore.Storage.PERSISTED, //or DataStore.Storage.PERSISTED
50, 5); // in MB
DataStore store = db.getStore("MyTestDataStore");
String sep = "_";
String result = store.getString(2 + sep + 100);
log.error("result {}", result);
String name = "MyTestDataStore";
byte[] array = new byte[10000000];
new Random().nextBytes(array);
String random = new String(array, StandardCharsets.UTF_8);
log.error(Utilities.readableFileSize(array.length));
log.error(Utilities.readableFileSize(random.getBytes(StandardCharsets.UTF_8).length));
c = store.getRecordCount();
key = c + sep;
ts = System.currentTimeMillis();
String res1 = store.getString(key);
// log.error("read took {} ms. {}", System.currentTimeMillis() - ts, res1);
Timer timer = UserThread.runPeriodically(() -> {
ts = System.currentTimeMillis();
key = c + sep;
String val1 = random + c;
store.putString(key, val1);
//log.error("write took {} ms", System.currentTimeMillis() - ts);
ts = System.currentTimeMillis();
String res = store.getString(key);
// log.error("read took {} ms. res={}, val1={}, match {}", System.currentTimeMillis() - ts, res, val1, res.equals(val1));
// log.error("read took {} ms. match {}", System.currentTimeMillis() - ts, res.equals(val1));
c++;
log.error("getFilesize {} getRecordCount {}", Utilities.readableFileSize(store.getFilesize()), store.getRecordCount());
System.gc();
if (store.getFilesize() > 1800000000) {
log.error("too large");
System.exit(0);
}
// 400 000
/* long ts = System.currentTimeMillis();
int size = 10000000;
for (int i = 0; i < size; i++) {
String val = String.valueOf(c * i);
String key = c + sep + i;
store.putString(key, val); //400 000
// log.error("write key/val {}/{}", key, val);
}
log.error("write took {} ms", System.currentTimeMillis() - ts);
ts = System.currentTimeMillis();
for (int i = 0; i < size; i++) {
String key = c + sep + i;
String val = store.getString(key);
//log.error("read key/val {}/{}", key, val);
}
log.error("read took {} ms", System.currentTimeMillis() - ts);
c++;*/
}, 100, TimeUnit.MILLISECONDS);
thread = new Thread(() -> {
while (true) {
}
});
thread.start();
UserThread.runAfter(() -> {
timer.stop();
thread.interrupt();
}, 500);
}
HashMap<String, DataStore> stores = new HashMap<>();
String homeDirectory =
System.getProperty("user.home") +
File.separator + "JavaOffHeap";
public NoHeapDB() {
}
public NoHeapDB(String homeDirectory) {
this.homeDirectory = homeDirectory;
}
public boolean createStore(String name) throws Exception {
return createStore(name,
DataStore.Storage.IN_MEMORY,
100, 10);
}
public boolean createStore(String name,
DataStore.Storage storageType)
throws Exception {
return createStore(name,
storageType,
100, 10);
}
public boolean createStore(String name,
DataStore.Storage storageType,
int size,
int indexFileSize) throws Exception {
if (size > Integer.MAX_VALUE) {
throw new Exception("Database size exceeds " + Integer.MAX_VALUE);
}
NoHeapDBStore nohdb = new NoHeapDBStore(homeDirectory,
name,
storageType,
size * MEGABYTE,
indexFileSize * MEGABYTE,
true);
stores.put(name, nohdb);
return true;
}
public DataStore getStore(String storeName) {
return this.stores.get(storeName);
}
public boolean deleteStore(String storeName) {
DataStore store = this.stores.get(storeName);
if (store != null) {
// Delete the store here
store.delete();
if (stores.remove(storeName) != null) {
return true;
}
}
return false;
}
public boolean putString(String storeName, String key, String value) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.putString(key, value);
}
return false;
}
public boolean putInteger(String storeName, String key, Integer value) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.putInteger(key, value);
}
return false;
}
public boolean putShort(String storeName, String key, Short value) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.putShort(key, value);
}
return false;
}
public boolean putLong(String storeName, String key, Long value) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.putLong(key, value);
}
return false;
}
public boolean putFloat(String storeName, String key, Float value) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.putFloat(key, value);
}
return false;
}
public boolean putDouble(String storeName, String key, Double value) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.putDouble(key, value);
}
return false;
}
public boolean putChar(String storeName, String key, char value) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.putChar(key, value);
}
return false;
}
public boolean putObject(String storeName, String key, Object value) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.putObject(key, value);
}
return false;
}
public String getString(String storeName, String key) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.getString(key);
}
return null;
}
public Integer getInteger(String storeName, String key) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.getInteger(key);
}
return null;
}
public Short getShort(String storeName, String key) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.getShort(key);
}
return null;
}
public Long getLong(String storeName, String key) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.getLong(key);
}
return null;
}
public Float getFloat(String storeName, String key) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.getFloat(key);
}
return null;
}
public Double getDouble(String storeName, String key) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.getDouble(key);
}
return null;
}
public char getChar(String storeName, String key) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.getChar(key);
}
return (char) 0;
}
public Object getObject(String storeName, String key) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.getObject(key);
}
return null;
}
public boolean remove(String storeName, String key) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.remove(key);
}
return false;
}
public Object iterateStart(String storeName) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.iterateStart();
}
return null;
}
public Object iterateNext(String storeName) {
DataStore store = this.stores.get(storeName);
if (store != null) {
return store.iterateNext();
}
return null;
}
public int getCollisions(String storeName) {
NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName);
return niop.getCollisions();
}
public int getIndexLoad(String storeName) {
NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName);
return niop.getIndexLoad();
}
public long getObjectRetrievalTime(String storeName) {
NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName);
return niop.getObjectRetrievalTime();
}
public long getObjectStorageTime(String storeName) {
NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName);
return niop.getObjectStorageTime();
}
public void outputStats(String storeName) {
NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName);
niop.outputStats();
}
public long getRecordCount(String storeName) {
NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName);
return niop.getRecordCount();
}
public long getEmptyCount(String storeName) {
NoHeapDBStore niop = (NoHeapDBStore) stores.get(storeName);
return niop.getEmptyCount();
}
}

File diff suppressed because it is too large Load Diff

View File

@ -18,7 +18,6 @@
package bisq.common.util;
import bisq.common.UserThread;
import bisq.common.app.DevEnv;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@ -29,6 +28,8 @@ public class GcUtil {
private static boolean DISABLE_GC_CALLS = false;
private static int TRIGGER_MEM = 1000;
private static int TRIGGER_MAX_MEM = 3000;
private static int totalInvocations;
private static long totalGCTime;
public static void autoReleaseMemory() {
if (DISABLE_GC_CALLS)
@ -59,20 +60,25 @@ public class GcUtil {
long preGcMemory = Runtime.getRuntime().totalMemory();
if (preGcMemory > trigger * 1024 * 1024) {
System.gc();
totalInvocations++;
long postGcMemory = Runtime.getRuntime().totalMemory();
log.info("GC reduced memory by {}. Total memory before/after: {}/{}. Took {} ms.",
long duration = System.currentTimeMillis() - ts;
totalGCTime += duration;
log.info("GC reduced memory by {}. Total memory before/after: {}/{}. Took {} ms. Total GC invocations: {} / Total GC time {} sec",
Utilities.readableFileSize(preGcMemory - postGcMemory),
Utilities.readableFileSize(preGcMemory),
Utilities.readableFileSize(postGcMemory),
System.currentTimeMillis() - ts);
if (DevEnv.isDevMode()) {
duration,
totalInvocations,
totalGCTime / 1000d);
/* if (DevEnv.isDevMode()) {
try {
// To see from where we got called
throw new RuntimeException("Dummy Exception for print stacktrace at maybeReleaseMemory");
} catch (Throwable t) {
t.printStackTrace();
}
}
}*/
}
}
}

View File

@ -35,6 +35,7 @@ import bisq.network.p2p.network.Connection;
import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.util.MathUtils;
import com.google.inject.Inject;
@ -222,8 +223,11 @@ public class LiteNode extends BsqNode {
runDelayedBatchProcessing(new ArrayList<>(blockList),
() -> {
log.info("Parsing {} blocks took {} seconds.", blockList.size(),
(System.currentTimeMillis() - ts) / 1000d);
double duration = System.currentTimeMillis() - ts;
log.info("Parsing {} blocks took {} seconds ({} min.) / {} ms in average / block", blockList.size(),
MathUtils.roundDouble(duration / 1000d, 2),
MathUtils.roundDouble(duration / 1000d / 60, 2),
MathUtils.roundDouble(duration / blockList.size(), 2));
// We only request again if wallet is synced, otherwise we would get repeated calls we want to avoid.
// We deal with that case at the setupWalletBestBlockListener method above.
if (walletsSetup.isDownloadComplete() &&

View File

@ -149,8 +149,8 @@ public class DaoStateService implements DaoSetupService {
return DaoState.getClone(daoState);
}
public protobuf.DaoState getCloneAsProto() {
return DaoState.getCloneAsProto(daoState);
public protobuf.DaoState getBsqStateCloneExcludingBlocks() {
return DaoState.getBsqStateCloneExcludingBlocks(daoState);
}
public byte[] getSerializedStateForHashChain() {
@ -321,12 +321,16 @@ public class DaoStateService implements DaoSetupService {
return getBlockAtHeight(height).map(Block::getTime).orElse(0L);
}
public List<Block> getBlocksFromBlockHeight(int fromBlockHeight) {
return getBlocksFromBlockHeight(fromBlockHeight, Integer.MAX_VALUE);
}
public List<Block> getBlocksFromBlockHeight(int fromBlockHeight, int numMaxBlocks) {
// We limit requests to numMaxBlocks blocks, to avoid performance issues and too
// large network data in case a node requests too far back in history.
return getBlocks().stream()
.filter(block -> block.getHeight() >= fromBlockHeight)
.sorted(Comparator.comparing(Block::getHeight))
.sorted(Comparator.comparing(Block::getHeight)) //todo not needed
.limit(numMaxBlocks)
.collect(Collectors.toList());
}

View File

@ -37,6 +37,7 @@ import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@ -61,9 +62,10 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
private final Preferences preferences;
private final File storageDir;
private protobuf.DaoState snapshotCandidate;
private int snapshotCandidateHeight;
private LinkedList<DaoStateHash> daoStateHashChainSnapshotCandidate = new LinkedList<>();
private protobuf.DaoState daoStateCandidate;
private LinkedList<DaoStateHash> hashChainCandidate = new LinkedList<>();
private List<Block> blocksCandidate;
private int snapshotHeight;
private int chainHeightOfLastApplySnapshot;
@Setter
@Nullable
@ -141,15 +143,20 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
// This also comes with the improvement that the user does not need to load the past blocks back to the last
// snapshot height. Though it comes also with the small risk that in case of re-orgs the user need to do
// a resync in case the dao state would have been affected by that reorg.
//todo
long ts = System.currentTimeMillis();
// We do not keep a copy of the clone as we use it immediately for persistence.
GcUtil.maybeReleaseMemory();
log.info("Create snapshot at height {}", daoStateService.getChainHeight());
daoStateStorageService.requestPersistence(daoStateService.getCloneAsProto(),
new LinkedList<>(daoStateMonitoringService.getDaoStateHashChain()),
int chainHeight = daoStateService.getChainHeight();
log.info("Create snapshot at height {}", chainHeight);
// We do not keep the data in our fields to enable gc as soon its released in the store
daoStateStorageService.requestPersistence(getDaoStateForSnapshot(),
getBlocksForSnapshot(),
getHashChainForSnapshot(),
() -> {
GcUtil.maybeReleaseMemory();
log.info("Persisted daoState after parsing completed at height {}. Took {} ms",
daoStateService.getChainHeight(), System.currentTimeMillis() - ts);
chainHeight, System.currentTimeMillis() - ts);
});
GcUtil.maybeReleaseMemory();
});
@ -167,8 +174,8 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
// Either we don't have a snapshot candidate yet, or if we have one the height at that snapshot candidate must be
// different to our current height.
boolean noSnapshotCandidateOrDifferentHeight = snapshotCandidate == null ||
snapshotCandidateHeight != chainHeight;
boolean noSnapshotCandidateOrDifferentHeight = daoStateCandidate == null ||
snapshotHeight != chainHeight;
if (isSnapshotHeight(chainHeight) &&
!daoStateService.getBlocks().isEmpty() &&
isValidHeight(daoStateService.getBlockHeightOfLastBlock()) &&
@ -191,7 +198,7 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
return;
}
if (snapshotCandidate != null) {
if (daoStateCandidate != null) {
persist();
} else {
createSnapshot();
@ -202,17 +209,12 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
private void persist() {
long ts = System.currentTimeMillis();
readyForPersisting = false;
daoStateStorageService.requestPersistence(snapshotCandidate,
daoStateHashChainSnapshotCandidate,
daoStateStorageService.requestPersistence(daoStateCandidate,
blocksCandidate,
hashChainCandidate,
() -> {
log.info("Serializing snapshotCandidate for writing to Disc at chainHeight {} took {} ms.\n" +
"snapshotCandidateHeight={};\n" +
"daoStateHashChainSnapshotCandidate.height={}",
daoStateService.getChainHeight(),
System.currentTimeMillis() - ts,
snapshotCandidateHeight,
daoStateHashChainSnapshotCandidate != null && !daoStateHashChainSnapshotCandidate.isEmpty() ?
daoStateHashChainSnapshotCandidate.getLast().getHeight() : "N/A");
log.info("Serializing daoStateCandidate for writing to Disc at chainHeight {} took {} ms.",
snapshotHeight, System.currentTimeMillis() - ts);
createSnapshot();
readyForPersisting = true;
@ -226,18 +228,13 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
// done from the write thread (mapped back to user thread).
// As we want to prevent to maintain 2 clones we prefer that strategy. If we would do the clone
// after the persist call we would keep an additional copy in memory.
snapshotCandidate = daoStateService.getCloneAsProto();
snapshotCandidateHeight = daoStateService.getChainHeight();
daoStateHashChainSnapshotCandidate = new LinkedList<>(daoStateMonitoringService.getDaoStateHashChain());
daoStateCandidate = getDaoStateForSnapshot();
blocksCandidate = getBlocksForSnapshot();
hashChainCandidate = getHashChainForSnapshot();
snapshotHeight = daoStateService.getChainHeight();
GcUtil.maybeReleaseMemory();
log.info("Cloned new snapshotCandidate at chainHeight {} took {} ms.\n" +
"snapshotCandidateHeight={};\n" +
"daoStateHashChainSnapshotCandidate.height={}",
daoStateService.getChainHeight(), System.currentTimeMillis() - ts,
snapshotCandidateHeight,
daoStateHashChainSnapshotCandidate != null && !daoStateHashChainSnapshotCandidate.isEmpty() ?
daoStateHashChainSnapshotCandidate.getLast().getHeight() : "N/A");
log.info("Cloned new daoStateCandidate at height {} took {} ms.", snapshotHeight, System.currentTimeMillis() - ts);
}
public void applySnapshot(boolean fromReorg) {
@ -247,13 +244,12 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
int chainHeightOfPersisted = persistedBsqState.getChainHeight();
if (!persistedBsqState.getBlocks().isEmpty()) {
int heightOfLastBlock = persistedBsqState.getLastBlock().getHeight();
log.debug("applySnapshot from persistedBsqState daoState with height of last block {}", heightOfLastBlock);
if (isValidHeight(heightOfLastBlock)) {
if (chainHeightOfLastApplySnapshot != chainHeightOfPersisted) {
chainHeightOfLastApplySnapshot = chainHeightOfPersisted;
daoStateService.applySnapshot(persistedBsqState);
daoStateMonitoringService.applySnapshot(persistedDaoStateHashChain);
daoStateStorageService.pruneStore();
daoStateStorageService.releaseMemory();
} else {
// The reorg might have been caused by the previous parsing which might contains a range of
// blocks.
@ -311,4 +307,17 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
private boolean isSnapshotHeight(int height) {
return isSnapshotHeight(genesisTxInfo.getGenesisBlockHeight(), height, SNAPSHOT_GRID);
}
private protobuf.DaoState getDaoStateForSnapshot() {
return daoStateService.getBsqStateCloneExcludingBlocks();
}
private List<Block> getBlocksForSnapshot() {
int fromBlockHeight = daoStateStorageService.getChainHeightOfPersistedBlocks() + 1;
return daoStateService.getBlocksFromBlockHeight(fromBlockHeight);
}
private LinkedList<DaoStateHash> getHashChainForSnapshot() {
return new LinkedList<>(daoStateMonitoringService.getDaoStateHashChain());
}
}

View File

@ -71,8 +71,8 @@ public class DaoState implements PersistablePayload {
return DaoState.fromProto(daoState.getBsqStateBuilder().build());
}
public static protobuf.DaoState getCloneAsProto(DaoState daoState) {
return daoState.getBsqStateBuilder().build();
public static protobuf.DaoState getBsqStateCloneExcludingBlocks(DaoState daoState) {
return daoState.getBsqStateBuilderExcludingBlocks().build();
}
@ -210,6 +210,10 @@ public class DaoState implements PersistablePayload {
LinkedList<Block> blocks = proto.getBlocksList().stream()
.map(Block::fromProto)
.collect(Collectors.toCollection(LinkedList::new));
return fromProto(proto, blocks);
}
public static DaoState fromProto(protobuf.DaoState proto, LinkedList<Block> blocks) {
LinkedList<Cycle> cycles = proto.getCyclesList().stream()
.map(Cycle::fromProto).collect(Collectors.toCollection(LinkedList::new));
TreeMap<TxOutputKey, TxOutput> unspentTxOutputMap = new TreeMap<>(proto.getUnspentTxOutputMapMap().entrySet().stream()

View File

@ -0,0 +1,205 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.core.dao.state.storage;
import bisq.common.file.FileUtil;
import bisq.common.proto.persistable.PersistenceProtoResolver;
import protobuf.BaseBlock;
import java.nio.file.Path;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
@Slf4j
public class BlocksPersistence {
// 10000->Writing 130014 blocks took 1658 msec
// 1000-> Writing 130014 blocks took 1685 msec Mapping blocks from DaoStateStore took 2250 ms
public static final int BUCKET_SIZE = 1000; // results in about 1 MB files and about 1 new file per week
private final File storageDir;
private final String fileName;
private final PersistenceProtoResolver persistenceProtoResolver;
private Path usedTempFilePath;
public BlocksPersistence(File storageDir, String fileName, PersistenceProtoResolver persistenceProtoResolver) {
this.storageDir = storageDir;
this.fileName = fileName;
this.persistenceProtoResolver = persistenceProtoResolver;
/* if (!storageDir.exists()) {
storageDir.mkdir();
}*/
}
public void writeBlocks(List<BaseBlock> protobufBlocks) {
long ts = System.currentTimeMillis();
if (!storageDir.exists()) {
storageDir.mkdir();
}
List<BaseBlock> temp = new ArrayList<>();
int bucketIndex = 0;
for (BaseBlock block : protobufBlocks) {
temp.add(block);
int height = block.getHeight();
bucketIndex = height / BUCKET_SIZE;
int remainder = height % BUCKET_SIZE;
boolean isLastBucketItem = remainder == 0;
if (isLastBucketItem) {
int first = bucketIndex * BUCKET_SIZE - BUCKET_SIZE + 1;
int last = bucketIndex * BUCKET_SIZE;
File storageFile = new File(storageDir, fileName + "_" + first + "-" + last);
// log.error("addAll height={} items={}", height, temp.stream().map(e -> e.getHeight() + ", ").collect(Collectors.toList()));
writeToDisk(storageFile, new BsqBlockStore(temp), null);
temp = new ArrayList<>();
}
}
if (!temp.isEmpty()) {
bucketIndex++;
int first = bucketIndex * BUCKET_SIZE - BUCKET_SIZE + 1;
int last = bucketIndex * BUCKET_SIZE;
File storageFile = new File(storageDir, fileName + "_" + first + "-" + last);
// log.error("items={}", temp.stream().map(e -> e.getHeight()).collect(Collectors.toList()));
writeToDisk(storageFile, new BsqBlockStore(temp), null);
}
log.error("Write {} blocks to disk took {} msec", protobufBlocks.size(), System.currentTimeMillis() - ts);
}
public void removeBlocksDirectory() {
if (storageDir.exists()) {
try {
FileUtil.deleteDirectory(storageDir);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public List<BaseBlock> readBlocks(int from, int to) {
if (!storageDir.exists()) {
storageDir.mkdir();
}
// log.error("getBlocks {}-{}", from, to);
long ts = System.currentTimeMillis();
// from = Math.max(571747, from);
List<BaseBlock> buckets = new ArrayList<>();
int start = from / BUCKET_SIZE + 1;
int end = to / BUCKET_SIZE + 1;
for (int bucketIndex = start; bucketIndex <= end; bucketIndex++) {
List<BaseBlock> bucket = readBucket(bucketIndex);
// log.error("read bucketIndex {}, items={}", bucketIndex, bucket.stream().map(e -> e.getHeight() + ", ").collect(Collectors.toList()));
buckets.addAll(bucket);
}
log.error("Reading {} blocks took {} msec", buckets.size(), System.currentTimeMillis() - ts);
// System.exit(0);
return buckets;
}
private List<BaseBlock> readBucket(int bucketIndex) {
int first = bucketIndex * BUCKET_SIZE - BUCKET_SIZE + 1;
int last = bucketIndex * BUCKET_SIZE;
/* int first = bucketIndex * BUCKET_SIZE + 1;
int last = first + BUCKET_SIZE - 1;*/
String child = fileName + "_" + first + "-" + last;
// log.error("getBlocksOfBucket {}", child);
File storageFile = new File(storageDir, child);
if (!storageFile.exists()) {
// log.error("storageFile not existing {}", storageFile.getName());
return new ArrayList<>();
}
try (FileInputStream fileInputStream = new FileInputStream(storageFile)) {
protobuf.PersistableEnvelope proto = protobuf.PersistableEnvelope.parseDelimitedFrom(fileInputStream);
BsqBlockStore bsqBlockStore = (BsqBlockStore) persistenceProtoResolver.fromProto(proto);
return bsqBlockStore.getBlocksAsProto();
} catch (Throwable t) {
log.error("Reading {} failed with {}.", fileName, t.getMessage());
return new ArrayList<>();
}
}
private void writeToDisk(File storageFile,
BsqBlockStore bsqBlockStore,
@Nullable Runnable completeHandler) {
long ts = System.currentTimeMillis();
File tempFile = null;
FileOutputStream fileOutputStream = null;
try {
tempFile = usedTempFilePath != null
? FileUtil.createNewFile(usedTempFilePath)
: File.createTempFile("temp_" + fileName, null, storageDir);
// Don't use a new temp file path each time, as that causes the delete-on-exit hook to leak memory:
tempFile.deleteOnExit();
fileOutputStream = new FileOutputStream(tempFile);
bsqBlockStore.toProtoMessage().writeDelimitedTo(fileOutputStream);
// Attempt to force the bits to hit the disk. In reality the OS or hard disk itself may still decide
// to not write through to physical media for at least a few seconds, but this is the best we can do.
fileOutputStream.flush();
fileOutputStream.getFD().sync();
// Close resources before replacing file with temp file because otherwise it causes problems on windows
// when rename temp file
fileOutputStream.close();
FileUtil.renameFile(tempFile, storageFile);
usedTempFilePath = tempFile.toPath();
} catch (Throwable t) {
// If an error occurred, don't attempt to reuse this path again, in case temp file cleanup fails.
usedTempFilePath = null;
log.error("Error at saveToFile, storageFile={}", fileName, t);
} finally {
if (tempFile != null && tempFile.exists()) {
log.warn("Temp file still exists after failed save. We will delete it now. storageFile={}", fileName);
if (!tempFile.delete()) {
log.error("Cannot delete temp file.");
}
}
try {
if (fileOutputStream != null) {
fileOutputStream.close();
}
} catch (IOException e) {
e.printStackTrace();
log.error("Cannot close resources." + e.getMessage());
}
// log.info("Writing the serialized {} completed in {} msec", fileName, System.currentTimeMillis() - ts);
if (completeHandler != null) {
completeHandler.run();
}
}
}
}

View File

@ -0,0 +1,52 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.core.dao.state.storage;
import bisq.common.proto.persistable.PersistableEnvelope;
import protobuf.BaseBlock;
import com.google.protobuf.Message;
import java.util.List;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* Wrapper for list of blocks
*/
@Slf4j
public class BsqBlockStore implements PersistableEnvelope {
@Getter
private final List<BaseBlock> blocksAsProto;
public BsqBlockStore(List<protobuf.BaseBlock> blocksAsProto) {
this.blocksAsProto = blocksAsProto;
}
public Message toProtoMessage() {
return protobuf.PersistableEnvelope.newBuilder()
.setBsqBlockStore(protobuf.BsqBlockStore.newBuilder().addAllBlocks(blocksAsProto))
.build();
}
public static BsqBlockStore fromProto(protobuf.BsqBlockStore proto) {
return new BsqBlockStore(proto.getBlocksList());
}
}

View File

@ -0,0 +1,173 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.core.dao.state.storage;
import bisq.core.dao.state.GenesisTxInfo;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.common.config.Config;
import bisq.common.proto.persistable.PersistenceProtoResolver;
import protobuf.BaseBlock;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.apache.commons.io.FileUtils;
import java.net.URL;
import java.io.File;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Singleton
public class BsqBlocksStorageService {
public final static String NAME = "BsqBlocks";
private final int genesisBlockHeight;
private final File storageDir;
private final BlocksPersistence blocksPersistence;
@Getter
private int chainHeightOfPersistedBlocks;
@Inject
public BsqBlocksStorageService(GenesisTxInfo genesisTxInfo,
PersistenceProtoResolver persistenceProtoResolver,
@Named(Config.STORAGE_DIR) File dbStorageDir) {
genesisBlockHeight = genesisTxInfo.getGenesisBlockHeight();
storageDir = new File(dbStorageDir.getAbsolutePath() + File.separator + NAME);
blocksPersistence = new BlocksPersistence(storageDir, NAME, persistenceProtoResolver);
}
public void persistBlocks(List<Block> blocks) {
long ts = System.currentTimeMillis();
List<BaseBlock> protobufBlocks = blocks.stream()
.map(Block::toProtoMessage)
.collect(Collectors.toList());
blocksPersistence.writeBlocks(protobufBlocks);
if (!blocks.isEmpty()) {
chainHeightOfPersistedBlocks = Math.max(chainHeightOfPersistedBlocks,
getHeightOfLastFullBucket(blocks));
}
log.error("Persist (serialize+write) {} blocks took {} ms",
blocks.size(),
System.currentTimeMillis() - ts);
}
public LinkedList<Block> readBlocks(int chainHeight) {
long ts = System.currentTimeMillis();
LinkedList<Block> blocks = new LinkedList<>();
blocksPersistence.readBlocks(genesisBlockHeight, chainHeight).stream()
.map(Block::fromProto)
.forEach(blocks::add);
log.error("Reading {} blocks took {} ms", blocks.size(), System.currentTimeMillis() - ts);
if (!blocks.isEmpty()) {
chainHeightOfPersistedBlocks = getHeightOfLastFullBucket(blocks);
}
return blocks;
}
public LinkedList<Block> swapBlocks(List<protobuf.BaseBlock> protobufBlocks) {
long ts = System.currentTimeMillis();
log.error("We have {} blocks in the daoStateAsProto", protobufBlocks.size());
blocksPersistence.writeBlocks(protobufBlocks);
LinkedList<Block> blocks = new LinkedList<>();
protobufBlocks.forEach(protobufBlock -> blocks.add(Block.fromProto(protobufBlock)));
if (!blocks.isEmpty()) {
chainHeightOfPersistedBlocks = getHeightOfLastFullBucket(blocks);
}
log.error("Mapping blocks (write+deserialization) from DaoStateStore took {} ms", System.currentTimeMillis() - ts);
return blocks;
}
void copyFromResources(String postFix) {
long ts = System.currentTimeMillis();
try {
String dirName = BsqBlocksStorageService.NAME;
String resourceDir = dirName + postFix;
if (storageDir.exists()) {
log.info("No resource directory was copied. {} exists already.", dirName);
return;
}
URL dirUrl = getClass().getClassLoader().getResource(resourceDir);
if (dirUrl == null) {
log.info("Directory {} in resources does not exist.", resourceDir);
return;
}
File dir = new File(dirUrl.toURI());
String[] fileNames = dir.list();
if (fileNames == null) {
log.info("No files in directory. {}", dir.getAbsolutePath());
return;
}
if (!storageDir.exists()) {
storageDir.mkdir();
}
for (String fileName : fileNames) {
URL url = getClass().getClassLoader().getResource(resourceDir + File.separator + fileName);
File resourceFile = new File(url.toURI());
File destinationFile = new File(storageDir, fileName);
FileUtils.copyFile(resourceFile, destinationFile);
}
log.error("Copying {} resource files took {} ms", fileNames.length, System.currentTimeMillis() - ts);
} catch (Throwable e) {
e.printStackTrace();
}
}
// todo
private int getHeightOfLastFullBucket(List<Block> blocks) {
int i = blocks.get(blocks.size() - 1).getHeight() / BlocksPersistence.BUCKET_SIZE;
int i1 = i * BlocksPersistence.BUCKET_SIZE;
// log.error("getHeightOfLastFullBucket {}", i * BlocksPersistence.BUCKET_SIZE);
return i1;
}
public void removeBlocksDirectory() {
blocksPersistence.removeBlocksDirectory();
}
// We recreate the directory so that we don't fill the blocks after restart from resources
// In copyFromResources we only check for the directory not the files inside.
public void removeBlocksInDirectory() {
blocksPersistence.removeBlocksDirectory();
if (!storageDir.exists()) {
storageDir.mkdir();
}
/* List<protobuf.BaseBlock> blocks = new ArrayList<>();
// height, long time, String hash, String previousBlockHash
for (int i = genesisBlockHeight; i <= chainHeightOfPersistedBlocks; i++) {
blocks.add(new Block(i, 0, "", "").toProtoMessage());
}
blocksPersistence.addAll(blocks);*/
}
}

View File

@ -17,13 +17,14 @@
package bisq.core.dao.state.storage;
import bisq.core.dao.monitoring.DaoStateMonitoringService;
import bisq.core.dao.monitoring.model.DaoStateHash;
import bisq.core.dao.state.model.DaoState;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.network.p2p.storage.persistence.ResourceDataStoreService;
import bisq.network.p2p.storage.persistence.StoreService;
import bisq.common.UserThread;
import bisq.common.config.Config;
import bisq.common.file.FileUtil;
import bisq.common.persistence.PersistenceManager;
@ -36,6 +37,7 @@ import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@ -46,7 +48,9 @@ import lombok.extern.slf4j.Slf4j;
public class DaoStateStorageService extends StoreService<DaoStateStore> {
private static final String FILE_NAME = "DaoStateStore";
private final DaoStateMonitoringService daoStateMonitoringService;
private final BsqBlocksStorageService bsqBlocksStorageService;
private final File storageDir;
private final LinkedList<Block> blocks = new LinkedList<>();
///////////////////////////////////////////////////////////////////////////////////////////
@ -55,11 +59,12 @@ public class DaoStateStorageService extends StoreService<DaoStateStore> {
@Inject
public DaoStateStorageService(ResourceDataStoreService resourceDataStoreService,
DaoStateMonitoringService daoStateMonitoringService,
BsqBlocksStorageService bsqBlocksStorageService,
@Named(Config.STORAGE_DIR) File storageDir,
PersistenceManager<DaoStateStore> persistenceManager) {
super(storageDir, persistenceManager);
this.daoStateMonitoringService = daoStateMonitoringService;
this.bsqBlocksStorageService = bsqBlocksStorageService;
this.storageDir = storageDir;
resourceDataStoreService.addService(this);
}
@ -74,7 +79,12 @@ public class DaoStateStorageService extends StoreService<DaoStateStore> {
return FILE_NAME;
}
public int getChainHeightOfPersistedBlocks() {
return bsqBlocksStorageService.getChainHeightOfPersistedBlocks();
}
public void requestPersistence(protobuf.DaoState daoStateAsProto,
List<Block> blocks,
LinkedList<DaoStateHash> daoStateHashChain,
Runnable completeHandler) {
if (daoStateAsProto == null) {
@ -82,52 +92,105 @@ public class DaoStateStorageService extends StoreService<DaoStateStore> {
return;
}
store.setDaoStateAsProto(daoStateAsProto);
store.setDaoStateHashChain(daoStateHashChain);
// We let the persistence run in a thread to avoid the slow protobuf serialisation to happen on the user
// thread. We also call it immediately to get notified about the completion event.
new Thread(() -> {
Thread.currentThread().setName("Serialize and write DaoState");
Thread.currentThread().setName("Write-blocks-and-DaoState");
bsqBlocksStorageService.persistBlocks(blocks);
store.setDaoStateAsProto(daoStateAsProto);
store.setDaoStateHashChain(daoStateHashChain);
long ts = System.currentTimeMillis();
persistenceManager.persistNow(() -> {
// After we have written to disk we remove the daoStateAsProto in the store to avoid that it stays in
// memory there until the next persist call.
pruneStore();
completeHandler.run();
log.error("Persist daoState took {} ms", System.currentTimeMillis() - ts);
store.releaseMemory();
GcUtil.maybeReleaseMemory();
UserThread.execute(completeHandler);
});
}).start();
}
public void pruneStore() {
store.setDaoStateAsProto(null);
store.setDaoStateHashChain(null);
GcUtil.maybeReleaseMemory();
@Override
protected void readFromResources(String postFix, Runnable completeHandler) {
new Thread(() -> {
Thread.currentThread().setName("copyBsqBlocksFromResources");
bsqBlocksStorageService.copyFromResources(postFix);
// We read daoState and blocks ane keep them in fields in store and
super.readFromResources(postFix, () -> {
// We got mapped back to user thread so we need to create a new thread again as we dont want to
// execute on user thread
new Thread(() -> {
Thread.currentThread().setName("Read-BsqBlocksStore");
protobuf.DaoState daoStateAsProto = store.getDaoStateAsProto();
if (daoStateAsProto != null) {
LinkedList<Block> list;
if (daoStateAsProto.getBlocksList().isEmpty()) {
list = bsqBlocksStorageService.readBlocks(daoStateAsProto.getChainHeight());
} else {
list = bsqBlocksStorageService.swapBlocks(daoStateAsProto.getBlocksList());
}
blocks.clear();
blocks.addAll(list);
}
UserThread.execute(completeHandler);
}).start();
});
}).start();
}
public DaoState getPersistedBsqState() {
protobuf.DaoState daoStateAsProto = store.getDaoStateAsProto();
if (daoStateAsProto != null) {
return DaoState.fromProto(daoStateAsProto);
} else {
return new DaoState();
long ts = System.currentTimeMillis();
DaoState daoState = DaoState.fromProto(daoStateAsProto, blocks);
log.error("Deserializing DaoState with {} blocks took {} ms",
daoState.getBlocks().size(), System.currentTimeMillis() - ts);
return daoState;
}
return new DaoState();
}
public LinkedList<DaoStateHash> getPersistedDaoStateHashChain() {
return store.getDaoStateHashChain();
}
public void releaseMemory() {
blocks.clear();
store.releaseMemory();
GcUtil.maybeReleaseMemory();
}
public void resyncDaoStateFromGenesis(Runnable resultHandler) {
store.setDaoStateAsProto(DaoState.getCloneAsProto(new DaoState()));
String backupDirName = "out_of_sync_dao_data";
try {
removeAndBackupDaoConsensusFiles(storageDir, backupDirName);
} catch (Throwable t) {
log.error(t.toString());
}
store.setDaoStateAsProto(DaoState.getBsqStateCloneExcludingBlocks(new DaoState()));
store.setDaoStateHashChain(new LinkedList<>());
persistenceManager.persistNow(resultHandler);
bsqBlocksStorageService.removeBlocksInDirectory();
}
public void resyncDaoStateFromResources(File storageDir) throws IOException {
// We delete all DAO consensus payload data and remove the daoState so it will rebuild from latest
// We delete all DAO consensus data and remove the daoState so it will rebuild from latest
// resource files.
long currentTime = System.currentTimeMillis();
String backupDirName = "out_of_sync_dao_data";
removeAndBackupDaoConsensusFiles(storageDir, backupDirName);
String newFileName = "DaoStateStore_" + System.currentTimeMillis();
FileUtil.removeAndBackupFile(storageDir, new File(storageDir, "DaoStateStore"), newFileName, backupDirName);
bsqBlocksStorageService.removeBlocksDirectory();
}
private void removeAndBackupDaoConsensusFiles(File storageDir, String backupDirName) throws IOException {
// We delete all DAO consensus data. Some will be rebuild from resources.
long currentTime = System.currentTimeMillis();
String newFileName = "BlindVoteStore_" + currentTime;
FileUtil.removeAndBackupFile(storageDir, new File(storageDir, "BlindVoteStore"), newFileName, backupDirName);
@ -140,9 +203,6 @@ public class DaoStateStorageService extends StoreService<DaoStateStore> {
newFileName = "UnconfirmedBsqChangeOutputList_" + currentTime;
FileUtil.removeAndBackupFile(storageDir, new File(storageDir, "UnconfirmedBsqChangeOutputList"), newFileName, backupDirName);
newFileName = "DaoStateStore_" + currentTime;
FileUtil.removeAndBackupFile(storageDir, new File(storageDir, "DaoStateStore"), newFileName, backupDirName);
}
@ -152,7 +212,7 @@ public class DaoStateStorageService extends StoreService<DaoStateStore> {
@Override
protected DaoStateStore createStore() {
return new DaoStateStore(null, new LinkedList<>(daoStateMonitoringService.getDaoStateHashChain()));
return new DaoStateStore(null, new LinkedList<>());
}
@Override

View File

@ -76,4 +76,10 @@ public class DaoStateStore implements PersistableEnvelope {
.collect(Collectors.toList()));
return new DaoStateStore(proto.getDaoState(), daoStateHashList);
}
public void releaseMemory() {
daoStateAsProto = null;
daoStateHashChain = null;
}
}

View File

@ -30,6 +30,7 @@ import bisq.core.dao.governance.proposal.MyProposalList;
import bisq.core.dao.governance.proposal.storage.appendonly.ProposalStore;
import bisq.core.dao.governance.proposal.storage.temp.TempProposalStore;
import bisq.core.dao.state.model.governance.BallotList;
import bisq.core.dao.state.storage.BsqBlockStore;
import bisq.core.dao.state.storage.DaoStateStore;
import bisq.core.dao.state.unconfirmed.UnconfirmedBsqChangeOutputList;
import bisq.core.payment.PaymentAccountList;
@ -138,6 +139,8 @@ public class CorePersistenceProtoResolver extends CoreProtoResolver implements P
return IgnoredMailboxMap.fromProto(proto.getIgnoredMailboxMap());
case REMOVED_PAYLOADS_MAP:
return RemovedPayloadsMap.fromProto(proto.getRemovedPayloadsMap());
case BSQ_BLOCK_STORE:
return BsqBlockStore.fromProto(proto.getBsqBlockStore());
default:
throw new ProtobufferRuntimeException("Unknown proto message case(PB.PersistableEnvelope). " +
"messageCase=" + proto.getMessageCase() + "; proto raw data=" + proto.toString());

View File

@ -1469,6 +1469,7 @@ message PersistableEnvelope {
MailboxMessageList mailbox_message_list = 32;
IgnoredMailboxMap ignored_mailbox_map = 33;
RemovedPayloadsMap removed_payloads_map = 34;
BsqBlockStore bsq_block_store = 35;
}
}
@ -1996,6 +1997,10 @@ message BaseBlock {
}
}
message BsqBlockStore {
repeated BaseBlock blocks = 1;
}
message RawBlock {
// Because of the way how PB implements inheritance we need to use the super class as type
repeated BaseTx raw_txs = 1;