Merge pull request #5935 from jmacxx/dao_state_store_and_statistics

Improve DAO state store persistence and statistics logging [chimp1984]
This commit is contained in:
Christoph Atteneder 2022-01-03 12:24:18 +01:00 committed by GitHub
commit dde48f7d9e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 47 additions and 20 deletions

View file

@ -127,7 +127,7 @@ public class DaoStateMonitoringService implements DaoSetupService, DaoStateListe
@Nullable
private Runnable createSnapshotHandler;
// Lookup map
private Map<Integer, DaoStateBlock> daoStateBlockByHeight = new HashMap<>();
private final Map<Integer, DaoStateBlock> daoStateBlockByHeight = new HashMap<>();
///////////////////////////////////////////////////////////////////////////////////////////

View file

@ -157,6 +157,7 @@ public abstract class BsqNode implements DaoSetupService {
public void shutDown() {
exportJsonFilesService.shutDown();
daoStateSnapshotService.shutDown();
}

View file

@ -122,6 +122,9 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
public void start() {
}
public void shutDown() {
daoStateStorageService.shutDown();
}
///////////////////////////////////////////////////////////////////////////////////////////
// DaoStateListener

View file

@ -33,11 +33,18 @@ import bisq.common.util.GcUtil;
import javax.inject.Inject;
import javax.inject.Named;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@ -53,6 +60,8 @@ public class DaoStateStorageService extends StoreService<DaoStateStore> {
private final BsqBlocksStorageService bsqBlocksStorageService;
private final File storageDir;
private final LinkedList<Block> blocks = new LinkedList<>();
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private Optional<Future<?>> future = Optional.empty();
///////////////////////////////////////////////////////////////////////////////////////////
@ -94,22 +103,36 @@ public class DaoStateStorageService extends StoreService<DaoStateStore> {
return;
}
new Thread(() -> {
Thread.currentThread().setName("Write-blocks-and-DaoState");
bsqBlocksStorageService.persistBlocks(blocks);
if (future.isPresent() && !future.get().isDone()) {
UserThread.runAfter(() -> requestPersistence(daoStateAsProto, blocks, daoStateHashChain, completeHandler), 2);
return;
}
store.setDaoStateAsProto(daoStateAsProto);
store.setDaoStateHashChain(daoStateHashChain);
long ts = System.currentTimeMillis();
persistenceManager.persistNow(() -> {
// After we have written to disk we remove the daoStateAsProto in the store to avoid that it stays in
// memory there until the next persist call.
log.info("Persist daoState took {} ms", System.currentTimeMillis() - ts);
store.releaseMemory();
GcUtil.maybeReleaseMemory();
UserThread.execute(completeHandler);
});
}).start();
future = Optional.of(executorService.submit(() -> {
try {
Thread.currentThread().setName("Write-blocks-and-DaoState");
bsqBlocksStorageService.persistBlocks(blocks);
store.setDaoStateAsProto(daoStateAsProto);
store.setDaoStateHashChain(daoStateHashChain);
long ts = System.currentTimeMillis();
persistenceManager.persistNow(() -> {
// After we have written to disk we remove the daoStateAsProto in the store to avoid that it stays in
// memory there until the next persist call.
log.info("Persist daoState took {} ms", System.currentTimeMillis() - ts);
store.releaseMemory();
GcUtil.maybeReleaseMemory();
UserThread.execute(completeHandler);
});
} catch (Exception e) {
log.error("Exception at persisting BSQ blocks and DaoState", e);
}
}));
}
public void shutDown() {
executorService.shutdown();
// noinspection UnstableApiUsage
MoreExecutors.shutdownAndAwaitTermination(executorService, 10, TimeUnit.SECONDS);
}
@Override

View file

@ -72,7 +72,7 @@ public class Statistic {
totalReceivedBytesPerSec.set(((double) totalReceivedBytes.get()) / passed);
}, 1);
// We log statistics every 5 minutes
// We log statistics every 60 minutes
UserThread.runPeriodically(() -> {
String ls = System.lineSeparator();
log.info("Accumulated network statistics:" + ls +
@ -81,14 +81,14 @@ public class Statistic {
"Number of sent messages per sec: {};" + ls +
"Bytes received: {}" + ls +
"Number of received messages/Received messages: {} / {};" + ls +
"Number of received messages per sec: {};" + ls,
"Number of received messages per sec: {}" + ls,
Utilities.readableFileSize(totalSentBytes.get()),
numTotalSentMessages.get(), totalSentMessages,
numTotalSentMessagesPerSec.get(),
Utilities.readableFileSize(totalReceivedBytes.get()),
numTotalReceivedMessages.get(), totalReceivedMessages,
numTotalReceivedMessagesPerSec.get());
}, TimeUnit.MINUTES.toSeconds(5));
}, TimeUnit.MINUTES.toSeconds(60));
}
public static LongProperty totalSentBytesProperty() {

View file

@ -174,7 +174,7 @@ public final class PeerManager implements ConnectionListener, PersistedDataHost
};
clockWatcher.addListener(clockWatcherListener);
printStatisticsTimer = UserThread.runPeriodically(this::printStatistics, TimeUnit.MINUTES.toSeconds(5));
printStatisticsTimer = UserThread.runPeriodically(this::printStatistics, TimeUnit.MINUTES.toSeconds(60));
}
public void shutDown() {