Merge pull request #4950 from chimp1984/improve-offer-publishing

Improve offer publishing
This commit is contained in:
Christoph Atteneder 2020-12-29 19:51:21 +01:00 committed by GitHub
commit 94cf02303e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 57 additions and 37 deletions

View File

@ -49,6 +49,7 @@ import bisq.network.p2p.DecryptedMessageWithPubKey;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.P2PService;
import bisq.network.p2p.SendDirectMessageListener;
import bisq.network.p2p.peers.Broadcaster;
import bisq.network.p2p.peers.PeerManager;
import bisq.common.Timer;
@ -117,6 +118,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
private final RefundAgentManager refundAgentManager;
private final DaoFacade daoFacade;
private final FilterManager filterManager;
private final Broadcaster broadcaster;
private final PersistenceManager<TradableList<OpenOffer>> persistenceManager;
private final Map<String, OpenOffer> offersToBeEdited = new HashMap<>();
private final TradableList<OpenOffer> openOffers = new TradableList<>();
@ -148,6 +150,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
RefundAgentManager refundAgentManager,
DaoFacade daoFacade,
FilterManager filterManager,
Broadcaster broadcaster,
PersistenceManager<TradableList<OpenOffer>> persistenceManager) {
this.createOfferService = createOfferService;
this.keyRing = keyRing;
@ -166,6 +169,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
this.refundAgentManager = refundAgentManager;
this.daoFacade = daoFacade;
this.filterManager = filterManager;
this.broadcaster = broadcaster;
this.persistenceManager = persistenceManager;
this.persistenceManager.initialize(openOffers, "OpenOffers", PersistenceManager.Source.PRIVATE);
@ -214,10 +218,6 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
});
}
private void shutDown() {
shutDown(null);
}
public void shutDown(@Nullable Runnable completeHandler) {
stopped = true;
p2PService.getPeerManager().removeListener(this);
@ -235,6 +235,11 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
UserThread.execute(() -> openOffers.forEach(
openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer().getOfferPayload())
));
// Force broadcaster to send out immediately, otherwise we could have a 2 sec delay until the
// bundled messages sent out.
broadcaster.flush();
if (completeHandler != null) {
// For typical number of offers we are tolerant with delay to give enough time to broadcast.
// If number of offers is very high we limit to 3 sec. to not delay other shutdown routines.
@ -873,41 +878,53 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
///////////////////////////////////////////////////////////////////////////////////////////
private void republishOffers() {
int size = openOffers.size();
final ArrayList<OpenOffer> openOffersList = new ArrayList<>(openOffers.getList());
if (!stopped) {
stopPeriodicRefreshOffersTimer();
for (int i = 0; i < size; i++) {
// we delay to avoid reaching throttle limits
if (stopped) {
return;
}
long delay = 700;
final long minDelay = (i + 1) * delay;
final long maxDelay = (i + 2) * delay;
final OpenOffer openOffer = openOffersList.get(i);
UserThread.runAfterRandomDelay(() -> {
if (openOffers.contains(openOffer)) {
String id = openOffer.getId();
if (id != null && !openOffer.isDeactivated())
republishOffer(openOffer);
}
stopPeriodicRefreshOffersTimer();
}, minDelay, maxDelay, TimeUnit.MILLISECONDS);
}
List<OpenOffer> openOffersList = new ArrayList<>(openOffers.getList());
processListForRepublishOffers(openOffersList);
}
private void processListForRepublishOffers(List<OpenOffer> list) {
if (list.isEmpty()) {
return;
}
OpenOffer openOffer = list.remove(0);
if (openOffers.contains(openOffer) && !openOffer.isDeactivated()) {
// TODO It is not clear yet if it is better for the node and the network to send out all add offer
// messages in one go or to spread it over a delay. With power users who have 100-200 offers that can have
// some significant impact to user experience and the network
republishOffer(openOffer, () -> processListForRepublishOffers(list));
/* republishOffer(openOffer,
() -> UserThread.runAfter(() -> processListForRepublishOffers(list),
30, TimeUnit.MILLISECONDS));*/
} else {
log.debug("We have stopped already. We ignore that republishOffers call.");
// If the offer was removed in the meantime or if its deactivated we skip and call
// processListForRepublishOffers again with the list where we removed the offer already.
processListForRepublishOffers(list);
}
}
private void republishOffer(OpenOffer openOffer) {
republishOffer(openOffer, null);
}
private void republishOffer(OpenOffer openOffer, @Nullable Runnable completeHandler) {
offerBookService.addOffer(openOffer.getOffer(),
() -> {
if (!stopped) {
log.debug("Successfully added offer to P2P network.");
// Refresh means we send only the data needed to refresh the TTL (hash, signature and sequence no.)
if (periodicRefreshOffersTimer == null)
if (periodicRefreshOffersTimer == null) {
startPeriodicRefreshOffersTimer();
} else {
log.debug("We have stopped already. We ignore that offerBookService.republishOffers.onSuccess call.");
}
if (completeHandler != null) {
completeHandler.run();
}
}
},
errorMessage -> {
@ -916,26 +933,25 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
stopRetryRepublishOffersTimer();
retryRepublishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers,
RETRY_REPUBLISH_DELAY_SEC);
} else {
log.debug("We have stopped already. We ignore that offerBookService.republishOffers.onFault call.");
if (completeHandler != null) {
completeHandler.run();
}
}
});
}
private void startPeriodicRepublishOffersTimer() {
stopped = false;
if (periodicRepublishOffersTimer == null)
if (periodicRepublishOffersTimer == null) {
periodicRepublishOffersTimer = UserThread.runPeriodically(() -> {
if (!stopped) {
republishOffers();
} else {
log.debug("We have stopped already. We ignore that periodicRepublishOffersTimer.run call.");
}
},
REPUBLISH_INTERVAL_MS,
TimeUnit.MILLISECONDS);
else
log.trace("periodicRepublishOffersTimer already stated");
}
}
private void startPeriodicRefreshOffersTimer() {

View File

@ -49,7 +49,7 @@ public class OpenOfferManagerTest {
final OpenOfferManager manager = new OpenOfferManager(null, null, null, p2PService,
null, null, null, offerBookService,
null, null, null,
null, null, null, null, null, null,
null, null, null, null, null, null, null,
persistenceManager);
AtomicBoolean startEditOfferSuccessful = new AtomicBoolean(false);
@ -81,7 +81,7 @@ public class OpenOfferManagerTest {
final OpenOfferManager manager = new OpenOfferManager(null, null, null, p2PService,
null, null, null, offerBookService,
null, null, null,
null, null, null, null, null, null,
null, null, null, null, null, null, null,
persistenceManager);
AtomicBoolean startEditOfferSuccessful = new AtomicBoolean(false);
@ -106,7 +106,7 @@ public class OpenOfferManagerTest {
final OpenOfferManager manager = new OpenOfferManager(null, null, null, p2PService,
null, null, null, offerBookService,
null, null, null,
null, null, null, null, null, null,
null, null, null, null, null, null, null,
persistenceManager);
AtomicBoolean startEditOfferSuccessful = new AtomicBoolean(false);

View File

@ -73,6 +73,10 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
}
}
public void flush() {
maybeBroadcastBundle();
}
private void doShutDown() {
broadcastHandlers.forEach(BroadcastHandler::cancel);
if (timer != null) {