mirror of
https://github.com/bisq-network/bisq.git
synced 2024-11-19 09:52:23 +01:00
When daoActivated is set to false the DAO P2P data
are still received from seed nodes and processed but as the services for processing the payloads are not added the data is inefficiently processed. The getMap returned a flattened map of all maps in all services which can be quite large. We use now a filtered map with calling canHandle first. Also the put got optimized to indicate in the return value if there has been a service found to add the payload. If not we do not invoke the listeners and do not broadcast. To not request the DAO P2P data would be better but I don't see a easy way how to do that as the P2P network is not aware of the type of data. Some market interface could be used and a flag at the request to the seed node to indicate if those types should be included but that does feel too customized for a special use case. The DAO P2P data is not that big as well, so I think for now that fix should be good enough.
This commit is contained in:
parent
4689ad5c4b
commit
19b52ba6f6
@ -676,7 +676,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||
}
|
||||
|
||||
ByteArray hashAsByteArray = new ByteArray(payload.getHash());
|
||||
boolean payloadHashAlreadyInStore = appendOnlyDataStoreService.getMap().containsKey(hashAsByteArray);
|
||||
boolean payloadHashAlreadyInStore = appendOnlyDataStoreService.getMap(payload).containsKey(hashAsByteArray);
|
||||
|
||||
// Store already knows about this payload. Ignore it unless the caller specifically requests a republish.
|
||||
if (payloadHashAlreadyInStore && !reBroadcast) {
|
||||
@ -693,13 +693,16 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers
|
||||
}
|
||||
|
||||
// Add the payload and publish the state update to the appendOnlyDataStoreListeners
|
||||
boolean wasAdded = false;
|
||||
if (!payloadHashAlreadyInStore) {
|
||||
appendOnlyDataStoreService.put(hashAsByteArray, payload);
|
||||
wasAdded = appendOnlyDataStoreService.put(hashAsByteArray, payload);
|
||||
if (wasAdded) {
|
||||
appendOnlyDataStoreListeners.forEach(e -> e.onAdded(payload));
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast the payload if requested by caller
|
||||
if (allowBroadcast)
|
||||
if (allowBroadcast && wasAdded)
|
||||
broadcaster.broadcast(new AddPersistableNetworkPayloadMessage(payload), sender);
|
||||
|
||||
return true;
|
||||
|
@ -25,8 +25,10 @@ import javax.inject.Inject;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -75,6 +77,18 @@ public class AppendOnlyDataStoreService {
|
||||
services.forEach(service -> service.readFromResourcesSync(postFix));
|
||||
}
|
||||
|
||||
public Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> getMap(PersistableNetworkPayload payload) {
|
||||
return services.stream()
|
||||
.filter(service -> service.canHandle(payload))
|
||||
.map(service -> {
|
||||
Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> map = service instanceof HistoricalDataStoreService ?
|
||||
((HistoricalDataStoreService) service).getMapOfAllData() :
|
||||
service.getMap();
|
||||
return map;
|
||||
})
|
||||
.findAny()
|
||||
.orElse(new HashMap<>());
|
||||
}
|
||||
|
||||
public Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> getMap() {
|
||||
return services.stream()
|
||||
@ -87,9 +101,14 @@ public class AppendOnlyDataStoreService {
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
}
|
||||
|
||||
public void put(P2PDataStorage.ByteArray hashAsByteArray, PersistableNetworkPayload payload) {
|
||||
public boolean put(P2PDataStorage.ByteArray hashAsByteArray, PersistableNetworkPayload payload) {
|
||||
AtomicBoolean canHandle = new AtomicBoolean(false);
|
||||
services.stream()
|
||||
.filter(service -> service.canHandle(payload))
|
||||
.filter(service -> {
|
||||
canHandle.set(service.canHandle(payload));
|
||||
return canHandle.get();
|
||||
})
|
||||
.forEach(service -> service.putIfAbsent(hashAsByteArray, payload));
|
||||
return canHandle.get();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user