Merge pull request #3154 from chimp1984/Fix-small-P2P-network-issues

Fix small p2p network issues
This commit is contained in:
Florian Reimair 2019-08-29 16:15:18 +02:00 committed by GitHub
commit 4e7cb04043
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 197 additions and 93 deletions

View file

@ -39,6 +39,10 @@ public class Capabilities {
*/
public static final Capabilities app = new Capabilities();
// Defines which most recent capability any node need to support.
// This helps to clean network from very old inactive but still running nodes.
private static final Capability mandatoryCapability = Capability.DAO_STATE;
protected final Set<Capability> capabilities = new HashSet<>();
public Capabilities(Capability... capabilities) {
@ -71,7 +75,7 @@ public class Capabilities {
}
public void addAll(Capabilities capabilities) {
if(capabilities != null)
if (capabilities != null)
this.capabilities.addAll(capabilities.capabilities);
}
@ -111,6 +115,10 @@ public class Capabilities {
.collect(Collectors.toSet()));
}
public static boolean hasMandatoryCapability(Capabilities capabilities) {
return capabilities.capabilities.stream().anyMatch(c -> c == mandatoryCapability);
}
@Override
public String toString() {
return Arrays.toString(Capabilities.toIntList(this).toArray());

View file

@ -28,11 +28,20 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class CoreNetworkCapabilities {
public static void setSupportedCapabilities(BisqEnvironment bisqEnvironment) {
Capabilities.app.addAll(Capability.TRADE_STATISTICS, Capability.TRADE_STATISTICS_2, Capability.ACCOUNT_AGE_WITNESS, Capability.ACK_MSG);
Capabilities.app.addAll(Capability.BUNDLE_OF_ENVELOPES, Capability.SIGNED_ACCOUNT_AGE_WITNESS);
Capabilities.app.addAll(
Capability.TRADE_STATISTICS,
Capability.TRADE_STATISTICS_2,
Capability.ACCOUNT_AGE_WITNESS,
Capability.ACK_MSG,
Capability.BUNDLE_OF_ENVELOPES
);
if (BisqEnvironment.isDaoActivated(bisqEnvironment)) {
Capabilities.app.addAll(Capability.PROPOSAL, Capability.BLIND_VOTE, Capability.DAO_STATE);
Capabilities.app.addAll(
Capability.PROPOSAL,
Capability.BLIND_VOTE,
Capability.DAO_STATE
);
maybeApplyDaoFullMode(bisqEnvironment);
}

View file

@ -121,8 +121,8 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
private static ConnectionConfig connectionConfig;
// Leaving some constants package-private for tests to know limits.
static final int PERMITTED_MESSAGE_SIZE = 200 * 1024; // 200 kb
static final int MAX_PERMITTED_MESSAGE_SIZE = 10 * 1024 * 1024; // 10 MB (425 offers resulted in about 660 kb, mailbox msg will add more to it) offer has usually 2 kb, mailbox 3kb.
private static final int PERMITTED_MESSAGE_SIZE = 200 * 1024; // 200 kb
private static final int MAX_PERMITTED_MESSAGE_SIZE = 10 * 1024 * 1024; // 10 MB (425 offers resulted in about 660 kb, mailbox msg will add more to it) offer has usually 2 kb, mailbox 3kb.
//TODO decrease limits again after testing
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(120);
@ -172,7 +172,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
private RuleViolation ruleViolation;
private final ConcurrentHashMap<RuleViolation, Integer> ruleViolations = new ConcurrentHashMap<>();
private Capabilities capabilities = new Capabilities();
private final Capabilities capabilities = new Capabilities();
///////////////////////////////////////////////////////////////////////////////////////////
@ -233,9 +233,9 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
return capabilities;
}
Object lock = new Object();
Queue<BundleOfEnvelopes> queueOfBundles = new ConcurrentLinkedQueue<>();
ScheduledExecutorService bundleSender = Executors.newSingleThreadScheduledExecutor();
private final Object lock = new Object();
private final Queue<BundleOfEnvelopes> queueOfBundles = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService bundleSender = Executors.newSingleThreadScheduledExecutor();
// Called from various threads
public void sendMessage(NetworkEnvelope networkEnvelope) {
@ -250,7 +250,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
log.debug("Sending message: {}", Utilities.toTruncatedString(proto.toString(), 10000));
if (networkEnvelope instanceof Ping | networkEnvelope instanceof RefreshOfferMessage) {
// pings and offer refresh msg we dont want to log in production
// pings and offer refresh msg we don't want to log in production
log.trace("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Sending direct message to peer" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
@ -298,10 +298,13 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
if (!stopped) {
synchronized (lock) {
BundleOfEnvelopes current = queueOfBundles.poll();
if(current.getEnvelopes().size() == 1)
protoOutputStream.writeEnvelope(current.getEnvelopes().get(0));
else
protoOutputStream.writeEnvelope(current);
if (current != null) {
if (current.getEnvelopes().size() == 1) {
protoOutputStream.writeEnvelope(current.getEnvelopes().get(0));
} else {
protoOutputStream.writeEnvelope(current);
}
}
}
}
}, lastSendTimeStamp - now, TimeUnit.MILLISECONDS);
@ -386,10 +389,10 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
messageTimeStamps.add(now);
// clean list
while(messageTimeStamps.size() > msgThrottlePer10Sec)
while (messageTimeStamps.size() > msgThrottlePer10Sec)
messageTimeStamps.remove(0);
return violatesThrottleLimit(now,1, msgThrottlePerSec) || violatesThrottleLimit(now,10, msgThrottlePer10Sec);
return violatesThrottleLimit(now, 1, msgThrottlePerSec) || violatesThrottleLimit(now, 10, msgThrottlePer10Sec);
}
private boolean violatesThrottleLimit(long now, int seconds, int messageCountLimit) {
@ -399,7 +402,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
long compareValue = messageTimeStamps.get(messageTimeStamps.size() - messageCountLimit);
// if duration < seconds sec we received too much network_messages
if(now - compareValue < TimeUnit.SECONDS.toMillis(seconds)) {
if (now - compareValue < TimeUnit.SECONDS.toMillis(seconds)) {
log.error("violatesThrottleLimit {}/{} second(s)", messageCountLimit, seconds);
return true;
@ -436,7 +439,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
this.peerType = peerType;
}
public void setPeersNodeAddress(NodeAddress peerNodeAddress) {
private void setPeersNodeAddress(NodeAddress peerNodeAddress) {
checkNotNull(peerNodeAddress, "peerAddress must not be null");
peersNodeAddressOptional = Optional.of(peerNodeAddress);
@ -494,6 +497,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
stopped = true;
//noinspection UnstableApiUsage
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
log.error(t.getMessage());
@ -534,6 +538,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
e.printStackTrace();
}
//noinspection UnstableApiUsage
MoreExecutors.shutdownAndAwaitTermination(singleThreadExecutor, 500, TimeUnit.MILLISECONDS);
log.debug("Connection shutdown complete " + this.toString());
@ -705,7 +710,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
Thread.sleep(20);
}
// Reading the protobuffer message from the inputstream
// Reading the protobuffer message from the inputStream
protobuf.NetworkEnvelope proto = protobuf.NetworkEnvelope.parseDelimitedFrom(protoInputStream);
if (proto == null) {
@ -793,7 +798,20 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
if (networkEnvelope instanceof SupportedCapabilitiesMessage) {
Capabilities supportedCapabilities = ((SupportedCapabilitiesMessage) networkEnvelope).getSupportedCapabilities();
if (supportedCapabilities != null) {
capabilities.set(supportedCapabilities);
if (!capabilities.equals(supportedCapabilities)) {
if (!Capabilities.hasMandatoryCapability(capabilities)) {
shutDown(CloseConnectionReason.RULE_VIOLATION);
return;
}
capabilities.set(supportedCapabilities);
capabilitiesListeners.forEach(weakListener -> {
SupportedCapabilitiesListener supportedCapabilitiesListener = weakListener.get();
if (supportedCapabilitiesListener != null) {
UserThread.execute(() -> supportedCapabilitiesListener.onChanged(supportedCapabilities));
}
});
}
}
}

View file

@ -96,7 +96,8 @@ public abstract class NetworkNode implements MessageListener {
// when the events happen.
abstract public void start(@Nullable SetupListener setupListener);
public SettableFuture<Connection> sendMessage(@NotNull NodeAddress peersNodeAddress, NetworkEnvelope networkEnvelope) {
public SettableFuture<Connection> sendMessage(@NotNull NodeAddress peersNodeAddress,
NetworkEnvelope networkEnvelope) {
log.debug("sendMessage: peersNodeAddress=" + peersNodeAddress + "\n\tmessage=" + Utilities.toTruncatedString(networkEnvelope));
checkNotNull(peersNodeAddress, "peerAddress must not be null");
@ -112,9 +113,9 @@ public abstract class NetworkNode implements MessageListener {
final SettableFuture<Connection> resultFuture = SettableFuture.create();
ListenableFuture<Connection> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peersNodeAddress);
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peersNodeAddress.getFullAddress());
if(peersNodeAddress.equals(getNodeAddress())){
if (peersNodeAddress.equals(getNodeAddress())) {
throw new ConnectException("We do not send a message to ourselves");
}
@ -162,7 +163,8 @@ public abstract class NetworkNode implements MessageListener {
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
public void onDisconnect(CloseConnectionReason closeConnectionReason,
Connection connection) {
log.trace("onDisconnect connectionListener\n\tconnection={}" + connection);
//noinspection SuspiciousMethodCalls
outBoundConnections.remove(connection);
@ -264,7 +266,8 @@ public abstract class NetworkNode implements MessageListener {
public SettableFuture<Connection> sendMessage(Connection connection, NetworkEnvelope networkEnvelope) {
// connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block
ListenableFuture<Connection> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + connection.getUid());
String id = connection.getPeersNodeAddressOptional().isPresent() ? connection.getPeersNodeAddressOptional().get().getFullAddress() : connection.getUid();
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + id);
connection.sendMessage(networkEnvelope);
return connection;
});

View file

@ -97,8 +97,9 @@ public final class Peer implements HasCapabilities, NetworkPayload, PersistableP
@Override
public void onChanged(Capabilities supportedCapabilities) {
if (!supportedCapabilities.isEmpty())
if (!supportedCapabilities.isEmpty()) {
capabilities.set(supportedCapabilities);
}
}

View file

@ -26,6 +26,7 @@ import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.peers.BroadcastHandler;
import bisq.network.p2p.peers.Broadcaster;
import bisq.network.p2p.storage.messages.AddDataMessage;
import bisq.network.p2p.storage.messages.AddOncePayload;
import bisq.network.p2p.storage.messages.AddPersistableNetworkPayloadMessage;
import bisq.network.p2p.storage.messages.BroadcastMessage;
import bisq.network.p2p.storage.messages.RefreshOfferMessage;
@ -75,6 +76,7 @@ import java.security.PublicKey;
import java.time.Clock;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@ -112,6 +114,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
@Getter
private final Map<ByteArray, ProtectedStorageEntry> map = new ConcurrentHashMap<>();
private final Set<ByteArray> removedAddOncePayloads = new HashSet<>();
private final Set<HashMapChangedListener> hashMapChangedListeners = new CopyOnWriteArraySet<>();
private Timer removeExpiredEntriesTimer;
@ -262,7 +265,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
NetworkPayload networkPayload = protectedStorageEntry.getProtectedStoragePayload();
if (networkPayload instanceof ExpirablePayload && networkPayload instanceof RequiresOwnerIsOnlinePayload) {
NodeAddress ownerNodeAddress = ((RequiresOwnerIsOnlinePayload) networkPayload).getOwnerNodeAddress();
if (ownerNodeAddress.equals(connection.getPeersNodeAddressOptional().get())) {
if (connection.getPeersNodeAddressOptional().isPresent() &&
ownerNodeAddress.equals(connection.getPeersNodeAddressOptional().get())) {
// We have a RequiresLiveOwnerData data object with the node address of the
// disconnected peer. We remove that data from our map.
@ -314,9 +318,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
boolean reBroadcast,
boolean checkDate) {
log.debug("addPersistableNetworkPayload payload={}", payload);
final byte[] hash = payload.getHash();
byte[] hash = payload.getHash();
if (payload.verifyHashSize()) {
final ByteArray hashAsByteArray = new ByteArray(hash);
ByteArray hashAsByteArray = new ByteArray(hash);
boolean containsKey = getAppendOnlyDataStoreMap().containsKey(hashAsByteArray);
if (!containsKey || reBroadcast) {
if (!(payload instanceof DateTolerantPayload) || !checkDate || ((DateTolerantPayload) payload).isDateInTolerance(clock)) {
@ -350,9 +354,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// might be added then but as we have the data already added calling them would be irrelevant as well.
// TODO find a way to avoid the second call...
public boolean addPersistableNetworkPayloadFromInitialRequest(PersistableNetworkPayload payload) {
final byte[] hash = payload.getHash();
byte[] hash = payload.getHash();
if (payload.verifyHashSize()) {
final ByteArray hashAsByteArray = new ByteArray(hash);
ByteArray hashAsByteArray = new ByteArray(hash);
appendOnlyDataStoreService.put(hashAsByteArray, payload);
return true;
} else {
@ -366,22 +370,34 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
return addProtectedStorageEntry(protectedStorageEntry, sender, listener, isDataOwner, true);
}
public boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender,
@Nullable BroadcastHandler.Listener listener, boolean isDataOwner, boolean allowBroadcast) {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
public boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageEntry,
@Nullable NodeAddress sender,
@Nullable BroadcastHandler.Listener listener,
boolean isDataOwner,
boolean allowBroadcast) {
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
if (protectedStoragePayload instanceof AddOncePayload &&
removedAddOncePayloads.contains(hashOfPayload)) {
log.warn("We have already removed that AddOncePayload by a previous removeDataMessage. " +
"We ignore that message. ProtectedStoragePayload: {}", protectedStoragePayload.toString());
return false;
}
boolean sequenceNrValid = isSequenceNrValid(protectedStorageEntry.getSequenceNumber(), hashOfPayload);
boolean result = checkPublicKeys(protectedStorageEntry, true)
&& checkSignature(protectedStorageEntry)
&& sequenceNrValid;
boolean result = sequenceNrValid &&
checkPublicKeys(protectedStorageEntry, true)
&& checkSignature(protectedStorageEntry);
boolean containsKey = map.containsKey(hashOfPayload);
if (containsKey)
if (containsKey) {
result = result && checkIfStoredDataPubKeyMatchesNewDataPubKey(protectedStorageEntry.getOwnerPubKey(), hashOfPayload);
}
// printData("before add");
if (result) {
final boolean hasSequenceNrIncreased = hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload);
boolean hasSequenceNrIncreased = hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload);
if (!containsKey || hasSequenceNrIncreased) {
// At startup we don't have the item so we store it. At updates of the seq nr we store as well.
@ -415,37 +431,32 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
return result;
}
public void broadcastProtectedStorageEntry(ProtectedStorageEntry protectedStorageEntry,
@Nullable NodeAddress sender,
@Nullable BroadcastHandler.Listener broadcastListener,
boolean isDataOwner) {
private void broadcastProtectedStorageEntry(ProtectedStorageEntry protectedStorageEntry,
@Nullable NodeAddress sender,
@Nullable BroadcastHandler.Listener broadcastListener,
boolean isDataOwner) {
broadcast(new AddDataMessage(protectedStorageEntry), sender, broadcastListener, isDataOwner);
}
public boolean refreshTTL(RefreshOfferMessage refreshTTLMessage, @Nullable NodeAddress sender, boolean isDataOwner) {
byte[] hashOfDataAndSeqNr = refreshTTLMessage.getHashOfDataAndSeqNr();
byte[] signature = refreshTTLMessage.getSignature();
public boolean refreshTTL(RefreshOfferMessage refreshTTLMessage,
@Nullable NodeAddress sender,
boolean isDataOwner) {
ByteArray hashOfPayload = new ByteArray(refreshTTLMessage.getHashOfPayload());
int sequenceNumber = refreshTTLMessage.getSequenceNumber();
if (map.containsKey(hashOfPayload)) {
ProtectedStorageEntry storedData = map.get(hashOfPayload);
int sequenceNumber = refreshTTLMessage.getSequenceNumber();
if (sequenceNumberMap.containsKey(hashOfPayload) && sequenceNumberMap.get(hashOfPayload).sequenceNr == sequenceNumber) {
log.trace("We got that message with that seq nr already from another peer. We ignore that message.");
return true;
} else {
PublicKey ownerPubKey = storedData.getProtectedStoragePayload().getOwnerPubKey();
final boolean checkSignature = checkSignature(ownerPubKey, hashOfDataAndSeqNr, signature);
final boolean hasSequenceNrIncreased = hasSequenceNrIncreased(sequenceNumber, hashOfPayload);
final boolean checkIfStoredDataPubKeyMatchesNewDataPubKey = checkIfStoredDataPubKeyMatchesNewDataPubKey(ownerPubKey,
hashOfPayload);
boolean allValid = checkSignature &&
hasSequenceNrIncreased &&
checkIfStoredDataPubKeyMatchesNewDataPubKey;
byte[] hashOfDataAndSeqNr = refreshTTLMessage.getHashOfDataAndSeqNr();
byte[] signature = refreshTTLMessage.getSignature();
// printData("before refreshTTL");
if (allValid) {
if (hasSequenceNrIncreased(sequenceNumber, hashOfPayload) &&
checkIfStoredDataPubKeyMatchesNewDataPubKey(ownerPubKey, hashOfPayload) &&
checkSignature(ownerPubKey, hashOfDataAndSeqNr, signature)) {
log.debug("refreshDate called for storedData:\n\t" + StringUtils.abbreviate(storedData.toString(), 100));
storedData.refreshTTL();
storedData.updateSequenceNumber(sequenceNumber);
@ -455,8 +466,10 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 1000);
broadcast(refreshTTLMessage, sender, null, isDataOwner);
return true;
}
return allValid;
return false;
}
} else {
log.debug("We don't have data for that refresh message in our map. That is expected if we missed the data publishing.");
@ -464,8 +477,10 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
}
}
public boolean remove(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, boolean isDataOwner) {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
public boolean remove(ProtectedStorageEntry protectedStorageEntry,
@Nullable NodeAddress sender,
boolean isDataOwner) {
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
boolean containsKey = map.containsKey(hashOfPayload);
if (!containsKey)
@ -483,6 +498,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), System.currentTimeMillis()));
sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 300);
maybeAddToRemoveAddOncePayloads(protectedStoragePayload, hashOfPayload);
broadcast(new RemoveDataMessage(protectedStorageEntry), sender, null, isDataOwner);
removeFromProtectedDataStore(protectedStorageEntry);
@ -506,25 +523,33 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
}
@SuppressWarnings("UnusedReturnValue")
public boolean removeMailboxData(ProtectedMailboxStorageEntry protectedMailboxStorageEntry, @Nullable NodeAddress sender, boolean isDataOwner) {
ByteArray hashOfData = get32ByteHashAsByteArray(protectedMailboxStorageEntry.getProtectedStoragePayload());
boolean containsKey = map.containsKey(hashOfData);
public boolean removeMailboxData(ProtectedMailboxStorageEntry protectedMailboxStorageEntry,
@Nullable NodeAddress sender,
boolean isDataOwner) {
ProtectedStoragePayload protectedStoragePayload = protectedMailboxStorageEntry.getProtectedStoragePayload();
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
boolean containsKey = map.containsKey(hashOfPayload);
if (!containsKey)
log.debug("Remove data ignored as we don't have an entry for that data.");
boolean result = containsKey
&& checkPublicKeys(protectedMailboxStorageEntry, false)
&& isSequenceNrValid(protectedMailboxStorageEntry.getSequenceNumber(), hashOfData)
&& protectedMailboxStorageEntry.getMailboxStoragePayload().getOwnerPubKey().equals(protectedMailboxStorageEntry.getReceiversPubKey()) // at remove both keys are the same (only receiver is able to remove data)
&& checkSignature(protectedMailboxStorageEntry)
&& checkIfStoredMailboxDataMatchesNewMailboxData(protectedMailboxStorageEntry.getReceiversPubKey(), hashOfData);
int sequenceNumber = protectedMailboxStorageEntry.getSequenceNumber();
PublicKey receiversPubKey = protectedMailboxStorageEntry.getReceiversPubKey();
boolean result = containsKey &&
isSequenceNrValid(sequenceNumber, hashOfPayload) &&
checkPublicKeys(protectedMailboxStorageEntry, false) &&
protectedMailboxStorageEntry.getMailboxStoragePayload().getOwnerPubKey().equals(receiversPubKey) && // at remove both keys are the same (only receiver is able to remove data)
checkSignature(protectedMailboxStorageEntry) &&
checkIfStoredMailboxDataMatchesNewMailboxData(receiversPubKey, hashOfPayload);
// printData("before removeMailboxData");
if (result) {
doRemoveProtectedExpirableData(protectedMailboxStorageEntry, hashOfData);
doRemoveProtectedExpirableData(protectedMailboxStorageEntry, hashOfPayload);
printData("after removeMailboxData");
sequenceNumberMap.put(hashOfData, new MapValue(protectedMailboxStorageEntry.getSequenceNumber(), System.currentTimeMillis()));
sequenceNumberMap.put(hashOfPayload, new MapValue(sequenceNumber, System.currentTimeMillis()));
sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 300);
maybeAddToRemoveAddOncePayloads(protectedStoragePayload, hashOfPayload);
broadcast(new RemoveMailboxDataMessage(protectedMailboxStorageEntry), sender, null, isDataOwner);
} else {
log.debug("removeMailboxData failed");
@ -532,7 +557,15 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
return result;
}
public ProtectedStorageEntry getProtectedStorageEntry(ProtectedStoragePayload protectedStoragePayload, KeyPair ownerStoragePubKey)
private void maybeAddToRemoveAddOncePayloads(ProtectedStoragePayload protectedStoragePayload,
ByteArray hashOfData) {
if (protectedStoragePayload instanceof AddOncePayload) {
removedAddOncePayloads.add(hashOfData);
}
}
public ProtectedStorageEntry getProtectedStorageEntry(ProtectedStoragePayload protectedStoragePayload,
KeyPair ownerStoragePubKey)
throws CryptoException {
ByteArray hashOfData = get32ByteHashAsByteArray(protectedStoragePayload);
int sequenceNumber;
@ -546,7 +579,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
return new ProtectedStorageEntry(protectedStoragePayload, ownerStoragePubKey.getPublic(), sequenceNumber, signature);
}
public RefreshOfferMessage getRefreshTTLMessage(ProtectedStoragePayload protectedStoragePayload, KeyPair ownerStoragePubKey)
public RefreshOfferMessage getRefreshTTLMessage(ProtectedStoragePayload protectedStoragePayload,
KeyPair ownerStoragePubKey)
throws CryptoException {
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
int sequenceNumber;
@ -561,7 +595,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
}
public ProtectedMailboxStorageEntry getMailboxDataWithSignedSeqNr(MailboxStoragePayload expirableMailboxStoragePayload,
KeyPair storageSignaturePubKey, PublicKey receiversPublicKey)
KeyPair storageSignaturePubKey,
PublicKey receiversPublicKey)
throws CryptoException {
ByteArray hashOfData = get32ByteHashAsByteArray(expirableMailboxStoragePayload);
int sequenceNumber;
@ -588,14 +623,17 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
appendOnlyDataStoreListeners.add(listener);
}
@SuppressWarnings("unused")
public void removeAppendOnlyDataStoreListener(AppendOnlyDataStoreListener listener) {
appendOnlyDataStoreListeners.remove(listener);
}
@SuppressWarnings("unused")
public void addProtectedDataStoreListener(ProtectedDataStoreListener listener) {
protectedDataStoreListeners.add(listener);
}
@SuppressWarnings("unused")
public void removeProtectedDataStoreListener(ProtectedDataStoreListener listener) {
protectedDataStoreListeners.remove(listener);
}
@ -608,7 +646,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
private void doRemoveProtectedExpirableData(ProtectedStorageEntry protectedStorageEntry, ByteArray hashOfPayload) {
map.remove(hashOfPayload);
log.trace("Data removed from our map. We broadcast the message to our peers.");
hashMapChangedListeners.stream().forEach(e -> e.onRemoved(protectedStorageEntry));
hashMapChangedListeners.forEach(e -> e.onRemoved(protectedStorageEntry));
}
private boolean isSequenceNrValid(int newSequenceNumber, ByteArray hashOfData) {
@ -681,7 +719,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// in the contained mailbox message, or the pubKey of other kinds of network_messages.
private boolean checkPublicKeys(ProtectedStorageEntry protectedStorageEntry, boolean isAddOperation) {
boolean result;
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof MailboxStoragePayload) {
MailboxStoragePayload payload = (MailboxStoragePayload) protectedStoragePayload;
if (isAddOperation)
@ -751,7 +789,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
return new ByteArray(getCompactHash(protectedStoragePayload));
}
public static byte[] getCompactHash(ProtectedStoragePayload protectedStoragePayload) {
private static byte[] getCompactHash(ProtectedStoragePayload protectedStoragePayload) {
return Hash.getSha256Ripemd160hash(protectedStoragePayload.toProtoMessage().toByteArray());
}
@ -759,9 +797,9 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
private Map<ByteArray, MapValue> getPurgedSequenceNumberMap(Map<ByteArray, MapValue> persisted) {
Map<ByteArray, MapValue> purged = new HashMap<>();
long maxAgeTs = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(PURGE_AGE_DAYS);
persisted.entrySet().stream().forEach(entry -> {
if (entry.getValue().timeStamp > maxAgeTs)
purged.put(entry.getKey(), entry.getValue());
persisted.forEach((key, value) -> {
if (value.timeStamp > maxAgeTs)
purged.put(key, value);
});
return purged;
}
@ -773,12 +811,12 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// We print the items sorted by hash with the payload class name and id
List<Tuple2<String, ProtectedStorageEntry>> tempList = map.values().stream()
.map(e -> new Tuple2<>(org.bitcoinj.core.Utils.HEX.encode(get32ByteHashAsByteArray(e.getProtectedStoragePayload()).bytes), e))
.sorted(Comparator.comparing(o -> o.first))
.collect(Collectors.toList());
tempList.sort((o1, o2) -> o1.first.compareTo(o2.first));
tempList.stream().forEach(e -> {
final ProtectedStorageEntry storageEntry = e.second;
final ProtectedStoragePayload protectedStoragePayload = storageEntry.getProtectedStoragePayload();
final MapValue mapValue = sequenceNumberMap.get(get32ByteHashAsByteArray(protectedStoragePayload));
tempList.forEach(e -> {
ProtectedStorageEntry storageEntry = e.second;
ProtectedStoragePayload protectedStoragePayload = storageEntry.getProtectedStoragePayload();
MapValue mapValue = sequenceNumberMap.get(get32ByteHashAsByteArray(protectedStoragePayload));
sb.append("\n")
.append("Hash=")
.append(e.first)
@ -802,7 +840,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
}
/**
* @param data
* @param data Network payload
* @return Hash of data
*/
public static byte[] get32ByteHash(NetworkPayload data) {
@ -865,10 +903,6 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// Protobuffer
///////////////////////////////////////////////////////////////////////////////////////////
public ByteArray(String hex) {
this.bytes = Utilities.decodeFromHex(hex);
}
@Override
public protobuf.ByteArray toProtoMessage() {
return protobuf.ByteArray.newBuilder().setBytes(ByteString.copyFrom(bytes)).build();
@ -878,6 +912,12 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
return new ByteArray(proto.getBytes().toByteArray());
}
///////////////////////////////////////////////////////////////////////////////////////////
// Util
///////////////////////////////////////////////////////////////////////////////////////////
@SuppressWarnings("unused")
public String getHex() {
return Utilities.encodeToHex(bytes);
}
@ -901,7 +941,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
final public int sequenceNr;
final public long timeStamp;
public MapValue(int sequenceNr, long timeStamp) {
MapValue(int sequenceNr, long timeStamp) {
this.sequenceNr = sequenceNr;
this.timeStamp = timeStamp;
}

View file

@ -0,0 +1,24 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.network.p2p.storage.messages;
/**
* Marker interface for messages which must not be added again after a remove message has been received (e.g. MailboxMessages).
*/
public interface AddOncePayload {
}

View file

@ -18,6 +18,7 @@
package bisq.network.p2p.storage.payload;
import bisq.network.p2p.PrefixedSealedAndSignedMessage;
import bisq.network.p2p.storage.messages.AddOncePayload;
import bisq.common.crypto.Sig;
import bisq.common.util.ExtraDataMapValidator;
@ -49,7 +50,7 @@ import javax.annotation.Nullable;
@Getter
@EqualsAndHashCode
@Slf4j
public final class MailboxStoragePayload implements ProtectedStoragePayload, ExpirablePayload {
public final class MailboxStoragePayload implements ProtectedStoragePayload, ExpirablePayload, AddOncePayload {
private final PrefixedSealedAndSignedMessage prefixedSealedAndSignedMessage;
private PublicKey senderPubKeyForAddOperation;
private final byte[] senderPubKeyForAddOperationBytes;