Use same model as in SellerSendsDepositTxAndDelayedPayoutTxMessage

Reason: the other model was already tested quite a lot and seems to work correctly.
We gain a lot of resiliance with min. costs (repeated mailbox messages - as they are the same
they will not cause higher number of msg, just a bit more traffic)
This commit is contained in:
chimp1984 2020-09-22 19:34:14 -05:00
parent 96221317e2
commit c7e0c51875
No known key found for this signature in database
GPG key ID: 9801B4EC591F90E3

View file

@ -19,7 +19,6 @@ package bisq.core.trade.protocol.tasks.buyer;
import bisq.core.btc.model.AddressEntry; import bisq.core.btc.model.AddressEntry;
import bisq.core.network.MessageState; import bisq.core.network.MessageState;
import bisq.core.payment.payload.PaymentMethod;
import bisq.core.trade.Trade; import bisq.core.trade.Trade;
import bisq.core.trade.messages.CounterCurrencyTransferStartedMessage; import bisq.core.trade.messages.CounterCurrencyTransferStartedMessage;
import bisq.core.trade.messages.TradeMessage; import bisq.core.trade.messages.TradeMessage;
@ -35,15 +34,23 @@ import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import static com.google.common.base.Preconditions.checkNotNull; /**
* We send the seller the BuyerSendCounterCurrencyTransferStartedMessage.
* We wait to receive a ACK message back and resend the message
* in case that does not happen in 10 minutes or if the message was stored in mailbox or failed. We keep repeating that
* with doubling the interval each time and until the MAX_RESEND_ATTEMPTS is reached.
* If never successful we give up and complete. It might be a valid case that the peer was not online for an extended
* time but we can be very sure that our message was stored as mailbox message in the network and one the peer goes
* online he will process it.
*/
@Slf4j @Slf4j
public class BuyerSendCounterCurrencyTransferStartedMessage extends SendMailboxMessageTask { public class BuyerSendCounterCurrencyTransferStartedMessage extends SendMailboxMessageTask {
private static final long MAX_REFRESH_INTERVAL = TimeUnit.HOURS.toMillis(4); private static final int MAX_RESEND_ATTEMPTS = 10;
private int delayInMin = 15;
private int resendCounter = 0;
private CounterCurrencyTransferStartedMessage message;
private ChangeListener<MessageState> listener; private ChangeListener<MessageState> listener;
private Timer timer; private Timer timer;
private CounterCurrencyTransferStartedMessage counterCurrencyTransferStartedMessage;
public BuyerSendCounterCurrencyTransferStartedMessage(TaskRunner<Trade> taskHandler, Trade trade) { public BuyerSendCounterCurrencyTransferStartedMessage(TaskRunner<Trade> taskHandler, Trade trade) {
super(taskHandler, trade); super(taskHandler, trade);
@ -51,7 +58,7 @@ public class BuyerSendCounterCurrencyTransferStartedMessage extends SendMailboxM
@Override @Override
protected TradeMessage getMessage(String tradeId) { protected TradeMessage getMessage(String tradeId) {
if (counterCurrencyTransferStartedMessage == null) { if (message == null) {
AddressEntry payoutAddressEntry = processModel.getBtcWalletService().getOrCreateAddressEntry(tradeId, AddressEntry payoutAddressEntry = processModel.getBtcWalletService().getOrCreateAddressEntry(tradeId,
AddressEntry.Context.TRADE_PAYOUT); AddressEntry.Context.TRADE_PAYOUT);
@ -60,7 +67,7 @@ public class BuyerSendCounterCurrencyTransferStartedMessage extends SendMailboxM
// messages where only the one which gets processed by the peer would be removed we use the same uid. All // messages where only the one which gets processed by the peer would be removed we use the same uid. All
// other data stays the same when we re-send the message at any time later. // other data stays the same when we re-send the message at any time later.
String deterministicId = tradeId + processModel.getMyNodeAddress().getFullAddress(); String deterministicId = tradeId + processModel.getMyNodeAddress().getFullAddress();
counterCurrencyTransferStartedMessage = new CounterCurrencyTransferStartedMessage( message = new CounterCurrencyTransferStartedMessage(
tradeId, tradeId,
payoutAddressEntry.getAddressString(), payoutAddressEntry.getAddressString(),
processModel.getMyNodeAddress(), processModel.getMyNodeAddress(),
@ -70,30 +77,47 @@ public class BuyerSendCounterCurrencyTransferStartedMessage extends SendMailboxM
deterministicId deterministicId
); );
} }
return counterCurrencyTransferStartedMessage; return message;
} }
@Override @Override
protected void setStateSent() { protected void setStateSent() {
trade.setState(Trade.State.BUYER_SENT_FIAT_PAYMENT_INITIATED_MSG); trade.setStateIfValidTransitionTo(Trade.State.BUYER_SENT_FIAT_PAYMENT_INITIATED_MSG);
} }
@Override @Override
protected void setStateArrived() { protected void setStateArrived() {
trade.setState(Trade.State.BUYER_SAW_ARRIVED_FIAT_PAYMENT_INITIATED_MSG); trade.setStateIfValidTransitionTo(Trade.State.BUYER_SAW_ARRIVED_FIAT_PAYMENT_INITIATED_MSG);
stop(); cleanup();
// Complete is called in base class
}
// We override the default behaviour for onStoredInMailbox and do not call complete
@Override
protected void onStoredInMailbox() {
setStateStoredInMailbox();
} }
@Override @Override
protected void setStateStoredInMailbox() { protected void setStateStoredInMailbox() {
trade.setState(Trade.State.BUYER_STORED_IN_MAILBOX_FIAT_PAYMENT_INITIATED_MSG); trade.setStateIfValidTransitionTo(Trade.State.BUYER_STORED_IN_MAILBOX_FIAT_PAYMENT_INITIATED_MSG);
start(); if (!trade.isPayoutPublished()) {
tryToSendAgainLater();
}
}
// We override the default behaviour for onFault and do not call appendToErrorMessage and failed
@Override
protected void onFault(String errorMessage, TradeMessage message) {
setStateFault();
} }
@Override @Override
protected void setStateFault() { protected void setStateFault() {
trade.setState(Trade.State.BUYER_SEND_FAILED_FIAT_PAYMENT_INITIATED_MSG); trade.setStateIfValidTransitionTo(Trade.State.BUYER_SEND_FAILED_FIAT_PAYMENT_INITIATED_MSG);
start(); if (!trade.isPayoutPublished()) {
tryToSendAgainLater();
}
} }
@Override @Override
@ -104,38 +128,53 @@ public class BuyerSendCounterCurrencyTransferStartedMessage extends SendMailboxM
super.run(); super.run();
} catch (Throwable t) { } catch (Throwable t) {
failed(t); failed(t);
} finally {
cleanup();
} }
} }
private void stop() { private void cleanup() {
if (timer != null) { if (timer != null) {
timer.stop(); timer.stop();
}
if (listener != null) {
processModel.getPaymentStartedMessageStateProperty().removeListener(listener); processModel.getPaymentStartedMessageStateProperty().removeListener(listener);
} }
} }
// The listeners ensure we don't get GCed even we have completed the task. private void tryToSendAgainLater() {
private void start() { if (resendCounter >= MAX_RESEND_ATTEMPTS) {
if (timer != null) { cleanup();
log.warn("We never received an ACK message when sending the CounterCurrencyTransferStartedMessage to the peer. " +
"We stop now and complete the protocol task.");
complete();
return; return;
} }
PaymentMethod paymentMethod = checkNotNull(trade.getOffer()).getPaymentMethod(); log.info("We send the message again to the peer after a delay of {} min.", delayInMin);
// For instant trades with 1 hour we want a short interval, otherwise a few hours should be ok. if (timer != null) {
long interval = Math.min(paymentMethod.getMaxTradePeriod() / 5, MAX_REFRESH_INTERVAL); timer.stop();
timer = UserThread.runPeriodically(this::run, interval, TimeUnit.MILLISECONDS); }
timer = UserThread.runAfter(this::run, delayInMin, TimeUnit.MINUTES);
listener = (observable, oldValue, newValue) -> { if (resendCounter == 0) {
// Once we receive an ACK from our msg we know the peer has received the msg and we stop. // We want to register listener only once
if (newValue == MessageState.ACKNOWLEDGED) { listener = (observable, oldValue, newValue) -> onMessageStateChange(newValue);
// We treat a ACK like BUYER_SAW_ARRIVED_FIAT_PAYMENT_INITIATED_MSG processModel.getPaymentStartedMessageStateProperty().addListener(listener);
if (trade.getState().getPhase() == Trade.Phase.FIAT_SENT) { onMessageStateChange(processModel.getPaymentStartedMessageStateProperty().get());
trade.setState(Trade.State.BUYER_SAW_ARRIVED_FIAT_PAYMENT_INITIATED_MSG); }
}
// Ensure listener construction is completed before remove call delayInMin = delayInMin * 2;
UserThread.execute(this::stop); resendCounter++;
} }
};
processModel.getPaymentStartedMessageStateProperty().addListener(listener); private void onMessageStateChange(MessageState newValue) {
// Once we receive an ACK from our msg we know the peer has received the msg and we stop.
if (newValue == MessageState.ACKNOWLEDGED) {
// We treat a ACK like BUYER_SAW_ARRIVED_FIAT_PAYMENT_INITIATED_MSG
trade.setStateIfValidTransitionTo(Trade.State.BUYER_SAW_ARRIVED_FIAT_PAYMENT_INITIATED_MSG);
cleanup();
complete();
}
} }
} }