Merge pull request #7181 from HenrikJannsen/3_improve-resync-handling

Improve resync handling [C]
This commit is contained in:
Alejandro García 2024-06-28 21:54:54 +00:00 committed by GitHub
commit 6963caba80
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 144 additions and 118 deletions

View file

@ -32,6 +32,7 @@ import bisq.core.dao.state.GenesisTxInfo;
import bisq.core.dao.state.model.blockchain.BaseTxOutput;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.governance.IssuanceType;
import bisq.core.dao.state.storage.DaoStateStorageService;
import bisq.core.user.Preferences;
import bisq.network.p2p.NodeAddress;
@ -41,7 +42,6 @@ import bisq.network.p2p.seed.SeedNodeRepository;
import bisq.common.UserThread;
import bisq.common.config.Config;
import bisq.common.crypto.Hash;
import bisq.common.file.FileUtil;
import bisq.common.util.Hex;
import bisq.common.util.Utilities;
@ -102,6 +102,7 @@ public class DaoStateMonitoringService implements DaoSetupService, DaoStateListe
}
private final DaoStateService daoStateService;
private final DaoStateStorageService daoStateStorageService;
private final DaoStateNetworkService daoStateNetworkService;
private final GenesisTxInfo genesisTxInfo;
private final Set<String> seedNodeAddresses;
@ -144,6 +145,7 @@ public class DaoStateMonitoringService implements DaoSetupService, DaoStateListe
@Inject
public DaoStateMonitoringService(DaoStateService daoStateService,
DaoStateStorageService daoStateStorageService,
DaoStateNetworkService daoStateNetworkService,
GenesisTxInfo genesisTxInfo,
SeedNodeRepository seedNodeRepository,
@ -151,6 +153,7 @@ public class DaoStateMonitoringService implements DaoSetupService, DaoStateListe
@Named(Config.STORAGE_DIR) File storageDir,
@Named(Config.IGNORE_DEV_MSG) boolean ignoreDevMsg) {
this.daoStateService = daoStateService;
this.daoStateStorageService = daoStateStorageService;
this.daoStateNetworkService = daoStateNetworkService;
this.genesisTxInfo = genesisTxInfo;
this.preferences = preferences;
@ -481,35 +484,15 @@ public class DaoStateMonitoringService implements DaoSetupService, DaoStateListe
Hex.encode(checkpoint.getHash()),
checkpoint);
try {
// Delete state and stop
removeFile("DaoStateStore");
removeFile("BlindVoteStore");
removeFile("ProposalStore");
removeFile("TempProposalStore");
listeners.forEach(Listener::onCheckpointFailed);
daoStateStorageService.removeAndBackupAllDaoData();
} catch (Throwable t) {
log.error("removeAndBackupAllDaoData failed", t);
}
listeners.forEach(Listener::onCheckpointFailed);
}
}));
}
private void removeFile(String storeName) {
long currentTime = System.currentTimeMillis();
String newFileName = storeName + "_" + currentTime;
String backupDirName = "out_of_sync_dao_data";
File corrupted = new File(storageDir, storeName);
try {
if (corrupted.exists()) {
FileUtil.removeAndBackupFile(storageDir, corrupted, newFileName, backupDirName);
}
} catch (Throwable t) {
t.printStackTrace();
log.error(t.toString());
}
}
private boolean isSeedNode(String peersNodeAddress) {
return seedNodeAddresses.contains(peersNodeAddress);
}

View file

