[REFACTOR] HashMapListener::onAdded/onRemoved

Previously, this interface was called each time an item was changed. This
required listeners to understand performance implications of multiple
adds or removes in a short time span.

Instead, give each listener the ability to process a list of added or
removed entrys which can help them avoid performance issues.

This patch is just a refactor. Each listener is called once for each
ProtectedStorageEntry. Future patches will change this.
This commit is contained in:
Julian Knutsen 2019-11-16 11:15:35 -08:00
parent 3bd67bab05
commit b281566e14
No known key found for this signature in database
GPG key ID: D85F536DB3615B2D
10 changed files with 113 additions and 77 deletions

View file

@ -44,6 +44,8 @@ import java.security.SignatureException;
import java.math.BigInteger;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -79,22 +81,26 @@ public class AlertManager {
if (!ignoreDevMsg) {
p2PService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedStorageEntry data) {
final ProtectedStoragePayload protectedStoragePayload = data.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
Alert alert = (Alert) protectedStoragePayload;
if (verifySignature(alert))
alertMessageProperty.set(alert);
}
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
Alert alert = (Alert) protectedStoragePayload;
if (verifySignature(alert))
alertMessageProperty.set(alert);
}
});
}
@Override
public void onRemoved(ProtectedStorageEntry data) {
final ProtectedStoragePayload protectedStoragePayload = data.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
if (verifySignature((Alert) protectedStoragePayload))
alertMessageProperty.set(null);
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof Alert) {
if (verifySignature((Alert) protectedStoragePayload))
alertMessageProperty.set(null);
}
});
}
});
}

View file

@ -41,6 +41,7 @@ import javafx.collections.ListChangeListener;
import javafx.collections.ObservableList;
import javafx.collections.transformation.FilteredList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -130,17 +131,21 @@ public class ProposalListPresentation implements DaoStateListener, HashMapChange
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onAdded(ProtectedStorageEntry entry) {
if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
});
}
@Override
public void onRemoved(ProtectedStorageEntry entry) {
if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof TempProposalPayload) {
tempProposalsChanged = true;
}
});
}
@Override

View file

@ -50,6 +50,7 @@ import javax.inject.Named;
import javafx.collections.FXCollections;
import javafx.collections.ObservableList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@ -133,13 +134,17 @@ public class ProposalService implements HashMapChangedListener, AppendOnlyDataSt
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onAdded(ProtectedStorageEntry entry) {
onProtectedDataAdded(entry, true);
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
onProtectedDataAdded(protectedStorageEntry, true);
});
}
@Override
public void onRemoved(ProtectedStorageEntry entry) {
onProtectedDataRemoved(entry);
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
onProtectedDataRemoved(protectedStorageEntry);
});
}

View file

@ -52,6 +52,7 @@ import java.security.SignatureException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
@ -133,23 +134,27 @@ public class FilterManager {
p2PService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedStorageEntry data) {
if (data.getProtectedStoragePayload() instanceof Filter) {
Filter filter = (Filter) data.getProtectedStoragePayload();
boolean wasValid = addFilter(filter);
if (!wasValid) {
UserThread.runAfter(() -> p2PService.getP2PDataStorage().removeInvalidProtectedStorageEntry(data), 1);
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof Filter) {
Filter filter = (Filter) protectedStorageEntry.getProtectedStoragePayload();
boolean wasValid = addFilter(filter);
if (!wasValid) {
UserThread.runAfter(() -> p2PService.getP2PDataStorage().removeInvalidProtectedStorageEntry(protectedStorageEntry), 1);
}
}
}
});
}
@Override
public void onRemoved(ProtectedStorageEntry data) {
if (data.getProtectedStoragePayload() instanceof Filter) {
Filter filter = (Filter) data.getProtectedStoragePayload();
if (verifySignature(filter))
resetFilters();
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof Filter) {
Filter filter = (Filter) protectedStorageEntry.getProtectedStoragePayload();
if (verifySignature(filter))
resetFilters();
}
});
}
});
}

View file

@ -40,6 +40,7 @@ import javax.inject.Inject;
import java.io.File;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
@ -87,26 +88,30 @@ public class OfferBookService {
p2PService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedStorageEntry data) {
offerBookChangedListeners.stream().forEach(listener -> {
if (data.getProtectedStoragePayload() instanceof OfferPayload) {
OfferPayload offerPayload = (OfferPayload) data.getProtectedStoragePayload();
Offer offer = new Offer(offerPayload);
offer.setPriceFeedService(priceFeedService);
listener.onAdded(offer);
}
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
offerBookChangedListeners.stream().forEach(listener -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof OfferPayload) {
OfferPayload offerPayload = (OfferPayload) protectedStorageEntry.getProtectedStoragePayload();
Offer offer = new Offer(offerPayload);
offer.setPriceFeedService(priceFeedService);
listener.onAdded(offer);
}
});
});
}
@Override
public void onRemoved(ProtectedStorageEntry data) {
offerBookChangedListeners.stream().forEach(listener -> {
if (data.getProtectedStoragePayload() instanceof OfferPayload) {
OfferPayload offerPayload = (OfferPayload) data.getProtectedStoragePayload();
Offer offer = new Offer(offerPayload);
offer.setPriceFeedService(priceFeedService);
listener.onRemoved(offer);
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
offerBookChangedListeners.stream().forEach(listener -> {
if (protectedStorageEntry.getProtectedStoragePayload() instanceof OfferPayload) {
OfferPayload offerPayload = (OfferPayload) protectedStorageEntry.getProtectedStoragePayload();
Offer offer = new Offer(offerPayload);
offer.setPriceFeedService(priceFeedService);
listener.onRemoved(offer);
}
});
});
}
});

