Remove verification for address prefix

Set address prefix to empty bytes in case we know that peer has capability (updated version)
Batch process mailbox messages in a thread.
Refactor handling of mailbox messages
This commit is contained in:
chimp1984 2020-10-07 11:52:17 -05:00
parent 40f9cfb7c5
commit 8aec306159
No known key found for this signature in database
GPG Key ID: 9801B4EC591F90E3
2 changed files with 150 additions and 118 deletions

View File

@ -46,6 +46,7 @@ import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import bisq.network.p2p.storage.payload.ProtectedStoragePayload;
import bisq.common.UserThread;
import bisq.common.app.Capability;
import bisq.common.crypto.CryptoException;
import bisq.common.crypto.KeyRing;
import bisq.common.crypto.PubKeyRing;
@ -53,12 +54,15 @@ import bisq.common.proto.ProtobufferException;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.persistable.PersistedDataHost;
import bisq.common.util.Tuple2;
import bisq.common.util.Utilities;
import com.google.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
@ -74,16 +78,20 @@ import javafx.beans.property.SimpleIntegerProperty;
import java.security.PublicKey;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -164,7 +172,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
this.networkNode.addConnectionListener(this);
this.networkNode.addMessageListener(this);
this.p2PDataStorage.addHashMapChangedListener(this);
this.requestDataManager.addListener(this);
// We need to have both the initial data delivered and the hidden service published
@ -197,13 +204,11 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
public void onAllServicesInitialized() {
if (networkNode.getNodeAddress() != null) {
maybeProcessAllMailboxEntries();
myNodeAddress = networkNode.getNodeAddress();
} else {
// If our HS is still not published
networkNode.nodeAddressProperty().addListener((observable, oldValue, newValue) -> {
if (newValue != null) {
maybeProcessAllMailboxEntries();
myNodeAddress = networkNode.getNodeAddress();
}
});
@ -284,7 +289,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
if (!seedNodesAvailable) {
isBootstrapped = true;
maybeProcessAllMailboxEntries();
// As we do not expect a updated data request response we start here listening
addHashMapChangedListenerAndApply();
p2pServiceListeners.stream().forEach(P2PServiceListener::onNoSeedNodeAvailable);
}
}
@ -346,7 +352,9 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
public void onUpdatedDataReceived() {
if (!isBootstrapped) {
isBootstrapped = true;
maybeProcessAllMailboxEntries();
// Only now we start listening and processing. The p2PDataStorage is our cache for data we have received
// so far.
addHashMapChangedListenerAndApply();
p2pServiceListeners.stream().forEach(P2PServiceListener::onUpdatedDataReceived);
p2PDataStorage.onBootstrapComplete();
}
@ -398,58 +406,120 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof PrefixedSealedAndSignedMessage) {
// Seed nodes don't have set the encryptionService
PrefixedSealedAndSignedMessage sealedMsg = (PrefixedSealedAndSignedMessage) networkEnvelope;
connection.setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
try {
PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage = (PrefixedSealedAndSignedMessage) networkEnvelope;
if (verifyAddressPrefixHash(prefixedSealedAndSignedMessage)) {
// We set connectionType to that connection to avoid that is get closed when
// we get too many connection attempts.
connection.setPeerType(Connection.PeerType.DIRECT_MSG_PEER);
log.debug("Try to decrypt...");
DecryptedMessageWithPubKey decryptedMessageWithPubKey = encryptionService.decryptAndVerify(
prefixedSealedAndSignedMessage.getSealedAndSigned());
log.debug("\n\nDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD\n" +
"Decrypted SealedAndSignedMessage:\ndecryptedMsgWithPubKey={}"
+ "\nDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD\n", decryptedMessageWithPubKey);
if (connection.getPeersNodeAddressOptional().isPresent())
decryptedDirectMessageListeners.forEach(
e -> e.onDirectMessage(decryptedMessageWithPubKey, connection.getPeersNodeAddressOptional().get()));
else
log.error("peersNodeAddress is not available at onMessage.");
} else {
log.debug("Wrong receiverAddressMaskHash. The message is not intended for us.");
}
DecryptedMessageWithPubKey decryptedMsg = encryptionService.decryptAndVerify(sealedMsg.getSealedAndSigned());
connection.getPeersNodeAddressOptional().ifPresentOrElse(nodeAddress ->
decryptedDirectMessageListeners.forEach(e -> e.onDirectMessage(decryptedMsg, nodeAddress)),
() -> {
log.error("peersNodeAddress is expected to be available at onMessage for " +
"processing PrefixedSealedAndSignedMessage.");
});
} catch (CryptoException e) {
log.debug(networkEnvelope.toString());
log.debug(e.toString());
log.debug("Decryption of prefixedSealedAndSignedMessage.sealedAndSigned failed. " +
"That is expected if the message is not intended for us.");
log.warn("Decryption of a direct message failed. This is not expected as the " +
"direct message was sent to our node.");
} catch (ProtobufferException e) {
log.error("Protobuffer data could not be processed: {}", e.toString());
log.error("ProtobufferException at decryptAndVerify: {}", e.toString());
e.getStackTrace();
}
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// HashMapChangedListener implementation
// HashMapChangedListener implementation for ProtectedStorageEntry items
///////////////////////////////////////////////////////////////////////////////////////////
private void addHashMapChangedListenerAndApply() {
p2PDataStorage.addHashMapChangedListener(this);
onAdded(p2PDataStorage.getMap().values());
}
@Override
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry)
processMailboxEntry((ProtectedMailboxStorageEntry) protectedStorageEntry);
});
Collection<ProtectedMailboxStorageEntry> entries = protectedStorageEntries.stream()
.filter(e -> e instanceof ProtectedMailboxStorageEntry)
.map(e -> (ProtectedMailboxStorageEntry) e)
.filter(e -> networkNode.getNodeAddress() != null)
.filter(e -> !seedNodeRepository.isSeedNode(networkNode.getNodeAddress())) // Seed nodes don't expect mailbox messages
.collect(Collectors.toSet());
if (entries.size() > 1) {
threadedBatchProcessMailboxEntries(entries);
} else if (entries.size() == 1) {
processSingleMailboxEntry(entries);
}
}
@Override
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
// not used
private void processSingleMailboxEntry(Collection<ProtectedMailboxStorageEntry> protectedMailboxStorageEntries) {
var decryptedEntries = new ArrayList<>(getDecryptedEntries(protectedMailboxStorageEntries));
checkArgument(decryptedEntries.size() == 1);
storeMailboxDataAndNotifyListeners(decryptedEntries.get(0));
}
// We run the batch processing of all mailbox messages we have received at startup in a thread to not block the UI.
// For about 1000 messages decryption takes about 1 sec.
private void threadedBatchProcessMailboxEntries(Collection<ProtectedMailboxStorageEntry> protectedMailboxStorageEntries) {
ListeningExecutorService executor = Utilities.getSingleThreadExecutor("processMailboxEntry-" + new Random().nextInt(1000));
long ts = System.currentTimeMillis();
ListenableFuture<Set<Tuple2<ProtectedMailboxStorageEntry, DecryptedMessageWithPubKey>>> future = executor.submit(() -> {
var decryptedEntries = getDecryptedEntries(protectedMailboxStorageEntries);
log.info("Batch processing of {} mailbox entries took {} ms",
protectedMailboxStorageEntries.size(),
System.currentTimeMillis() - ts);
return decryptedEntries;
});
Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(Set<Tuple2<ProtectedMailboxStorageEntry, DecryptedMessageWithPubKey>> decryptedEntries) {
UserThread.execute(() -> decryptedEntries.forEach(e -> storeMailboxDataAndNotifyListeners(e)));
}
public void onFailure(@NotNull Throwable throwable) {
log.error(throwable.toString());
}
}, MoreExecutors.directExecutor());
}
private Set<Tuple2<ProtectedMailboxStorageEntry, DecryptedMessageWithPubKey>> getDecryptedEntries(Collection<ProtectedMailboxStorageEntry> protectedMailboxStorageEntries) {
Set<Tuple2<ProtectedMailboxStorageEntry, DecryptedMessageWithPubKey>> decryptedEntries = new HashSet<>();
protectedMailboxStorageEntries.stream()
.map(this::decryptProtectedMailboxStorageEntry)
.filter(Objects::nonNull)
.forEach(decryptedEntries::add);
return decryptedEntries;
}
@Nullable
private Tuple2<ProtectedMailboxStorageEntry, DecryptedMessageWithPubKey> decryptProtectedMailboxStorageEntry(
ProtectedMailboxStorageEntry protectedMailboxStorageEntry) {
try {
DecryptedMessageWithPubKey decryptedMessageWithPubKey = encryptionService.decryptAndVerify(protectedMailboxStorageEntry
.getMailboxStoragePayload()
.getPrefixedSealedAndSignedMessage()
.getSealedAndSigned());
checkArgument(decryptedMessageWithPubKey.getNetworkEnvelope() instanceof MailboxMessage);
return new Tuple2<>(protectedMailboxStorageEntry, decryptedMessageWithPubKey);
} catch (CryptoException ignore) {
// Expected if message was not intended for us
} catch (ProtobufferException e) {
log.error(e.toString());
e.getStackTrace();
}
return null;
}
private void storeMailboxDataAndNotifyListeners(Tuple2<ProtectedMailboxStorageEntry, DecryptedMessageWithPubKey> tuple2) {
DecryptedMessageWithPubKey decryptedMessageWithPubKey = tuple2.second;
MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope();
NodeAddress sender = mailboxMessage.getSenderNodeAddress();
mailboxMap.put(mailboxMessage.getUid(), tuple2);
log.info("Received a {} mailbox message with uid {} and senderAddress {}",
mailboxMessage.getClass().getSimpleName(), mailboxMessage.getUid(), sender);
decryptedMailboxListeners.forEach(e -> e.onMailboxMessageAdded(decryptedMessageWithPubKey, sender));
}
///////////////////////////////////////////////////////////////////////////////////////////
// DirectMessages
///////////////////////////////////////////////////////////////////////////////////////////
@ -486,13 +556,16 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
log.debug("\n\nEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE\n" +
"Encrypt message:\nmessage={}"
+ "\nEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE\n", message);
PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage = new PrefixedSealedAndSignedMessage(
networkNode.getNodeAddress(),
encryptionService.encryptAndSign(pubKeyRing, message),
peersNodeAddress.getAddressPrefixHash(),
UUID.randomUUID().toString());
SettableFuture<Connection> future = networkNode.sendMessage(peersNodeAddress, prefixedSealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
// Prefix is not needed for direct messages but as old code is doing the verification we still need to
// send it if peer has not updated.
// TODO persist capability
PrefixedSealedAndSignedMessage sealedMsg = getPrefixedSealedAndSignedMessage(peersNodeAddress,
pubKeyRing,
message);
SettableFuture<Connection> future = networkNode.sendMessage(peersNodeAddress, sealedMsg);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Connection connection) {
sendDirectMessageListener.onArrived();
@ -513,48 +586,30 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
}
private PrefixedSealedAndSignedMessage getPrefixedSealedAndSignedMessage(NodeAddress peersNodeAddress,
PubKeyRing pubKeyRing,
NetworkEnvelope message) throws CryptoException {
byte[] addressPrefixHash;
if (peerManager.peerHasCapability(peersNodeAddress, Capability.NO_ADDRESS_PRE_FIX)) {
// The peer has an updated version so we do not need to send the prefix.
// We cannot use null as not updated nodes would get a nullPointer at protobuf serialisation.
addressPrefixHash = new byte[0];
} else {
addressPrefixHash = peersNodeAddress.getAddressPrefixHash();
}
return new PrefixedSealedAndSignedMessage(
networkNode.getNodeAddress(),
encryptionService.encryptAndSign(pubKeyRing, message),
addressPrefixHash,
UUID.randomUUID().toString());
}
///////////////////////////////////////////////////////////////////////////////////////////
// MailboxMessages
///////////////////////////////////////////////////////////////////////////////////////////
private void processMailboxEntry(ProtectedMailboxStorageEntry protectedMailboxStorageEntry) {
NodeAddress nodeAddress = networkNode.getNodeAddress();
// Seed nodes don't receive mailbox network_messages
if (nodeAddress != null && !seedNodeRepository.isSeedNode(nodeAddress)) {
MailboxStoragePayload mailboxStoragePayload = protectedMailboxStorageEntry.getMailboxStoragePayload();
PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage = mailboxStoragePayload.getPrefixedSealedAndSignedMessage();
if (verifyAddressPrefixHash(prefixedSealedAndSignedMessage)) {
try {
DecryptedMessageWithPubKey decryptedMessageWithPubKey = encryptionService.decryptAndVerify(
prefixedSealedAndSignedMessage.getSealedAndSigned());
if (decryptedMessageWithPubKey.getNetworkEnvelope() instanceof MailboxMessage) {
MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.getNetworkEnvelope();
NodeAddress senderNodeAddress = mailboxMessage.getSenderNodeAddress();
checkNotNull(senderNodeAddress, "senderAddress must not be null for mailbox network_messages");
mailboxMap.put(mailboxMessage.getUid(), new Tuple2<>(protectedMailboxStorageEntry, decryptedMessageWithPubKey));
log.info("Received a {} mailbox message with messageUid {} and senderAddress {}", mailboxMessage.getClass().getSimpleName(), mailboxMessage.getUid(), senderNodeAddress);
decryptedMailboxListeners.forEach(
e -> e.onMailboxMessageAdded(decryptedMessageWithPubKey, senderNodeAddress));
} else {
log.warn("tryDecryptMailboxData: Expected MailboxMessage but got other type. " +
"decryptedMsgWithPubKey.message={}", decryptedMessageWithPubKey.getNetworkEnvelope());
}
} catch (CryptoException e) {
log.debug(e.toString());
log.debug("Decryption of prefixedSealedAndSignedMessage.sealedAndSigned failed. " +
"That is expected if the message is not intended for us.");
} catch (ProtobufferException e) {
log.error("Protobuffer data could not be processed: {}", e.toString());
}
} else {
log.trace("Wrong blurredAddressHash. The message is not intended for us.");
}
}
}
public void sendEncryptedMailboxMessage(NodeAddress peersNodeAddress, PubKeyRing peersPubKeyRing,
public void sendEncryptedMailboxMessage(NodeAddress peer, PubKeyRing peersPubKeyRing,
NetworkEnvelope message,
SendMailboxMessageListener sendMailboxMessageListener) {
if (peersPubKeyRing == null) {
@ -562,7 +617,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
return;
}
checkNotNull(peersNodeAddress,
checkNotNull(peer,
"PeerAddress must not be null (sendEncryptedMailboxMessage)");
checkNotNull(networkNode.getNodeAddress(),
"My node address must not be null at sendEncryptedMailboxMessage");
@ -578,7 +633,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
return;
}
if (capabilityRequiredAndCapabilityNotSupported(peersNodeAddress, message)) {
if (capabilityRequiredAndCapabilityNotSupported(peer, message)) {
sendMailboxMessageListener.onFault("We did not send the EncryptedMailboxMessage " +
"because the peer does not support the capability.");
return;
@ -589,14 +644,11 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
"Encrypt message:\nmessage={}"
+ "\nEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE\n", message);
PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage = new PrefixedSealedAndSignedMessage(
networkNode.getNodeAddress(),
encryptionService.encryptAndSign(peersPubKeyRing, message),
peersNodeAddress.getAddressPrefixHash(),
UUID.randomUUID().toString());
PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage = getPrefixedSealedAndSignedMessage(peer,
peersPubKeyRing, message);
log.debug("sendEncryptedMailboxMessage msg={}, peersNodeAddress={}", message, peersNodeAddress);
SettableFuture<Connection> future = networkNode.sendMessage(peersNodeAddress, prefixedSealedAndSignedMessage);
log.debug("sendEncryptedMailboxMessage msg={}, peersNodeAddress={}", message, peer);
SettableFuture<Connection> future = networkNode.sendMessage(peer, prefixedSealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
@ -654,15 +706,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
private void maybeProcessAllMailboxEntries() {
if (isBootstrapped) {
p2PDataStorage.getMap().values().forEach(protectedStorageEntry -> {
if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry)
processMailboxEntry((ProtectedMailboxStorageEntry) protectedStorageEntry);
});
}
}
private void addMailboxData(MailboxStoragePayload expirableMailboxStoragePayload,
PublicKey receiversPublicKey,
SendMailboxMessageListener sendMailboxMessageListener) {
@ -892,19 +935,4 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
public KeyRing getKeyRing() {
return keyRing;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private boolean verifyAddressPrefixHash(PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage) {
if (networkNode.getNodeAddress() != null) {
byte[] blurredAddressHash = networkNode.getNodeAddress().getAddressPrefixHash();
return blurredAddressHash != null &&
Arrays.equals(blurredAddressHash, prefixedSealedAndSignedMessage.getAddressPrefixHash());
} else {
log.debug("myOnionAddress is null at verifyAddressPrefixHash. That is expected at startup.");
return false;
}
}
}

View File

@ -33,7 +33,11 @@ import static com.google.common.base.Preconditions.checkNotNull;
public final class PrefixedSealedAndSignedMessage extends NetworkEnvelope implements MailboxMessage, SendersNodeAddressMessage {
private final NodeAddress senderNodeAddress;
private final SealedAndSigned sealedAndSigned;
// From v1.4.0 on that can be an empty byte array. We cannot use null as not updated nodes would get a nullPointer
// at protobuf serialisation.
private final byte[] addressPrefixHash;
private final String uid;
public PrefixedSealedAndSignedMessage(NodeAddress senderNodeAddress,