@ -56,7 +56,7 @@ public abstract class BsqNode implements DaoSetupService {
private final String genesisTxId;
private final int genesisBlockHeight;
private final ExportJsonFilesService exportJsonFilesService;
private final DaoStateSnapshotService daoStateSnapshotService;
protected final DaoStateSnapshotService daoStateSnapshotService;
private final P2PServiceListener p2PServiceListener;
protected boolean parseBlockchainComplete;
protected boolean p2pNetworkReady;
@ -169,7 +169,7 @@ public abstract class BsqNode implements DaoSetupService {
@SuppressWarnings("WeakerAccess")
protected void onInitialized() {
daoStateSnapshotService.applySnapshot(false);
daoStateSnapshotService.applyPersistedSnapshot();
if (p2PService.isBootstrapped()) {
log.info("onAllServicesInitialized: isBootstrapped");
@ -195,12 +195,6 @@ public abstract class BsqNode implements DaoSetupService {
maybeExportToJson();
}
@SuppressWarnings("WeakerAccess")
protected void startReOrgFromLastSnapshot() {
daoStateSnapshotService.applySnapshot(true);
}
protected Optional<Block> doParseBlock(RawBlock rawBlock) throws RequiredReorgFromSnapshotException {
if (shutdownInProgress) {
return Optional.empty();
@ -273,7 +267,7 @@ public abstract class BsqNode implements DaoSetupService {
lastBlock.isPresent() ? lastBlock.get().getHash() : "lastBlock not present");
pendingBlocks.clear();
startReOrgFromLastSnapshot();
daoStateSnapshotService.revertToLastSnapshot();
startParseBlocks();
throw new RequiredReorgFromSnapshotException(rawBlock);
}

View file

@ -270,7 +270,7 @@ public class FullNode extends BsqNode {
if (numExceptions > 10) {
log.warn("We got {} RPC HttpExceptions at our block handler.", numExceptions);
pendingBlocks.clear();
startReOrgFromLastSnapshot();
revertToLastSnapshot();
startParseBlocks();
numExceptions = 0;
}
@ -301,7 +301,7 @@ public class FullNode extends BsqNode {
return;
} else if (cause instanceof NotificationHandlerException) {
log.error("Error from within block notification daemon: {}", cause.getCause().toString());
startReOrgFromLastSnapshot();
revertToLastSnapshot();
startParseBlocks();
return;
} else if (cause instanceof Error) {
@ -314,4 +314,8 @@ public class FullNode extends BsqNode {
errorMessageHandler.accept(errorMessage);
}
}
private void revertToLastSnapshot() {
daoStateSnapshotService.revertToLastSnapshot();
}
}

View file

@ -39,6 +39,7 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@ -78,7 +79,7 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
private int daoRequiresRestartHandlerAttempts = 0;
private boolean readyForPersisting = true;
private boolean isParseBlockChainComplete;
private final List<Integer> heightsOfLastAppliedSnapshots = new ArrayList<>();
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -269,47 +270,72 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
log.info("Cloned new daoStateCandidate at height {} took {} ms.", snapshotHeight, System.currentTimeMillis() - ts);
}
public void applySnapshot(boolean fromReorg) {
DaoState persistedBsqState = daoStateStorageService.getPersistedBsqState();
LinkedList<DaoStateHash> persistedDaoStateHashChain = daoStateStorageService.getPersistedDaoStateHashChain();
if (persistedBsqState != null) {
int chainHeightOfPersisted = persistedBsqState.getChainHeight();
if (!persistedBsqState.getBlocks().isEmpty()) {
int heightOfLastBlock = persistedBsqState.getLastBlock().getHeight();
if (heightOfLastBlock != chainHeightOfPersisted) {
log.warn("chainHeightOfPersisted must be same as heightOfLastBlock. heightOfLastBlock={}, chainHeightOfPersisted={}",
heightOfLastBlock, chainHeightOfPersisted);
resyncDaoStateFromResources();
return;
}
if (isHeightAtLeastGenesisHeight(heightOfLastBlock)) {
if (chainHeightOfLastAppliedSnapshot != chainHeightOfPersisted) {
chainHeightOfLastAppliedSnapshot = chainHeightOfPersisted;
daoStateService.applySnapshot(persistedBsqState);
daoStateMonitoringService.applySnapshot(persistedDaoStateHashChain);
daoStateStorageService.releaseMemory();
} else {
// The reorg might have been caused by the previous parsing which might contains a range of
// blocks.
log.warn("We applied already a snapshot with chainHeight {}. " +
"We remove all dao store files and shutdown. After a restart resource files will " +
"be applied if available.",
chainHeightOfLastAppliedSnapshot);
resyncDaoStateFromResources();
}
}
} else if (fromReorg) {
log.info("We got a reorg and we want to apply the snapshot but it is empty. " +
public void applyPersistedSnapshot() {
applySnapshot(true);
}
public void revertToLastSnapshot() {
applySnapshot(false);
}
private void applySnapshot(boolean fromInitialize) {
DaoState persistedDaoState = daoStateStorageService.getPersistedBsqState();
if (persistedDaoState == null) {
log.info("Try to apply snapshot but no stored snapshot available. That is expected at first blocks.");
return;
}
int chainHeightOfPersistedDaoState = persistedDaoState.getChainHeight();
int numSameAppliedSnapshots = (int) heightsOfLastAppliedSnapshots.stream()
.filter(height -> height == chainHeightOfPersistedDaoState)
.count();
if (numSameAppliedSnapshots >= 3) {
log.warn("We got called applySnapshot the 3rd time with the same snapshot height. " +
"We abort and call resyncDaoStateFromResources.");
resyncDaoStateFromResources();
return;
}
heightsOfLastAppliedSnapshots.add(chainHeightOfPersistedDaoState);
if (persistedDaoState.getBlocks().isEmpty()) {
if (fromInitialize) {
log.info("No Bsq blocks in DaoState. Expected if no data are provided yet from resources or persisted data.");
} else {
log.info("We got a reorg or error and we want to apply the snapshot but it is empty. " +
"That is expected in the first blocks until the first snapshot has been created. " +
"We remove all dao store files and shutdown. " +
"After a restart resource files will be applied if available.");
resyncDaoStateFromResources();
} else {
log.info("No Bsq blocks in DaoState. Expected if no data are provided yet from resources or persisted data.");
}
} else {
log.info("Try to apply snapshot but no stored snapshot available. That is expected at first blocks.");
return;
}
if (!daoStateStorageService.isChainHeighMatchingLastBlockHeight()) {
resyncDaoStateFromResources();
return;
}
if (!isHeightAtLeastGenesisHeight(chainHeightOfPersistedDaoState)) {
log.error("heightOfPersistedLastBlock is below genesis height. This should never happen.");
return;
}
if (chainHeightOfLastAppliedSnapshot == chainHeightOfPersistedDaoState) {
// The reorg might have been caused by the previous parsing which might contains a range of
// blocks.
log.warn("We applied already a snapshot with chainHeight {}. " +
"We remove all dao store files and shutdown. After a restart resource files will " +
"be applied if available.",
chainHeightOfLastAppliedSnapshot);
resyncDaoStateFromResources();
return;
}
chainHeightOfLastAppliedSnapshot = chainHeightOfPersistedDaoState;
daoStateService.applySnapshot(persistedDaoState);
LinkedList<DaoStateHash> persistedDaoStateHashChain = daoStateStorageService.getPersistedDaoStateHashChain();
daoStateMonitoringService.applySnapshot(persistedDaoStateHashChain);
daoStateStorageService.releaseMemory();
}
@ -323,20 +349,20 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
private void resyncDaoStateFromResources() {
log.info("resyncDaoStateFromResources called");
if (resyncDaoStateFromResourcesHandler == null && ++daoRequiresRestartHandlerAttempts <= 3) {
log.warn("resyncDaoStateFromResourcesHandler has not been initialized yet, will try again in 10 seconds");
UserThread.runAfter(this::resyncDaoStateFromResources, 10); // a delay for the app to init
return;
if (resyncDaoStateFromResourcesHandler == null) {
if (++daoRequiresRestartHandlerAttempts <= 3) {
log.warn("resyncDaoStateFromResourcesHandler has not been initialized yet, will try again in 10 seconds");
UserThread.runAfter(this::resyncDaoStateFromResources, 10); // a delay for the app to init
return;
} else {
log.warn("No resyncDaoStateFromResourcesHandler has not been set. We shutdown non-gracefully with a failure code on exit");
System.exit(1);
}
}
try {
daoStateStorageService.removeAndBackupAllDaoData();
// the restart handler informs the user of the need to restart bisq (in desktop mode)
if (resyncDaoStateFromResourcesHandler == null) {
log.error("resyncDaoStateFromResourcesHandler COULD NOT be called as it has not been initialized yet");
} else {
log.info("calling resyncDaoStateFromResourcesHandler...");
resyncDaoStateFromResourcesHandler.run();
}
resyncDaoStateFromResourcesHandler.run();
} catch (IOException e) {
log.error("Error at resyncDaoStateFromResources: {}", e.toString());
}

View file

@ -46,7 +46,7 @@ public class BsqBlocksStorageService {
public final static String NAME = "BsqBlocks";
private final int genesisBlockHeight;
private final File storageDir;
private final File blocksDir;
private final BlocksPersistence blocksPersistence;
@Getter
private int chainHeightOfPersistedBlocks;
@ -54,10 +54,10 @@ public class BsqBlocksStorageService {
@Inject
public BsqBlocksStorageService(GenesisTxInfo genesisTxInfo,
PersistenceProtoResolver persistenceProtoResolver,
@Named(Config.STORAGE_DIR) File dbStorageDir) {
@Named(Config.STORAGE_DIR) File storageDir) {
genesisBlockHeight = genesisTxInfo.getGenesisBlockHeight();
storageDir = new File(dbStorageDir.getAbsolutePath() + File.separator + NAME);
blocksPersistence = new BlocksPersistence(storageDir, NAME, persistenceProtoResolver);
blocksDir = new File(storageDir.getAbsolutePath() + File.separator + NAME);
blocksPersistence = new BlocksPersistence(blocksDir, NAME, persistenceProtoResolver);
}
public void persistBlocks(List<Block> blocks) {
@ -108,7 +108,7 @@ public class BsqBlocksStorageService {
String dirName = BsqBlocksStorageService.NAME;
String resourceDir = dirName + postFix;
try {
if (storageDir.exists()) {
if (blocksDir.exists()) {
log.info("No resource directory was copied. {} exists already.", dirName);
return;
}
@ -118,11 +118,11 @@ public class BsqBlocksStorageService {
log.info("No files in directory. {}", resourceDir);
return;
}
if (!storageDir.exists()) {
storageDir.mkdir();
if (!blocksDir.exists()) {
blocksDir.mkdir();
}
for (String fileName : fileNames) {
File destinationFile = new File(storageDir, fileName);
File destinationFile = new File(blocksDir, fileName);
// File.separator doesn't appear to work on Windows. It has to be "/", not "\".
// See: https://github.com/bisq-network/bisq/pull/5909#pullrequestreview-827992563
FileUtil.resourceToFile(resourceDir + "/" + fileName, destinationFile);
@ -144,12 +144,9 @@ public class BsqBlocksStorageService {
blocksPersistence.removeBlocksDirectory();
}
// We recreate the directory so that we don't fill the blocks after restart from resources
// In copyFromResources we only check for the directory not the files inside.
public void removeBlocksInDirectory() {
blocksPersistence.removeBlocksDirectory();
if (!storageDir.exists()) {
storageDir.mkdir();
public void makeBlocksDirectory() {
if (!blocksDir.exists()) {
blocksDir.mkdir();
}
}
}

View file

@ -180,6 +180,19 @@ public class DaoStateStorageService extends StoreService<DaoStateStore> {
return new DaoState();
}
public boolean isChainHeighMatchingLastBlockHeight() {
DaoState persistedDaoState = getPersistedBsqState();
int heightOfPersistedLastBlock = persistedDaoState.getLastBlock().getHeight();
int chainHeightOfPersistedDaoState = persistedDaoState.getChainHeight();
boolean isMatching = heightOfPersistedLastBlock == chainHeightOfPersistedDaoState;
if (!isMatching) {
log.warn("heightOfPersistedLastBlock is not same as chainHeightOfPersistedDaoState.\n" +
"heightOfPersistedLastBlock={}; chainHeightOfPersistedDaoState={}",
heightOfPersistedLastBlock, chainHeightOfPersistedDaoState);
}
return isMatching;
}
public LinkedList<DaoStateHash> getPersistedDaoStateHashChain() {
return store.getDaoStateHashChain();
}
@ -191,46 +204,46 @@ public class DaoStateStorageService extends StoreService<DaoStateStore> {
}
public void resyncDaoStateFromGenesis(Runnable resultHandler) {
String backupDirName = "out_of_sync_dao_data";
try {
removeAndBackupDaoConsensusFiles(storageDir, backupDirName);
removeAndBackupDaoConsensusFiles(false);
// We recreate the directory so that we don't fill the blocks after restart from resources
// In copyFromResources we only check for the directory not the files inside.
bsqBlocksStorageService.makeBlocksDirectory();
} catch (Throwable t) {
log.error(t.toString());
}
// Reset to empty DaoState and DaoStateHashChain
store.setDaoStateAsProto(DaoState.getBsqStateCloneExcludingBlocks(new DaoState()));
store.setDaoStateHashChain(new LinkedList<>());
persistenceManager.persistNow(resultHandler);
bsqBlocksStorageService.removeBlocksInDirectory();
}
public void removeAndBackupAllDaoData() throws IOException {
// We delete all DAO consensus data and remove the daoState so it will rebuild from latest
// We delete all DAO consensus data and remove the daoState and blocks, so it will rebuild from latest
// resource files.
String backupDirName = "out_of_sync_dao_data";
removeAndBackupDaoConsensusFiles(storageDir, backupDirName);
String newFileName = "DaoStateStore_" + System.currentTimeMillis();
FileUtil.removeAndBackupFile(storageDir, new File(storageDir, "DaoStateStore"), newFileName, backupDirName);
removeAndBackupDaoConsensusFiles(true);
}
private void removeAndBackupDaoConsensusFiles(boolean removeDaoStateStore) throws IOException {
// We delete all DAO related data. At re-start they will get rebuilt from resources.
if (removeDaoStateStore) {
removeAndBackupFile("DaoStateStore");
}
removeAndBackupFile("BlindVoteStore");
removeAndBackupFile("ProposalStore");
// We also need to remove ballot list as it contains the proposals as well. It will be recreated at resync
removeAndBackupFile("BallotList");
removeAndBackupFile("UnconfirmedBsqChangeOutputList");
removeAndBackupFile("TempProposalStore");
removeAndBackupFile("BurningManAccountingStore_v3");
bsqBlocksStorageService.removeBlocksDirectory();
}
private void removeAndBackupDaoConsensusFiles(File storageDir, String backupDirName) throws IOException {
// We delete all DAO related data. Some will be rebuild from resources.
long currentTime = System.currentTimeMillis();
String newFileName = "BlindVoteStore_" + currentTime;
FileUtil.removeAndBackupFile(storageDir, new File(storageDir, "BlindVoteStore"), newFileName, backupDirName);
newFileName = "ProposalStore_" + currentTime;
FileUtil.removeAndBackupFile(storageDir, new File(storageDir, "ProposalStore"), newFileName, backupDirName);
// We also need to remove ballot list as it contains the proposals as well. It will be recreated at resync
newFileName = "BallotList_" + currentTime;
FileUtil.removeAndBackupFile(storageDir, new File(storageDir, "BallotList"), newFileName, backupDirName);
newFileName = "UnconfirmedBsqChangeOutputList_" + currentTime;
FileUtil.removeAndBackupFile(storageDir, new File(storageDir, "UnconfirmedBsqChangeOutputList"), newFileName, backupDirName);
private void removeAndBackupFile(String fileName) throws IOException {
String backupDirName = "out_of_sync_dao_data";
String newFileName = fileName + "_" + System.currentTimeMillis();
FileUtil.removeAndBackupFile(storageDir, new File(storageDir, fileName), newFileName, backupDirName);
}

View file

@ -433,6 +433,7 @@ public class MainView extends InitializableView<StackPane, MainViewModel>
public void onCheckpointFailed() {
new Popup().attention(Res.get("dao.monitor.daoState.checkpoint.popup"))
.useShutDownButton()
.hideCloseButton()
.show();
}

View file

@ -19,6 +19,7 @@ package bisq.seednode;
import bisq.core.app.TorSetup;
import bisq.core.app.misc.ExecutableForAppWithP2p;
import bisq.core.dao.monitoring.DaoStateMonitoringService;
import bisq.core.dao.state.DaoStateSnapshotService;
import bisq.core.user.CookieKey;
import bisq.core.user.User;
@ -144,6 +145,13 @@ public class SeedNodeMain extends ExecutableForAppWithP2p {
}, log::error);
injector.getInstance(DaoStateMonitoringService.class).addListener(new DaoStateMonitoringService.Listener() {
@Override
public void onCheckpointFailed() {
gracefulShutDown();
}
});
injector.getInstance(P2PService.class).addP2PServiceListener(new P2PServiceListener() {
@Override
public void onDataReceived() {