Copy peers in a new hashset to avoid concurrent modification exc at serialisation

Remove final
Cleanups
This commit is contained in:
chimp1984 2020-10-07 22:31:03 -05:00
parent c7f23e8deb
commit f53290b817
No known key found for this signature in database
GPG Key ID: 9801B4EC591F90E3
6 changed files with 28 additions and 13 deletions

View File

@ -50,11 +50,11 @@ class SynchronizedProtoOutputStream extends ProtoOutputStream {
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread currentThread = Thread.currentThread(); Thread currentThread = Thread.currentThread();
currentThread.interrupt(); currentThread.interrupt();
final String msg = "Thread " + currentThread + " was interrupted. InterruptedException=" + e; String msg = "Thread " + currentThread + " was interrupted. InterruptedException=" + e;
log.error(msg); log.error(msg);
throw new BisqRuntimeException(msg, e); throw new BisqRuntimeException(msg, e);
} catch (ExecutionException e) { } catch (ExecutionException e) {
final String msg = "Failed to write envelope. ExecutionException " + e; String msg = "Failed to write envelope. ExecutionException " + e;
log.error(msg); log.error(msg);
throw new BisqRuntimeException(msg, e); throw new BisqRuntimeException(msg, e);
} }
@ -65,7 +65,7 @@ class SynchronizedProtoOutputStream extends ProtoOutputStream {
executorService.shutdownNow(); executorService.shutdownNow();
super.onConnectionShutdown(); super.onConnectionShutdown();
} catch (Throwable t) { } catch (Throwable t) {
log.error("Failed to handle connection shutdown. Throwable={}", t); log.error("Failed to handle connection shutdown. Throwable={}", t.toString());
} }
} }
} }

View File

@ -401,7 +401,7 @@ public final class PeerManager implements ConnectionListener, PersistedDataHost
// We look up first our connections as that is our own data. If not found there we look up the peers which // We look up first our connections as that is our own data. If not found there we look up the peers which
// include reported peers. // include reported peers.
Optional<Capabilities> optionalCapabilities = networkNode.findPeersCapabilities(nodeAddress); Optional<Capabilities> optionalCapabilities = networkNode.findPeersCapabilities(nodeAddress);
if (optionalCapabilities.isPresent()) { if (optionalCapabilities.isPresent() && !optionalCapabilities.get().isEmpty()) {
return optionalCapabilities; return optionalCapabilities;
} }
@ -415,7 +415,8 @@ public final class PeerManager implements ConnectionListener, PersistedDataHost
// attack all users have a strong incentive to update ;-). // attack all users have a strong incentive to update ;-).
return getAllPeers().stream() return getAllPeers().stream()
.filter(peer -> peer.getNodeAddress().equals(nodeAddress)) .filter(peer -> peer.getNodeAddress().equals(nodeAddress))
.findAny().map(Peer::getCapabilities); .findAny()
.map(Peer::getCapabilities);
} }
private void applyCapabilities(Connection connection, Capabilities capabilities) { private void applyCapabilities(Connection connection, Capabilities capabilities) {
@ -647,7 +648,7 @@ public final class PeerManager implements ConnectionListener, PersistedDataHost
private boolean removePersistedPeer(Peer persistedPeer) { private boolean removePersistedPeer(Peer persistedPeer) {
if (getPersistedPeers().contains(persistedPeer)) { if (getPersistedPeers().contains(persistedPeer)) {
getPersistedPeers().remove(persistedPeer); //getPersistedPeers().remove(persistedPeer);
requestPersistence(); requestPersistence();
return true; return true;
} else { } else {

View File

@ -32,6 +32,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import java.util.HashSet;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -83,11 +84,11 @@ class GetPeersRequestHandler {
// API // API
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void handle(GetPeersRequest getPeersRequest, final Connection connection) { public void handle(GetPeersRequest getPeersRequest, Connection connection) {
checkArgument(connection.getPeersNodeAddressOptional().isPresent(), checkArgument(connection.getPeersNodeAddressOptional().isPresent(),
"The peers address must have been already set at the moment"); "The peers address must have been already set at the moment");
GetPeersResponse getPeersResponse = new GetPeersResponse(getPeersRequest.getNonce(), GetPeersResponse getPeersResponse = new GetPeersResponse(getPeersRequest.getNonce(),
peerManager.getLivePeers(connection.getPeersNodeAddressOptional().get())); new HashSet<>(peerManager.getLivePeers(connection.getPeersNodeAddressOptional().get())));
checkArgument(timeoutTimer == null, "onGetPeersRequest must not be called twice."); checkArgument(timeoutTimer == null, "onGetPeersRequest must not be called twice.");
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions

View File

@ -35,6 +35,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import java.util.HashSet;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -104,7 +105,9 @@ class PeerExchangeHandler implements MessageListener {
log.debug("sendGetPeersRequest to nodeAddress={}", nodeAddress); log.debug("sendGetPeersRequest to nodeAddress={}", nodeAddress);
if (!stopped) { if (!stopped) {
if (networkNode.getNodeAddress() != null) { if (networkNode.getNodeAddress() != null) {
GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(), nonce, peerManager.getLivePeers(nodeAddress)); GetPeersRequest getPeersRequest = new GetPeersRequest(networkNode.getNodeAddress(),
nonce,
new HashSet<>(peerManager.getLivePeers(nodeAddress)));
if (timeoutTimer == null) { if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
if (!stopped) { if (!stopped) {

View File

@ -47,8 +47,14 @@ public final class GetPeersRequest extends NetworkEnvelope implements PeerExchan
@Nullable @Nullable
private final Capabilities supportedCapabilities; private final Capabilities supportedCapabilities;
public GetPeersRequest(NodeAddress senderNodeAddress, int nonce, Set<Peer> reportedPeers) { public GetPeersRequest(NodeAddress senderNodeAddress,
this(senderNodeAddress, nonce, reportedPeers, Capabilities.app, Version.getP2PMessageVersion()); int nonce,
Set<Peer> reportedPeers) {
this(senderNodeAddress,
nonce,
reportedPeers,
Capabilities.app,
Version.getP2PMessageVersion());
} }

View File

@ -43,8 +43,12 @@ public final class GetPeersResponse extends NetworkEnvelope implements PeerExcha
@Nullable @Nullable
private final Capabilities supportedCapabilities; private final Capabilities supportedCapabilities;
public GetPeersResponse(int requestNonce, Set<Peer> reportedPeers) { public GetPeersResponse(int requestNonce,
this(requestNonce, reportedPeers, Capabilities.app, Version.getP2PMessageVersion()); Set<Peer> reportedPeers) {
this(requestNonce,
reportedPeers,
Capabilities.app,
Version.getP2PMessageVersion());
} }