From b5112673401e77fed8ad4bae581fc8c53a115d10 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Sat, 27 Feb 2016 00:44:33 +0100 Subject: [PATCH] Fix bug with missing broadcast --- .../src/main/java/io/bitsquare/app/Log.java | 2 +- .../java/io/bitsquare/alert/AlertService.java | 2 +- .../arbitration/ArbitratorService.java | 2 +- .../trade/offer/OfferBookService.java | 11 +--------- .../trade/offer/OpenOfferManager.java | 2 +- .../settings/network/NetworkSettingsView.fxml | 16 +++++++------- .../settings/network/NetworkSettingsView.java | 2 +- .../java/io/bitsquare/p2p/P2PService.java | 6 ++--- .../p2p/network/CloseConnectionReason.java | 18 ++++++--------- .../io/bitsquare/p2p/network/Connection.java | 6 +++-- .../io/bitsquare/p2p/peers/PeerManager.java | 2 +- .../p2p/peers/getdata/RequestDataHandler.java | 2 +- .../bitsquare/p2p/storage/P2PDataStorage.java | 22 +++++++++---------- .../p2p/storage/ProtectedDataStorageTest.java | 6 ++--- 14 files changed, 44 insertions(+), 55 deletions(-) diff --git a/common/src/main/java/io/bitsquare/app/Log.java b/common/src/main/java/io/bitsquare/app/Log.java index 66485f286d..15b8694eb2 100644 --- a/common/src/main/java/io/bitsquare/app/Log.java +++ b/common/src/main/java/io/bitsquare/app/Log.java @@ -63,7 +63,7 @@ public class Log { logbackLogger = loggerContext.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME); //TODO for now use always trace - logbackLogger.setLevel(useDetailedLogging ? Level.TRACE : Level.INFO); + logbackLogger.setLevel(useDetailedLogging ? Level.INFO : Level.INFO); // logbackLogger.setLevel(useDetailedLogging ? Level.TRACE : Level.DEBUG); logbackLogger.addAppender(appender); } diff --git a/core/src/main/java/io/bitsquare/alert/AlertService.java b/core/src/main/java/io/bitsquare/alert/AlertService.java index d0a13830c2..682780f209 100644 --- a/core/src/main/java/io/bitsquare/alert/AlertService.java +++ b/core/src/main/java/io/bitsquare/alert/AlertService.java @@ -50,7 +50,7 @@ public class AlertService { } public void addAlertMessage(Alert alert, @Nullable ResultHandler resultHandler, @Nullable ErrorMessageHandler errorMessageHandler) { - boolean result = p2PService.addData(alert, true, true); + boolean result = p2PService.addData(alert, true); if (result) { log.trace("Add alertMessage to network was successful. AlertMessage = " + alert); if (resultHandler != null) resultHandler.handleResult(); diff --git a/core/src/main/java/io/bitsquare/arbitration/ArbitratorService.java b/core/src/main/java/io/bitsquare/arbitration/ArbitratorService.java index 5c49c82e8c..5be900c97e 100644 --- a/core/src/main/java/io/bitsquare/arbitration/ArbitratorService.java +++ b/core/src/main/java/io/bitsquare/arbitration/ArbitratorService.java @@ -59,7 +59,7 @@ public class ArbitratorService { public void addArbitrator(Arbitrator arbitrator, final ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { log.debug("addArbitrator arbitrator.hashCode() " + arbitrator.hashCode()); - boolean result = p2PService.addData(arbitrator, true, true); + boolean result = p2PService.addData(arbitrator, true); if (result) { log.trace("Add arbitrator to network was successful. Arbitrator.hashCode() = " + arbitrator.hashCode()); resultHandler.handleResult(); diff --git a/core/src/main/java/io/bitsquare/trade/offer/OfferBookService.java b/core/src/main/java/io/bitsquare/trade/offer/OfferBookService.java index a75d8e1cdb..810447fe79 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/OfferBookService.java +++ b/core/src/main/java/io/bitsquare/trade/offer/OfferBookService.java @@ -84,17 +84,8 @@ public class OfferBookService { // API /////////////////////////////////////////////////////////////////////////////////////////// - - public void republishOffers(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { - doAddOffer(offer, resultHandler, errorMessageHandler, true); - } - public void addOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { - doAddOffer(offer, resultHandler, errorMessageHandler, false); - } - - private void doAddOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler, boolean forceBroadcast) { - boolean result = p2PService.addData(offer, forceBroadcast, true); + boolean result = p2PService.addData(offer, true); if (result) { log.trace("Add offer to network was successful. Offer ID = " + offer.getId()); resultHandler.handleResult(); diff --git a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java index eb5da86f7c..f1ee08eed7 100644 --- a/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java +++ b/core/src/main/java/io/bitsquare/trade/offer/OpenOfferManager.java @@ -370,7 +370,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe } private void republishOffer(OpenOffer openOffer) { - offerBookService.republishOffers(openOffer.getOffer(), + offerBookService.addOffer(openOffer.getOffer(), () -> { if (!stopped) { log.debug("Successful added offer to P2P network"); diff --git a/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.fxml b/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.fxml index bc9a862bbe..0b4f3638be 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.fxml +++ b/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.fxml @@ -84,12 +84,12 @@ - - - - - - + + @@ -117,8 +117,8 @@ focusTraversable="false"/> - - + + diff --git a/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.java b/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.java index 298ff8124a..f199f1228b 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.java +++ b/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.java @@ -71,7 +71,7 @@ public class NetworkSettingsView extends ActivatableViewAndModel p2PPeerTable; @FXML TableColumn onionAddressColumn, connectionTypeColumn, creationDateColumn, - lastActivityColumn, roundTripTimeColumn, sentBytesColumn, receivedBytesColumn, peerTypeColumn; + /*lastActivityColumn,*/ roundTripTimeColumn, sentBytesColumn, receivedBytesColumn, peerTypeColumn; private Subscription numP2PPeersSubscription; private Subscription bitcoinPeersSubscription; private Subscription nodeAddressSubscription; diff --git a/network/src/main/java/io/bitsquare/p2p/P2PService.java b/network/src/main/java/io/bitsquare/p2p/P2PService.java index 5c5a4c8c19..c0526b35bf 100644 --- a/network/src/main/java/io/bitsquare/p2p/P2PService.java +++ b/network/src/main/java/io/bitsquare/p2p/P2PService.java @@ -556,7 +556,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis public void onBroadcastFailed(String errorMessage) { } }; - boolean result = p2PDataStorage.add(protectedMailboxStorageEntry, networkNode.getNodeAddress(), listener, true, true); + boolean result = p2PDataStorage.add(protectedMailboxStorageEntry, networkNode.getNodeAddress(), listener, true); if (!result) { //TODO remove and add again with a delay to ensure the data will be broadcasted sendMailboxMessageListener.onFault("Data already exists in our local database"); @@ -616,13 +616,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis // Data storage /////////////////////////////////////////////////////////////////////////////////////////// - public boolean addData(StoragePayload storagePayload, boolean forceBroadcast, boolean isDataOwner) { + public boolean addData(StoragePayload storagePayload, boolean isDataOwner) { Log.traceCall(); checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen."); if (isBootstrapped()) { try { ProtectedStorageEntry protectedStorageEntry = p2PDataStorage.getProtectedData(storagePayload, optionalKeyRing.get().getSignatureKeyPair()); - return p2PDataStorage.add(protectedStorageEntry, networkNode.getNodeAddress(), null, forceBroadcast, isDataOwner); + return p2PDataStorage.add(protectedStorageEntry, networkNode.getNodeAddress(), null, isDataOwner); } catch (CryptoException e) { log.error("Signing at getDataWithSignedSeqNr failed. That should never happen."); return false; diff --git a/network/src/main/java/io/bitsquare/p2p/network/CloseConnectionReason.java b/network/src/main/java/io/bitsquare/p2p/network/CloseConnectionReason.java index ee9797eedb..170733b16f 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/CloseConnectionReason.java +++ b/network/src/main/java/io/bitsquare/p2p/network/CloseConnectionReason.java @@ -2,19 +2,19 @@ package io.bitsquare.p2p.network; public enum CloseConnectionReason { // First block are from different exceptions - SOCKET_CLOSED(false), - RESET(false), - SOCKET_TIMEOUT(false), - TERMINATED(false), // EOFException - UNKNOWN_EXCEPTION(false), + SOCKET_CLOSED(false, false), + RESET(false, false), + SOCKET_TIMEOUT(false, false), + TERMINATED(false, false), // EOFException + UNKNOWN_EXCEPTION(false, false), // Planned APP_SHUT_DOWN(true, true), CLOSE_REQUESTED_BY_PEER(false, true), // send msg - SEND_MSG_FAILURE(false), - SEND_MSG_TIMEOUT(false), + SEND_MSG_FAILURE(false, false), + SEND_MSG_TIMEOUT(false, false), // maintenance TOO_MANY_CONNECTIONS_OPEN(true, true), @@ -27,10 +27,6 @@ public enum CloseConnectionReason { public final boolean sendCloseMessage; public boolean isIntended; - CloseConnectionReason(boolean sendCloseMessage) { - this(sendCloseMessage, true); - } - CloseConnectionReason(boolean sendCloseMessage, boolean isIntended) { this.sendCloseMessage = sendCloseMessage; this.isIntended = isIntended; diff --git a/network/src/main/java/io/bitsquare/p2p/network/Connection.java b/network/src/main/java/io/bitsquare/p2p/network/Connection.java index 19877f6501..b1ae06cd52 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/Connection.java +++ b/network/src/main/java/io/bitsquare/p2p/network/Connection.java @@ -1,5 +1,6 @@ package io.bitsquare.p2p.network; +import com.google.common.util.concurrent.CycleDetectingLockFactory; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import io.bitsquare.app.Log; @@ -68,6 +69,8 @@ public class Connection implements MessageListener { return MAX_MSG_SIZE; } + private static final CycleDetectingLockFactory cycleDetectingLockFactory = CycleDetectingLockFactory.newInstance(CycleDetectingLockFactory.Policies.THROW); + /////////////////////////////////////////////////////////////////////////////////////////// // Class fields @@ -79,7 +82,7 @@ public class Connection implements MessageListener { private final String portInfo; private final String uid; private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); - private final ReentrantLock objectOutputStreamLock = new ReentrantLock(true); + private final ReentrantLock objectOutputStreamLock = cycleDetectingLockFactory.newReentrantLock("objectOutputStreamLock"); // holder of state shared between InputHandler and Connection private final SharedModel sharedModel; private final Statistic statistic; @@ -104,7 +107,6 @@ public class Connection implements MessageListener { Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener, @Nullable NodeAddress peersNodeAddress) { this.socket = socket; - //this.messageListener = messageListener; this.connectionListener = connectionListener; uid = UUID.randomUUID().toString(); statistic = new Statistic(); diff --git a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java index 8f26991c25..bb78ea938d 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/PeerManager.java @@ -47,7 +47,7 @@ public class PeerManager implements ConnectionListener { } static { - setMaxConnections(10); + setMaxConnections(12); } private static final int MAX_REPORTED_PEERS = 1000; diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java index b7f4beb05c..a81c1020af 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java @@ -157,7 +157,7 @@ public class RequestDataHandler implements MessageListener { "at that moment"); ((GetDataResponse) message).dataSet.stream() .forEach(protectedData -> dataStorage.add(protectedData, - connection.getPeersNodeAddressOptional().get(), null, false, false)); + connection.getPeersNodeAddressOptional().get(), null, false)); cleanup(); listener.onComplete(); diff --git a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java index de965d3d3a..f7fa061604 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java @@ -113,7 +113,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { Log.traceCall(StringUtils.abbreviate(message.toString(), 100) + "\n\tconnection=" + connection); connection.getPeersNodeAddressOptional().ifPresent(peersNodeAddress -> { if (message instanceof AddDataMessage) { - add(((AddDataMessage) message).protectedStorageEntry, peersNodeAddress, null, false, false); + add(((AddDataMessage) message).protectedStorageEntry, peersNodeAddress, null, false); } else if (message instanceof RemoveDataMessage) { remove(((RemoveDataMessage) message).protectedStorageEntry, peersNodeAddress, false); } else if (message instanceof RemoveMailboxDataMessage) { @@ -136,7 +136,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { @Override public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { - if (connection.getPeersNodeAddressOptional().isPresent() && !closeConnectionReason.isIntended) { + if (connection.hasPeersNodeAddress() && !closeConnectionReason.isIntended) { map.values().stream() .forEach(protectedData -> { ExpirablePayload expirablePayload = protectedData.getStoragePayload(); @@ -151,6 +151,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { ByteArray hashOfPayload = getHashAsByteArray(expirablePayload); boolean containsKey = map.containsKey(hashOfPayload); if (containsKey) { + log.info("We remove the data as the data owner got disconnected with " + + "closeConnectionReason=" + closeConnectionReason); doRemoveProtectedExpirableData(protectedData, hashOfPayload); } else { log.debug("Remove data ignored as we don't have an entry for that data."); @@ -172,13 +174,14 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { /////////////////////////////////////////////////////////////////////////////////////////// public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, - @Nullable BroadcastHandler.Listener listener, boolean forceBroadcast, boolean isDataOwner) { + @Nullable BroadcastHandler.Listener listener, boolean isDataOwner) { Log.traceCall(); ByteArray hashOfPayload = getHashAsByteArray(protectedStorageEntry.getStoragePayload()); + boolean sequenceNrValid = isSequenceNrValid(protectedStorageEntry.sequenceNumber, hashOfPayload); boolean result = checkPublicKeys(protectedStorageEntry, true) && checkSignature(protectedStorageEntry) - && isSequenceNrValid(protectedStorageEntry.sequenceNumber, hashOfPayload); + && sequenceNrValid; boolean containsKey = map.containsKey(hashOfPayload); if (containsKey) @@ -197,10 +200,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { log.trace(sb.toString()); log.info("Data set after doAdd: size=" + map.values().size()); - if (!containsKey || forceBroadcast) - broadcast(new AddDataMessage(protectedStorageEntry), sender, listener, isDataOwner); - else - log.trace("Not broadcasting data as we had it already in our map."); + broadcast(new AddDataMessage(protectedStorageEntry), sender, listener, isDataOwner); hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedStorageEntry)); } else { @@ -383,13 +383,13 @@ public class P2PDataStorage implements MessageListener, ConnectionListener { private boolean isSequenceNrValid(int newSequenceNumber, ByteArray hashOfData) { if (sequenceNumberMap.containsKey(hashOfData)) { Integer storedSequenceNumber = sequenceNumberMap.get(hashOfData).sequenceNr; - if (newSequenceNumber < storedSequenceNumber) { + if (newSequenceNumber > storedSequenceNumber) { + return true; + } else { log.info("Sequence number is invalid. sequenceNumber = " + newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber + "\n" + "That can happen if the data owner gets an old delayed data storage message."); return false; - } else { - return true; } } else { return true; diff --git a/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java b/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java index 4ad925bec9..8af9b689f5 100644 --- a/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java +++ b/network/src/test/java/io/bitsquare/p2p/storage/ProtectedDataStorageTest.java @@ -99,7 +99,7 @@ public class ProtectedDataStorageTest { //@Test public void testAddAndRemove() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException { ProtectedStorageEntry data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1); - Assert.assertTrue(dataStorage1.add(data, null, null, true, true)); + Assert.assertTrue(dataStorage1.add(data, null, null, true)); Assert.assertEquals(1, dataStorage1.getMap().size()); int newSequenceNumber = data.sequenceNumber + 1; @@ -115,7 +115,7 @@ public class ProtectedDataStorageTest { mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_SEC * 1.5); ProtectedStorageEntry data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1); log.debug("data.date " + data.timeStamp); - Assert.assertTrue(dataStorage1.add(data, null, null, true, true)); + Assert.assertTrue(dataStorage1.add(data, null, null, true)); log.debug("test 1"); Assert.assertEquals(1, dataStorage1.getMap().size()); @@ -163,7 +163,7 @@ public class ProtectedDataStorageTest { public void testRefreshTTL() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException { mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_SEC * 1.5); ProtectedStorageEntry data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1); - Assert.assertTrue(dataStorage1.add(data, null, null, true, true)); + Assert.assertTrue(dataStorage1.add(data, null, null, true)); Assert.assertEquals(1, dataStorage1.getMap().size()); Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC); log.debug("test 1");