add penalty for failed connections, increase duration for maintenance

This commit is contained in:
Manfred Karrer 2016-01-30 04:33:23 +01:00
parent 167e07b094
commit 067bf07a83
6 changed files with 78 additions and 22 deletions

View File

@ -39,13 +39,13 @@ public class Broadcaster {
receivers.stream()
.filter(connection -> !connection.getPeersNodeAddressOptional().get().equals(sender))
.forEach(connection -> {
log.trace("Broadcast message from " + networkNode.getNodeAddress() + " to " +
log.trace("Broadcast message to " +
connection.getPeersNodeAddressOptional().get() + ".");
SettableFuture<Connection> future = networkNode.sendMessage(connection, message);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("Broadcast from " + networkNode.getNodeAddress() + " to " + connection + " succeeded.");
log.trace("Broadcast to " + connection + " succeeded.");
listeners.stream().forEach(listener -> {
listener.onBroadcasted(message);
listeners.remove(listener);

View File

@ -116,6 +116,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
public void onFault(String errorMessage) {
log.trace("PeerExchangeHandshake of outbound connection failed. {} connection= {}",
errorMessage, connection);
peerManager.penalizeUnreachablePeer(connection);
}
});
peerExchangeHandshake.onGetPeersRequest((GetPeersRequest) message, connection);
@ -147,6 +148,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
errorMessage, nodeAddress);
peerExchangeHandshakeMap.remove(nodeAddress);
peerManager.penalizeUnreachablePeer(nodeAddress);
if (!remainingNodeAddresses.isEmpty()) {
log.info("There are remaining nodes available for requesting peers. " +
"We will try getReportedPeers again.");
@ -186,7 +188,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
Collections.shuffle(nodeAddresses);
requestReportedPeersFromRandomPeer(nodeAddresses);
}
// We try to get sufficient connections by connecting to reported and persisted peers
if (numberOfConnectedSeedNodes == 0) {
@ -197,16 +199,16 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
connectToMorePeers();
}
// Use all outbound connections older than 4 min. for updating reported peers and make sure we keep the connection alive
// Use all outbound connections older than 10 min. for updating reported peers and make sure we keep the connection alive
// Inbound connections should be maintained be the requesting peer
confirmedConnections.stream()
.filter(c -> c.getPeersNodeAddressOptional().isPresent() &&
c instanceof OutboundConnection &&
new Date().getTime() - c.getLastActivityDate().getTime() > 4 * 60 * 1000)
.forEach(c -> UserThread.runAfterRandomDelay(() -> {
new Date().getTime() - c.getLastActivityDate().getTime() > 10 * 60 * 1000)
.forEach(c -> {
log.trace("Call requestReportedPeers from maintainConnections");
requestReportedPeers(c.getPeersNodeAddressOptional().get(), new ArrayList<>());
}, 3, 5));
});
}
private void connectToMorePeers() {
@ -228,6 +230,7 @@ public class PeerExchangeManager implements MessageListener, ConnectionListener
!peerManager.isSelf(e) &&
!peerManager.isConfirmed(e))
.collect(Collectors.toSet()));
log.info("Sorted and filtered list: list.size()=" + list.size());
log.trace("Sorted and filtered list: list=" + list);
if (!list.isEmpty()) {
NodeAddress nextCandidate = list.get(0);

View File

@ -44,6 +44,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
private static final int MAX_REPORTED_PEERS = 1000;
private static final int MAX_PERSISTED_PEERS = 500;
private static final long MAX_AGE = 14 * 24 * 60 * 60 * 1000; // max age for reported peers is 14 days
private final NetworkNode networkNode;
@ -68,9 +69,13 @@ public class PeerManager implements ConnectionListener, MessageListener {
createDbStorage(storageDir);
connectionNodeAddressListener = (observable, oldValue, newValue) -> {
// Every time we get a new peer connected with a known address we check if we need to remove peers
printConnectedPeers();
if (checkMaxConnectionsTimer == null && newValue != null)
checkMaxConnectionsTimer = UserThread.runAfter(() -> checkMaxConnections(MAX_CONNECTIONS), 3);
checkMaxConnectionsTimer = UserThread.runAfter(() -> {
removeTooOldReportedPeers();
checkMaxConnections(MAX_CONNECTIONS);
}, 3);
};
}
@ -118,7 +123,12 @@ public class PeerManager implements ConnectionListener, MessageListener {
@Override
public void onDisconnect(Reason reason, Connection connection) {
connection.getNodeAddressProperty().removeListener(connectionNodeAddressListener);
//connection.getPeersNodeAddressOptional().ifPresent(this::removePeer);
connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> {
ReportedPeer reportedPeer = new ReportedPeer(nodeAddress);
reportedPeers.remove(reportedPeer);
persistedPeers.add(reportedPeer);
dbStorage.queueUpForSave(persistedPeers, 5000);
});
}
@Override
@ -203,6 +213,18 @@ public class PeerManager implements ConnectionListener, MessageListener {
}
}
private void removeTooOldReportedPeers() {
Set<ReportedPeer> reportedPeersToRemove = reportedPeers.stream()
.filter(reportedPeer -> new Date().getTime() - reportedPeer.lastActivityDate.getTime() > MAX_AGE)
.collect(Collectors.toSet());
reportedPeersToRemove.forEach(this::removeReportedPeer);
Set<ReportedPeer> persistedPeersToRemove = persistedPeers.stream()
.filter(reportedPeer -> new Date().getTime() - reportedPeer.lastActivityDate.getTime() > MAX_AGE)
.collect(Collectors.toSet());
persistedPeersToRemove.forEach(this::removeFromPersistedPeers);
}
private void removeSuperfluousSeedNodes() {
Set<Connection> allConnections = networkNode.getAllConnections();
if (allConnections.size() > MAX_CONNECTIONS_EXTENDED_1) {
@ -299,7 +321,7 @@ public class PeerManager implements ConnectionListener, MessageListener {
}
if (dbStorage != null)
dbStorage.queueUpForSave(persistedPeers);
dbStorage.queueUpForSave(persistedPeers, 2000);
}
printReportedPeers();
@ -342,6 +364,31 @@ public class PeerManager implements ConnectionListener, MessageListener {
return networkNode.getNodeAddressesOfConfirmedConnections().size() >= MIN_CONNECTIONS;
}
public void penalizeUnreachablePeer(Connection connection) {
connection.getPeersNodeAddressOptional().ifPresent(this::penalizeUnreachablePeer);
}
public void penalizeUnreachablePeer(NodeAddress nodeAddress) {
reportedPeers.stream()
.filter(reportedPeer -> reportedPeer.nodeAddress.equals(nodeAddress))
.findAny()
.ifPresent(this::adjustLastActivityDate);
persistedPeers.stream()
.filter(reportedPeer -> reportedPeer.nodeAddress.equals(nodeAddress))
.findAny()
.ifPresent(reportedPeer -> {
adjustLastActivityDate(reportedPeer);
dbStorage.queueUpForSave(persistedPeers, 5000);
});
}
private void adjustLastActivityDate(ReportedPeer reportedPeer) {
long now = new Date().getTime();
long diff = now - reportedPeer.lastActivityDate.getTime();
long reduced = now - diff * 2;
reportedPeer.setLastActivityDate(new Date(reduced));
}
public Set<ReportedPeer> getConnectedAndReportedPeers() {
Set<ReportedPeer> result = new HashSet<>(reportedPeers);
result.addAll(getConnectedPeers());
@ -443,5 +490,4 @@ public class PeerManager implements ConnectionListener, MessageListener {
log.info(result.toString());
}
}
}

