Add handling for missing blindVote data

This commit is contained in:
Manfred Karrer 2018-10-09 22:00:02 -05:00
parent 58e6db0a25
commit 5141806509
No known key found for this signature in database
GPG Key ID: 401250966A6B2C46
18 changed files with 587 additions and 105 deletions

View File

@ -56,6 +56,7 @@ message NetworkEnvelope {
AddPersistableNetworkPayloadMessage add_persistable_network_payload_message = 31;
AckMessage ack_message = 32;
RepublishBlindVotesRequest republish_blind_votes_request = 33;
}
}
@ -319,6 +320,9 @@ message NewBlockBroadcastMessage {
BaseBlock raw_block = 1;
}
message RepublishBlindVotesRequest {
}
///////////////////////////////////////////////////////////////////////////////////////////
// Payload
///////////////////////////////////////////////////////////////////////////////////////////

View File

@ -25,6 +25,7 @@ import bisq.core.dao.governance.ballot.BallotListService;
import bisq.core.dao.governance.blindvote.BlindVoteListService;
import bisq.core.dao.governance.blindvote.BlindVoteValidator;
import bisq.core.dao.governance.blindvote.MyBlindVoteListService;
import bisq.core.dao.governance.blindvote.network.RepublishBlindVotesHandler;
import bisq.core.dao.governance.blindvote.storage.BlindVoteStorageService;
import bisq.core.dao.governance.blindvote.storage.BlindVoteStore;
import bisq.core.dao.governance.myvote.MyVoteListService;
@ -51,6 +52,7 @@ import bisq.core.dao.governance.proposal.storage.appendonly.ProposalStore;
import bisq.core.dao.governance.proposal.storage.temp.TempProposalStorageService;
import bisq.core.dao.governance.proposal.storage.temp.TempProposalStore;
import bisq.core.dao.governance.role.BondedRolesService;
import bisq.core.dao.governance.voteresult.MissingDataRequestService;
import bisq.core.dao.governance.voteresult.VoteResultService;
import bisq.core.dao.governance.voteresult.issuance.IssuanceService;
import bisq.core.dao.governance.votereveal.VoteRevealService;
@ -165,7 +167,9 @@ public class DaoModule extends AppModule {
// VoteResult
bind(VoteResultService.class).in(Singleton.class);
bind(MissingDataRequestService.class).in(Singleton.class);
bind(IssuanceService.class).in(Singleton.class);
bind(RepublishBlindVotesHandler.class).in(Singleton.class);
// Genesis
String genesisTxId = environment.getProperty(DaoOptionKeys.GENESIS_TX_ID, String.class, "");

View File

@ -21,6 +21,7 @@ import bisq.core.dao.governance.ballot.BallotListService;
import bisq.core.dao.governance.blindvote.BlindVoteListService;
import bisq.core.dao.governance.blindvote.MyBlindVoteListService;
import bisq.core.dao.governance.proposal.ProposalService;
import bisq.core.dao.governance.voteresult.MissingDataRequestService;
import bisq.core.dao.governance.voteresult.VoteResultService;
import bisq.core.dao.governance.votereveal.VoteRevealService;
import bisq.core.dao.node.BsqNode;
@ -47,6 +48,7 @@ public class DaoSetup {
private final VoteRevealService voteRevealService;
private final VoteResultService voteResultService;
private final BsqNode bsqNode;
private final MissingDataRequestService missingDataRequestService;
private final DaoFacade daoFacade;
private final ExportJsonFilesService exportJsonFilesService;
@ -60,6 +62,7 @@ public class DaoSetup {
MyBlindVoteListService myBlindVoteListService,
VoteRevealService voteRevealService,
VoteResultService voteResultService,
MissingDataRequestService missingDataRequestService,
DaoFacade daoFacade,
ExportJsonFilesService exportJsonFilesService) {
this.bsqStateService = bsqStateService;
@ -70,6 +73,7 @@ public class DaoSetup {
this.myBlindVoteListService = myBlindVoteListService;
this.voteRevealService = voteRevealService;
this.voteResultService = voteResultService;
this.missingDataRequestService = missingDataRequestService;
this.daoFacade = daoFacade;
this.exportJsonFilesService = exportJsonFilesService;
@ -87,8 +91,9 @@ public class DaoSetup {
myBlindVoteListService.addListeners();
voteRevealService.addListeners();
voteResultService.addListeners();
exportJsonFilesService.addListeners();
missingDataRequestService.addListeners();
daoFacade.addListeners();
exportJsonFilesService.addListeners();
bsqStateService.start();
cycleService.start();
@ -98,8 +103,9 @@ public class DaoSetup {
myBlindVoteListService.start();
voteRevealService.start();
voteResultService.start();
exportJsonFilesService.start();
missingDataRequestService.start();
daoFacade.start();
exportJsonFilesService.start();
bsqNode.setErrorMessageHandler(errorMessageHandler);
bsqNode.start();

View File

@ -39,6 +39,7 @@ import javafx.collections.ObservableList;
import java.util.List;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
@ -49,7 +50,7 @@ public class BlindVoteListService implements AppendOnlyDataStoreListener, BsqSta
private final BsqStateService bsqStateService;
private final P2PService p2PService;
private final BlindVoteValidator blindVoteValidator;
@Getter
private final ObservableList<BlindVotePayload> appendOnlyStoreList = FXCollections.observableArrayList();

View File

@ -0,0 +1,214 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.core.dao.governance.blindvote.network;
import bisq.core.dao.governance.blindvote.network.messages.RepublishBlindVotesRequest;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.peers.PeerManager;
import bisq.network.p2p.peers.peerexchange.Peer;
import bisq.network.p2p.seed.SeedNodeRepository;
import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.app.Capabilities;
import javax.inject.Inject;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
/**
* Responsible for sending a RepublishBlindVotesRequest to full nodes.
* Processing of RepublishBlindVotesRequests at full nodes is done in the FullNodeNetworkService.
*/
@Slf4j
public final class RepublishBlindVotesHandler {
private static final long TIMEOUT = 120;
private final Collection<NodeAddress> seedNodeAddresses;
private final NetworkNode networkNode;
private final PeerManager peerManager;
private boolean stopped;
private Timer timeoutTimer;
@Inject
public RepublishBlindVotesHandler(NetworkNode networkNode,
PeerManager peerManager,
SeedNodeRepository seedNodesRepository) {
this.networkNode = networkNode;
this.peerManager = peerManager;
this.seedNodeAddresses = new HashSet<>(seedNodesRepository.getSeedNodeAddresses());
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void requestBlindVotePayload() {
// First try if we have a seed node in our connections. All seed nodes are full nodes.
if (!stopped)
connectToNextNode();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void sendRepublishBlindVotesRequest(NodeAddress nodeAddress) {
RepublishBlindVotesRequest republishBlindVotesRequest = new RepublishBlindVotesRequest();
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
if (!stopped) {
String errorMessage = "A timeout occurred at sending republishBlindVotesRequest:" +
" to nodeAddress:" + nodeAddress;
log.warn(errorMessage);
connectToNextNode();
} else {
log.warn("We have stopped already. We ignore that timeoutTimer.run call. " +
"Might be caused by an previous networkNode.sendMessage.onFailure.");
}
},
TIMEOUT);
}
log.info("We send to peer {} a republishBlindVotesRequest.", nodeAddress);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, republishBlindVotesRequest);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
if (!stopped) {
log.info("Sending of RepublishBlindVotesRequest message to peer {} succeeded.", nodeAddress.getFullAddress());
stop();
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call." +
"Might be caused by an previous timeout.");
}
}
@Override
public void onFailure(@NotNull Throwable throwable) {
if (!stopped) {
String errorMessage = "Sending republishBlindVotesRequest to " + nodeAddress +
" failed. That is expected if the peer is offline.\n\t" +
"\n\tException=" + throwable.getMessage();
log.info(errorMessage);
handleFault(nodeAddress);
connectToNextNode();
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onFailure call. " +
"Might be caused by an previous timeout.");
}
}
});
}
private void connectToNextNode() {
// First we try our connected seed nodes
Optional<Connection> connectionToSeedNodeOptional = networkNode.getConfirmedConnections().stream()
.filter(peerManager::isSeedNode)
.findAny();
if (connectionToSeedNodeOptional.isPresent() &&
connectionToSeedNodeOptional.get().getPeersNodeAddressOptional().isPresent()) {
NodeAddress nodeAddress = connectionToSeedNodeOptional.get().getPeersNodeAddressOptional().get();
sendRepublishBlindVotesRequest(nodeAddress);
} else {
// If connected seed nodes did not confirm receipt of message we try next seed node from seedNodeAddresses
List<NodeAddress> list = seedNodeAddresses.stream()
.filter(e -> peerManager.isSeedNode(e) && !peerManager.isSelf(e))
.collect(Collectors.toList());
Collections.shuffle(list);
if (!list.isEmpty()) {
NodeAddress nodeAddress = list.get(0);
seedNodeAddresses.remove(nodeAddress);
sendRepublishBlindVotesRequest(nodeAddress);
} else {
log.warn("No more seed nodes available. We try any of our other peers.");
connectToAnyFullNode();
}
}
}
private void connectToAnyFullNode() {
List<Integer> required = new ArrayList<>(Collections.singletonList(
Capabilities.Capability.DAO_FULL_NODE.ordinal()
));
List<Peer> list = peerManager.getLivePeers(null).stream()
.filter(peer -> Capabilities.isCapabilitySupported(required, peer.getSupportedCapabilities()))
.collect(Collectors.toList());
if (list.isEmpty())
list = peerManager.getReportedPeers().stream()
.filter(peer -> Capabilities.isCapabilitySupported(required, peer.getSupportedCapabilities()))
.collect(Collectors.toList());
if (list.isEmpty())
list = peerManager.getPersistedPeers().stream()
.filter(peer -> Capabilities.isCapabilitySupported(required, peer.getSupportedCapabilities()))
.collect(Collectors.toList());
if (!list.isEmpty()) {
// We avoid the complexity to maintain the results of all our peers and to iterate all until we find a good peer,
// but we prefer simplicity with the risk that we don't get the data so we request from max 4 peers in parallel
// assuming that at least one will republish and therefore we should receive all data.
list = new ArrayList<>(list.subList(0, Math.min(list.size(), 4)));
list.stream()
.map(Peer::getNodeAddress)
.forEach(this::sendRepublishBlindVotesRequest);
} else {
log.warn("No other nodes found. We try again in 60 seconds.");
UserThread.runAfter(this::connectToNextNode, 60);
}
}
private void handleFault(NodeAddress nodeAddress) {
peerManager.handleConnectionFault(nodeAddress);
}
private void stop() {
stopped = true;
stopTimeoutTimer();
}
private void stopTimeoutTimer() {
if (timeoutTimer != null) {
timeoutTimer.stop();
timeoutTimer = null;
}
}
}

