Gather Dao data

This commit is contained in:
Florian Reimair 2019-06-04 17:21:15 +02:00
parent 7442d2a428
commit fb71889042
No known key found for this signature in database
GPG key ID: 05634D8D7A7954C8

View file

@ -24,6 +24,15 @@ import bisq.monitor.OnionParser;
import bisq.monitor.Reporter;
import bisq.monitor.ThreadGate;
import bisq.core.dao.monitoring.model.BlindVoteStateHash;
import bisq.core.dao.monitoring.model.DaoStateHash;
import bisq.core.dao.monitoring.model.ProposalStateHash;
import bisq.core.dao.monitoring.network.messages.GetBlindVoteStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.GetBlindVoteStateHashesResponse;
import bisq.core.dao.monitoring.network.messages.GetDaoStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.GetDaoStateHashesResponse;
import bisq.core.dao.monitoring.network.messages.GetProposalStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.GetProposalStateHashesResponse;
import bisq.core.proto.network.CoreNetworkProtoResolver;
import bisq.network.p2p.CloseConnectionMessage;
@ -50,12 +59,14 @@ import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@ -163,39 +174,55 @@ public class P2PSeedNodeSnapshot extends Metric implements MessageListener {
// clear our buckets
bucketsPerHost.clear();
int height = 550000;
Random random = new Random();
send(networkNode, new PreliminaryGetDataRequest(random.nextInt(), hashes));
send(networkNode, new GetDaoStateHashesRequest(height, random.nextInt()));
send(networkNode, new GetProposalStateHashesRequest(height, random.nextInt()));
send(networkNode, new GetBlindVoteStateHashesRequest(height, random.nextInt()));
report();
}
private void send(NetworkNode networkNode, NetworkEnvelope message) {
ArrayList<Thread> threadList = new ArrayList<>();
// for each configured host
for (String current : configuration.getProperty(HOSTS, "").split(",")) {
threadList.add(new Thread(() -> {
try {
// parse Url
NodeAddress target = OnionParser.getNodeAddress(current);
try {
// parse Url
NodeAddress target = OnionParser.getNodeAddress(current);
// do the data request
SettableFuture<Connection> future = networkNode.sendMessage(target, getFreshRequest(networkNode.getNodeAddress()));
// do the data request
SettableFuture<Connection> future = networkNode.sendMessage(target, message);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
connection.addMessageListener(P2PSeedNodeSnapshot.this);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
gate.proceed();
log.error(
"Sending PreliminaryDataRequest failed. That is expected if the peer is offline.\n\tException="
+ throwable.getMessage());
}
});
} catch (Exception e) {
gate.proceed(); // release the gate on error
e.printStackTrace();
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
connection.addMessageListener(P2PSeedNodeSnapshot.this);
}
@Override
public void onFailure(@NotNull Throwable throwable) {
gate.proceed();
log.error(
"Sending PreliminaryDataRequest failed. That is expected if the peer is offline.\n\tException="
+ throwable.getMessage());
}
});
} catch (Exception e) {
gate.proceed(); // release the gate on error
e.printStackTrace();
}
}, current));
}
@ -205,14 +232,8 @@ public class P2PSeedNodeSnapshot extends Metric implements MessageListener {
// minimize the time between querying the hosts and therefore the chance of
// inconsistencies.
threadList.forEach(Thread::start);
gate.await();
report();
}
protected NetworkEnvelope getFreshRequest(NodeAddress nodeAddress) {
int nonce = new Random().nextInt();
return new PreliminaryGetDataRequest(nonce, hashes);
}
/**
@ -266,7 +287,6 @@ public class P2PSeedNodeSnapshot extends Metric implements MessageListener {
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if(treatMessage(networkEnvelope, connection)) {
connection.shutDown(CloseConnectionReason.APP_SHUT_DOWN);
gate.proceed();
} else if (networkEnvelope instanceof CloseConnectionMessage) {
gate.unlock();
@ -276,6 +296,20 @@ public class P2PSeedNodeSnapshot extends Metric implements MessageListener {
}
}
private class Tuple {
private final long height;
private final byte[] hash;
Tuple(long height, byte[] hash) {
this.height = height;
this.hash = hash;
}
}
private Map<NodeAddress, Tuple> daoStateHashData = new ConcurrentHashMap<>();
private Map<NodeAddress, Tuple> daoProposalStateHashData = new ConcurrentHashMap<>();
private Map<NodeAddress, Tuple> daoBlindVoteStateHashData = new ConcurrentHashMap<>();
protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof GetDataResponse) {
@ -311,6 +345,28 @@ public class P2PSeedNodeSnapshot extends Metric implements MessageListener {
checkNotNull(connection.getPeersNodeAddressProperty(),
"although the property is nullable, we need it to not be null");
bucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), result);
return true;
} else if (networkEnvelope instanceof GetDaoStateHashesResponse) {
// get last entry
DaoStateHash last = ((GetDaoStateHashesResponse) networkEnvelope).getStateHashes().get(((GetDaoStateHashesResponse) networkEnvelope).getStateHashes().size() - 1);
daoStateHashData.put(connection.getPeersNodeAddressProperty().getValue(), new Tuple(last.getHeight(), last.getHash()));
return true;
} else if (networkEnvelope instanceof GetProposalStateHashesResponse) {
// get last entry
ProposalStateHash last = ((GetProposalStateHashesResponse) networkEnvelope).getStateHashes().get(((GetProposalStateHashesResponse) networkEnvelope).getStateHashes().size() - 1);
daoProposalStateHashData.put(connection.getPeersNodeAddressProperty().getValue(), new Tuple(last.getHeight(), last.getHash()));
return true;
} else if(networkEnvelope instanceof GetBlindVoteStateHashesResponse) {
// get last entry
BlindVoteStateHash last = ((GetBlindVoteStateHashesResponse) networkEnvelope).getStateHashes().get(((GetBlindVoteStateHashesResponse) networkEnvelope).getStateHashes().size() - 1);
daoBlindVoteStateHashData.put(connection.getPeersNodeAddressProperty().getValue(), new Tuple(last.getHeight(), last.getHash()));
connection.shutDown(CloseConnectionReason.APP_SHUT_DOWN);
return true;
}
return false;