Refactor: Rearrange code, remove unused methods, renamings (no functional change)

This commit is contained in:
chimp1984 2020-10-07 18:44:04 -05:00
parent 4575516d19
commit ed960aba3f
No known key found for this signature in database
GPG Key ID: 9801B4EC591F90E3

View File

@ -64,7 +64,7 @@ import javax.annotation.Nullable;
import static com.google.common.base.Preconditions.checkArgument;
@Slf4j
public class PeerManager implements ConnectionListener, PersistedDataHost {
public final class PeerManager implements ConnectionListener, PersistedDataHost {
///////////////////////////////////////////////////////////////////////////////////////////
// Static
@ -104,7 +104,7 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
private final ClockWatcher clockWatcher;
private final Set<NodeAddress> seedNodeAddresses;
private final PersistenceManager<PeerList> persistenceManager;
private final ClockWatcher.Listener listener;
private final ClockWatcher.Listener clockWatcherListener;
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
// Persistable peerList
@ -151,7 +151,7 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
setConnectionLimits(maxConnections);
// we check if app was idle for more then 5 sec.
listener = new ClockWatcher.Listener() {
clockWatcherListener = new ClockWatcher.Listener() {
@Override
public void onSecondTick() {
}
@ -168,18 +168,18 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
listeners.forEach(Listener::onAwakeFromStandby);
}
};
clockWatcher.addListener(listener);
clockWatcher.addListener(clockWatcherListener);
}
public void shutDown() {
networkNode.removeConnectionListener(this);
clockWatcher.removeListener(listener);
clockWatcher.removeListener(clockWatcherListener);
stopCheckMaxConnectionsTimer();
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
// PersistedDataHost implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
@ -190,29 +190,197 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
}
}
public int getMaxConnections() {
return maxConnectionsAbsolute;
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
if (isSeedNode(connection)) {
connection.setPeerType(Connection.PeerType.SEED_NODE);
}
doHouseKeeping();
if (lostAllConnections) {
lostAllConnections = false;
stopped = false;
listeners.forEach(Listener::onNewConnectionAfterAllConnectionsLost);
}
}
public void addListener(Listener listener) {
listeners.add(listener);
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
log.info("onDisconnect called: nodeAddress={}, closeConnectionReason={}",
connection.getPeersNodeAddressOptional(), closeConnectionReason);
handleConnectionFault(connection);
lostAllConnections = networkNode.getAllConnections().isEmpty();
if (lostAllConnections) {
stopped = true;
log.warn("\n------------------------------------------------------------\n" +
"All connections lost\n" +
"------------------------------------------------------------");
listeners.forEach(Listener::onAllConnectionsLost);
}
maybeRemoveBannedPeer(closeConnectionReason, connection);
}
public void removeListener(Listener listener) {
listeners.remove(listener);
@Override
public void onError(Throwable throwable) {
}
// Modify this to change the relationships between connection limits.
// maxConnections default 12
private void setConnectionLimits(int maxConnections) {
this.maxConnections = maxConnections; // app node 12; seedNode 30
minConnections = Math.max(1, (int) Math.round(maxConnections * 0.7)); // app node 1-8; seedNode 21
disconnectFromSeedNode = maxConnections; // app node 12; seedNode 30
maxConnectionsPeer = Math.max(4, (int) Math.round(maxConnections * 1.3)); // app node 16; seedNode 39
maxConnectionsNonDirect = Math.max(8, (int) Math.round(maxConnections * 1.7)); // app node 20; seedNode 51
maxConnectionsAbsolute = Math.max(12, (int) Math.round(maxConnections * 2.5)); // app node 30; seedNode 66
///////////////////////////////////////////////////////////////////////////////////////////
// Connection
///////////////////////////////////////////////////////////////////////////////////////////
public boolean hasSufficientConnections() {
return networkNode.getConfirmedConnections().size() >= minConnections;
}
// Checks if that connection has the peers node address
public boolean isConfirmed(NodeAddress nodeAddress) {
return networkNode.getNodeAddressesOfConfirmedConnections().contains(nodeAddress);
}
public void handleConnectionFault(Connection connection) {
connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> handleConnectionFault(nodeAddress, connection));
}
public void handleConnectionFault(NodeAddress nodeAddress) {
handleConnectionFault(nodeAddress, null);
}
public void handleConnectionFault(NodeAddress nodeAddress, @Nullable Connection connection) {
log.debug("handleConnectionFault called: nodeAddress=" + nodeAddress);
boolean doRemovePersistedPeer = false;
removeReportedPeer(nodeAddress);
Optional<Peer> persistedPeerOptional = findPersistedPeer(nodeAddress);
if (persistedPeerOptional.isPresent()) {
Peer persistedPeer = persistedPeerOptional.get();
persistedPeer.increaseFailedConnectionAttempts();
doRemovePersistedPeer = persistedPeer.tooManyFailedConnectionAttempts();
}
doRemovePersistedPeer = doRemovePersistedPeer || (connection != null && connection.getRuleViolation() != null);
if (doRemovePersistedPeer)
removePersistedPeer(nodeAddress);
else
removeTooOldPersistedPeers();
}
public boolean isSeedNode(Connection connection) {
//TODO
return connection.hasPeersNodeAddress() && seedNodeAddresses.contains(connection.getPeersNodeAddressOptional().get());
}
public boolean isSelf(NodeAddress nodeAddress) {
return nodeAddress.equals(networkNode.getNodeAddress());
}
private boolean isSeedNode(Peer peer) {
return seedNodeAddresses.contains(peer.getNodeAddress());
}
public boolean isSeedNode(NodeAddress nodeAddress) {
return seedNodeAddresses.contains(nodeAddress);
}
//TODO rename
public boolean isNodeBanned(CloseConnectionReason closeConnectionReason, Connection connection) {
return closeConnectionReason == CloseConnectionReason.PEER_BANNED &&
connection.getPeersNodeAddressOptional().isPresent();
}
private void maybeRemoveBannedPeer(CloseConnectionReason closeConnectionReason, Connection connection) {
if (connection.getPeersNodeAddressOptional().isPresent() && isNodeBanned(closeConnectionReason, connection)) {
NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get();
seedNodeAddresses.remove(nodeAddress);
removePersistedPeer(nodeAddress);
removeReportedPeer(nodeAddress);
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Peer
///////////////////////////////////////////////////////////////////////////////////////////
public Optional<Peer> findPeer(NodeAddress peersNodeAddress) {
return getAllPeers().stream()
.filter(peer -> peer.getNodeAddress().equals(peersNodeAddress))
.findAny();
}
public Set<Peer> getAllPeers() {
Set<Peer> allPeers = new HashSet<>(getLivePeers());
allPeers.addAll(getPersistedPeers());
allPeers.addAll(reportedPeers);
return allPeers;
}
public Collection<Peer> getPersistedPeers() {
return peerList.getList();
}
public void addToReportedPeers(Set<Peer> reportedPeersToAdd,
Connection connection,
Capabilities capabilities) {
applyCapabilities(connection, capabilities);
printNewReportedPeers(reportedPeersToAdd);
// We check if the reported msg is not violating our rules
if (reportedPeersToAdd.size() <= (MAX_REPORTED_PEERS + maxConnectionsAbsolute + 10)) {
reportedPeers.addAll(reportedPeersToAdd);
purgeReportedPeersIfExceeds();
getPersistedPeers().addAll(reportedPeersToAdd);
purgePersistedPeersIfExceeds();
requestPersistence();
printReportedPeers();
} else {
// If a node is trying to send too many list we treat it as rule violation.
// Reported list include the connected list. We use the max value and give some extra headroom.
// Will trigger a shutdown after 2nd time sending too much
connection.reportInvalidRequest(RuleViolation.TOO_MANY_REPORTED_PEERS_SENT);
}
}
// Delivers the live peers from the last 30 min (MAX_AGE_LIVE_PEERS)
// We include older peers to avoid risks for network partitioning
public Set<Peer> getLivePeers() {
return getLivePeers(null);
}
public Set<Peer> getLivePeers(@Nullable NodeAddress excludedNodeAddress) {
int oldNumLatestLivePeers = latestLivePeers.size();
Set<Peer> peers = new HashSet<>(latestLivePeers);
Set<Peer> currentLivePeers = getConnectedReportedPeers().stream()
.filter(e -> !isSeedNode(e))
.filter(e -> !e.getNodeAddress().equals(excludedNodeAddress))
.collect(Collectors.toSet());
peers.addAll(currentLivePeers);
long maxAge = new Date().getTime() - MAX_AGE_LIVE_PEERS;
latestLivePeers.clear();
Set<Peer> recentPeers = peers.stream()
.filter(peer -> peer.getDateAsLong() > maxAge)
.collect(Collectors.toSet());
latestLivePeers.addAll(recentPeers);
if (oldNumLatestLivePeers != latestLivePeers.size())
log.info("Num of latestLivePeers={}", latestLivePeers.size());
return latestLivePeers;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Capabilities
///////////////////////////////////////////////////////////////////////////////////////////
public boolean peerHasCapability(NodeAddress peersNodeAddress, Capability capability) {
return findPeersCapabilities(peersNodeAddress)
@ -241,88 +409,17 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
.findAny().map(Peer::getCapabilities);
}
public Optional<Peer> findPeer(NodeAddress peersNodeAddress) {
return getAllPeers().stream()
.filter(peer -> peer.getNodeAddress().equals(peersNodeAddress))
.findAny();
}
public Set<Peer> getAllPeers() {
Set<Peer> allPeers = new HashSet<>(getLivePeers());
allPeers.addAll(getPersistedPeers());
allPeers.addAll(reportedPeers);
return allPeers;
}
public Collection<Peer> getPersistedPeers() {
return peerList.getList();
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
boolean seedNode = isSeedNode(connection);
Optional<NodeAddress> addressOptional = connection.getPeersNodeAddressOptional();
if (log.isDebugEnabled()) {
String peer = addressOptional.map(NodeAddress::getFullAddress).orElseGet(() ->
"not known yet (connection id=" + connection.getUid() + ")");
log.debug("onConnection: peer = {}{}",
peer,
seedNode ? " (SeedNode)" : "");
private void applyCapabilities(Connection connection, Capabilities capabilities) {
if (capabilities == null || capabilities.isEmpty()) {
return;
}
if (seedNode)
connection.setPeerType(Connection.PeerType.SEED_NODE);
doHouseKeeping();
if (lostAllConnections) {
lostAllConnections = false;
stopped = false;
listeners.stream().forEach(Listener::onNewConnectionAfterAllConnectionsLost);
}
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
log.info("onDisconnect called: nodeAddress={}, closeConnectionReason={}", connection.getPeersNodeAddressOptional(), closeConnectionReason);
final Optional<NodeAddress> addressOptional = connection.getPeersNodeAddressOptional();
log.debug("onDisconnect: peer = {}{} / closeConnectionReason: {}",
(addressOptional.isPresent() ? addressOptional.get().getFullAddress() : "not known yet (connection id=" + connection.getUid() + ")"),
isSeedNode(connection) ? " (SeedNode)" : "",
closeConnectionReason);
handleConnectionFault(connection);
lostAllConnections = networkNode.getAllConnections().isEmpty();
if (lostAllConnections) {
stopped = true;
log.warn("\n------------------------------------------------------------\n" +
"All connections lost\n" +
"------------------------------------------------------------");
listeners.stream().forEach(Listener::onAllConnectionsLost);
}
if (connection.getPeersNodeAddressOptional().isPresent() && isNodeBanned(closeConnectionReason, connection)) {
final NodeAddress nodeAddress = connection.getPeersNodeAddressOptional().get();
seedNodeAddresses.remove(nodeAddress);
removePersistedPeer(nodeAddress);
removeReportedPeer(nodeAddress);
}
}
public boolean isNodeBanned(CloseConnectionReason closeConnectionReason, Connection connection) {
return closeConnectionReason == CloseConnectionReason.PEER_BANNED &&
connection.getPeersNodeAddressOptional().isPresent();
}
@Override
public void onError(Throwable throwable) {
connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> {
getAllPeers().stream()
.filter(peer -> peer.getNodeAddress().equals(nodeAddress))
.forEach(peer -> peer.setCapabilities(capabilities));
});
requestPersistence();
}
@ -463,29 +560,22 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Reported peers
///////////////////////////////////////////////////////////////////////////////////////////
private boolean removeReportedPeer(Peer reportedPeer) {
boolean contained = reportedPeers.remove(reportedPeer);
private void removeReportedPeer(Peer reportedPeer) {
reportedPeers.remove(reportedPeer);
printReportedPeers();
return contained;
}
@SuppressWarnings("UnusedReturnValue")
@Nullable
private Peer removeReportedPeer(NodeAddress nodeAddress) {
private void removeReportedPeer(NodeAddress nodeAddress) {
List<Peer> reportedPeersClone = new ArrayList<>(reportedPeers);
Optional<Peer> reportedPeerOptional = reportedPeersClone.stream()
.filter(e -> e.getNodeAddress().equals(nodeAddress)).findAny();
if (reportedPeerOptional.isPresent()) {
Peer reportedPeer = reportedPeerOptional.get();
removeReportedPeer(reportedPeer);
return reportedPeer;
} else {
return null;
}
}
@ -497,44 +587,6 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
reportedPeersToRemove.forEach(this::removeReportedPeer);
}
public void addToReportedPeers(Set<Peer> reportedPeersToAdd,
Connection connection,
Capabilities capabilities) {
applyCapabilities(connection, capabilities);
printNewReportedPeers(reportedPeersToAdd);
// We check if the reported msg is not violating our rules
if (reportedPeersToAdd.size() <= (MAX_REPORTED_PEERS + maxConnectionsAbsolute + 10)) {
reportedPeers.addAll(reportedPeersToAdd);
purgeReportedPeersIfExceeds();
getPersistedPeers().addAll(reportedPeersToAdd);
purgePersistedPeersIfExceeds();
requestPersistence();
printReportedPeers();
} else {
// If a node is trying to send too many list we treat it as rule violation.
// Reported list include the connected list. We use the max value and give some extra headroom.
// Will trigger a shutdown after 2nd time sending too much
connection.reportInvalidRequest(RuleViolation.TOO_MANY_REPORTED_PEERS_SENT);
}
}
private void applyCapabilities(Connection connection, Capabilities capabilities) {
if (capabilities == null || capabilities.isEmpty()) {
return;
}
connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> {
getAllPeers().stream()
.filter(peer -> peer.getNodeAddress().equals(nodeAddress))
.forEach(peer -> peer.setCapabilities(capabilities));
});
requestPersistence();
}
private void purgeReportedPeersIfExceeds() {
int size = reportedPeers.size();
@ -543,7 +595,7 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
"We remove random peers from the reported peers list.", size, MAX_REPORTED_PEERS);
int diff = size - MAX_REPORTED_PEERS;
List<Peer> list = new ArrayList<>(reportedPeers);
// we dont use sorting by lastActivityDate to keep it more random
// we don't use sorting by lastActivityDate to keep it more random
for (int i = 0; i < diff; i++) {
if (!list.isEmpty()) {
Peer toRemove = list.remove(new Random().nextInt(list.size()));
@ -557,12 +609,11 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
private void printReportedPeers() {
if (!reportedPeers.isEmpty()) {
//noinspection ConstantConditions
if (PRINT_REPORTED_PEERS_DETAILS) {
StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" +
"Collected reported peers:");
List<Peer> reportedPeersClone = new ArrayList<>(reportedPeers);
reportedPeersClone.stream().forEach(e -> result.append("\n").append(e));
reportedPeersClone.forEach(e -> result.append("\n").append(e));
result.append("\n------------------------------------------------------------\n");
log.trace(result.toString());
}
@ -571,7 +622,6 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
}
private void printNewReportedPeers(Set<Peer> reportedPeers) {
//noinspection ConstantConditions
if (PRINT_REPORTED_PEERS_DETAILS) {
StringBuilder result = new StringBuilder("We received new reportedPeers:");
List<Peer> reportedPeersClone = new ArrayList<>(reportedPeers);
@ -583,7 +633,7 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
///////////////////////////////////////////////////////////////////////////////////////////
// Persisted list
// Persisted peers
///////////////////////////////////////////////////////////////////////////////////////////
private boolean removePersistedPeer(Peer persistedPeer) {
@ -641,114 +691,42 @@ public class PeerManager implements ConnectionListener, PersistedDataHost {
///////////////////////////////////////////////////////////////////////////////////////////
// Misc
// Getters
///////////////////////////////////////////////////////////////////////////////////////////
public boolean hasSufficientConnections() {
return networkNode.getNodeAddressesOfConfirmedConnections().size() >= minConnections;
public int getMaxConnections() {
return maxConnectionsAbsolute;
}
private boolean isSeedNode(Peer reportedPeer) {
return seedNodeAddresses.contains(reportedPeer.getNodeAddress());
}
public boolean isSeedNode(NodeAddress nodeAddress) {
return seedNodeAddresses.contains(nodeAddress);
}
public boolean isSeedNode(Connection connection) {
return connection.hasPeersNodeAddress() && seedNodeAddresses.contains(connection.getPeersNodeAddressOptional().get());
}
public boolean isSelf(Peer reportedPeer) {
return isSelf(reportedPeer.getNodeAddress());
}
public boolean isSelf(NodeAddress nodeAddress) {
return nodeAddress.equals(networkNode.getNodeAddress());
}
public boolean isConfirmed(Peer reportedPeer) {
return isConfirmed(reportedPeer.getNodeAddress());
}
// Checks if that connection has the peers node address
public boolean isConfirmed(NodeAddress nodeAddress) {
return networkNode.getNodeAddressesOfConfirmedConnections().contains(nodeAddress);
}
public void handleConnectionFault(Connection connection) {
connection.getPeersNodeAddressOptional().ifPresent(nodeAddress -> handleConnectionFault(nodeAddress, connection));
}
public void handleConnectionFault(NodeAddress nodeAddress) {
handleConnectionFault(nodeAddress, null);
}
public void handleConnectionFault(NodeAddress nodeAddress, @Nullable Connection connection) {
log.debug("handleConnectionFault called: nodeAddress=" + nodeAddress);
boolean doRemovePersistedPeer = false;
removeReportedPeer(nodeAddress);
Optional<Peer> persistedPeerOptional = findPersistedPeer(nodeAddress);
if (persistedPeerOptional.isPresent()) {
Peer persistedPeer = persistedPeerOptional.get();
persistedPeer.increaseFailedConnectionAttempts();
doRemovePersistedPeer = persistedPeer.tooManyFailedConnectionAttempts();
}
doRemovePersistedPeer = doRemovePersistedPeer || (connection != null && connection.getRuleViolation() != null);
if (doRemovePersistedPeer)
removePersistedPeer(nodeAddress);
else
removeTooOldPersistedPeers();
}
public void shutDownConnection(Connection connection, CloseConnectionReason closeConnectionReason) {
if (connection.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER)
connection.shutDown(closeConnectionReason);
}
public void shutDownConnection(NodeAddress peersNodeAddress, CloseConnectionReason closeConnectionReason) {
networkNode.getAllConnections().stream()
.filter(connection -> connection.getPeersNodeAddressOptional().isPresent() &&
connection.getPeersNodeAddressOptional().get().equals(peersNodeAddress) &&
connection.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER)
.findAny()
.ifPresent(connection -> connection.shutDown(closeConnectionReason));
}
// Delivers the live peers from the last 30 min (MAX_AGE_LIVE_PEERS)
// We include older peers to avoid risks for network partitioning
public Set<Peer> getLivePeers() {
return getLivePeers(null);
}
public Set<Peer> getLivePeers(@Nullable NodeAddress excludedNodeAddress) {
int oldNumLatestLivePeers = latestLivePeers.size();
Set<Peer> peers = new HashSet<>(latestLivePeers);
Set<Peer> currentLivePeers = getConnectedReportedPeers().stream()
.filter(e -> !isSeedNode(e))
.filter(e -> !e.getNodeAddress().equals(excludedNodeAddress))
.collect(Collectors.toSet());
peers.addAll(currentLivePeers);
long maxAge = new Date().getTime() - MAX_AGE_LIVE_PEERS;
latestLivePeers.clear();
Set<Peer> recentPeers = peers.stream()
.filter(peer -> peer.getDateAsLong() > maxAge)
.collect(Collectors.toSet());
latestLivePeers.addAll(recentPeers);
if (oldNumLatestLivePeers != latestLivePeers.size())
log.info("Num of latestLivePeers={}", latestLivePeers.size());
return latestLivePeers;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
// Listeners
///////////////////////////////////////////////////////////////////////////////////////////
public void addListener(Listener listener) {
listeners.add(listener);
}
public void removeListener(Listener listener) {
listeners.remove(listener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private misc
///////////////////////////////////////////////////////////////////////////////////////////
// Modify this to change the relationships between connection limits.
// maxConnections default 12
private void setConnectionLimits(int maxConnections) {
this.maxConnections = maxConnections; // app node 12; seedNode 30
minConnections = Math.max(1, (int) Math.round(maxConnections * 0.7)); // app node 1-8; seedNode 21
disconnectFromSeedNode = maxConnections; // app node 12; seedNode 30
maxConnectionsPeer = Math.max(4, (int) Math.round(maxConnections * 1.3)); // app node 16; seedNode 39
maxConnectionsNonDirect = Math.max(8, (int) Math.round(maxConnections * 1.7)); // app node 20; seedNode 51
maxConnectionsAbsolute = Math.max(12, (int) Math.round(maxConnections * 2.5)); // app node 30; seedNode 66
}
private Set<Peer> getConnectedReportedPeers() {
// networkNode.getConfirmedConnections includes:
// filter(connection -> connection.getPeersNodeAddressOptional().isPresent())