Merge pull request #4643 from chimp1984/add-get-inventory-msg

Add GetInventory messages
This commit is contained in:
sqrrm 2020-10-14 12:22:00 +02:00 committed by GitHub
commit c41bfd7164
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 383 additions and 2 deletions

View File

@ -64,6 +64,8 @@ import bisq.network.p2p.AckMessage;
import bisq.network.p2p.BundleOfEnvelopes;
import bisq.network.p2p.CloseConnectionMessage;
import bisq.network.p2p.PrefixedSealedAndSignedMessage;
import bisq.network.p2p.inventory.messages.GetInventoryRequest;
import bisq.network.p2p.inventory.messages.GetInventoryResponse;
import bisq.network.p2p.peers.getdata.messages.GetDataResponse;
import bisq.network.p2p.peers.getdata.messages.GetUpdatedDataRequest;
import bisq.network.p2p.peers.getdata.messages.PreliminaryGetDataRequest;
@ -224,6 +226,11 @@ public class CoreNetworkProtoResolver extends CoreProtoResolver implements Netwo
case BUNDLE_OF_ENVELOPES:
return BundleOfEnvelopes.fromProto(proto.getBundleOfEnvelopes(), this, messageVersion);
case GET_INVENTORY_REQUEST:
return GetInventoryRequest.fromProto(proto.getGetInventoryRequest(), messageVersion);
case GET_INVENTORY_RESPONSE:
return GetInventoryResponse.fromProto(proto.getGetInventoryResponse(), messageVersion);
default:
throw new ProtobufferException("Unknown proto message case (PB.NetworkEnvelope). messageCase=" +
proto.getMessageCase() + "; proto raw data=" + proto.toString());

View File

@ -19,6 +19,8 @@ package bisq.network.p2p;
import bisq.network.Socks5ProxyProvider;
import bisq.network.crypto.EncryptionService;
import bisq.network.p2p.inventory.GetInventoryRequestHandler;
import bisq.network.p2p.inventory.GetInventoryRequestManager;
import bisq.network.p2p.messaging.DecryptedMailboxListener;
import bisq.network.p2p.network.CloseConnectionReason;
import bisq.network.p2p.network.Connection;
@ -111,6 +113,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private final SeedNodeRepository seedNodeRepository;
private final EncryptionService encryptionService;
private final KeyRing keyRing;
private final GetInventoryRequestHandler getInventoryRequestHandler;
private final GetInventoryRequestManager getInventoryRequestManager;
private final NetworkNode networkNode;
private final PeerManager peerManager;
@ -157,7 +161,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
SeedNodeRepository seedNodeRepository,
Socks5ProxyProvider socks5ProxyProvider,
EncryptionService encryptionService,
KeyRing keyRing) {
KeyRing keyRing,
GetInventoryRequestHandler getInventoryRequestHandler,
GetInventoryRequestManager getInventoryRequestManager) {
this.networkNode = networkNode;
this.peerManager = peerManager;
this.p2PDataStorage = p2PDataStorage;
@ -169,6 +175,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
this.socks5ProxyProvider = socks5ProxyProvider;
this.encryptionService = encryptionService;
this.keyRing = keyRing;
this.getInventoryRequestHandler = getInventoryRequestHandler;
this.getInventoryRequestManager = getInventoryRequestManager;
this.networkNode.addConnectionListener(this);
this.networkNode.addMessageListener(this);
@ -259,6 +267,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
} else {
shutDownResultHandlers.forEach(Runnable::run);
}
getInventoryRequestHandler.shutDown();
getInventoryRequestManager.shutDown();
}

View File

