Merge pull request #3557 from julianknutsen/refactor-add-remove-refresh

(3/8) [REFACTOR] P2PDataStore::add/remove/refresh
This commit is contained in:
Christoph Atteneder 2019-11-18 10:49:50 +01:00 committed by GitHub
commit d484617385
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -316,33 +316,41 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
boolean reBroadcast,
boolean checkDate) {
log.trace("addPersistableNetworkPayload payload={}", payload);
byte[] hash = payload.getHash();
if (payload.verifyHashSize()) {
ByteArray hashAsByteArray = new ByteArray(hash);
boolean containsKey = getAppendOnlyDataStoreMap().containsKey(hashAsByteArray);
if (!containsKey || reBroadcast) {
if (!(payload instanceof DateTolerantPayload) || !checkDate || ((DateTolerantPayload) payload).isDateInTolerance(clock)) {
if (!containsKey) {
appendOnlyDataStoreService.put(hashAsByteArray, payload);
appendOnlyDataStoreListeners.forEach(e -> e.onAdded(payload));
}
if (allowBroadcast)
broadcaster.broadcast(new AddPersistableNetworkPayloadMessage(payload), sender, null, isDataOwner);
return true;
} else {
log.warn("Publish date of payload is not matching our current time and outside of our tolerance.\n" +
"Payload={}; now={}", payload.toString(), new Date());
return false;
}
} else {
log.trace("We have that payload already in our map.");
return false;
}
} else {
log.warn("We got a hash exceeding our permitted size");
// Payload hash size does not match expectation for that type of message.
if (!payload.verifyHashSize()) {
log.warn("addPersistableNetworkPayload failed due to unexpected hash size");
return false;
}
ByteArray hashAsByteArray = new ByteArray(payload.getHash());
boolean payloadHashAlreadyInStore = getAppendOnlyDataStoreMap().containsKey(hashAsByteArray);
// Store already knows about this payload. Ignore it unless the caller specifically requests a republish.
if (payloadHashAlreadyInStore && !reBroadcast) {
log.trace("addPersistableNetworkPayload failed due to duplicate payload");
return false;
}
// DateTolerantPayloads are only checked for tolerance from the onMessage handler (checkDate == true). If not in
// tolerance, ignore it.
if (checkDate && payload instanceof DateTolerantPayload && !((DateTolerantPayload) payload).isDateInTolerance((clock))) {
log.warn("addPersistableNetworkPayload failed due to payload time outside tolerance.\n" +
"Payload={}; now={}", payload.toString(), new Date());
return false;
}
// Add the payload and publish the state update to the appendOnlyDataStoreListeners
if (!payloadHashAlreadyInStore) {
appendOnlyDataStoreService.put(hashAsByteArray, payload);
appendOnlyDataStoreListeners.forEach(e -> e.onAdded(payload));
}
// Broadcast the payload if requested by caller
if (allowBroadcast)
broadcaster.broadcast(new AddPersistableNetworkPayloadMessage(payload), sender, null, isDataOwner);
return true;
}
// When we receive initial data we skip several checks to improve performance. We requested only missing entries so we
@ -382,50 +390,50 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
return false;
}
boolean sequenceNrValid = isSequenceNrValid(protectedStorageEntry.getSequenceNumber(), hashOfPayload);
boolean result = sequenceNrValid &&
checkPublicKeys(protectedStorageEntry, true)
&& checkSignature(protectedStorageEntry);
// TODO: Combine with hasSequenceNrIncreased check, but keep existing behavior for now
if(!isSequenceNrValid(protectedStorageEntry.getSequenceNumber(), hashOfPayload))
return false;
// Verify the ProtectedStorageEntry is well formed and valid for the add operation
if (!checkPublicKeys(protectedStorageEntry, true) || !checkSignature(protectedStorageEntry))
return false;
boolean containsKey = map.containsKey(hashOfPayload);
if (containsKey) {
result = result && checkIfStoredDataPubKeyMatchesNewDataPubKey(protectedStorageEntry.getOwnerPubKey(), hashOfPayload);
// If we have already seen an Entry with the same hash, verify the new Entry has the same owner
if (containsKey && !checkIfStoredDataPubKeyMatchesNewDataPubKey(protectedStorageEntry.getOwnerPubKey(), hashOfPayload))
return false;
boolean hasSequenceNrIncreased = hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload);
// If we have seen a more recent operation for this payload, we ignore the current one
// TODO: I think we can return false here. All callers use the Client API (addProtectedStorageEntry(getProtectedStorageEntry())
// leaving only the onMessage() handler which doesn't look at the return value. It makes more intuitive sense that adds() that don't
// change state return false.
if (!hasSequenceNrIncreased)
return true;
// This is an updated entry. Record it and signal listeners.
map.put(hashOfPayload, protectedStorageEntry);
hashMapChangedListeners.forEach(e -> e.onAdded(protectedStorageEntry));
// Record the updated sequence number and persist it. Higher delay so we can batch more items.
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), System.currentTimeMillis()));
sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 2000);
// Optionally, broadcast the add/update depending on the calling environment
if (allowBroadcast)
broadcastProtectedStorageEntry(protectedStorageEntry, sender, listener, isDataOwner);
// Persist ProtectedStorageEntrys carrying PersistablePayload payloads and signal listeners on changes
if (protectedStoragePayload instanceof PersistablePayload) {
ByteArray compactHash = P2PDataStorage.getCompactHashAsByteArray(protectedStoragePayload);
ProtectedStorageEntry previous = protectedDataStoreService.putIfAbsent(compactHash, protectedStorageEntry);
if (previous == null)
protectedDataStoreListeners.forEach(e -> e.onAdded(protectedStorageEntry));
}
// printData("before add");
if (result) {
boolean hasSequenceNrIncreased = hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload);
if (!containsKey || hasSequenceNrIncreased) {
// At startup we don't have the item so we store it. At updates of the seq nr we store as well.
map.put(hashOfPayload, protectedStorageEntry);
hashMapChangedListeners.forEach(e -> e.onAdded(protectedStorageEntry));
// printData("after add");
} else {
log.trace("We got that version of the data already, so we don't store it.");
}
if (hasSequenceNrIncreased) {
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), System.currentTimeMillis()));
// We set the delay higher as we might receive a batch of items
sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 2000);
if (allowBroadcast)
broadcastProtectedStorageEntry(protectedStorageEntry, sender, listener, isDataOwner);
} else {
log.trace("We got that version of the data already, so we don't broadcast it.");
}
if (protectedStoragePayload instanceof PersistablePayload) {
ByteArray compactHash = getCompactHashAsByteArray(protectedStoragePayload);
ProtectedStorageEntry previous = protectedDataStoreService.putIfAbsent(compactHash, protectedStorageEntry);
if (previous == null)
protectedDataStoreListeners.forEach(e -> e.onAdded(protectedStorageEntry));
}
} else {
log.trace("add failed");
}
return result;
return true;
}
private void broadcastProtectedStorageEntry(ProtectedStorageEntry protectedStorageEntry,
@ -439,39 +447,58 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
@Nullable NodeAddress sender,
boolean isDataOwner) {
ByteArray hashOfPayload = new ByteArray(refreshTTLMessage.getHashOfPayload());
if (map.containsKey(hashOfPayload)) {
ProtectedStorageEntry storedData = map.get(hashOfPayload);
int sequenceNumber = refreshTTLMessage.getSequenceNumber();
if (sequenceNumberMap.containsKey(hashOfPayload) && sequenceNumberMap.get(hashOfPayload).sequenceNr == sequenceNumber) {
log.trace("We got that message with that seq nr already from another peer. We ignore that message.");
return true;
} else {
PublicKey ownerPubKey = storedData.getProtectedStoragePayload().getOwnerPubKey();
byte[] hashOfDataAndSeqNr = refreshTTLMessage.getHashOfDataAndSeqNr();
byte[] signature = refreshTTLMessage.getSignature();
// printData("before refreshTTL");
if (hasSequenceNrIncreased(sequenceNumber, hashOfPayload) &&
checkIfStoredDataPubKeyMatchesNewDataPubKey(ownerPubKey, hashOfPayload) &&
checkSignature(ownerPubKey, hashOfDataAndSeqNr, signature)) {
log.debug("refreshDate called for storedData:\n\t" + StringUtils.abbreviate(storedData.toString(), 100));
storedData.refreshTTL();
storedData.updateSequenceNumber(sequenceNumber);
storedData.updateSignature(signature);
printData("after refreshTTL");
sequenceNumberMap.put(hashOfPayload, new MapValue(sequenceNumber, System.currentTimeMillis()));
sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 1000);
broadcast(refreshTTLMessage, sender, null, isDataOwner);
return true;
}
return false;
}
} else {
if (!map.containsKey((hashOfPayload))) {
log.debug("We don't have data for that refresh message in our map. That is expected if we missed the data publishing.");
return false;
}
int sequenceNumber = refreshTTLMessage.getSequenceNumber();
// If we have seen a more recent operation for this payload, we ignore the current one
// TODO: I think we can return false here. All callers use the Client API (refreshTTL(getRefreshTTLMessage()) which increments the sequence number
// leaving only the onMessage() handler which doesn't look at the return value. It makes more intuitive sense that operations that don't
// change state return false.
if (sequenceNumberMap.containsKey(hashOfPayload) && sequenceNumberMap.get(hashOfPayload).sequenceNr == sequenceNumber) {
log.trace("We got that message with that seq nr already from another peer. We ignore that message.");
return true;
}
// TODO: Combine with above in future work, but preserve existing behavior for now
if(!hasSequenceNrIncreased(sequenceNumber, hashOfPayload))
return false;
ProtectedStorageEntry storedData = map.get(hashOfPayload);
PublicKey ownerPubKey = storedData.getProtectedStoragePayload().getOwnerPubKey();
byte[] hashOfDataAndSeqNr = refreshTTLMessage.getHashOfDataAndSeqNr();
byte[] signature = refreshTTLMessage.getSignature();
// Verify the RefreshOfferMessage is well formed and valid for the refresh operation
if (!checkSignature(ownerPubKey, hashOfDataAndSeqNr, signature))
return false;
// Verify the Payload owner and the Entry owner for the stored Entry are the same
// TODO: This is also checked in the validation for the original add(), investigate if this can be removed
if (!checkIfStoredDataPubKeyMatchesNewDataPubKey(ownerPubKey, hashOfPayload))
return false;
// This is a valid refresh, update the payload for it
log.debug("refreshDate called for storedData:\n\t" + StringUtils.abbreviate(storedData.toString(), 100));
storedData.refreshTTL();
storedData.updateSequenceNumber(sequenceNumber);
storedData.updateSignature(signature);
printData("after refreshTTL");
// Record the latest sequence number and persist it
sequenceNumberMap.put(hashOfPayload, new MapValue(sequenceNumber, System.currentTimeMillis()));
sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 1000);
// Always broadcast refreshes
broadcast(refreshTTLMessage, sender, null, isDataOwner);
return true;
}
public boolean remove(ProtectedStorageEntry protectedStorageEntry,
@ -481,32 +508,42 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
boolean containsKey = map.containsKey(hashOfPayload);
if (!containsKey)
// If we don't know about the target of this remove, ignore it
if (!map.containsKey(hashOfPayload)) {
log.debug("Remove data ignored as we don't have an entry for that data.");
boolean result = containsKey
&& checkPublicKeys(protectedStorageEntry, false)
&& isSequenceNrValid(protectedStorageEntry.getSequenceNumber(), hashOfPayload)
&& checkSignature(protectedStorageEntry)
&& checkIfStoredDataPubKeyMatchesNewDataPubKey(protectedStorageEntry.getOwnerPubKey(), hashOfPayload);
// printData("before remove");
if (result) {
doRemoveProtectedExpirableData(protectedStorageEntry, hashOfPayload);
printData("after remove");
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), System.currentTimeMillis()));
sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 300);
maybeAddToRemoveAddOncePayloads(protectedStoragePayload, hashOfPayload);
broadcast(new RemoveDataMessage(protectedStorageEntry), sender, null, isDataOwner);
removeFromProtectedDataStore(protectedStorageEntry);
} else {
log.debug("remove failed");
return false;
}
return result;
}
// If we have seen a more recent operation for this payload, ignore this one
if (!isSequenceNrValid(protectedStorageEntry.getSequenceNumber(), hashOfPayload))
return false;
// Verify the ProtectedStorageEntry is well formed and valid for the remove operation
if (!checkPublicKeys(protectedStorageEntry, false) || !checkSignature(protectedStorageEntry))
return false;
// If we have already seen an Entry with the same hash, verify the new Entry has the same owner
if (!checkIfStoredDataPubKeyMatchesNewDataPubKey(protectedStorageEntry.getOwnerPubKey(), hashOfPayload))
return false;
// Valid remove entry, do the remove and signal listeners
doRemoveProtectedExpirableData(protectedStorageEntry, hashOfPayload);
printData("after remove");
// Record the latest sequence number and persist it
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), System.currentTimeMillis()));
sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 300);
maybeAddToRemoveAddOncePayloads(protectedStoragePayload, hashOfPayload);
broadcast(new RemoveDataMessage(protectedStorageEntry), sender, null, isDataOwner);
removeFromProtectedDataStore(protectedStorageEntry);
return true;
}
/**
@ -559,33 +596,46 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
boolean isDataOwner) {
ProtectedStoragePayload protectedStoragePayload = protectedMailboxStorageEntry.getProtectedStoragePayload();
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);
boolean containsKey = map.containsKey(hashOfPayload);
if (!containsKey)
log.debug("Remove data ignored as we don't have an entry for that data.");
if (!map.containsKey(hashOfPayload)) {
log.debug("removeMailboxData failed due to unknown entry");
return false;
}
int sequenceNumber = protectedMailboxStorageEntry.getSequenceNumber();
if (!isSequenceNrValid(sequenceNumber, hashOfPayload))
return false;
PublicKey receiversPubKey = protectedMailboxStorageEntry.getReceiversPubKey();
boolean result = containsKey &&
isSequenceNrValid(sequenceNumber, hashOfPayload) &&
checkPublicKeys(protectedMailboxStorageEntry, false) &&
protectedMailboxStorageEntry.getMailboxStoragePayload().getOwnerPubKey().equals(receiversPubKey) && // at remove both keys are the same (only receiver is able to remove data)
checkSignature(protectedMailboxStorageEntry) &&
checkIfStoredMailboxDataMatchesNewMailboxData(receiversPubKey, hashOfPayload);
// printData("before removeMailboxData");
if (result) {
doRemoveProtectedExpirableData(protectedMailboxStorageEntry, hashOfPayload);
printData("after removeMailboxData");
sequenceNumberMap.put(hashOfPayload, new MapValue(sequenceNumber, System.currentTimeMillis()));
sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 300);
if (!checkPublicKeys(protectedMailboxStorageEntry, false) || !checkSignature(protectedMailboxStorageEntry))
return false;
maybeAddToRemoveAddOncePayloads(protectedStoragePayload, hashOfPayload);
broadcast(new RemoveMailboxDataMessage(protectedMailboxStorageEntry), sender, null, isDataOwner);
} else {
log.debug("removeMailboxData failed");
// Verify the Entry has the correct receiversPubKey for removal.
if (!protectedMailboxStorageEntry.getMailboxStoragePayload().getOwnerPubKey().equals(receiversPubKey)) {
log.debug("Entry receiversPubKey does not match payload owner which is a requirement for removing MailboxStoragePayloads");
return false;
}
return result;
// If we have already seen an Entry with the same hash, verify the new Entry has the same owner
if (!checkIfStoredMailboxDataMatchesNewMailboxData(receiversPubKey, hashOfPayload))
return false;
// Valid remove ProtectedMailboxStorageEntry, do the remove and signal listeners
doRemoveProtectedExpirableData(protectedMailboxStorageEntry, hashOfPayload);
printData("after removeMailboxData");
// Record the latest sequence number and persist it
sequenceNumberMap.put(hashOfPayload, new MapValue(sequenceNumber, System.currentTimeMillis()));
sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 300);
maybeAddToRemoveAddOncePayloads(protectedStoragePayload, hashOfPayload);
broadcast(new RemoveMailboxDataMessage(protectedMailboxStorageEntry), sender, null, isDataOwner);
return true;
}
private void maybeAddToRemoveAddOncePayloads(ProtectedStoragePayload protectedStoragePayload,