View file

@ -46,6 +46,7 @@ import java.security.SignatureException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -131,18 +132,22 @@ public abstract class DisputeAgentManager<T extends DisputeAgent> {
public void onAllServicesInitialized() {
disputeAgentService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedStorageEntry data) {
if (isExpectedInstance(data)) {
updateMap();
}
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (isExpectedInstance(protectedStorageEntry)) {
updateMap();
}
});
}
@Override
public void onRemoved(ProtectedStorageEntry data) {
if (isExpectedInstance(data)) {
updateMap();
removeAcceptedDisputeAgentFromUser(data);
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (isExpectedInstance(protectedStorageEntry)) {
updateMap();
removeAcceptedDisputeAgentFromUser(protectedStorageEntry);
}
});
}
});

View file

@ -75,6 +75,7 @@ import javafx.beans.property.SimpleIntegerProperty;
import java.security.PublicKey;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@ -432,15 +433,15 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onAdded(ProtectedStorageEntry protectedStorageEntry) {
if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry)
processMailboxEntry((ProtectedMailboxStorageEntry) protectedStorageEntry);
public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
protectedStorageEntries.forEach(protectedStorageEntry -> {
if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry)
processMailboxEntry((ProtectedMailboxStorageEntry) protectedStorageEntry);
});
}
@Override
public void onRemoved(ProtectedStorageEntry data) {
}
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) { }
///////////////////////////////////////////////////////////////////////////////////////////
// DirectMessages

View file

@ -19,11 +19,13 @@ package bisq.network.p2p.storage;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import java.util.Collection;
public interface HashMapChangedListener {
void onAdded(ProtectedStorageEntry data);
void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries);
@SuppressWarnings("UnusedParameters")
void onRemoved(ProtectedStorageEntry data);
void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries);
// We process all expired entries after a delay (60 s) after onBootstrapComplete.
// We notify listeners of start and completion so they can optimize to only update after batch processing is done.

View file

@ -75,6 +75,7 @@ import java.security.PublicKey;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
@ -409,7 +410,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
// This is an updated entry. Record it and signal listeners.
map.put(hashOfPayload, protectedStorageEntry);
hashMapChangedListeners.forEach(e -> e.onAdded(protectedStorageEntry));
hashMapChangedListeners.forEach(e -> e.onAdded(Collections.singletonList(protectedStorageEntry)));
// Record the updated sequence number and persist it. Higher delay so we can batch more items.
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis()));
@ -643,7 +644,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
private void removeFromMapAndDataStore(ProtectedStorageEntry protectedStorageEntry, ByteArray hashOfPayload) {
map.remove(hashOfPayload);
hashMapChangedListeners.forEach(e -> e.onRemoved(protectedStorageEntry));
hashMapChangedListeners.forEach(e -> e.onRemoved(Collections.singletonList(protectedStorageEntry)));
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof PersistablePayload) {

View file

@ -45,6 +45,7 @@ import bisq.common.storage.Storage;
import java.security.PublicKey;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
@ -174,7 +175,7 @@ public class TestState {
verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry);
}
verify(this.hashMapChangedListener).onAdded(protectedStorageEntry);
verify(this.hashMapChangedListener).onAdded(Collections.singletonList(protectedStorageEntry));
final ArgumentCaptor<BroadcastMessage> captor = ArgumentCaptor.forClass(BroadcastMessage.class);
verify(this.mockBroadcaster).broadcast(captor.capture(), any(NodeAddress.class),
@ -192,7 +193,7 @@ public class TestState {
verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), any(BroadcastHandler.Listener.class), anyBoolean());
// Internal state didn't change... nothing should be notified
verify(this.hashMapChangedListener, never()).onAdded(protectedStorageEntry);
verify(this.hashMapChangedListener, never()).onAdded(Collections.singletonList(protectedStorageEntry));
verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry);
verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong());
}
@ -216,7 +217,7 @@ public class TestState {
verify(this.protectedDataStoreListener).onRemoved(protectedStorageEntry);
}
verify(this.hashMapChangedListener).onRemoved(protectedStorageEntry);
verify(this.hashMapChangedListener).onRemoved(Collections.singletonList(protectedStorageEntry));
if (expectedSeqNrWriteOnStateChange)
this.verifySequenceNumberMapWriteContains(P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()), protectedStorageEntry.getSequenceNumber());
@ -232,7 +233,7 @@ public class TestState {
Assert.assertEquals(beforeState.protectedStorageEntryBeforeOp, this.mockedStorage.getMap().get(hashMapHash));
verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), any(BroadcastHandler.Listener.class), anyBoolean());
verify(this.hashMapChangedListener, never()).onAdded(protectedStorageEntry);
verify(this.hashMapChangedListener, never()).onAdded(Collections.singletonList(protectedStorageEntry));
verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry);
verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong());
}