Improve handling fo p2p network data broadcasts

This commit is contained in:
Manfred Karrer 2019-03-30 18:45:58 -05:00
parent d422a732e7
commit 823cec086c
No known key found for this signature in database
GPG key ID: 401250966A6B2C46
5 changed files with 109 additions and 81 deletions

View file

@ -135,7 +135,7 @@ public class MyBlindVoteListService implements PersistedDataHost, DaoStateListen
this.myVoteListService = myVoteListService; this.myVoteListService = myVoteListService;
this.myProposalListService = myProposalListService; this.myProposalListService = myProposalListService;
numConnectedPeersListener = (observable, oldValue, newValue) -> rePublishOnceWellConnected(); numConnectedPeersListener = (observable, oldValue, newValue) -> rePublishMyBlindVoteOnceWellConnected();
} }
@ -176,7 +176,7 @@ public class MyBlindVoteListService implements PersistedDataHost, DaoStateListen
@Override @Override
public void onParseBlockChainComplete() { public void onParseBlockChainComplete() {
rePublishOnceWellConnected(); rePublishMyBlindVoteOnceWellConnected();
} }
@ -351,15 +351,16 @@ public class MyBlindVoteListService implements PersistedDataHost, DaoStateListen
return bsqWalletService.signTx(txWithBtcFee); return bsqWalletService.signTx(txWithBtcFee);
} }
private void rePublishOnceWellConnected() { private void rePublishMyBlindVoteOnceWellConnected() {
// We republish at each startup at any block during the cycle. We filter anyway for valid blind votes
// of that cycle so it is 1 blind vote getting rebroadcast at each startup to my neighbors.
int minPeers = BisqEnvironment.getBaseCurrencyNetwork().isMainnet() ? 4 : 1; int minPeers = BisqEnvironment.getBaseCurrencyNetwork().isMainnet() ? 4 : 1;
if ((p2PService.getNumConnectedPeers().get() >= minPeers && p2PService.isBootstrapped()) || if ((p2PService.getNumConnectedPeers().get() >= minPeers && p2PService.isBootstrapped()) ||
BisqEnvironment.getBaseCurrencyNetwork().isRegtest()) { BisqEnvironment.getBaseCurrencyNetwork().isRegtest()) {
int chainHeight = periodService.getChainHeight();
myBlindVoteList.stream() myBlindVoteList.stream()
.filter(blindVote -> periodService.isTxInPhaseAndCycle(blindVote.getTxId(), .filter(blindVote -> periodService.isTxInPhaseAndCycle(blindVote.getTxId(),
DaoPhase.Phase.BLIND_VOTE, DaoPhase.Phase.BLIND_VOTE,
chainHeight)) periodService.getChainHeight()))
.forEach(blindVote -> addToP2PNetwork(blindVote, null)); .forEach(blindVote -> addToP2PNetwork(blindVote, null));
// We delay removal of listener as we call that inside listener itself. // We delay removal of listener as we call that inside listener itself.
@ -369,13 +370,15 @@ public class MyBlindVoteListService implements PersistedDataHost, DaoStateListen
private void addToP2PNetwork(BlindVote blindVote, @Nullable ErrorMessageHandler errorMessageHandler) { private void addToP2PNetwork(BlindVote blindVote, @Nullable ErrorMessageHandler errorMessageHandler) {
BlindVotePayload blindVotePayload = new BlindVotePayload(blindVote); BlindVotePayload blindVotePayload = new BlindVotePayload(blindVote);
// We use reBroadcast flag here as we only broadcast our own blindVote and want to be sure it gets distributed
// well.
boolean success = p2PService.addPersistableNetworkPayload(blindVotePayload, true); boolean success = p2PService.addPersistableNetworkPayload(blindVotePayload, true);
if (success) { if (success) {
log.info("We added a blindVotePayload to the P2P network as append only data. blindVoteTxId={}", log.info("We added a blindVotePayload to the P2P network as append only data. blindVoteTxId={}",
blindVote.getTxId()); blindVote.getTxId());
} else { } else {
final String msg = "Adding of blindVotePayload to P2P network failed. blindVoteTxId=" + blindVote.getTxId(); String msg = "Adding of blindVotePayload to P2P network failed. blindVoteTxId=" + blindVote.getTxId();
log.error(msg); log.error(msg);
if (errorMessageHandler != null) if (errorMessageHandler != null)
errorMessageHandler.handleErrorMessage(msg); errorMessageHandler.handleErrorMessage(msg);

View file

@ -30,6 +30,7 @@ import bisq.core.dao.state.model.governance.Proposal;
import bisq.network.p2p.P2PService; import bisq.network.p2p.P2PService;
import bisq.common.UserThread;
import bisq.common.app.DevEnv; import bisq.common.app.DevEnv;
import bisq.common.crypto.KeyRing; import bisq.common.crypto.KeyRing;
import bisq.common.handlers.ErrorMessageHandler; import bisq.common.handlers.ErrorMessageHandler;
@ -217,23 +218,20 @@ public class MyProposalListService implements PersistedDataHost, DaoStateListene
} }
private void rePublishMyProposalsOnceWellConnected() { private void rePublishMyProposalsOnceWellConnected() {
// We republish at each startup at any block during the cycle. We filter anyway for valid blind votes
// of that cycle so it is 1 blind vote getting rebroadcast at each startup to my neighbors.
int minPeers = BisqEnvironment.getBaseCurrencyNetwork().isMainnet() ? 4 : 1; int minPeers = BisqEnvironment.getBaseCurrencyNetwork().isMainnet() ? 4 : 1;
if ((p2PService.getNumConnectedPeers().get() >= minPeers && p2PService.isBootstrapped()) || if ((p2PService.getNumConnectedPeers().get() >= minPeers && p2PService.isBootstrapped()) ||
BisqEnvironment.getBaseCurrencyNetwork().isRegtest()) { BisqEnvironment.getBaseCurrencyNetwork().isRegtest()) {
p2PService.getNumConnectedPeers().removeListener(numConnectedPeersListener); myProposalList.stream()
rePublishMyProposals(); .filter(proposal -> periodService.isTxInPhaseAndCycle(proposal.getTxId(),
} DaoPhase.Phase.PROPOSAL,
} periodService.getChainHeight()))
.forEach(this::addToP2PNetworkAsProtectedData);
private void rePublishMyProposals() { // We delay removal of listener as we call that inside listener itself.
myProposalList.forEach(proposal -> { UserThread.execute(() -> p2PService.getNumConnectedPeers().removeListener(numConnectedPeersListener));
String txId = proposal.getTxId(); }
if (periodService.isTxInPhaseAndCycle(txId, DaoPhase.Phase.PROPOSAL, periodService.getChainHeight())) {
boolean result = addToP2PNetworkAsProtectedData(proposal);
if (!result)
log.warn("Adding of proposal to P2P network failed.\nproposal=" + proposal);
}
});
} }
private void persist() { private void persist() {

View file

@ -41,8 +41,6 @@ import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreListener;
import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreService; import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreService;
import bisq.network.p2p.storage.persistence.ProtectedDataStoreService; import bisq.network.p2p.storage.persistence.ProtectedDataStoreService;
import bisq.common.UserThread;
import org.bitcoinj.core.Coin; import org.bitcoinj.core.Coin;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -54,7 +52,6 @@ import javafx.collections.ObservableList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import lombok.Getter; import lombok.Getter;
@ -162,11 +159,12 @@ public class ProposalService implements HashMapChangedListener, AppendOnlyDataSt
@Override @Override
public void onParseBlockCompleteAfterBatchProcessing(Block block) { public void onParseBlockCompleteAfterBatchProcessing(Block block) {
int heightForRepublishing = periodService.getFirstBlockOfPhase(daoStateService.getChainHeight(), DaoPhase.Phase.BREAK1); // We try to broadcast at any block in the break1 phase. If we have received the data already we do not
if (block.getHeight() == heightForRepublishing) { // broadcast so we do not flood the network.
if (periodService.isInPhase(block.getHeight(), DaoPhase.Phase.BREAK1)) {
// We only republish if we are completed with parsing old blocks, otherwise we would republish old // We only republish if we are completed with parsing old blocks, otherwise we would republish old
// proposals all the time // proposals all the time
publishToAppendOnlyDataStore(); maybePublishToAppendOnlyDataStore();
fillListFromAppendOnlyDataStore(); fillListFromAppendOnlyDataStore();
} }
} }
@ -217,38 +215,49 @@ public class ProposalService implements HashMapChangedListener, AppendOnlyDataSt
p2PService.getP2PDataStorage().getAppendOnlyDataStoreMap().values().forEach(e -> onAppendOnlyDataAdded(e, false)); p2PService.getP2PDataStorage().getAppendOnlyDataStoreMap().values().forEach(e -> onAppendOnlyDataAdded(e, false));
} }
private void publishToAppendOnlyDataStore() { private void maybePublishToAppendOnlyDataStore() {
// We set reBroadcast to false to avoid to flood the network.
// If we have 20 proposals and 200 nodes with 10 neighbor peers we would send 40 000 messages if we would set
// reBroadcast to !
tempProposals.stream() tempProposals.stream()
.filter(proposal -> validatorProvider.getValidator(proposal).isValidAndConfirmed(proposal)) .filter(proposal -> validatorProvider.getValidator(proposal).isValidAndConfirmed(proposal))
.map(ProposalPayload::new) .map(ProposalPayload::new)
.forEach(proposalPayload -> { .forEach(proposalPayload -> {
UserThread.runAfterRandomDelay(() -> { boolean success = p2PService.addPersistableNetworkPayload(proposalPayload, false);
boolean success = p2PService.addPersistableNetworkPayload(proposalPayload, true); if (success) {
if (success) log.info("We published a ProposalPayload to the P2P network as append-only data. proposalTxId={}",
log.info("We published a ProposalPayload to the P2P network as append-only data. proposalTxId={}", proposalPayload.getProposal().getTxId());
proposalPayload.getProposal().getTxId()); }
else // If we had data already we did not broadcast and success is false
log.warn("publishToAppendOnlyDataStore failed for proposal " + proposalPayload.getProposal());
}, 100, 5000, TimeUnit.MILLISECONDS);
}); });
} }
private void onProtectedDataAdded(ProtectedStorageEntry entry, boolean doLog) { private void onProtectedDataAdded(ProtectedStorageEntry entry, boolean fromBroadcastMessage) {
ProtectedStoragePayload protectedStoragePayload = entry.getProtectedStoragePayload(); ProtectedStoragePayload protectedStoragePayload = entry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof TempProposalPayload) { if (protectedStoragePayload instanceof TempProposalPayload) {
Proposal proposal = ((TempProposalPayload) protectedStoragePayload).getProposal(); Proposal proposal = ((TempProposalPayload) protectedStoragePayload).getProposal();
// We do not validate if we are in current cycle and if tx is confirmed yet as the tx might be not // We do not validate if we are in current cycle and if tx is confirmed yet as the tx might be not
// available/confirmed. But we check if we are in the proposal phase. // available/confirmed.
if (!tempProposals.contains(proposal)) { // We check if we are in the proposal or break1 phase. We are tolerant to accept tempProposals in the break1
if (validatorProvider.getValidator(proposal).isValidOrUnconfirmed(proposal)) { // phase to avoid risks that a proposal published very closely to the end of the proposal phase will not be
if (doLog) { // sufficiently broadcast.
log.info("We received a TempProposalPayload and store it to our protectedStoreList. proposalTxId={}", // When we receive tempProposals from the seed node at startup we only keep those which are in the current
proposal.getTxId()); // proposal/break1 phase if we are in that phase. We ignore tempProposals in case we are not in the
// proposal/break1 phase as they are not used anyway but the proposalPayloads will be relevant once we
// left the proposal/break1 phase.
if (periodService.isInPhase(daoStateService.getChainHeight(), DaoPhase.Phase.PROPOSAL) ||
periodService.isInPhase(daoStateService.getChainHeight(), DaoPhase.Phase.BREAK1)) {
if (!tempProposals.contains(proposal)) {
if (validatorProvider.getValidator(proposal).areDataFieldsValid(proposal)) {
if (fromBroadcastMessage) {
log.info("We received a TempProposalPayload and store it to our protectedStoreList. proposalTxId={}",
proposal.getTxId());
}
tempProposals.add(proposal);
} else {
log.debug("We received an invalid proposal from the P2P network. Proposal.txId={}, blockHeight={}",
proposal.getTxId(), daoStateService.getChainHeight());
} }
tempProposals.add(proposal);
} else {
log.debug("We received an invalid proposal from the P2P network. Proposal.txId={}, blockHeight={}",
proposal.getTxId(), daoStateService.getChainHeight());
} }
} }
} }
@ -280,13 +289,17 @@ public class ProposalService implements HashMapChangedListener, AppendOnlyDataSt
} }
} }
private void onAppendOnlyDataAdded(PersistableNetworkPayload persistableNetworkPayload, boolean doLog) { private void onAppendOnlyDataAdded(PersistableNetworkPayload persistableNetworkPayload, boolean fromBroadcastMessage) {
if (persistableNetworkPayload instanceof ProposalPayload) { if (persistableNetworkPayload instanceof ProposalPayload) {
ProposalPayload proposalPayload = (ProposalPayload) persistableNetworkPayload; ProposalPayload proposalPayload = (ProposalPayload) persistableNetworkPayload;
if (!proposalPayloads.contains(proposalPayload)) { if (!proposalPayloads.contains(proposalPayload)) {
Proposal proposal = proposalPayload.getProposal(); Proposal proposal = proposalPayload.getProposal();
// We don't validate phase and cycle as we might receive proposals from other cycles or phases at startup.
// Beside that we might receive payloads we requested at the vote result phase in case we missed some
// payloads. We prefer here resilience over protection against late publishing attacks.
if (validatorProvider.getValidator(proposal).areDataFieldsValid(proposal)) { if (validatorProvider.getValidator(proposal).areDataFieldsValid(proposal)) {
if (doLog) { if (fromBroadcastMessage) {
log.info("We received a ProposalPayload and store it to our appendOnlyStoreList. proposalTxId={}", log.info("We received a ProposalPayload and store it to our appendOnlyStoreList. proposalTxId={}",
proposal.getTxId()); proposal.getTxId());
} }

View file

@ -43,6 +43,7 @@ public class MissingDataRequestService implements DaoSetupService {
private final BlindVoteListService blindVoteListService; private final BlindVoteListService blindVoteListService;
private final ProposalService proposalService; private final ProposalService proposalService;
private final P2PService p2PService; private final P2PService p2PService;
private boolean reRepublishAllGovernanceDataDone;
@Inject @Inject
public MissingDataRequestService(RepublishGovernanceDataHandler republishGovernanceDataHandler, public MissingDataRequestService(RepublishGovernanceDataHandler republishGovernanceDataHandler,
@ -77,38 +78,44 @@ public class MissingDataRequestService implements DaoSetupService {
republishGovernanceDataHandler.sendRepublishRequest(); republishGovernanceDataHandler.sendRepublishRequest();
} }
// Can be triggered with shortcut ctrl+UP or alt+UP
public void reRepublishAllGovernanceData() { public void reRepublishAllGovernanceData() {
ObservableList<ProposalPayload> proposalPayloads = proposalService.getProposalPayloads(); // We only want to do it once in case we would get flooded with requests.
proposalPayloads.forEach(proposalPayload -> { if (!reRepublishAllGovernanceDataDone) {
// We want a random delay between 0.1 and 30 sec. depending on the number of items reRepublishAllGovernanceDataDone = true;
int delay = Math.max(100, Math.min(30_000, new Random().nextInt(proposalPayloads.size() * 500))); ObservableList<ProposalPayload> proposalPayloads = proposalService.getProposalPayloads();
UserThread.runAfter(() -> { proposalPayloads.forEach(proposalPayload -> {
boolean success = p2PService.addPersistableNetworkPayload(proposalPayload, true); // We want a random delay between 0.1 and 300 sec. depending on the number of items.
String txId = proposalPayload.getProposal().getTxId(); // We send all proposals including those from old cycles.
if (success) { int delay = Math.max(100, Math.min(300_000, new Random().nextInt(proposalPayloads.size() * 1000)));
log.debug("We received a RepublishGovernanceDataRequest and re-published a proposalPayload to " + UserThread.runAfter(() -> {
"the P2P network as append only data. proposalTxId={}", txId); boolean success = p2PService.addPersistableNetworkPayload(proposalPayload, true);
} else { String txId = proposalPayload.getProposal().getTxId();
log.error("Adding of proposalPayload to P2P network failed. proposalTxId={}", txId); if (success) {
} log.debug("We received a RepublishGovernanceDataRequest and re-published a proposalPayload to " +
}, delay, TimeUnit.MILLISECONDS); "the P2P network as append only data. proposalTxId={}", txId);
}); } else {
log.error("Adding of proposalPayload to P2P network failed. proposalTxId={}", txId);
}
}, delay, TimeUnit.MILLISECONDS);
});
ObservableList<BlindVotePayload> blindVotePayloads = blindVoteListService.getBlindVotePayloads(); ObservableList<BlindVotePayload> blindVotePayloads = blindVoteListService.getBlindVotePayloads();
blindVotePayloads blindVotePayloads.forEach(blindVotePayload -> {
.forEach(blindVotePayload -> { // We want a random delay between 0.1 and 300 sec. depending on the number of items.
// We want a random delay between 0.1 and 30 sec. depending on the number of items // We send all blindVotes including those from old cycles.
int delay = Math.max(100, Math.min(30_000, new Random().nextInt(blindVotePayloads.size() * 500))); int delay = Math.max(100, Math.min(300_000, new Random().nextInt(blindVotePayloads.size() * 1000)));
UserThread.runAfter(() -> { UserThread.runAfter(() -> {
boolean success = p2PService.addPersistableNetworkPayload(blindVotePayload, true); boolean success = p2PService.addPersistableNetworkPayload(blindVotePayload, true);
String txId = blindVotePayload.getBlindVote().getTxId(); String txId = blindVotePayload.getBlindVote().getTxId();
if (success) { if (success) {
log.debug("We received a RepublishGovernanceDataRequest and re-published a blindVotePayload to " + log.debug("We received a RepublishGovernanceDataRequest and re-published a blindVotePayload to " +
"the P2P network as append only data. blindVoteTxId={}", txId); "the P2P network as append only data. blindVoteTxId={}", txId);
} else { } else {
log.error("Adding of blindVotePayload to P2P network failed. blindVoteTxId={}", txId); log.error("Adding of blindVotePayload to P2P network failed. blindVoteTxId={}", txId);
} }
}, delay, TimeUnit.MILLISECONDS); }, delay, TimeUnit.MILLISECONDS);
}); });
}
} }
} }

View file

@ -41,6 +41,7 @@ import bisq.core.dao.state.model.governance.DaoPhase;
import bisq.network.p2p.P2PService; import bisq.network.p2p.P2PService;
import bisq.common.UserThread;
import bisq.common.util.Utilities; import bisq.common.util.Utilities;
import org.bitcoinj.core.InsufficientMoneyException; import org.bitcoinj.core.InsufficientMoneyException;
@ -285,12 +286,18 @@ public class VoteRevealService implements DaoStateListener, DaoSetupService {
} }
private void rePublishBlindVotePayloadList(List<BlindVote> blindVoteList) { private void rePublishBlindVotePayloadList(List<BlindVote> blindVoteList) {
// If we have 20 blind votes from 20 voters we would have 400 messages sent to their 10 neighbor peers.
// Most of the neighbors will already have the data so they will not continue broadcast.
// To not flood the network too much we use a long random delay to spread the load over 5 minutes.
// As this is only for extra resilience we don't care so much for the case that the user might shut down the
// app before we are finished with our delayed broadcast.
// We cannot set reBroadcast to false as otherwise it would not have any effect as we have the data already and
// broadcast would only be triggered at new data.
blindVoteList.stream() blindVoteList.stream()
.map(BlindVotePayload::new) .map(BlindVotePayload::new)
.forEach(blindVotePayload -> { .forEach(blindVotePayload -> {
boolean success = p2PService.addPersistableNetworkPayload(blindVotePayload, true); UserThread.runAfterRandomDelay(() -> p2PService.addPersistableNetworkPayload(blindVotePayload, true),
if (!success) 1, 300);
log.warn("publishToAppendOnlyDataStore failed for blindVote " + blindVotePayload.getBlindVote());
}); });
} }
} }