@ -0,0 +1,81 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.network.p2p.inventory;
import bisq.network.p2p.inventory.messages.GetInventoryRequest;
import bisq.network.p2p.inventory.messages.GetInventoryResponse;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.MessageListener;
import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import bisq.common.proto.network.NetworkEnvelope;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class GetInventoryRequestHandler implements MessageListener {
private final NetworkNode networkNode;
private final P2PDataStorage p2PDataStorage;
@Inject
public GetInventoryRequestHandler(NetworkNode networkNode, P2PDataStorage p2PDataStorage) {
this.networkNode = networkNode;
this.p2PDataStorage = p2PDataStorage;
networkNode.addMessageListener(this);
}
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof GetInventoryRequest) {
GetInventoryRequest getInventoryRequest = (GetInventoryRequest) networkEnvelope;
Map<String, Integer> numPayloadsByClassName = new HashMap<>();
p2PDataStorage.getMapForDataResponse(getInventoryRequest.getVersion()).values().stream()
.map(e -> e.getClass().getSimpleName())
.forEach(className -> {
numPayloadsByClassName.putIfAbsent(className, 0);
int prev = numPayloadsByClassName.get(className);
numPayloadsByClassName.put(className, prev + 1);
});
p2PDataStorage.getMap().values().stream()
.map(ProtectedStorageEntry::getProtectedStoragePayload)
.filter(Objects::nonNull)
.map(e -> e.getClass().getSimpleName())
.forEach(className -> {
numPayloadsByClassName.putIfAbsent(className, 0);
int prev = numPayloadsByClassName.get(className);
numPayloadsByClassName.put(className, prev + 1);
});
GetInventoryResponse getInventoryResponse = new GetInventoryResponse(numPayloadsByClassName);
networkNode.sendMessage(connection, getInventoryResponse);
}
}
public void shutDown() {
networkNode.removeMessageListener(this);
}
}

View File

@ -0,0 +1,69 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.network.p2p.inventory;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.NetworkNode;
import bisq.common.handlers.ErrorMessageHandler;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class GetInventoryRequestManager {
private final NetworkNode networkNode;
private final Map<NodeAddress, GetInventoryRequester> requesterMap = new HashMap<>();
@Inject
public GetInventoryRequestManager(NetworkNode networkNode) {
this.networkNode = networkNode;
}
public void request(NodeAddress nodeAddress,
Consumer<Map<String, Integer>> resultHandler,
ErrorMessageHandler errorMessageHandler) {
if (requesterMap.containsKey(nodeAddress)) {
log.warn("There is still an open request pending for {}", nodeAddress.getFullAddress());
return;
}
GetInventoryRequester getInventoryRequester = new GetInventoryRequester(networkNode,
nodeAddress,
resultMap -> {
requesterMap.remove(nodeAddress);
resultHandler.accept(resultMap);
},
errorMessage -> {
requesterMap.remove(nodeAddress);
errorMessageHandler.handleErrorMessage(errorMessage);
});
requesterMap.put(nodeAddress, getInventoryRequester);
getInventoryRequester.request();
}
public void shutDown() {
requesterMap.values().forEach(GetInventoryRequester::shutDown);
requesterMap.clear();
}
}

View File

@ -0,0 +1,89 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.network.p2p.inventory;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.inventory.messages.GetInventoryRequest;
import bisq.network.p2p.inventory.messages.GetInventoryResponse;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.MessageListener;
import bisq.network.p2p.network.NetworkNode;
import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.app.Version;
import bisq.common.handlers.ErrorMessageHandler;
import bisq.common.proto.network.NetworkEnvelope;
import java.util.Map;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class GetInventoryRequester implements MessageListener {
private final static int TIMEOUT_SEC = 90;
private final NetworkNode networkNode;
private final NodeAddress nodeAddress;
private final Consumer<Map<String, Integer>> resultHandler;
private final ErrorMessageHandler errorMessageHandler;
private Timer timer;
public GetInventoryRequester(NetworkNode networkNode,
NodeAddress nodeAddress,
Consumer<Map<String, Integer>> resultHandler,
ErrorMessageHandler errorMessageHandler) {
this.networkNode = networkNode;
this.nodeAddress = nodeAddress;
this.resultHandler = resultHandler;
this.errorMessageHandler = errorMessageHandler;
}
public void request() {
networkNode.addMessageListener(this);
timer = UserThread.runAfter(this::onTimeOut, TIMEOUT_SEC);
networkNode.sendMessage(nodeAddress, new GetInventoryRequest(Version.VERSION));
}
private void onTimeOut() {
errorMessageHandler.handleErrorMessage("Timeout got triggered (" + TIMEOUT_SEC + " sec)");
shutDown();
}
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof GetInventoryResponse) {
connection.getPeersNodeAddressOptional().ifPresent(peer -> {
if (peer.equals(nodeAddress)) {
GetInventoryResponse getInventoryResponse = (GetInventoryResponse) networkEnvelope;
resultHandler.accept(getInventoryResponse.getNumPayloadsMap());
shutDown();
}
});
}
}
public void shutDown() {
if (timer != null) {
timer.stop();
timer = null;
}
networkNode.removeMessageListener(this);
}
}

