Fix issue with too early disconnect to seeds

As soon we are over our connection target we start disconnecting seed
nodes though that can be too early as getBlocks requests are still
pending. We delay the disconnect seed handling to 3 minutes after our
first connection so give it enough time to receive the getBlocksResponse.
This commit is contained in:
Manfred Karrer 2019-05-11 12:22:59 +02:00
parent ca08b7c272
commit a8b1524d8d
No known key found for this signature in database
GPG key ID: 401250966A6B2C46
2 changed files with 33 additions and 17 deletions

View file

@ -528,6 +528,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
return "Connection{" +
"peerAddress=" + peersNodeAddressOptional +
", peerType=" + peerType +
", connectionType=" + (this instanceof InboundConnection ? "InboundConnection" : "OutboundConnection") +
", uid='" + uid + '\'' +
'}';
}

View file

@ -44,6 +44,7 @@ import javax.inject.Inject;
import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
@ -77,6 +78,8 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
// Age of what we consider connected peers still as live peers
private static final long MAX_AGE_LIVE_PEERS = TimeUnit.MINUTES.toMillis(30);
private static final boolean PRINT_REPORTED_PEERS_DETAILS = true;
private long allowDisconnectSeedNodesTs;
private boolean allowDisconnectSeedNodes;
private Set<Peer> latestLivePeers = new HashSet<>();
@ -218,6 +221,16 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
@Override
public void onConnection(Connection connection) {
if (!allowDisconnectSeedNodes) {
if (allowDisconnectSeedNodesTs == 0) {
allowDisconnectSeedNodesTs = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3);
}
// We do not want to disconnect seed nodes at start up as we would not get the response of our blockRequests
// Those requests should be happen soon after start up, to be on the safe side we give it 3 minutes.
// If seed nodes get too heavy under load we still might get disconnected from their side.
allowDisconnectSeedNodes = System.currentTimeMillis() > allowDisconnectSeedNodesTs;
}
final boolean seedNode = isSeedNode(connection);
final Optional<NodeAddress> addressOptional = connection.getPeersNodeAddressOptional();
@ -299,7 +312,7 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
}
private boolean checkMaxConnections() {
Set<Connection> allConnections = networkNode.getAllConnections();
Set<Connection> allConnections = new HashSet<>(networkNode.getAllConnections());
int size = allConnections.size();
log.info("We have {} connections open. Our limit is {}", size, maxConnections);
@ -312,7 +325,7 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
.collect(Collectors.toList());
if (candidates.isEmpty()) {
log.debug("No candidates found. We check if we exceed our " +
log.info("No candidates found. We check if we exceed our " +
"maxConnectionsPeer limit of {}", maxConnectionsPeer);
if (size > maxConnectionsPeer) {
log.info("Lets try to remove ANY connection of type PEER.");
@ -334,7 +347,7 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
"maxConnectionsAbsolute limit of {}", maxConnectionsAbsolute);
if (size > maxConnectionsAbsolute) {
log.info("We reached abs. max. connections. Lets try to remove ANY connection.");
candidates = allConnections.stream().collect(Collectors.toList());
candidates = new ArrayList<>(allConnections);
}
}
}
@ -343,7 +356,7 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
}
if (!candidates.isEmpty()) {
candidates.sort((o1, o2) -> ((Long) o1.getStatistic().getLastActivityTimestamp()).compareTo(((Long) o2.getStatistic().getLastActivityTimestamp())));
candidates.sort(Comparator.comparingLong(o -> o.getStatistic().getLastActivityTimestamp()));
Connection connection = candidates.remove(0);
log.info("checkMaxConnections: Num candidates for shut down={}. We close oldest connection: {}", candidates.size(), connection);
log.debug("We are going to shut down the oldest connection.\n\tconnection={}", connection.toString());
@ -353,7 +366,7 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
} else {
log.warn("No candidates found to remove (That case should not be possible as we use in the " +
"last case all connections).\n\t" +
"allConnections={}", allConnections);
"size={}, allConnections={}", size, allConnections);
return false;
}
} else {
@ -378,18 +391,20 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
}
private void removeSuperfluousSeedNodes() {
if (networkNode.getConfirmedConnections().size() > disconnectFromSeedNode) {
List<Connection> seedNodes = networkNode.getConfirmedConnections().stream()
.filter(this::isSeedNode)
.collect(Collectors.toList());
if (allowDisconnectSeedNodes) {
if (networkNode.getConfirmedConnections().size() > disconnectFromSeedNode) {
List<Connection> seedNodes = networkNode.getConfirmedConnections().stream()
.filter(this::isSeedNode)
.collect(Collectors.toList());
if (!seedNodes.isEmpty()) {
seedNodes.sort((o1, o2) -> ((Long) o1.getStatistic().getLastActivityTimestamp()).compareTo(((Long) o2.getStatistic().getLastActivityTimestamp())));
log.debug("Number of seed node connections to disconnect. Current size=" + seedNodes.size());
Connection connection = seedNodes.get(0);
log.debug("We are going to shut down the oldest connection.\n\tconnection={}", connection.toString());
connection.shutDown(CloseConnectionReason.TOO_MANY_SEED_NODES_CONNECTED,
() -> UserThread.runAfter(this::removeSuperfluousSeedNodes, 200, TimeUnit.MILLISECONDS));
if (!seedNodes.isEmpty()) {
seedNodes.sort((o1, o2) -> ((Long) o1.getStatistic().getLastActivityTimestamp()).compareTo(((Long) o2.getStatistic().getLastActivityTimestamp())));
log.debug("Number of seed node connections to disconnect. Current size=" + seedNodes.size());
Connection connection = seedNodes.get(0);
log.debug("We are going to shut down the oldest connection.\n\tconnection={}", connection.toString());
connection.shutDown(CloseConnectionReason.TOO_MANY_SEED_NODES_CONNECTED,
() -> UserThread.runAfter(this::removeSuperfluousSeedNodes, 200, TimeUnit.MILLISECONDS));
}
}
}
}
@ -670,7 +685,7 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
Optional<Peer> ourPeer = allPeers.stream().filter(peer -> peer.getNodeAddress().equals(connection.getPeersNodeAddressOptional().get()))
.filter(peer -> !peer.getCapabilities().isEmpty())
.findAny();
if(ourPeer.isPresent())
if (ourPeer.isPresent())
supportedCapabilities = new Capabilities(ourPeer.get().getCapabilities());
}
Peer peer = new Peer(connection.getPeersNodeAddressOptional().get(), supportedCapabilities);