View File

@ -0,0 +1,76 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.core.dao.governance.blindvote.network.messages;
import bisq.network.p2p.DirectMessage;
import bisq.network.p2p.storage.payload.CapabilityRequiringPayload;
import bisq.common.app.Capabilities;
import bisq.common.app.Version;
import bisq.common.proto.network.NetworkEnvelope;
import io.bisq.generated.protobuffer.PB;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@EqualsAndHashCode(callSuper = true)
@Getter
public final class RepublishBlindVotesRequest extends NetworkEnvelope implements DirectMessage, CapabilityRequiringPayload {
public RepublishBlindVotesRequest() {
this(Version.getP2PMessageVersion());
}
///////////////////////////////////////////////////////////////////////////////////////////
// PROTO BUFFER
///////////////////////////////////////////////////////////////////////////////////////////
private RepublishBlindVotesRequest(int messageVersion) {
super(messageVersion);
}
@Override
public PB.NetworkEnvelope toProtoNetworkEnvelope() {
return getNetworkEnvelopeBuilder()
.setRepublishBlindVotesRequest(PB.RepublishBlindVotesRequest.newBuilder())
.build();
}
public static NetworkEnvelope fromProto(PB.RepublishBlindVotesRequest proto, int messageVersion) {
return new RepublishBlindVotesRequest(messageVersion);
}
@Override
public List<Integer> getRequiredCapabilities() {
return new ArrayList<>(Collections.singletonList(
Capabilities.Capability.DAO_FULL_NODE.ordinal()
));
}
@Override
public String toString() {
return "RepublishBlindVotesRequest{" +
"\n} " + super.toString();
}
}