View File

@ -11,7 +11,7 @@ public class ReportedPeer implements Serializable {
private static final long serialVersionUID = Version.NETWORK_PROTOCOL_VERSION;
public final NodeAddress nodeAddress;
public final Date lastActivityDate;
public Date lastActivityDate;
public ReportedPeer(NodeAddress nodeAddress, Date lastActivityDate) {
this.nodeAddress = nodeAddress;
@ -22,6 +22,10 @@ public class ReportedPeer implements Serializable {
this(nodeAddress, null);
}
public void setLastActivityDate(Date lastActivityDate) {
this.lastActivityDate = lastActivityDate;
}
// We don't use the lastActivityDate for identity
@Override
public boolean equals(Object o) {

View File

@ -126,6 +126,7 @@ public class RequestDataManager implements MessageListener {
public void onFault(String errorMessage) {
log.trace("RequestDataHandshake of inbound connection failed. {} Connection= {}",
errorMessage, connection);
peerManager.penalizeUnreachablePeer(connection);
}
});
requestDataHandshake.onDataRequest(message, connection);
@ -170,6 +171,9 @@ public class RequestDataManager implements MessageListener {
public void onFault(String errorMessage) {
log.trace("RequestDataHandshake of outbound connection failed. {} nodeAddress= {}",
errorMessage, nodeAddress);
peerManager.penalizeUnreachablePeer(nodeAddress);
if (!remainingNodeAddresses.isEmpty()) {
log.info("There are remaining nodes available for requesting data. " +
"We will try requestDataFromPeers again.");

View File

@ -160,14 +160,11 @@ public class P2PDataStorage implements MessageListener {
storage.queueUpForSave(sequenceNumberMap, 5000);
StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n");
sb.append("Data set after addProtectedExpirableData:");
if (map.values().size() < 10)
map.values().stream().forEach(e -> sb.append("\n").append(e.toString()).append("\n"));
else
map.values().stream().forEach(e -> sb.append("\n").append("Truncated logs:").append(map.values().size())
.append(" entries\n").append(e.toString().substring(0, 40)).append("...\n"));
sb.append("Data set after addProtectedExpirableData (truncated)");
map.values().stream().forEach(e -> sb.append("\n").append(e.toString().substring(0, 40)).append("...\n"));
sb.append("\n------------------------------------------------------------\n");
log.info(sb.toString());
log.trace(sb.toString());
log.info("Data set after addProtectedExpirableData: size=" + map.values().size());
if (rePublish || !containsKey)
broadcast(new AddDataMessage(protectedData), sender);
@ -282,10 +279,12 @@ public class P2PDataStorage implements MessageListener {
hashMapChangedListeners.stream().forEach(e -> e.onRemoved(protectedData));
StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n" +
"Data set after removeProtectedExpirableData:");
map.values().stream().forEach(e -> sb.append("\n").append(e.toString()));
"Data set after removeProtectedExpirableData: (truncated)");
map.values().stream().forEach(e -> sb.append("\n").append(e.toString().substring(0, 40)).append("...\n"));
sb.append("\n------------------------------------------------------------\n");
log.info(sb.toString());
log.trace(sb.toString());
log.info("Data set after addProtectedExpirableData: size=" + map.values().size());
}
private boolean isSequenceNrValid(ProtectedData data, ByteArray hashOfData) {