View File

@ -0,0 +1,55 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.network.p2p.inventory.messages;
import bisq.common.app.Version;
import bisq.common.proto.network.NetworkEnvelope;
import lombok.Value;
@Value
public class GetInventoryRequest extends NetworkEnvelope {
private final String version;
public GetInventoryRequest(String version) {
this(version, Version.getP2PMessageVersion());
}
///////////////////////////////////////////////////////////////////////////////////////////
// PROTO BUFFER
///////////////////////////////////////////////////////////////////////////////////////////
private GetInventoryRequest(String version, int messageVersion) {
super(messageVersion);
this.version = version;
}
@Override
public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
return getNetworkEnvelopeBuilder()
.setGetInventoryRequest(protobuf.GetInventoryRequest.newBuilder()
.setVersion(version))
.build();
}
public static GetInventoryRequest fromProto(protobuf.GetInventoryRequest proto, int messageVersion) {
return new GetInventoryRequest(proto.getVersion(), messageVersion);
}
}

View File

@ -0,0 +1,56 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.network.p2p.inventory.messages;
import bisq.common.app.Version;
import bisq.common.proto.network.NetworkEnvelope;
import java.util.Map;
import lombok.Value;
@Value
public class GetInventoryResponse extends NetworkEnvelope {
private final Map<String, Integer> numPayloadsMap;
public GetInventoryResponse(Map<String, Integer> numPayloadsMap) {
this(numPayloadsMap, Version.getP2PMessageVersion());
}
///////////////////////////////////////////////////////////////////////////////////////////
// PROTO BUFFER
///////////////////////////////////////////////////////////////////////////////////////////
private GetInventoryResponse(Map<String, Integer> numPayloadsMap, int messageVersion) {
super(messageVersion);
this.numPayloadsMap = numPayloadsMap;
}
@Override
public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
return getNetworkEnvelopeBuilder()
.setGetInventoryResponse(protobuf.GetInventoryResponse.newBuilder()
.putAllNumPayloadsMap(numPayloadsMap))
.build();
}
public static GetInventoryResponse fromProto(protobuf.GetInventoryResponse proto, int messageVersion) {
return new GetInventoryResponse(proto.getNumPayloadsMapMap(), messageVersion);
}
}

View File

@ -294,7 +294,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
return map;
}
private Map<ByteArray, PersistableNetworkPayload> getMapForDataResponse(String requestersVersion) {
public Map<ByteArray, PersistableNetworkPayload> getMapForDataResponse(String requestersVersion) {
Map<ByteArray, PersistableNetworkPayload> map = new HashMap<>();
appendOnlyDataStoreService.getServices()
.forEach(service -> {

View File

@ -78,6 +78,9 @@ message NetworkEnvelope {
RefreshTradeStateRequest refresh_trade_state_request = 50 [deprecated = true];
TraderSignedWitnessMessage trader_signed_witness_message = 51 [deprecated = true];
GetInventoryRequest get_inventory_request = 52;
GetInventoryResponse get_inventory_response = 53;
}
}
@ -137,6 +140,16 @@ message Pong {
int32 request_nonce = 1;
}
// Inventory
message GetInventoryRequest {
string version = 1;
}
message GetInventoryResponse {
map<string, uint32> num_payloads_map = 1;
}
// offer
message OfferAvailabilityRequest {