View File

@ -41,12 +41,13 @@ public class MeritConsensus {
// Value with 144 blocks a day and 365 days would be 52560. We take a close round number instead.
private static final int BLOCKS_PER_YEAR = 50_000;
public static MeritList decryptMeritList(byte[] encryptedMeritList, SecretKey secretKey) throws VoteResultException {
public static MeritList decryptMeritList(byte[] encryptedMeritList, SecretKey secretKey)
throws VoteResultException.DecryptionException {
try {
final byte[] decrypted = Encryption.decrypt(encryptedMeritList, secretKey);
byte[] decrypted = Encryption.decrypt(encryptedMeritList, secretKey);
return MeritList.getMeritListFromBytes(decrypted);
} catch (Throwable t) {
throw new VoteResultException(t);
throw new VoteResultException.DecryptionException(t);
}
}

View File

@ -1,37 +0,0 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.core.dao.governance.voteresult;
import bisq.core.dao.governance.ballot.Ballot;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Value;
@EqualsAndHashCode(callSuper = true)
@Value
public class MissingBallotException extends Exception {
private List<Ballot> existingBallots;
private List<String> proposalTxIdsOfMissingBallots;
public MissingBallotException(List<Ballot> existingBallots, List<String> proposalTxIdsOfMissingBallots) {
this.existingBallots = existingBallots;
this.proposalTxIdsOfMissingBallots = proposalTxIdsOfMissingBallots;
}
}

View File

@ -0,0 +1,71 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.core.dao.governance.voteresult;
import bisq.core.dao.DaoSetupService;
import bisq.core.dao.governance.blindvote.network.RepublishBlindVotesHandler;
import javax.inject.Inject;
import javafx.collections.FXCollections;
import javafx.collections.ListChangeListener;
import javafx.collections.ObservableList;
import lombok.Getter;
public class MissingDataRequestService implements DaoSetupService {
private final RepublishBlindVotesHandler republishBlindVotesHandler;
@Getter
private final ObservableList<VoteResultException> voteResultExceptions = FXCollections.observableArrayList();
@Inject
public MissingDataRequestService(RepublishBlindVotesHandler republishBlindVotesHandler) {
this.republishBlindVotesHandler = republishBlindVotesHandler;
}
///////////////////////////////////////////////////////////////////////////////////////////
// DaoSetupService
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void addListeners() {
voteResultExceptions.addListener((ListChangeListener<VoteResultException>) c -> {
c.next();
if (c.wasAdded()) {
c.getAddedSubList().stream().filter(e -> e instanceof VoteResultException.MissingBlindVoteDataException)
.map(e -> (VoteResultException.MissingBlindVoteDataException) e)
.forEach(e -> republishBlindVotesHandler.requestBlindVotePayload());
}
});
}
@Override
public void start() {
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void addVoteResultException(VoteResultException voteResultException) {
this.voteResultExceptions.add(voteResultException);
}
}

View File

@ -54,35 +54,41 @@ public class VoteResultConsensus {
return Arrays.copyOfRange(opReturnData, 2, 22);
}
public static VoteWithProposalTxIdList decryptVotes(byte[] encryptedVotes, SecretKey secretKey) throws VoteResultException {
public static VoteWithProposalTxIdList decryptVotes(byte[] encryptedVotes, SecretKey secretKey)
throws VoteResultException.DecryptionException {
try {
byte[] decrypted = Encryption.decrypt(encryptedVotes, secretKey);
return VoteWithProposalTxIdList.getVoteWithProposalTxIdListFromBytes(decrypted);
} catch (Throwable t) {
throw new VoteResultException(t);
throw new VoteResultException.DecryptionException(t);
}
}
// We compare first by stake and in case we have multiple entries with same stake we use the
// hex encoded hashOfProposalList for comparision
@Nullable
public static byte[] getMajorityHash(List<VoteResultService.HashWithStake> hashWithStakeList) throws VoteResultException {
checkArgument(!hashWithStakeList.isEmpty(), "hashWithStakeList must not be empty");
hashWithStakeList.sort(Comparator.comparingLong(VoteResultService.HashWithStake::getStake).reversed()
.thenComparing(hashWithStake -> Utilities.encodeToHex(hashWithStake.getHash())));
public static byte[] getMajorityHash(List<VoteResultService.HashWithStake> hashWithStakeList)
throws VoteResultException.ConsensusException, VoteResultException.ValidationException {
try {
checkArgument(!hashWithStakeList.isEmpty(), "hashWithStakeList must not be empty");
hashWithStakeList.sort(Comparator.comparingLong(VoteResultService.HashWithStake::getStake).reversed()
.thenComparing(hashWithStake -> Utilities.encodeToHex(hashWithStake.getHash())));
// If there are conflicting data views (multiple hashes) we only consider the voting round as valid if
// the majority is a super majority with > 80%.
if (hashWithStakeList.size() > 1) {
long stakeOfAll = hashWithStakeList.stream().mapToLong(VoteResultService.HashWithStake::getStake).sum();
long stakeOfFirst = hashWithStakeList.get(0).getStake();
if ((double) stakeOfFirst / (double) stakeOfAll < 0.8) {
throw new VoteResultException("The winning data view has less then 80% of the total stake of " +
"all data views. We consider the voting cycle as invalid if the winning data view does not " +
"reach a super majority.");
// If there are conflicting data views (multiple hashes) we only consider the voting round as valid if
// the majority is a super majority with > 80%.
if (hashWithStakeList.size() > 1) {
long stakeOfAll = hashWithStakeList.stream().mapToLong(VoteResultService.HashWithStake::getStake).sum();
long stakeOfFirst = hashWithStakeList.get(0).getStake();
if ((double) stakeOfFirst / (double) stakeOfAll < 0.8) {
throw new VoteResultException.ConsensusException("The winning data view has less then 80% of the " +
"total stake of all data views. We consider the voting cycle as invalid if the " +
"winning data view does not reach a super majority.");
}
}
return hashWithStakeList.get(0).getHash();
} catch (Throwable t) {
throw new VoteResultException.ValidationException(t);
}
return hashWithStakeList.get(0).getHash();
}
// Key is stored after version and type bytes and list of Blind votes. It has 16 bytes
@ -92,24 +98,24 @@ public class VoteResultConsensus {
}
public static TxOutput getConnectedBlindVoteStakeOutput(Tx voteRevealTx, BsqStateService bsqStateService)
throws VoteResultException {
throws VoteResultException.ValidationException {
try {
// We use the stake output of the blind vote tx as first input
final TxInput stakeTxInput = voteRevealTx.getTxInputs().get(0);
TxInput stakeTxInput = voteRevealTx.getTxInputs().get(0);
Optional<TxOutput> optionalBlindVoteStakeOutput = bsqStateService.getConnectedTxOutput(stakeTxInput);
checkArgument(optionalBlindVoteStakeOutput.isPresent(), "blindVoteStakeOutput must be present");
final TxOutput blindVoteStakeOutput = optionalBlindVoteStakeOutput.get();
TxOutput blindVoteStakeOutput = optionalBlindVoteStakeOutput.get();
checkArgument(blindVoteStakeOutput.getTxOutputType() == TxOutputType.BLIND_VOTE_LOCK_STAKE_OUTPUT,
"blindVoteStakeOutput must be of type BLIND_VOTE_LOCK_STAKE_OUTPUT");
return blindVoteStakeOutput;
} catch (Throwable t) {
throw new VoteResultException(t);
throw new VoteResultException.ValidationException(t);
}
}
public static Tx getBlindVoteTx(TxOutput blindVoteStakeOutput, BsqStateService bsqStateService,
PeriodService periodService, int chainHeight)
throws VoteResultException {
throws VoteResultException.ValidationException {
try {
String blindVoteTxId = blindVoteStakeOutput.getTxId();
Optional<Tx> optionalBlindVoteTx = bsqStateService.getTx(blindVoteTxId);
@ -128,7 +134,7 @@ public class VoteResultConsensus {
+ blindVoteTx.getBlockHeight());
return blindVoteTx;
} catch (Throwable t) {
throw new VoteResultException(t);
throw new VoteResultException.ValidationException(t);
}
}
}

View File

@ -17,35 +17,105 @@
package bisq.core.dao.governance.voteresult;
import bisq.core.dao.governance.blindvote.storage.BlindVotePayload;
import bisq.core.dao.governance.ballot.Ballot;
import lombok.Getter;
import java.util.List;
import javax.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Value;
public class VoteResultException extends Exception {
@Getter
@Nullable
private BlindVotePayload blindVotePayload;
public VoteResultException(String message, Exception cause, BlindVotePayload blindVotePayload) {
super(message, cause);
this.blindVotePayload = blindVotePayload;
VoteResultException(Throwable cause) {
super(cause);
}
public VoteResultException(String message) {
private VoteResultException(String message) {
super(message);
}
public VoteResultException(Throwable cause) {
super(cause);
private VoteResultException(String message, Throwable cause) {
super(message, cause);
}
@Override
public String toString() {
return "VoteResultException{" +
"\n blindVotePayload=" + blindVotePayload +
"\n cause=" + getCause() +
"\n} " + super.toString();
}
@EqualsAndHashCode(callSuper = true)
public static class ConsensusException extends VoteResultException {
ConsensusException(String message) {
super(message);
}
@Override
public String toString() {
return "ConsensusException{" +
"\n} " + super.toString();
}
}
@EqualsAndHashCode(callSuper = true)
public static class ValidationException extends VoteResultException {
ValidationException(Throwable cause) {
super("Validation of vote result failed.", cause);
}
@Override
public String toString() {
return "VoteResultException{" +
"\n cause=" + getCause() +
"\n} " + super.toString();
}
}
@EqualsAndHashCode(callSuper = true)
@Value
public static class MissingBlindVoteDataException extends VoteResultException {
private String blindVoteTxId;
MissingBlindVoteDataException(String blindVoteTxId) {
super("Blind vote tx ID " + blindVoteTxId + " is missing");
this.blindVoteTxId = blindVoteTxId;
}
@Override
public String toString() {
return "MissingBlindVoteDataException{" +
"\n blindVoteTxId='" + blindVoteTxId + '\'' +
"\n} " + super.toString();
}
}
@EqualsAndHashCode(callSuper = true)
@Value
public static class MissingBallotException extends VoteResultException {
private List<Ballot> existingBallots;
private List<String> proposalTxIdsOfMissingBallots;
MissingBallotException(List<Ballot> existingBallots, List<String> proposalTxIdsOfMissingBallots) {
super("Missing ballots. proposalTxIdsOfMissingBallots=" + proposalTxIdsOfMissingBallots);
this.existingBallots = existingBallots;
this.proposalTxIdsOfMissingBallots = proposalTxIdsOfMissingBallots;
}
}
@EqualsAndHashCode(callSuper = true)
@Value
public static class DecryptionException extends VoteResultException {
public DecryptionException(Throwable cause) {
super(cause);
}
@Override
public String toString() {
return "DecryptionException{" +
"\n} " + super.toString();
}
}
}

View File

@ -105,10 +105,12 @@ public class VoteResultService implements BsqStateListener, DaoSetupService, Per
private final BondedRolesService bondedRolesService;
private final IssuanceService issuanceService;
private final AssetService assetService;
private final Storage<EvaluatedProposalList> evaluatedProposalStorage;
private Storage<DecryptedBallotsWithMeritsList> decryptedBallotsWithMeritsStorage;
private final MissingDataRequestService missingDataRequestService;
@Getter
private final ObservableList<VoteResultException> voteResultExceptions = FXCollections.observableArrayList();
private final Storage<EvaluatedProposalList> evaluatedProposalStorage;
private Storage<DecryptedBallotsWithMeritsList> decryptedBallotsWithMeritsStorage;
@Getter
private final EvaluatedProposalList evaluatedProposalList = new EvaluatedProposalList();
@Getter
@ -129,6 +131,7 @@ public class VoteResultService implements BsqStateListener, DaoSetupService, Per
BondedRolesService bondedRolesService,
IssuanceService issuanceService,
AssetService assetService,
MissingDataRequestService missingDataRequestService,
Storage<EvaluatedProposalList> evaluatedProposalStorage,
Storage<DecryptedBallotsWithMeritsList> decryptedBallotsWithMeritsStorage) {
this.voteRevealService = voteRevealService;
@ -140,6 +143,7 @@ public class VoteResultService implements BsqStateListener, DaoSetupService, Per
this.bondedRolesService = bondedRolesService;
this.issuanceService = issuanceService;
this.assetService = assetService;
this.missingDataRequestService = missingDataRequestService;
this.evaluatedProposalStorage = evaluatedProposalStorage;
this.decryptedBallotsWithMeritsStorage = decryptedBallotsWithMeritsStorage;
}
@ -249,13 +253,19 @@ public class VoteResultService implements BsqStateListener, DaoSetupService, Per
// TODO request missing blind votes
}
} catch (VoteResultException e) {
} catch (VoteResultException.ValidationException e) {
log.error(e.toString());
e.printStackTrace();
voteResultExceptions.add(e);
} catch (VoteResultException.ConsensusException e) {
log.error(e.toString());
e.printStackTrace();
//TODO notify application of that case (e.g. add error handler)
// The vote cycle is invalid as conflicting data views of the blind vote data exist and the winner
// did not reach super majority of 80%.
voteResultExceptions.add(e);
}
} else {
log.info("There have not been any votes in that cycle. chainHeight={}", chainHeight);
@ -301,28 +311,41 @@ public class VoteResultService implements BsqStateListener, DaoSetupService, Per
.findAny();
if (optionalBlindVote.isPresent()) {
BlindVote blindVote = optionalBlindVote.get();
VoteWithProposalTxIdList voteWithProposalTxIdList = VoteResultConsensus.decryptVotes(blindVote.getEncryptedVotes(), secretKey);
MeritList meritList = MeritConsensus.decryptMeritList(blindVote.getEncryptedMeritList(), secretKey);
// We lookup for the proposals we have in our local list which match the txId from the
// voteWithProposalTxIdList and create a ballot list with the proposal and the vote from
// the voteWithProposalTxIdList
BallotList ballotList = createBallotList(voteWithProposalTxIdList);
return new DecryptedBallotsWithMerits(hashOfBlindVoteList, voteRevealTxId, blindVoteTxId, blindVoteStake, ballotList, meritList);
try {
VoteWithProposalTxIdList voteWithProposalTxIdList = VoteResultConsensus.decryptVotes(blindVote.getEncryptedVotes(), secretKey);
MeritList meritList = MeritConsensus.decryptMeritList(blindVote.getEncryptedMeritList(), secretKey);
// We lookup for the proposals we have in our local list which match the txId from the
// voteWithProposalTxIdList and create a ballot list with the proposal and the vote from
// the voteWithProposalTxIdList
BallotList ballotList = createBallotList(voteWithProposalTxIdList);
return new DecryptedBallotsWithMerits(hashOfBlindVoteList, voteRevealTxId, blindVoteTxId, blindVoteStake, ballotList, meritList);
} catch (VoteResultException.MissingBallotException missingBallotException) {
//TODO handle case that we are missing proposals
log.warn("We are missing proposals to create the vote result: " + missingBallotException.toString());
missingDataRequestService.addVoteResultException(missingBallotException);
return null;
} catch (VoteResultException.DecryptionException decryptionException) {
log.error("Could not decrypt data: " + decryptionException.toString());
voteResultExceptions.add(decryptionException);
return null;
}
} else {
//TODO handle recovering
log.warn("We have a blindVoteTx but we do not have the corresponding blindVote in our local list.\n" +
"That can happen if the blindVote item was not properly broadcast. We will go on " +
"and see if that blindVote was part of the majority data view. If so we should " +
"recover the missing blind vote by a request to our peers. blindVoteTxId={}", blindVoteTxId);
missingDataRequestService.addVoteResultException(new VoteResultException.MissingBlindVoteDataException(blindVoteTxId));
return null;
}
} catch (MissingBallotException e) {
//TODO handle case that we are missing proposals
log.error("We are missing proposals to create the vote result: " + e.toString());
} catch (VoteResultException.ValidationException e) {
log.error("Could not create DecryptedBallotsWithMerits because of voteResultValidationException: " + e.toString());
voteResultExceptions.add(e);
return null;
} catch (Throwable e) {
log.error("Could not create DecryptedBallotsWithMerits: " + e.toString());
log.error("Could not create DecryptedBallotsWithMerits because of an unknown exception: " + e.toString());
voteResultExceptions.add(new VoteResultException(e));
return null;
}
})
@ -330,7 +353,8 @@ public class VoteResultService implements BsqStateListener, DaoSetupService, Per
.collect(Collectors.toSet());
}
private BallotList createBallotList(VoteWithProposalTxIdList voteWithProposalTxIdList) throws MissingBallotException {
private BallotList createBallotList(VoteWithProposalTxIdList voteWithProposalTxIdList)
throws VoteResultException.MissingBallotException {
// We convert the list to a map with proposalTxId as key and the vote as value
Map<String, Vote> voteByTxIdMap = voteWithProposalTxIdList.stream()
.filter(voteWithProposalTxId -> voteWithProposalTxId.getVote() != null)
@ -366,7 +390,7 @@ public class VoteResultService implements BsqStateListener, DaoSetupService, Per
.collect(Collectors.toList());
if (!missingBallots.isEmpty())
throw new MissingBallotException(ballots, missingBallots);
throw new VoteResultException.MissingBallotException(ballots, missingBallots);
// Let's keep the data more deterministic by sorting it by txId. Though we are not using the sorting.
ballots.sort(Comparator.comparing(Ballot::getTxId));
@ -393,7 +417,8 @@ public class VoteResultService implements BsqStateListener, DaoSetupService, Per
return map;
}
private byte[] getMajorityBlindVoteListHash(Map<P2PDataStorage.ByteArray, Long> map) throws VoteResultException {
private byte[] getMajorityBlindVoteListHash(Map<P2PDataStorage.ByteArray, Long> map)
throws VoteResultException.ValidationException, VoteResultException.ConsensusException {
List<HashWithStake> list = map.entrySet().stream()
.map(entry -> new HashWithStake(entry.getKey().bytes, entry.getValue()))
.collect(Collectors.toList());

View File

@ -80,6 +80,8 @@ public class FullNode extends BsqNode {
@Override
public void start() {
fullNodeNetworkService.start();
rpcService.setup(() -> {
super.onInitialized();
startParseBlocks();

View File

@ -17,12 +17,16 @@
package bisq.core.dao.node.full.network;
import bisq.core.dao.governance.blindvote.BlindVoteListService;
import bisq.core.dao.governance.blindvote.network.messages.RepublishBlindVotesRequest;
import bisq.core.dao.governance.blindvote.storage.BlindVotePayload;
import bisq.core.dao.node.messages.GetBlocksRequest;
import bisq.core.dao.node.messages.NewBlockBroadcastMessage;
import bisq.core.dao.state.BsqStateService;
import bisq.core.dao.state.blockchain.Block;
import bisq.core.dao.state.blockchain.RawBlock;
import bisq.network.p2p.P2PService;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.MessageListener;
import bisq.network.p2p.network.NetworkNode;
@ -35,8 +39,12 @@ import bisq.common.proto.network.NetworkEnvelope;
import javax.inject.Inject;
import javafx.collections.ObservableList;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@ -58,6 +66,8 @@ public class FullNodeNetworkService implements MessageListener, PeerManager.List
private final NetworkNode networkNode;
private final PeerManager peerManager;
private final Broadcaster broadcaster;
private final BlindVoteListService blindVoteListService;
private final P2PService p2PService;
private final BsqStateService bsqStateService;
// Key is connection UID
@ -73,20 +83,26 @@ public class FullNodeNetworkService implements MessageListener, PeerManager.List
public FullNodeNetworkService(NetworkNode networkNode,
PeerManager peerManager,
Broadcaster broadcaster,
BlindVoteListService blindVoteListService,
P2PService p2PService,
BsqStateService bsqStateService) {
this.networkNode = networkNode;
this.peerManager = peerManager;
this.broadcaster = broadcaster;
this.blindVoteListService = blindVoteListService;
this.p2PService = p2PService;
this.bsqStateService = bsqStateService;
networkNode.addMessageListener(this);
peerManager.addListener(this);
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void start() {
networkNode.addMessageListener(this);
peerManager.addListener(this);
}
@SuppressWarnings("Duplicates")
public void shutDown() {
Log.traceCall();
@ -173,6 +189,24 @@ public class FullNodeNetworkService implements MessageListener, PeerManager.List
} else {
log.warn("We have stopped already. We ignore that onMessage call.");
}
} else if (networkEnvelope instanceof RepublishBlindVotesRequest) {
ObservableList<BlindVotePayload> appendOnlyStoreList = blindVoteListService.getAppendOnlyStoreList();
appendOnlyStoreList
.forEach(blindVotePayload -> {
// We want a random delay between 0.1 and 30 sec. depending on the number of items
int delay = Math.max(100, Math.min(30_000, new Random().nextInt(appendOnlyStoreList.size() * 500)));
UserThread.runAfter(() -> {
boolean success = p2PService.addPersistableNetworkPayload(blindVotePayload, true);
String txId = blindVotePayload.getBlindVote().getTxId();
if (success) {
log.info("We received a RepublishBlindVotesRequest and re-published a blindVotePayload to " +
"the P2P network as append only data. blindVoteTxId={}",
txId);
} else {
log.error("Adding of blindVotePayload to P2P network failed. blindVoteTxId=" + txId);
}
}, delay, TimeUnit.MILLISECONDS);
});
}
}
}

View File

@ -99,7 +99,8 @@ class GetBlocksRequestHandler {
connection.getPeersNodeAddressOptional(), getBlocksRequest.getFromBlockHeight());
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
String errorMessage = "A timeout occurred for getBlocksResponse:" + getBlocksResponse +
String errorMessage = "A timeout occurred for getBlocksResponse.requestNonce:" +
getBlocksResponse.getRequestNonce() +
" on connection:" + connection;
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection);
},
@ -111,8 +112,8 @@ class GetBlocksRequestHandler {
@Override
public void onSuccess(Connection connection) {
if (!stopped) {
log.info("Send DataResponse to {} succeeded. getBlocksResponse={}",
connection.getPeersNodeAddressOptional(), getBlocksResponse);
log.info("Send DataResponse to {} succeeded. getBlocksResponse.getBlocks().size()={}",
connection.getPeersNodeAddressOptional(), getBlocksResponse.getBlocks().size());
cleanup();
listener.onComplete();
} else {

View File

@ -73,7 +73,7 @@ public class LiteNode extends BsqNode {
public void start() {
super.onInitialized();
liteNodeNetworkService.init();
liteNodeNetworkService.start();
}
@Override

View File

@ -117,7 +117,7 @@ public class LiteNodeNetworkService implements MessageListener, ConnectionListen
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void init() {
public void start() {
networkNode.addMessageListener(this);
networkNode.addConnectionListener(this);
peerManager.addListener(this);

View File

@ -26,6 +26,7 @@ import bisq.core.arbitration.messages.DisputeResultMessage;
import bisq.core.arbitration.messages.OpenNewDisputeMessage;
import bisq.core.arbitration.messages.PeerOpenedDisputeMessage;
import bisq.core.arbitration.messages.PeerPublishedDisputePayoutTxMessage;
import bisq.core.dao.governance.blindvote.network.messages.RepublishBlindVotesRequest;
import bisq.core.dao.governance.proposal.storage.temp.TempProposalPayload;
import bisq.core.dao.node.messages.GetBlocksRequest;
import bisq.core.dao.node.messages.GetBlocksResponse;
@ -157,6 +158,9 @@ public class CoreNetworkProtoResolver extends CoreProtoResolver implements Netwo
return AddPersistableNetworkPayloadMessage.fromProto(proto.getAddPersistableNetworkPayloadMessage(), this, messageVersion);
case ACK_MESSAGE:
return AckMessage.fromProto(proto.getAckMessage(), messageVersion);
case REPUBLISH_BLIND_VOTES_REQUEST:
return RepublishBlindVotesRequest.fromProto(proto.getRepublishBlindVotesRequest(), messageVersion);
default:
throw new ProtobufferException("Unknown proto message case (PB.NetworkEnvelope). messageCase=" +
proto.getMessageCase() + "; proto raw data=" + proto.toString());