diff --git a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java index b0065cb363..6611070546 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java @@ -676,7 +676,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers } ByteArray hashAsByteArray = new ByteArray(payload.getHash()); - boolean payloadHashAlreadyInStore = appendOnlyDataStoreService.getMap().containsKey(hashAsByteArray); + boolean payloadHashAlreadyInStore = appendOnlyDataStoreService.getMap(payload).containsKey(hashAsByteArray); // Store already knows about this payload. Ignore it unless the caller specifically requests a republish. if (payloadHashAlreadyInStore && !reBroadcast) { @@ -693,13 +693,16 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers } // Add the payload and publish the state update to the appendOnlyDataStoreListeners + boolean wasAdded = false; if (!payloadHashAlreadyInStore) { - appendOnlyDataStoreService.put(hashAsByteArray, payload); - appendOnlyDataStoreListeners.forEach(e -> e.onAdded(payload)); + wasAdded = appendOnlyDataStoreService.put(hashAsByteArray, payload); + if (wasAdded) { + appendOnlyDataStoreListeners.forEach(e -> e.onAdded(payload)); + } } // Broadcast the payload if requested by caller - if (allowBroadcast) + if (allowBroadcast && wasAdded) broadcaster.broadcast(new AddPersistableNetworkPayloadMessage(payload), sender); return true; diff --git a/p2p/src/main/java/bisq/network/p2p/storage/persistence/AppendOnlyDataStoreService.java b/p2p/src/main/java/bisq/network/p2p/storage/persistence/AppendOnlyDataStoreService.java index 25ce17f0e9..d9e98c7219 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/persistence/AppendOnlyDataStoreService.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/persistence/AppendOnlyDataStoreService.java @@ -25,8 +25,10 @@ import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -75,6 +77,18 @@ public class AppendOnlyDataStoreService { services.forEach(service -> service.readFromResourcesSync(postFix)); } + public Map getMap(PersistableNetworkPayload payload) { + return services.stream() + .filter(service -> service.canHandle(payload)) + .map(service -> { + Map map = service instanceof HistoricalDataStoreService ? + ((HistoricalDataStoreService) service).getMapOfAllData() : + service.getMap(); + return map; + }) + .findAny() + .orElse(new HashMap<>()); + } public Map getMap() { return services.stream() @@ -87,9 +101,14 @@ public class AppendOnlyDataStoreService { .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - public void put(P2PDataStorage.ByteArray hashAsByteArray, PersistableNetworkPayload payload) { + public boolean put(P2PDataStorage.ByteArray hashAsByteArray, PersistableNetworkPayload payload) { + AtomicBoolean canHandle = new AtomicBoolean(false); services.stream() - .filter(service -> service.canHandle(payload)) + .filter(service -> { + canHandle.set(service.canHandle(payload)); + return canHandle.get(); + }) .forEach(service -> service.putIfAbsent(hashAsByteArray, payload)); + return canHandle.get(); } }