From 5f9d3d1f0dc977beb4893a73a6f29683625bbf86 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Tue, 13 Oct 2020 23:30:39 -0500 Subject: [PATCH 1/2] Add GetInventory messages This will be used for monitoring seed nodes. Instead of requesting all data (we cannot request all in fact as it is too large) we request the number of items the node has. This code will not have any impact atm. It will be triggered once a new monitor module gets added which will send the GetInventoryRequest to the seeds. --- .../network/CoreNetworkProtoResolver.java | 7 ++ .../java/bisq/network/p2p/P2PService.java | 13 ++- .../inventory/GetInventoryRequestHandler.java | 81 +++++++++++++++++++ .../inventory/GetInventoryRequestManager.java | 61 ++++++++++++++ .../p2p/inventory/GetInventoryRequester.java | 66 +++++++++++++++ .../messages/GetInventoryRequest.java | 55 +++++++++++++ .../messages/GetInventoryResponse.java | 56 +++++++++++++ .../network/p2p/storage/P2PDataStorage.java | 2 +- proto/src/main/proto/pb.proto | 13 +++ 9 files changed, 352 insertions(+), 2 deletions(-) create mode 100644 p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequestHandler.java create mode 100644 p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequestManager.java create mode 100644 p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequester.java create mode 100644 p2p/src/main/java/bisq/network/p2p/inventory/messages/GetInventoryRequest.java create mode 100644 p2p/src/main/java/bisq/network/p2p/inventory/messages/GetInventoryResponse.java diff --git a/core/src/main/java/bisq/core/proto/network/CoreNetworkProtoResolver.java b/core/src/main/java/bisq/core/proto/network/CoreNetworkProtoResolver.java index 675011cec3..afa177c673 100644 --- a/core/src/main/java/bisq/core/proto/network/CoreNetworkProtoResolver.java +++ b/core/src/main/java/bisq/core/proto/network/CoreNetworkProtoResolver.java @@ -64,6 +64,8 @@ import bisq.network.p2p.AckMessage; import bisq.network.p2p.BundleOfEnvelopes; import bisq.network.p2p.CloseConnectionMessage; import bisq.network.p2p.PrefixedSealedAndSignedMessage; +import bisq.network.p2p.inventory.messages.GetInventoryRequest; +import bisq.network.p2p.inventory.messages.GetInventoryResponse; import bisq.network.p2p.peers.getdata.messages.GetDataResponse; import bisq.network.p2p.peers.getdata.messages.GetUpdatedDataRequest; import bisq.network.p2p.peers.getdata.messages.PreliminaryGetDataRequest; @@ -224,6 +226,11 @@ public class CoreNetworkProtoResolver extends CoreProtoResolver implements Netwo case BUNDLE_OF_ENVELOPES: return BundleOfEnvelopes.fromProto(proto.getBundleOfEnvelopes(), this, messageVersion); + case GET_INVENTORY_REQUEST: + return GetInventoryRequest.fromProto(proto.getGetInventoryRequest(), messageVersion); + case GET_INVENTORY_RESPONSE: + return GetInventoryResponse.fromProto(proto.getGetInventoryResponse(), messageVersion); + default: throw new ProtobufferException("Unknown proto message case (PB.NetworkEnvelope). messageCase=" + proto.getMessageCase() + "; proto raw data=" + proto.toString()); diff --git a/p2p/src/main/java/bisq/network/p2p/P2PService.java b/p2p/src/main/java/bisq/network/p2p/P2PService.java index 8716c81d9d..7639612546 100644 --- a/p2p/src/main/java/bisq/network/p2p/P2PService.java +++ b/p2p/src/main/java/bisq/network/p2p/P2PService.java @@ -19,6 +19,8 @@ package bisq.network.p2p; import bisq.network.Socks5ProxyProvider; import bisq.network.crypto.EncryptionService; +import bisq.network.p2p.inventory.GetInventoryRequestHandler; +import bisq.network.p2p.inventory.GetInventoryRequestManager; import bisq.network.p2p.messaging.DecryptedMailboxListener; import bisq.network.p2p.network.CloseConnectionReason; import bisq.network.p2p.network.Connection; @@ -111,6 +113,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis private final SeedNodeRepository seedNodeRepository; private final EncryptionService encryptionService; private final KeyRing keyRing; + private final GetInventoryRequestHandler getInventoryRequestHandler; + private final GetInventoryRequestManager getInventoryRequestManager; private final NetworkNode networkNode; private final PeerManager peerManager; @@ -157,7 +161,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis SeedNodeRepository seedNodeRepository, Socks5ProxyProvider socks5ProxyProvider, EncryptionService encryptionService, - KeyRing keyRing) { + KeyRing keyRing, + GetInventoryRequestHandler getInventoryRequestHandler, + GetInventoryRequestManager getInventoryRequestManager) { this.networkNode = networkNode; this.peerManager = peerManager; this.p2PDataStorage = p2PDataStorage; @@ -169,6 +175,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis this.socks5ProxyProvider = socks5ProxyProvider; this.encryptionService = encryptionService; this.keyRing = keyRing; + this.getInventoryRequestHandler = getInventoryRequestHandler; + this.getInventoryRequestManager = getInventoryRequestManager; this.networkNode.addConnectionListener(this); this.networkNode.addMessageListener(this); @@ -259,6 +267,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis } else { shutDownResultHandlers.forEach(Runnable::run); } + + getInventoryRequestHandler.shutDown(); + getInventoryRequestManager.shutDown(); } diff --git a/p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequestHandler.java b/p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequestHandler.java new file mode 100644 index 0000000000..8695e71b37 --- /dev/null +++ b/p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequestHandler.java @@ -0,0 +1,81 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.inventory; + +import bisq.network.p2p.inventory.messages.GetInventoryRequest; +import bisq.network.p2p.inventory.messages.GetInventoryResponse; +import bisq.network.p2p.network.Connection; +import bisq.network.p2p.network.MessageListener; +import bisq.network.p2p.network.NetworkNode; +import bisq.network.p2p.storage.P2PDataStorage; +import bisq.network.p2p.storage.payload.ProtectedStorageEntry; + +import bisq.common.proto.network.NetworkEnvelope; + +import javax.inject.Inject; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class GetInventoryRequestHandler implements MessageListener { + private final NetworkNode networkNode; + private final P2PDataStorage p2PDataStorage; + + @Inject + public GetInventoryRequestHandler(NetworkNode networkNode, P2PDataStorage p2PDataStorage) { + this.networkNode = networkNode; + this.p2PDataStorage = p2PDataStorage; + networkNode.addMessageListener(this); + } + + @Override + public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { + if (networkEnvelope instanceof GetInventoryRequest) { + GetInventoryRequest getInventoryRequest = (GetInventoryRequest) networkEnvelope; + + Map numPayloadsByClassName = new HashMap<>(); + p2PDataStorage.getMapForDataResponse(getInventoryRequest.getVersion()).values().stream() + .map(e -> e.getClass().getSimpleName()) + .forEach(className -> { + numPayloadsByClassName.putIfAbsent(className, 0); + int prev = numPayloadsByClassName.get(className); + numPayloadsByClassName.put(className, prev + 1); + }); + p2PDataStorage.getMap().values().stream() + .map(ProtectedStorageEntry::getProtectedStoragePayload) + .filter(Objects::nonNull) + .map(e -> e.getClass().getSimpleName()) + .forEach(className -> { + numPayloadsByClassName.putIfAbsent(className, 0); + int prev = numPayloadsByClassName.get(className); + numPayloadsByClassName.put(className, prev + 1); + }); + + GetInventoryResponse getInventoryResponse = new GetInventoryResponse(numPayloadsByClassName); + networkNode.sendMessage(connection, getInventoryResponse); + } + } + + public void shutDown() { + networkNode.removeMessageListener(this); + } +} diff --git a/p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequestManager.java b/p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequestManager.java new file mode 100644 index 0000000000..dcf88b463a --- /dev/null +++ b/p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequestManager.java @@ -0,0 +1,61 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.inventory; + +import bisq.network.p2p.NodeAddress; +import bisq.network.p2p.network.NetworkNode; + +import javax.inject.Inject; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class GetInventoryRequestManager { + private final NetworkNode networkNode; + private final Map requesterMap = new HashMap<>(); + + @Inject + public GetInventoryRequestManager(NetworkNode networkNode) { + this.networkNode = networkNode; + } + + public void request(NodeAddress nodeAddress, Consumer> resultHandler) { + if (requesterMap.containsKey(nodeAddress)) { + log.warn("There is already an open request pending"); + return; + } + + GetInventoryRequester getInventoryRequester = new GetInventoryRequester(networkNode, + nodeAddress, + resultMap -> { + requesterMap.remove(nodeAddress); + resultHandler.accept(resultMap); + }); + requesterMap.put(nodeAddress, getInventoryRequester); + getInventoryRequester.request(); + } + + public void shutDown() { + requesterMap.values().forEach(GetInventoryRequester::shutDown); + requesterMap.clear(); + } +} diff --git a/p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequester.java b/p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequester.java new file mode 100644 index 0000000000..df07367526 --- /dev/null +++ b/p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequester.java @@ -0,0 +1,66 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.inventory; + +import bisq.network.p2p.NodeAddress; +import bisq.network.p2p.inventory.messages.GetInventoryRequest; +import bisq.network.p2p.inventory.messages.GetInventoryResponse; +import bisq.network.p2p.network.Connection; +import bisq.network.p2p.network.MessageListener; +import bisq.network.p2p.network.NetworkNode; + +import bisq.common.app.Version; +import bisq.common.proto.network.NetworkEnvelope; + +import java.util.Map; +import java.util.function.Consumer; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class GetInventoryRequester implements MessageListener { + private final NetworkNode networkNode; + private final NodeAddress nodeAddress; + private final Consumer> resultHandler; + + public GetInventoryRequester(NetworkNode networkNode, + NodeAddress nodeAddress, + Consumer> resultHandler) { + this.networkNode = networkNode; + this.nodeAddress = nodeAddress; + this.resultHandler = resultHandler; + networkNode.addMessageListener(this); + } + + public void request() { + networkNode.sendMessage(nodeAddress, new GetInventoryRequest(Version.VERSION)); + } + + @Override + public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { + if (networkEnvelope instanceof GetInventoryResponse) { + GetInventoryResponse getInventoryResponse = (GetInventoryResponse) networkEnvelope; + resultHandler.accept(getInventoryResponse.getNumPayloadsMap()); + shutDown(); + } + } + + public void shutDown() { + networkNode.removeMessageListener(this); + } +} diff --git a/p2p/src/main/java/bisq/network/p2p/inventory/messages/GetInventoryRequest.java b/p2p/src/main/java/bisq/network/p2p/inventory/messages/GetInventoryRequest.java new file mode 100644 index 0000000000..dc26d37774 --- /dev/null +++ b/p2p/src/main/java/bisq/network/p2p/inventory/messages/GetInventoryRequest.java @@ -0,0 +1,55 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.inventory.messages; + + +import bisq.common.app.Version; +import bisq.common.proto.network.NetworkEnvelope; + +import lombok.Value; + +@Value +public class GetInventoryRequest extends NetworkEnvelope { + private final String version; + + public GetInventoryRequest(String version) { + this(version, Version.getP2PMessageVersion()); + } + + /////////////////////////////////////////////////////////////////////////////////////////// + // PROTO BUFFER + /////////////////////////////////////////////////////////////////////////////////////////// + + private GetInventoryRequest(String version, int messageVersion) { + super(messageVersion); + + this.version = version; + } + + @Override + public protobuf.NetworkEnvelope toProtoNetworkEnvelope() { + return getNetworkEnvelopeBuilder() + .setGetInventoryRequest(protobuf.GetInventoryRequest.newBuilder() + .setVersion(version)) + .build(); + } + + public static GetInventoryRequest fromProto(protobuf.GetInventoryRequest proto, int messageVersion) { + return new GetInventoryRequest(proto.getVersion(), messageVersion); + } +} diff --git a/p2p/src/main/java/bisq/network/p2p/inventory/messages/GetInventoryResponse.java b/p2p/src/main/java/bisq/network/p2p/inventory/messages/GetInventoryResponse.java new file mode 100644 index 0000000000..45b5fbf992 --- /dev/null +++ b/p2p/src/main/java/bisq/network/p2p/inventory/messages/GetInventoryResponse.java @@ -0,0 +1,56 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.inventory.messages; + +import bisq.common.app.Version; +import bisq.common.proto.network.NetworkEnvelope; + +import java.util.Map; + +import lombok.Value; + +@Value +public class GetInventoryResponse extends NetworkEnvelope { + private final Map numPayloadsMap; + + public GetInventoryResponse(Map numPayloadsMap) { + this(numPayloadsMap, Version.getP2PMessageVersion()); + } + + /////////////////////////////////////////////////////////////////////////////////////////// + // PROTO BUFFER + /////////////////////////////////////////////////////////////////////////////////////////// + + private GetInventoryResponse(Map numPayloadsMap, int messageVersion) { + super(messageVersion); + + this.numPayloadsMap = numPayloadsMap; + } + + @Override + public protobuf.NetworkEnvelope toProtoNetworkEnvelope() { + return getNetworkEnvelopeBuilder() + .setGetInventoryResponse(protobuf.GetInventoryResponse.newBuilder() + .putAllNumPayloadsMap(numPayloadsMap)) + .build(); + } + + public static GetInventoryResponse fromProto(protobuf.GetInventoryResponse proto, int messageVersion) { + return new GetInventoryResponse(proto.getNumPayloadsMapMap(), messageVersion); + } +} diff --git a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java index 7b9e3714ce..5453c42723 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java @@ -294,7 +294,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers return map; } - private Map getMapForDataResponse(String requestersVersion) { + public Map getMapForDataResponse(String requestersVersion) { Map map = new HashMap<>(); appendOnlyDataStoreService.getServices() .forEach(service -> { diff --git a/proto/src/main/proto/pb.proto b/proto/src/main/proto/pb.proto index 8d7b152bd2..8323a58c4a 100644 --- a/proto/src/main/proto/pb.proto +++ b/proto/src/main/proto/pb.proto @@ -78,6 +78,9 @@ message NetworkEnvelope { RefreshTradeStateRequest refresh_trade_state_request = 50 [deprecated = true]; TraderSignedWitnessMessage trader_signed_witness_message = 51 [deprecated = true]; + + GetInventoryRequest get_inventory_request = 52; + GetInventoryResponse get_inventory_response = 53; } } @@ -137,6 +140,16 @@ message Pong { int32 request_nonce = 1; } +// Inventory + +message GetInventoryRequest { + string version = 1; +} + +message GetInventoryResponse { + map num_payloads_map = 1; +} + // offer message OfferAvailabilityRequest { From 32b953b61aeb01cbf949d5ffd2c17c2064344081 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Wed, 14 Oct 2020 02:37:00 -0500 Subject: [PATCH 2/2] Improve GetInventoryRequester and GetInventoryRequestManager --- .../inventory/GetInventoryRequestManager.java | 12 +++++-- .../p2p/inventory/GetInventoryRequester.java | 33 ++++++++++++++++--- 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequestManager.java b/p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequestManager.java index dcf88b463a..755f2912c0 100644 --- a/p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequestManager.java +++ b/p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequestManager.java @@ -20,6 +20,8 @@ package bisq.network.p2p.inventory; import bisq.network.p2p.NodeAddress; import bisq.network.p2p.network.NetworkNode; +import bisq.common.handlers.ErrorMessageHandler; + import javax.inject.Inject; import java.util.HashMap; @@ -38,9 +40,11 @@ public class GetInventoryRequestManager { this.networkNode = networkNode; } - public void request(NodeAddress nodeAddress, Consumer> resultHandler) { + public void request(NodeAddress nodeAddress, + Consumer> resultHandler, + ErrorMessageHandler errorMessageHandler) { if (requesterMap.containsKey(nodeAddress)) { - log.warn("There is already an open request pending"); + log.warn("There is still an open request pending for {}", nodeAddress.getFullAddress()); return; } @@ -49,6 +53,10 @@ public class GetInventoryRequestManager { resultMap -> { requesterMap.remove(nodeAddress); resultHandler.accept(resultMap); + }, + errorMessage -> { + requesterMap.remove(nodeAddress); + errorMessageHandler.handleErrorMessage(errorMessage); }); requesterMap.put(nodeAddress, getInventoryRequester); getInventoryRequester.request(); diff --git a/p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequester.java b/p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequester.java index df07367526..b43f1d3789 100644 --- a/p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequester.java +++ b/p2p/src/main/java/bisq/network/p2p/inventory/GetInventoryRequester.java @@ -24,7 +24,10 @@ import bisq.network.p2p.network.Connection; import bisq.network.p2p.network.MessageListener; import bisq.network.p2p.network.NetworkNode; +import bisq.common.Timer; +import bisq.common.UserThread; import bisq.common.app.Version; +import bisq.common.handlers.ErrorMessageHandler; import bisq.common.proto.network.NetworkEnvelope; import java.util.Map; @@ -34,33 +37,53 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class GetInventoryRequester implements MessageListener { + private final static int TIMEOUT_SEC = 90; + private final NetworkNode networkNode; private final NodeAddress nodeAddress; private final Consumer> resultHandler; + private final ErrorMessageHandler errorMessageHandler; + private Timer timer; public GetInventoryRequester(NetworkNode networkNode, NodeAddress nodeAddress, - Consumer> resultHandler) { + Consumer> resultHandler, + ErrorMessageHandler errorMessageHandler) { this.networkNode = networkNode; this.nodeAddress = nodeAddress; this.resultHandler = resultHandler; - networkNode.addMessageListener(this); + this.errorMessageHandler = errorMessageHandler; } public void request() { + networkNode.addMessageListener(this); + timer = UserThread.runAfter(this::onTimeOut, TIMEOUT_SEC); networkNode.sendMessage(nodeAddress, new GetInventoryRequest(Version.VERSION)); } + private void onTimeOut() { + errorMessageHandler.handleErrorMessage("Timeout got triggered (" + TIMEOUT_SEC + " sec)"); + shutDown(); + } + @Override public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { if (networkEnvelope instanceof GetInventoryResponse) { - GetInventoryResponse getInventoryResponse = (GetInventoryResponse) networkEnvelope; - resultHandler.accept(getInventoryResponse.getNumPayloadsMap()); - shutDown(); + connection.getPeersNodeAddressOptional().ifPresent(peer -> { + if (peer.equals(nodeAddress)) { + GetInventoryResponse getInventoryResponse = (GetInventoryResponse) networkEnvelope; + resultHandler.accept(getInventoryResponse.getNumPayloadsMap()); + shutDown(); + } + }); } } public void shutDown() { + if (timer != null) { + timer.stop(); + timer = null; + } networkNode.removeMessageListener(this); } }