Add support for listeners when GetDataResponse and GetBlocksResponse are sent.

Signed-off-by: HenrikJannsen <boilingfrog@gmx.com>
This commit is contained in:
HenrikJannsen 2022-12-16 17:56:01 -05:00
parent 1030f891b9
commit d5b4ce275b
No known key found for this signature in database
GPG Key ID: 02AA2BAE387C8307
8 changed files with 91 additions and 8 deletions

View File

@ -25,6 +25,7 @@ import bisq.core.dao.governance.period.PeriodService;
import bisq.core.dao.monitoring.model.BlindVoteStateBlock;
import bisq.core.dao.monitoring.model.BlindVoteStateHash;
import bisq.core.dao.monitoring.network.BlindVoteStateNetworkService;
import bisq.core.dao.monitoring.network.StateNetworkService;
import bisq.core.dao.monitoring.network.messages.GetBlindVoteStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.NewBlindVoteStateHashMessage;
import bisq.core.dao.state.DaoStateListener;
@ -230,6 +231,10 @@ public class BlindVoteStateMonitoringService implements DaoSetupService, DaoStat
blindVoteStateNetworkService.requestHashes(genesisTxInfo.getGenesisBlockHeight(), peersAddress);
}
public void addResponseListener(StateNetworkService.ResponseListener responseListener) {
blindVoteStateNetworkService.addResponseListener(responseListener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Listeners

View File

@ -23,6 +23,7 @@ import bisq.core.dao.monitoring.model.DaoStateHash;
import bisq.core.dao.monitoring.model.UtxoMismatch;
import bisq.core.dao.monitoring.network.Checkpoint;
import bisq.core.dao.monitoring.network.DaoStateNetworkService;
import bisq.core.dao.monitoring.network.StateNetworkService;
import bisq.core.dao.monitoring.network.messages.GetDaoStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.NewDaoStateHashMessage;
import bisq.core.dao.state.DaoStateListener;
@ -289,6 +290,10 @@ public class DaoStateMonitoringService implements DaoSetupService, DaoStateListe
createSnapshotHandler = handler;
}
public void addResponseListener(StateNetworkService.ResponseListener responseListener) {
daoStateNetworkService.addResponseListener(responseListener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Listeners

View File

@ -24,6 +24,7 @@ import bisq.core.dao.governance.proposal.ProposalService;
import bisq.core.dao.monitoring.model.ProposalStateBlock;
import bisq.core.dao.monitoring.model.ProposalStateHash;
import bisq.core.dao.monitoring.network.ProposalStateNetworkService;
import bisq.core.dao.monitoring.network.StateNetworkService;
import bisq.core.dao.monitoring.network.messages.GetProposalStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.NewProposalStateHashMessage;
import bisq.core.dao.state.DaoStateListener;
@ -232,6 +233,10 @@ public class ProposalStateMonitoringService implements DaoSetupService, DaoState
proposalStateNetworkService.requestHashes(genesisTxInfo.getGenesisBlockHeight(), peersAddress);
}
public void addResponseListener(StateNetworkService.ResponseListener responseListener) {
proposalStateNetworkService.addResponseListener(responseListener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Listeners
@ -294,7 +299,9 @@ public class ProposalStateMonitoringService implements DaoSetupService, DaoState
return true;
}
private boolean processPeersProposalStateHash(ProposalStateHash proposalStateHash, Optional<NodeAddress> peersNodeAddress, boolean notifyListeners) {
private boolean processPeersProposalStateHash(ProposalStateHash proposalStateHash,
Optional<NodeAddress> peersNodeAddress,
boolean notifyListeners) {
AtomicBoolean changed = new AtomicBoolean(false);
AtomicBoolean inConflictWithNonSeedNode = new AtomicBoolean(this.isInConflictWithNonSeedNode);
AtomicBoolean inConflictWithSeedNode = new AtomicBoolean(this.isInConflictWithSeedNode);

View File

@ -29,10 +29,16 @@ import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.peers.Broadcaster;
import bisq.network.p2p.peers.PeerManager;
import bisq.common.UserThread;
import bisq.common.proto.network.NetworkEnvelope;
import javax.inject.Inject;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -42,6 +48,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import javax.annotation.Nullable;
@Slf4j
@ -59,6 +67,12 @@ public abstract class StateNetworkService<Msg extends NewStateHashMessage,
void onPeersStateHashes(List<StH> stateHashes, Optional<NodeAddress> peersNodeAddress);
}
public interface ResponseListener {
void onSuccess(int serializedSize);
void onFault();
}
protected final NetworkNode networkNode;
protected final PeerManager peerManager;
private final Broadcaster broadcaster;
@ -67,6 +81,7 @@ public abstract class StateNetworkService<Msg extends NewStateHashMessage,
private final Map<NodeAddress, Han> requestStateHashHandlerMap = new HashMap<>();
private final List<Listener<Msg, Req, StH>> listeners = new CopyOnWriteArrayList<>();
private boolean messageListenerAdded;
private final List<ResponseListener> responseListeners = new CopyOnWriteArrayList<>();
///////////////////////////////////////////////////////////////////////////////////////////
@ -145,7 +160,20 @@ public abstract class StateNetworkService<Msg extends NewStateHashMessage,
Res getStateHashesResponse = getGetStateHashesResponse(nonce, stateHashes);
log.info("Send {} with {} stateHashes to peer {}", getStateHashesResponse.getClass().getSimpleName(),
stateHashes.size(), connection.getPeersNodeAddressOptional());
networkNode.sendMessage(connection, getStateHashesResponse);
SettableFuture<Connection> future = networkNode.sendMessage(connection, getStateHashesResponse);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
UserThread.execute(() -> responseListeners.forEach(listeners -> listeners.onSuccess(getStateHashesResponse.toProtoMessage().getSerializedSize()))
);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
UserThread.execute(() -> responseListeners.forEach(StateNetworkService.ResponseListener::onFault)
);
}
}, MoreExecutors.directExecutor());
}
public void requestHashesFromAllConnectedSeedNodes(int fromHeight) {
@ -171,6 +199,10 @@ public abstract class StateNetworkService<Msg extends NewStateHashMessage,
return peerManager.isSeedNode(nodeAddress);
}
public void addResponseListener(ResponseListener responseListener) {
responseListeners.add(responseListener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Listeners

View File

@ -37,7 +37,9 @@ import bisq.common.proto.network.NetworkEnvelope;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import lombok.extern.slf4j.Slf4j;
@ -51,6 +53,12 @@ public class FullNodeNetworkService implements MessageListener, PeerManager.List
private static final long CLEANUP_TIMER = 120;
public interface ResponseListener {
void onSuccess(int serializedSize);
void onFault();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Class fields
@ -65,6 +73,7 @@ public class FullNodeNetworkService implements MessageListener, PeerManager.List
// Key is connection UID
private final Map<String, GetBlocksRequestHandler> getBlocksRequestHandlers = new HashMap<>();
private boolean stopped;
private final List<ResponseListener> responseListeners = new CopyOnWriteArrayList<>();
///////////////////////////////////////////////////////////////////////////////////////////
@ -107,6 +116,10 @@ public class FullNodeNetworkService implements MessageListener, PeerManager.List
broadcaster.broadcast(newBlockBroadcastMessage, networkNode.getNodeAddress());
}
public void addResponseListener(ResponseListener responseListener) {
responseListeners.add(responseListener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// PeerManager.Listener implementation
@ -166,8 +179,10 @@ public class FullNodeNetworkService implements MessageListener, PeerManager.List
daoStateService,
new GetBlocksRequestHandler.Listener() {
@Override
public void onComplete() {
public void onComplete(int serializedSize) {
getBlocksRequestHandlers.remove(uid);
responseListeners.forEach(listener -> listener.onSuccess(serializedSize));
}
@Override
@ -179,6 +194,8 @@ public class FullNodeNetworkService implements MessageListener, PeerManager.List
if (connection != null) {
peerManager.handleConnectionFault(connection);
}
responseListeners.forEach(ResponseListener::onFault);
} else {
log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call.");
}

View File

@ -57,7 +57,7 @@ class GetBlocksRequestHandler {
///////////////////////////////////////////////////////////////////////////////////////////
public interface Listener {
void onComplete();
void onComplete(int serializedSize);
void onFault(String errorMessage, Connection connection);
}
@ -120,7 +120,7 @@ class GetBlocksRequestHandler {
log.info("Send DataResponse to {} succeeded. getBlocksResponse.getBlocks().size()={}",
connection.getPeersNodeAddressOptional(), getBlocksResponse.getBlocks().size());
cleanup();
listener.onComplete();
listener.onComplete(getBlocksResponse.toProtoNetworkEnvelope().getSerializedSize());
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call.");
}

View File

@ -50,7 +50,7 @@ public class GetDataRequestHandler {
///////////////////////////////////////////////////////////////////////////////////////////
public interface Listener {
void onComplete();
void onComplete(int serializedSize);
void onFault(String errorMessage, Connection connection);
}
@ -126,8 +126,8 @@ public class GetDataRequestHandler {
if (!stopped) {
log.trace("Send DataResponse to {} succeeded. getDataResponse={}",
connection.getPeersNodeAddressOptional(), getDataResponse);
listener.onComplete(getDataResponse.toProtoNetworkEnvelope().getSerializedSize());
cleanup();
listener.onComplete();
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call.");
}

View File

@ -43,6 +43,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -63,6 +64,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
private static int NUM_ADDITIONAL_SEEDS_FOR_UPDATE_REQUEST = 1;
private boolean isPreliminaryDataRequest = true;
///////////////////////////////////////////////////////////////////////////////////////////
// Listener
///////////////////////////////////////////////////////////////////////////////////////////
@ -81,6 +83,12 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
}
}
public interface ResponseListener {
void onSuccess(int serializedSize);
void onFault();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Class fields
@ -90,6 +98,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
private final P2PDataStorage dataStorage;
private final PeerManager peerManager;
private final List<NodeAddress> seedNodeAddresses;
private final List<ResponseListener> responseListeners = new CopyOnWriteArrayList<>();
// As we use Guice injection we cannot set the listener in our constructor but the P2PService calls the setListener
// in it's constructor so we can guarantee it is not null.
@ -205,6 +214,10 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
return nodeAddressOfPreliminaryDataRequest;
}
public void addResponseListener(ResponseListener responseListener) {
responseListeners.add(responseListener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
@ -276,9 +289,11 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
GetDataRequestHandler getDataRequestHandler = new GetDataRequestHandler(networkNode, dataStorage,
new GetDataRequestHandler.Listener() {
@Override
public void onComplete() {
public void onComplete(int serializedSize) {
getDataRequestHandlers.remove(uid);
log.trace("requestDataHandshake completed.\n\tConnection={}", connection);
responseListeners.forEach(listener -> listener.onSuccess(serializedSize));
}
@Override
@ -288,6 +303,8 @@ public class RequestDataManager implements MessageListener, ConnectionListener,
log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage);
peerManager.handleConnectionFault(connection);
responseListeners.forEach(ResponseListener::onFault);
} else {
log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call.");
}