diff --git a/core/src/main/java/bisq/core/offer/OpenOfferManager.java b/core/src/main/java/bisq/core/offer/OpenOfferManager.java index 252724305f..3e16c78402 100644 --- a/core/src/main/java/bisq/core/offer/OpenOfferManager.java +++ b/core/src/main/java/bisq/core/offer/OpenOfferManager.java @@ -49,6 +49,7 @@ import bisq.network.p2p.DecryptedMessageWithPubKey; import bisq.network.p2p.NodeAddress; import bisq.network.p2p.P2PService; import bisq.network.p2p.SendDirectMessageListener; +import bisq.network.p2p.peers.Broadcaster; import bisq.network.p2p.peers.PeerManager; import bisq.common.Timer; @@ -117,6 +118,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe private final RefundAgentManager refundAgentManager; private final DaoFacade daoFacade; private final FilterManager filterManager; + private final Broadcaster broadcaster; private final PersistenceManager> persistenceManager; private final Map offersToBeEdited = new HashMap<>(); private final TradableList openOffers = new TradableList<>(); @@ -148,6 +150,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe RefundAgentManager refundAgentManager, DaoFacade daoFacade, FilterManager filterManager, + Broadcaster broadcaster, PersistenceManager> persistenceManager) { this.createOfferService = createOfferService; this.keyRing = keyRing; @@ -166,6 +169,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe this.refundAgentManager = refundAgentManager; this.daoFacade = daoFacade; this.filterManager = filterManager; + this.broadcaster = broadcaster; this.persistenceManager = persistenceManager; this.persistenceManager.initialize(openOffers, "OpenOffers", PersistenceManager.Source.PRIVATE); @@ -214,10 +218,6 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe }); } - private void shutDown() { - shutDown(null); - } - public void shutDown(@Nullable Runnable completeHandler) { stopped = true; p2PService.getPeerManager().removeListener(this); @@ -235,6 +235,11 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe UserThread.execute(() -> openOffers.forEach( openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer().getOfferPayload()) )); + + // Force broadcaster to send out immediately, otherwise we could have a 2 sec delay until the + // bundled messages sent out. + broadcaster.flush(); + if (completeHandler != null) { // For typical number of offers we are tolerant with delay to give enough time to broadcast. // If number of offers is very high we limit to 3 sec. to not delay other shutdown routines. @@ -873,41 +878,53 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe /////////////////////////////////////////////////////////////////////////////////////////// private void republishOffers() { - int size = openOffers.size(); - final ArrayList openOffersList = new ArrayList<>(openOffers.getList()); - if (!stopped) { - stopPeriodicRefreshOffersTimer(); - for (int i = 0; i < size; i++) { - // we delay to avoid reaching throttle limits + if (stopped) { + return; + } - long delay = 700; - final long minDelay = (i + 1) * delay; - final long maxDelay = (i + 2) * delay; - final OpenOffer openOffer = openOffersList.get(i); - UserThread.runAfterRandomDelay(() -> { - if (openOffers.contains(openOffer)) { - String id = openOffer.getId(); - if (id != null && !openOffer.isDeactivated()) - republishOffer(openOffer); - } + stopPeriodicRefreshOffersTimer(); - }, minDelay, maxDelay, TimeUnit.MILLISECONDS); - } + List openOffersList = new ArrayList<>(openOffers.getList()); + processListForRepublishOffers(openOffersList); + } + + private void processListForRepublishOffers(List list) { + if (list.isEmpty()) { + return; + } + + OpenOffer openOffer = list.remove(0); + if (openOffers.contains(openOffer) && !openOffer.isDeactivated()) { + // TODO It is not clear yet if it is better for the node and the network to send out all add offer + // messages in one go or to spread it over a delay. With power users who have 100-200 offers that can have + // some significant impact to user experience and the network + republishOffer(openOffer, () -> processListForRepublishOffers(list)); + + /* republishOffer(openOffer, + () -> UserThread.runAfter(() -> processListForRepublishOffers(list), + 30, TimeUnit.MILLISECONDS));*/ } else { - log.debug("We have stopped already. We ignore that republishOffers call."); + // If the offer was removed in the meantime or if its deactivated we skip and call + // processListForRepublishOffers again with the list where we removed the offer already. + processListForRepublishOffers(list); } } private void republishOffer(OpenOffer openOffer) { + republishOffer(openOffer, null); + } + + private void republishOffer(OpenOffer openOffer, @Nullable Runnable completeHandler) { offerBookService.addOffer(openOffer.getOffer(), () -> { if (!stopped) { - log.debug("Successfully added offer to P2P network."); // Refresh means we send only the data needed to refresh the TTL (hash, signature and sequence no.) - if (periodicRefreshOffersTimer == null) + if (periodicRefreshOffersTimer == null) { startPeriodicRefreshOffersTimer(); - } else { - log.debug("We have stopped already. We ignore that offerBookService.republishOffers.onSuccess call."); + } + if (completeHandler != null) { + completeHandler.run(); + } } }, errorMessage -> { @@ -916,26 +933,25 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe stopRetryRepublishOffersTimer(); retryRepublishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers, RETRY_REPUBLISH_DELAY_SEC); - } else { - log.debug("We have stopped already. We ignore that offerBookService.republishOffers.onFault call."); + + if (completeHandler != null) { + completeHandler.run(); + } } }); } private void startPeriodicRepublishOffersTimer() { stopped = false; - if (periodicRepublishOffersTimer == null) + if (periodicRepublishOffersTimer == null) { periodicRepublishOffersTimer = UserThread.runPeriodically(() -> { if (!stopped) { republishOffers(); - } else { - log.debug("We have stopped already. We ignore that periodicRepublishOffersTimer.run call."); } }, REPUBLISH_INTERVAL_MS, TimeUnit.MILLISECONDS); - else - log.trace("periodicRepublishOffersTimer already stated"); + } } private void startPeriodicRefreshOffersTimer() { diff --git a/core/src/test/java/bisq/core/offer/OpenOfferManagerTest.java b/core/src/test/java/bisq/core/offer/OpenOfferManagerTest.java index 0fcf60daa4..5cc984687a 100644 --- a/core/src/test/java/bisq/core/offer/OpenOfferManagerTest.java +++ b/core/src/test/java/bisq/core/offer/OpenOfferManagerTest.java @@ -49,7 +49,7 @@ public class OpenOfferManagerTest { final OpenOfferManager manager = new OpenOfferManager(null, null, null, p2PService, null, null, null, offerBookService, null, null, null, - null, null, null, null, null, null, + null, null, null, null, null, null, null, persistenceManager); AtomicBoolean startEditOfferSuccessful = new AtomicBoolean(false); @@ -81,7 +81,7 @@ public class OpenOfferManagerTest { final OpenOfferManager manager = new OpenOfferManager(null, null, null, p2PService, null, null, null, offerBookService, null, null, null, - null, null, null, null, null, null, + null, null, null, null, null, null, null, persistenceManager); AtomicBoolean startEditOfferSuccessful = new AtomicBoolean(false); @@ -106,7 +106,7 @@ public class OpenOfferManagerTest { final OpenOfferManager manager = new OpenOfferManager(null, null, null, p2PService, null, null, null, offerBookService, null, null, null, - null, null, null, null, null, null, + null, null, null, null, null, null, null, persistenceManager); AtomicBoolean startEditOfferSuccessful = new AtomicBoolean(false); diff --git a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java index cdfdde5c46..e48b867571 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java @@ -73,6 +73,10 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { } } + public void flush() { + maybeBroadcastBundle(); + } + private void doShutDown() { broadcastHandlers.forEach(BroadcastHandler::cancel); if (timer != null) {