diff --git a/core/src/main/java/bisq/core/dao/monitoring/BlindVoteStateMonitoringService.java b/core/src/main/java/bisq/core/dao/monitoring/BlindVoteStateMonitoringService.java index f803060a53..e6915a0c92 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/BlindVoteStateMonitoringService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/BlindVoteStateMonitoringService.java @@ -25,6 +25,7 @@ import bisq.core.dao.governance.period.PeriodService; import bisq.core.dao.monitoring.model.BlindVoteStateBlock; import bisq.core.dao.monitoring.model.BlindVoteStateHash; import bisq.core.dao.monitoring.network.BlindVoteStateNetworkService; +import bisq.core.dao.monitoring.network.StateNetworkService; import bisq.core.dao.monitoring.network.messages.GetBlindVoteStateHashesRequest; import bisq.core.dao.monitoring.network.messages.NewBlindVoteStateHashMessage; import bisq.core.dao.state.DaoStateListener; @@ -230,6 +231,10 @@ public class BlindVoteStateMonitoringService implements DaoSetupService, DaoStat blindVoteStateNetworkService.requestHashes(genesisTxInfo.getGenesisBlockHeight(), peersAddress); } + public void addResponseListener(StateNetworkService.ResponseListener responseListener) { + blindVoteStateNetworkService.addResponseListener(responseListener); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Listeners diff --git a/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java b/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java index 1d35f3b237..ff6205ec71 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java @@ -23,6 +23,7 @@ import bisq.core.dao.monitoring.model.DaoStateHash; import bisq.core.dao.monitoring.model.UtxoMismatch; import bisq.core.dao.monitoring.network.Checkpoint; import bisq.core.dao.monitoring.network.DaoStateNetworkService; +import bisq.core.dao.monitoring.network.StateNetworkService; import bisq.core.dao.monitoring.network.messages.GetDaoStateHashesRequest; import bisq.core.dao.monitoring.network.messages.NewDaoStateHashMessage; import bisq.core.dao.state.DaoStateListener; @@ -289,6 +290,10 @@ public class DaoStateMonitoringService implements DaoSetupService, DaoStateListe createSnapshotHandler = handler; } + public void addResponseListener(StateNetworkService.ResponseListener responseListener) { + daoStateNetworkService.addResponseListener(responseListener); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Listeners diff --git a/core/src/main/java/bisq/core/dao/monitoring/ProposalStateMonitoringService.java b/core/src/main/java/bisq/core/dao/monitoring/ProposalStateMonitoringService.java index c28f33a235..a87c1b1518 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/ProposalStateMonitoringService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/ProposalStateMonitoringService.java @@ -24,6 +24,7 @@ import bisq.core.dao.governance.proposal.ProposalService; import bisq.core.dao.monitoring.model.ProposalStateBlock; import bisq.core.dao.monitoring.model.ProposalStateHash; import bisq.core.dao.monitoring.network.ProposalStateNetworkService; +import bisq.core.dao.monitoring.network.StateNetworkService; import bisq.core.dao.monitoring.network.messages.GetProposalStateHashesRequest; import bisq.core.dao.monitoring.network.messages.NewProposalStateHashMessage; import bisq.core.dao.state.DaoStateListener; @@ -232,6 +233,10 @@ public class ProposalStateMonitoringService implements DaoSetupService, DaoState proposalStateNetworkService.requestHashes(genesisTxInfo.getGenesisBlockHeight(), peersAddress); } + public void addResponseListener(StateNetworkService.ResponseListener responseListener) { + proposalStateNetworkService.addResponseListener(responseListener); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Listeners @@ -294,7 +299,9 @@ public class ProposalStateMonitoringService implements DaoSetupService, DaoState return true; } - private boolean processPeersProposalStateHash(ProposalStateHash proposalStateHash, Optional peersNodeAddress, boolean notifyListeners) { + private boolean processPeersProposalStateHash(ProposalStateHash proposalStateHash, + Optional peersNodeAddress, + boolean notifyListeners) { AtomicBoolean changed = new AtomicBoolean(false); AtomicBoolean inConflictWithNonSeedNode = new AtomicBoolean(this.isInConflictWithNonSeedNode); AtomicBoolean inConflictWithSeedNode = new AtomicBoolean(this.isInConflictWithSeedNode); diff --git a/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java b/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java index 2df00fc94f..af3e4eedae 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java @@ -29,10 +29,16 @@ import bisq.network.p2p.network.NetworkNode; import bisq.network.p2p.peers.Broadcaster; import bisq.network.p2p.peers.PeerManager; +import bisq.common.UserThread; import bisq.common.proto.network.NetworkEnvelope; import javax.inject.Inject; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; + import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,6 +48,8 @@ import java.util.concurrent.CopyOnWriteArrayList; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; + import javax.annotation.Nullable; @Slf4j @@ -59,6 +67,12 @@ public abstract class StateNetworkService stateHashes, Optional peersNodeAddress); } + public interface ResponseListener { + void onSuccess(int serializedSize); + + void onFault(); + } + protected final NetworkNode networkNode; protected final PeerManager peerManager; private final Broadcaster broadcaster; @@ -67,6 +81,7 @@ public abstract class StateNetworkService requestStateHashHandlerMap = new HashMap<>(); private final List> listeners = new CopyOnWriteArrayList<>(); private boolean messageListenerAdded; + private final List responseListeners = new CopyOnWriteArrayList<>(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -145,7 +160,20 @@ public abstract class StateNetworkService future = networkNode.sendMessage(connection, getStateHashesResponse); + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(Connection connection) { + UserThread.execute(() -> responseListeners.forEach(listeners -> listeners.onSuccess(getStateHashesResponse.toProtoMessage().getSerializedSize())) + ); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + UserThread.execute(() -> responseListeners.forEach(StateNetworkService.ResponseListener::onFault) + ); + } + }, MoreExecutors.directExecutor()); } public void requestHashesFromAllConnectedSeedNodes(int fromHeight) { @@ -171,6 +199,10 @@ public abstract class StateNetworkService getBlocksRequestHandlers = new HashMap<>(); private boolean stopped; + private final List responseListeners = new CopyOnWriteArrayList<>(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -107,6 +116,10 @@ public class FullNodeNetworkService implements MessageListener, PeerManager.List broadcaster.broadcast(newBlockBroadcastMessage, networkNode.getNodeAddress()); } + public void addResponseListener(ResponseListener responseListener) { + responseListeners.add(responseListener); + } + /////////////////////////////////////////////////////////////////////////////////////////// // PeerManager.Listener implementation @@ -166,8 +179,10 @@ public class FullNodeNetworkService implements MessageListener, PeerManager.List daoStateService, new GetBlocksRequestHandler.Listener() { @Override - public void onComplete() { + public void onComplete(int serializedSize) { getBlocksRequestHandlers.remove(uid); + + responseListeners.forEach(listener -> listener.onSuccess(serializedSize)); } @Override @@ -179,6 +194,8 @@ public class FullNodeNetworkService implements MessageListener, PeerManager.List if (connection != null) { peerManager.handleConnectionFault(connection); } + + responseListeners.forEach(ResponseListener::onFault); } else { log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call."); } diff --git a/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java b/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java index de8fad9f8b..411d49de4f 100644 --- a/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java +++ b/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java @@ -57,7 +57,7 @@ class GetBlocksRequestHandler { /////////////////////////////////////////////////////////////////////////////////////////// public interface Listener { - void onComplete(); + void onComplete(int serializedSize); void onFault(String errorMessage, Connection connection); } @@ -120,7 +120,7 @@ class GetBlocksRequestHandler { log.info("Send DataResponse to {} succeeded. getBlocksResponse.getBlocks().size()={}", connection.getPeersNodeAddressOptional(), getBlocksResponse.getBlocks().size()); cleanup(); - listener.onComplete(); + listener.onComplete(getBlocksResponse.toProtoNetworkEnvelope().getSerializedSize()); } else { log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call."); } diff --git a/p2p/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java index e512ecb7c5..c000449b90 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java @@ -50,7 +50,7 @@ public class GetDataRequestHandler { /////////////////////////////////////////////////////////////////////////////////////////// public interface Listener { - void onComplete(); + void onComplete(int serializedSize); void onFault(String errorMessage, Connection connection); } @@ -126,8 +126,8 @@ public class GetDataRequestHandler { if (!stopped) { log.trace("Send DataResponse to {} succeeded. getDataResponse={}", connection.getPeersNodeAddressOptional(), getDataResponse); + listener.onComplete(getDataResponse.toProtoNetworkEnvelope().getSerializedSize()); cleanup(); - listener.onComplete(); } else { log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call."); } diff --git a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java index 8225c5cb07..53fd181dd8 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java @@ -43,6 +43,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -63,6 +64,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener, private static int NUM_ADDITIONAL_SEEDS_FOR_UPDATE_REQUEST = 1; private boolean isPreliminaryDataRequest = true; + /////////////////////////////////////////////////////////////////////////////////////////// // Listener /////////////////////////////////////////////////////////////////////////////////////////// @@ -81,6 +83,12 @@ public class RequestDataManager implements MessageListener, ConnectionListener, } } + public interface ResponseListener { + void onSuccess(int serializedSize); + + void onFault(); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Class fields @@ -90,6 +98,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener, private final P2PDataStorage dataStorage; private final PeerManager peerManager; private final List seedNodeAddresses; + private final List responseListeners = new CopyOnWriteArrayList<>(); // As we use Guice injection we cannot set the listener in our constructor but the P2PService calls the setListener // in it's constructor so we can guarantee it is not null. @@ -205,6 +214,10 @@ public class RequestDataManager implements MessageListener, ConnectionListener, return nodeAddressOfPreliminaryDataRequest; } + public void addResponseListener(ResponseListener responseListener) { + responseListeners.add(responseListener); + } + /////////////////////////////////////////////////////////////////////////////////////////// // ConnectionListener implementation @@ -276,9 +289,11 @@ public class RequestDataManager implements MessageListener, ConnectionListener, GetDataRequestHandler getDataRequestHandler = new GetDataRequestHandler(networkNode, dataStorage, new GetDataRequestHandler.Listener() { @Override - public void onComplete() { + public void onComplete(int serializedSize) { getDataRequestHandlers.remove(uid); log.trace("requestDataHandshake completed.\n\tConnection={}", connection); + + responseListeners.forEach(listener -> listener.onSuccess(serializedSize)); } @Override @@ -288,6 +303,8 @@ public class RequestDataManager implements MessageListener, ConnectionListener, log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" + "ErrorMessage={}", connection, errorMessage); peerManager.handleConnectionFault(connection); + + responseListeners.forEach(ResponseListener::onFault); } else { log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call."); }