From cdbd6cdfa8189a58f74518505209da447801f57e Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Wed, 23 Sep 2020 16:21:29 -0500 Subject: [PATCH] Redesigned fluent API again... Move code duplication to Buyer protocol --- .../trade/protocol/BuyerAsMakerProtocol.java | 140 ++++---- .../trade/protocol/BuyerAsTakerProtocol.java | 153 ++++----- .../core/trade/protocol/BuyerProtocol.java | 31 ++ .../trade/protocol/SellerAsMakerProtocol.java | 115 ++++--- .../trade/protocol/SellerAsTakerProtocol.java | 89 +++--- .../core/trade/protocol/TradeProtocol.java | 299 +++++++++++------- 6 files changed, 436 insertions(+), 391 deletions(-) diff --git a/core/src/main/java/bisq/core/trade/protocol/BuyerAsMakerProtocol.java b/core/src/main/java/bisq/core/trade/protocol/BuyerAsMakerProtocol.java index 28f4985835..1e015215e9 100644 --- a/core/src/main/java/bisq/core/trade/protocol/BuyerAsMakerProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/BuyerAsMakerProtocol.java @@ -11,7 +11,7 @@ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public * License for more details. * - * You should have received a copy of the GNU Affero General Public License + * You should have with a copy of the GNU Affero General Public License * along with Bisq. If not, see . */ @@ -62,28 +62,7 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol public BuyerAsMakerProtocol(BuyerAsMakerTrade trade) { super(trade); - Trade.Phase phase = trade.getState().getPhase(); - if (phase == Trade.Phase.TAKER_FEE_PUBLISHED) { - TradeTaskRunner taskRunner = new TradeTaskRunner(trade, - () -> handleTaskRunnerSuccess(BuyerEvent.STARTUP), - errorMessage -> handleTaskRunnerFault(BuyerEvent.STARTUP, errorMessage)); - - taskRunner.addTasks(BuyerSetupDepositTxListener.class); - taskRunner.run(); - } else if (trade.isFiatSent() && !trade.isPayoutPublished()) { - TradeTaskRunner taskRunner = new TradeTaskRunner(trade, - () -> handleTaskRunnerSuccess(BuyerEvent.STARTUP), - errorMessage -> handleTaskRunnerFault(BuyerEvent.STARTUP, errorMessage)); - - taskRunner.addTasks(BuyerSetupPayoutTxListener.class); - if (trade.getState() == Trade.State.BUYER_STORED_IN_MAILBOX_FIAT_PAYMENT_INITIATED_MSG || - trade.getState() == Trade.State.BUYER_SEND_FAILED_FIAT_PAYMENT_INITIATED_MSG) { - // In case we have not received an ACK from the CounterCurrencyTransferStartedMessage we re-send it - // periodically in BuyerSendCounterCurrencyTransferStartedMessage - taskRunner.addTasks(BuyerSendCounterCurrencyTransferStartedMessage.class); - } - taskRunner.run(); - } + maybeSetupTaskRunners(trade, processModel, this); } @@ -104,24 +83,17 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol /////////////////////////////////////////////////////////////////////////////////////////// - // Start trade + // Handle take offer request /////////////////////////////////////////////////////////////////////////////////////////// @Override public void handleTakeOfferRequest(InputsForDepositTxRequest message, NodeAddress peer, ErrorMessageHandler errorMessageHandler) { - expectedPhase(Trade.Phase.INIT) - .on(message) - .from(peer) - .withTimeout(30) - .setTaskRunner(new TradeTaskRunner(trade, - () -> handleTaskRunnerSuccess(message), - errorMessage -> { - errorMessageHandler.handleErrorMessage(errorMessage); - handleTaskRunnerFault(message, errorMessage); - })) - .addTasks( + given(phase(Trade.Phase.INIT) + .with(message) + .from(peer)) + .setup(tasks( MakerProcessesInputsForDepositTxRequest.class, ApplyFilter.class, VerifyPeersAccountAgeWitness.class, @@ -130,9 +102,15 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol MakerCreateAndSignContract.class, BuyerAsMakerCreatesAndSignsDepositTx.class, BuyerSetupDepositTxListener.class, - BuyerAsMakerSendsInputsForDepositTxResponse.class - ) - .runTasks(); + BuyerAsMakerSendsInputsForDepositTxResponse.class). + using(new TradeTaskRunner(trade, + () -> handleTaskRunnerSuccess(message), + errorMessage -> { + errorMessageHandler.handleErrorMessage(errorMessage); + handleTaskRunnerFault(message, errorMessage); + })) + .withTimeout(30)) + .executeTasks(); } @@ -141,20 +119,20 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol /////////////////////////////////////////////////////////////////////////////////////////// private void handle(DelayedPayoutTxSignatureRequest message, NodeAddress peer) { - expectedPhase(Trade.Phase.TAKER_FEE_PUBLISHED) - .on(message) - .from(peer) - .withTimeout(30) - .addTasks( + given(phase(Trade.Phase.TAKER_FEE_PUBLISHED) + .with(message) + .from(peer)) + .setup(tasks( BuyerProcessDelayedPayoutTxSignatureRequest.class, BuyerVerifiesPreparedDelayedPayoutTx.class, BuyerSignsDelayedPayoutTx.class, - BuyerSendsDelayedPayoutTxSignatureResponse.class - ).runTasks(); + BuyerSendsDelayedPayoutTxSignatureResponse.class) + .withTimeout(30)) + .executeTasks(); } // The DepositTxAndDelayedPayoutTxMessage is a mailbox message as earlier we use only the deposit tx which can - // be also received from the network once published. + // be also with from the network once published. // Now we send the delayed payout tx as well and with that this message is mandatory for continuing the protocol. // We do not support mailbox message handling during the take offer process as it is expected that both peers // are online. @@ -162,31 +140,30 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol // mailbox message but the stored in mailbox case is not expected and the seller would try to send the message again // in the hope to reach the buyer directly. private void handle(DepositTxAndDelayedPayoutTxMessage message, NodeAddress peer) { - expectedPhases(Trade.Phase.TAKER_FEE_PUBLISHED, Trade.Phase.DEPOSIT_PUBLISHED) - .on(message) + given(anyPhase(Trade.Phase.TAKER_FEE_PUBLISHED, Trade.Phase.DEPOSIT_PUBLISHED) + .with(message) .from(peer) .preCondition(trade.getDepositTx() == null || trade.getDelayedPayoutTx() == null, () -> { - log.warn("We received a DepositTxAndDelayedPayoutTxMessage but we have already processed the deposit and " + + log.warn("We with a DepositTxAndDelayedPayoutTxMessage but we have already processed the deposit and " + "delayed payout tx so we ignore the message. This can happen if the ACK message to the peer did not " + "arrive and the peer repeats sending us the message. We send another ACK msg."); stopTimeout(); sendAckMessage(message, true, null); processModel.removeMailboxMessageAfterProcessing(trade); - }) - .setTaskRunner(new TradeTaskRunner(trade, - () -> { - stopTimeout(); - handleTaskRunnerSuccess(message); - }, - errorMessage -> handleTaskRunnerFault(message, errorMessage))) - .addTasks( + })) + .setup(tasks( BuyerProcessDepositTxAndDelayedPayoutTxMessage.class, BuyerVerifiesFinalDelayedPayoutTx.class, - PublishTradeStatistics.class - ) + PublishTradeStatistics.class) + .using(new TradeTaskRunner(trade, + () -> { + stopTimeout(); + handleTaskRunnerSuccess(message); + }, + errorMessage -> handleTaskRunnerFault(message, errorMessage)))) .run(() -> processModel.witnessDebugLog(trade)) - .runTasks(); + .executeTasks(); } @@ -197,27 +174,26 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol @Override public void onFiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { BuyerEvent event = BuyerEvent.PAYMENT_SENT; - expectedPhase(Trade.Phase.DEPOSIT_CONFIRMED) - .on(event) - .preCondition(!wasDisputed()) - .setTaskRunner(new TradeTaskRunner(trade, - () -> { - resultHandler.handleResult(); - handleTaskRunnerSuccess(event); - }, - (errorMessage) -> { - errorMessageHandler.handleErrorMessage(errorMessage); - handleTaskRunnerFault(event, errorMessage); - })) - .addTasks( + given(phase(Trade.Phase.DEPOSIT_CONFIRMED) + .with(event) + .preCondition(!wasDisputed())) + .setup(tasks( ApplyFilter.class, MakerVerifyTakerFeePayment.class, BuyerSignPayoutTx.class, BuyerSetupPayoutTxListener.class, - BuyerSendCounterCurrencyTransferStartedMessage.class - ) + BuyerSendCounterCurrencyTransferStartedMessage.class) + .using(new TradeTaskRunner(trade, + () -> { + resultHandler.handleResult(); + handleTaskRunnerSuccess(event); + }, + (errorMessage) -> { + errorMessageHandler.handleErrorMessage(errorMessage); + handleTaskRunnerFault(event, errorMessage); + }))) .run(() -> trade.setState(Trade.State.BUYER_CONFIRMED_IN_UI_FIAT_PAYMENT_INITIATED)) - .runTasks(); + .executeTasks(); } @@ -226,13 +202,11 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol /////////////////////////////////////////////////////////////////////////////////////////// private void handle(PayoutTxPublishedMessage message, NodeAddress peer) { - expectedPhases(Trade.Phase.FIAT_SENT, Trade.Phase.PAYOUT_PUBLISHED) - .on(message) - .from(peer) - .addTasks( - BuyerProcessPayoutTxPublishedMessage.class - ) - .runTasks(); + given(anyPhase(Trade.Phase.FIAT_SENT, Trade.Phase.PAYOUT_PUBLISHED) + .with(message) + .from(peer)) + .setup(tasks(BuyerProcessPayoutTxPublishedMessage.class)) + .executeTasks(); } diff --git a/core/src/main/java/bisq/core/trade/protocol/BuyerAsTakerProtocol.java b/core/src/main/java/bisq/core/trade/protocol/BuyerAsTakerProtocol.java index 5b142ed670..f4e042a335 100644 --- a/core/src/main/java/bisq/core/trade/protocol/BuyerAsTakerProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/BuyerAsTakerProtocol.java @@ -11,14 +11,13 @@ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public * License for more details. * - * You should have received a copy of the GNU Affero General Public License + * You should have with a copy of the GNU Affero General Public License * along with Bisq. If not, see . */ package bisq.core.trade.protocol; -import bisq.core.offer.Offer; import bisq.core.trade.BuyerAsTakerTrade; import bisq.core.trade.Trade; import bisq.core.trade.messages.DelayedPayoutTxSignatureRequest; @@ -57,8 +56,6 @@ import bisq.common.handlers.ResultHandler; import lombok.extern.slf4j.Slf4j; -import static com.google.common.base.Preconditions.checkNotNull; - @Slf4j public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol, TakerProtocol { @@ -69,32 +66,7 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol public BuyerAsTakerProtocol(BuyerAsTakerTrade trade) { super(trade); - Offer offer = checkNotNull(trade.getOffer()); - processModel.getTradingPeer().setPubKeyRing(offer.getPubKeyRing()); - - Trade.Phase phase = trade.getState().getPhase(); - if (phase == Trade.Phase.TAKER_FEE_PUBLISHED) { - TradeTaskRunner taskRunner = new TradeTaskRunner(trade, - () -> handleTaskRunnerSuccess(BuyerEvent.STARTUP), - errorMessage -> handleTaskRunnerFault(BuyerEvent.STARTUP, errorMessage)); - - taskRunner.addTasks(BuyerSetupDepositTxListener.class); - taskRunner.run(); - } else if (trade.isFiatSent() && !trade.isPayoutPublished()) { - TradeTaskRunner taskRunner = new TradeTaskRunner(trade, - () -> handleTaskRunnerSuccess(BuyerEvent.STARTUP), - errorMessage -> handleTaskRunnerFault(BuyerEvent.STARTUP, errorMessage)); - - taskRunner.addTasks(BuyerSetupPayoutTxListener.class); - - if (trade.getState() == Trade.State.BUYER_STORED_IN_MAILBOX_FIAT_PAYMENT_INITIATED_MSG || - trade.getState() == Trade.State.BUYER_SEND_FAILED_FIAT_PAYMENT_INITIATED_MSG) { - // In case we have not received an ACK from the CounterCurrencyTransferStartedMessage we re-send it - // periodically in BuyerSendCounterCurrencyTransferStartedMessage - taskRunner.addTasks(BuyerSendCounterCurrencyTransferStartedMessage.class); - } - taskRunner.run(); - } + maybeSetupTaskRunners(trade, processModel, this); } @@ -103,33 +75,34 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol /////////////////////////////////////////////////////////////////////////////////////////// @Override - public void doApplyMailboxTradeMessage(TradeMessage message, NodeAddress peerNodeAddress) { - super.doApplyMailboxTradeMessage(message, peerNodeAddress); + public void doApplyMailboxTradeMessage(TradeMessage message, NodeAddress peer) { + super.doApplyMailboxTradeMessage(message, peer); if (message instanceof DepositTxAndDelayedPayoutTxMessage) { - handle((DepositTxAndDelayedPayoutTxMessage) message, peerNodeAddress); + handle((DepositTxAndDelayedPayoutTxMessage) message, peer); } else if (message instanceof PayoutTxPublishedMessage) { - handle((PayoutTxPublishedMessage) message, peerNodeAddress); + handle((PayoutTxPublishedMessage) message, peer); } } /////////////////////////////////////////////////////////////////////////////////////////// - // Start trade + // Take offer /////////////////////////////////////////////////////////////////////////////////////////// @Override public void takeAvailableOffer() { - expectedPhase(Trade.Phase.INIT) - .on(TakerEvent.TAKE_OFFER) - .withTimeout(30) - .addTasks( + given(phase(Trade.Phase.INIT) + .with(TakerEvent.TAKE_OFFER)) + .setup(tasks( ApplyFilter.class, TakerVerifyMakerFeePayment.class, CreateTakerFeeTx.class, BuyerAsTakerCreatesDepositTxInputs.class, - TakerSendInputsForDepositTxRequest.class - ).runTasks(); + TakerSendInputsForDepositTxRequest.class) + .withTimeout(30)) + .run(() -> processModel.setTempTradingPeerNodeAddress(trade.getTradingPeerNodeAddress())) + .executeTasks(); } @@ -138,11 +111,10 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol /////////////////////////////////////////////////////////////////////////////////////////// private void handle(InputsForDepositTxResponse message, NodeAddress peer) { - expectedPhase(Trade.Phase.INIT) - .on(message) - .from(peer) - .withTimeout(30) - .addTasks(TakerProcessesInputsForDepositTxResponse.class, + given(phase(Trade.Phase.INIT) + .with(message) + .from(peer)) + .setup(tasks(TakerProcessesInputsForDepositTxResponse.class, ApplyFilter.class, VerifyPeersAccountAgeWitness.class, TakerVerifyAndSignContract.class, @@ -150,25 +122,25 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol BuyerAsTakerSignsDepositTx.class, BuyerSetupDepositTxListener.class, BuyerAsTakerSendsDepositTxMessage.class) - .runTasks(); + .withTimeout(30)) + .executeTasks(); } private void handle(DelayedPayoutTxSignatureRequest message, NodeAddress peer) { - expectedPhase(Trade.Phase.TAKER_FEE_PUBLISHED) - .on(message) - .from(peer) - .withTimeout(30) - .addTasks( + given(phase(Trade.Phase.TAKER_FEE_PUBLISHED) + .with(message) + .from(peer)) + .setup(tasks( BuyerProcessDelayedPayoutTxSignatureRequest.class, BuyerVerifiesPreparedDelayedPayoutTx.class, BuyerSignsDelayedPayoutTx.class, - BuyerSendsDelayedPayoutTxSignatureResponse.class - ) - .runTasks(); + BuyerSendsDelayedPayoutTxSignatureResponse.class) + .withTimeout(30)) + .executeTasks(); } // The DepositTxAndDelayedPayoutTxMessage is a mailbox message as earlier we use only the deposit tx which can - // be also received from the network once published. + // be also with from the network once published. // Now we send the delayed payout tx as well and with that this message is mandatory for continuing the protocol. // We do not support mailbox message handling during the take offer process as it is expected that both peers // are online. @@ -176,29 +148,29 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol // mailbox message but the stored in mailbox case is not expected and the seller would try to send the message again // in the hope to reach the buyer directly. private void handle(DepositTxAndDelayedPayoutTxMessage message, NodeAddress peer) { - expectedPhases(Trade.Phase.TAKER_FEE_PUBLISHED, Trade.Phase.DEPOSIT_PUBLISHED) - .on(message) + given(anyPhase(Trade.Phase.TAKER_FEE_PUBLISHED, Trade.Phase.DEPOSIT_PUBLISHED) + .with(message) .from(peer) .preCondition(trade.getDepositTx() == null || trade.getDelayedPayoutTx() == null, () -> { - log.warn("We received a DepositTxAndDelayedPayoutTxMessage but we have already processed the deposit and " + + log.warn("We with a DepositTxAndDelayedPayoutTxMessage but we have already processed the deposit and " + "delayed payout tx so we ignore the message. This can happen if the ACK message to the peer did not " + "arrive and the peer repeats sending us the message. We send another ACK msg."); stopTimeout(); sendAckMessage(message, true, null); processModel.removeMailboxMessageAfterProcessing(trade); - }) - .setTaskRunner(new TradeTaskRunner(trade, - () -> { - stopTimeout(); - handleTaskRunnerSuccess(message); - }, - errorMessage -> handleTaskRunnerFault(message, errorMessage))) - .addTasks(BuyerProcessDepositTxAndDelayedPayoutTxMessage.class, + })) + .setup(tasks(BuyerProcessDepositTxAndDelayedPayoutTxMessage.class, BuyerVerifiesFinalDelayedPayoutTx.class, PublishTradeStatistics.class) - .runTasks(); - //processModel.witnessDebugLog(buyerAsTakerTrade); + .using(new TradeTaskRunner(trade, + () -> { + stopTimeout(); + handleTaskRunnerSuccess(message); + }, + errorMessage -> handleTaskRunnerFault(message, errorMessage)))) + .run(() -> processModel.witnessDebugLog(trade)) + .executeTasks(); } @@ -210,26 +182,25 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol @Override public void onFiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { BuyerEvent event = BuyerEvent.PAYMENT_SENT; - expectedPhase(Trade.Phase.DEPOSIT_CONFIRMED) - .on(event) - .preCondition(!wasDisputed()) - .setTaskRunner(new TradeTaskRunner(trade, - () -> { - resultHandler.handleResult(); - handleTaskRunnerSuccess(event); - }, - (errorMessage) -> { - errorMessageHandler.handleErrorMessage(errorMessage); - handleTaskRunnerFault(event, errorMessage); - })) - .addTasks(ApplyFilter.class, + given(phase(Trade.Phase.DEPOSIT_CONFIRMED) + .with(event) + .preCondition(!wasDisputed())) + .setup(tasks(ApplyFilter.class, TakerVerifyMakerFeePayment.class, BuyerSignPayoutTx.class, BuyerSetupPayoutTxListener.class, - BuyerSendCounterCurrencyTransferStartedMessage.class - ) + BuyerSendCounterCurrencyTransferStartedMessage.class) + .using(new TradeTaskRunner(trade, + () -> { + resultHandler.handleResult(); + handleTaskRunnerSuccess(event); + }, + (errorMessage) -> { + errorMessageHandler.handleErrorMessage(errorMessage); + handleTaskRunnerFault(event, errorMessage); + }))) .run(() -> trade.setState(Trade.State.BUYER_CONFIRMED_IN_UI_FIAT_PAYMENT_INITIATED)) - .runTasks(); + .executeTasks(); } @@ -238,13 +209,11 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol /////////////////////////////////////////////////////////////////////////////////////////// private void handle(PayoutTxPublishedMessage message, NodeAddress peer) { - expectedPhases(Trade.Phase.FIAT_SENT, Trade.Phase.PAYOUT_PUBLISHED) - .on(message) - .from(peer) - .addTasks( - BuyerProcessPayoutTxPublishedMessage.class - ) - .runTasks(); + given(anyPhase(Trade.Phase.FIAT_SENT, Trade.Phase.PAYOUT_PUBLISHED) + .with(message) + .from(peer)) + .setup(tasks(BuyerProcessPayoutTxPublishedMessage.class)) + .executeTasks(); } diff --git a/core/src/main/java/bisq/core/trade/protocol/BuyerProtocol.java b/core/src/main/java/bisq/core/trade/protocol/BuyerProtocol.java index 19c195a2ef..da37e2755f 100644 --- a/core/src/main/java/bisq/core/trade/protocol/BuyerProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/BuyerProtocol.java @@ -17,9 +17,17 @@ package bisq.core.trade.protocol; +import bisq.core.offer.Offer; +import bisq.core.trade.Trade; +import bisq.core.trade.protocol.tasks.buyer.BuyerSendCounterCurrencyTransferStartedMessage; +import bisq.core.trade.protocol.tasks.buyer.BuyerSetupDepositTxListener; +import bisq.core.trade.protocol.tasks.buyer.BuyerSetupPayoutTxListener; + import bisq.common.handlers.ErrorMessageHandler; import bisq.common.handlers.ResultHandler; +import static com.google.common.base.Preconditions.checkNotNull; + public interface BuyerProtocol { void onFiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler); @@ -27,4 +35,27 @@ public interface BuyerProtocol { STARTUP, PAYMENT_SENT } + + default void maybeSetupTaskRunners(Trade trade, ProcessModel processModel, TradeProtocol tradeProtocol) { + Offer offer = checkNotNull(trade.getOffer()); + processModel.getTradingPeer().setPubKeyRing(offer.getPubKeyRing()); + + + tradeProtocol.given(tradeProtocol.phase(Trade.Phase.TAKER_FEE_PUBLISHED) + .with(BuyerEvent.STARTUP)) + .setup(tradeProtocol.tasks(BuyerSetupDepositTxListener.class)) + .executeTasks(); + + tradeProtocol.given(tradeProtocol.anyPhase(Trade.Phase.FIAT_SENT, Trade.Phase.FIAT_RECEIVED) + .with(BuyerEvent.STARTUP)) + .setup(tradeProtocol.tasks(BuyerSetupPayoutTxListener.class)) + .executeTasks(); + + tradeProtocol.given(tradeProtocol.anyPhase(Trade.Phase.FIAT_SENT, Trade.Phase.FIAT_RECEIVED) + .anyState(Trade.State.BUYER_STORED_IN_MAILBOX_FIAT_PAYMENT_INITIATED_MSG, + Trade.State.BUYER_SEND_FAILED_FIAT_PAYMENT_INITIATED_MSG) + .with(BuyerEvent.STARTUP)) + .setup(tradeProtocol.tasks(BuyerSendCounterCurrencyTransferStartedMessage.class)) + .executeTasks(); + } } diff --git a/core/src/main/java/bisq/core/trade/protocol/SellerAsMakerProtocol.java b/core/src/main/java/bisq/core/trade/protocol/SellerAsMakerProtocol.java index 3124b2ccf8..752c242096 100644 --- a/core/src/main/java/bisq/core/trade/protocol/SellerAsMakerProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/SellerAsMakerProtocol.java @@ -82,24 +82,17 @@ public class SellerAsMakerProtocol extends TradeProtocol implements SellerProtoc /////////////////////////////////////////////////////////////////////////////////////////// - // Start trade + // Handle take offer request /////////////////////////////////////////////////////////////////////////////////////////// @Override public void handleTakeOfferRequest(InputsForDepositTxRequest message, NodeAddress peer, ErrorMessageHandler errorMessageHandler) { - expectedPhase(Trade.Phase.INIT) - .on(message) - .from(peer) - .withTimeout(30) - .setTaskRunner(new TradeTaskRunner(trade, - () -> handleTaskRunnerSuccess(message), - errorMessage -> { - errorMessageHandler.handleErrorMessage(errorMessage); - handleTaskRunnerFault(message, errorMessage); - })) - .addTasks( + given(phase(Trade.Phase.INIT) + .with(message) + .from(peer)) + .setup(tasks( MakerProcessesInputsForDepositTxRequest.class, ApplyFilter.class, VerifyPeersAccountAgeWitness.class, @@ -107,9 +100,15 @@ public class SellerAsMakerProtocol extends TradeProtocol implements SellerProtoc MakerSetsLockTime.class, MakerCreateAndSignContract.class, SellerAsMakerCreatesUnsignedDepositTx.class, - SellerAsMakerSendsInputsForDepositTxResponse.class - ) - .runTasks(); + SellerAsMakerSendsInputsForDepositTxResponse.class) + .using(new TradeTaskRunner(trade, + () -> handleTaskRunnerSuccess(message), + errorMessage -> { + errorMessageHandler.handleErrorMessage(errorMessage); + handleTaskRunnerFault(message, errorMessage); + })) + .withTimeout(30)) + .executeTasks(); } @@ -118,39 +117,37 @@ public class SellerAsMakerProtocol extends TradeProtocol implements SellerProtoc /////////////////////////////////////////////////////////////////////////////////////////// protected void handle(DepositTxMessage message, NodeAddress peer) { - expectedPhase(Trade.Phase.TAKER_FEE_PUBLISHED) - .on(message) - .from(peer) - .withTimeout(30) - .addTasks( + given(phase(Trade.Phase.TAKER_FEE_PUBLISHED) + .with(message) + .from(peer)) + .setup(tasks( SellerAsMakerProcessDepositTxMessage.class, SellerAsMakerFinalizesDepositTx.class, SellerCreatesDelayedPayoutTx.class, - SellerSendDelayedPayoutTxSignatureRequest.class - ) - .runTasks(); + SellerSendDelayedPayoutTxSignatureRequest.class) + .withTimeout(30)) + .executeTasks(); } private void handle(DelayedPayoutTxSignatureResponse message, NodeAddress peer) { - expectedPhase(Trade.Phase.TAKER_FEE_PUBLISHED) - .on(message) - .from(peer) - .setTaskRunner(new TradeTaskRunner(trade, - () -> { - stopTimeout(); - handleTaskRunnerSuccess(message); - }, - errorMessage -> handleTaskRunnerFault(message, errorMessage))) - .addTasks( + given(phase(Trade.Phase.TAKER_FEE_PUBLISHED) + .with(message) + .from(peer)) + .setup(tasks( SellerProcessDelayedPayoutTxSignatureResponse.class, SellerSignsDelayedPayoutTx.class, SellerFinalizesDelayedPayoutTx.class, SellerSendsDepositTxAndDelayedPayoutTxMessage.class, SellerPublishesDepositTx.class, - PublishTradeStatistics.class - ) + PublishTradeStatistics.class) + .using(new TradeTaskRunner(trade, + () -> { + stopTimeout(); + handleTaskRunnerSuccess(message); + }, + errorMessage -> handleTaskRunnerFault(message, errorMessage)))) .run(() -> processModel.witnessDebugLog(trade)) //TODO still needed? If so move to witness domain - .runTasks(); + .executeTasks(); } @@ -159,23 +156,22 @@ public class SellerAsMakerProtocol extends TradeProtocol implements SellerProtoc /////////////////////////////////////////////////////////////////////////////////////////// private void handle(CounterCurrencyTransferStartedMessage message, NodeAddress peer) { - expectedPhase(Trade.Phase.DEPOSIT_CONFIRMED) - .on(message) + given(phase(Trade.Phase.DEPOSIT_CONFIRMED) + .with(message) .from(peer) .preCondition(trade.getPayoutTx() == null, () -> { - log.warn("We received a CounterCurrencyTransferStartedMessage but we have already created the payout tx " + + log.warn("We with a CounterCurrencyTransferStartedMessage but we have already created the payout tx " + "so we ignore the message. This can happen if the ACK message to the peer did not " + "arrive and the peer repeats sending us the message. We send another ACK msg."); sendAckMessage(message, true, null); processModel.removeMailboxMessageAfterProcessing(trade); - }) - .addTasks( + })) + .setup(tasks( SellerProcessCounterCurrencyTransferStartedMessage.class, ApplyFilter.class, - MakerVerifyTakerFeePayment.class - ) - .runTasks(); + MakerVerifyTakerFeePayment.class)) + .executeTasks(); } @@ -186,27 +182,26 @@ public class SellerAsMakerProtocol extends TradeProtocol implements SellerProtoc @Override public void onFiatPaymentReceived(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { SellerEvent event = SellerEvent.PAYMENT_RECEIVED; - expectedPhase(Trade.Phase.FIAT_SENT) - .on(event) - .preCondition(!wasDisputed()) - .setTaskRunner(new TradeTaskRunner(trade, - () -> { - resultHandler.handleResult(); - handleTaskRunnerSuccess(event); - }, - (errorMessage) -> { - errorMessageHandler.handleErrorMessage(errorMessage); - handleTaskRunnerFault(event, errorMessage); - })) - .addTasks( + given(phase(Trade.Phase.FIAT_SENT) + .with(event) + .preCondition(!wasDisputed())) + .setup(tasks( ApplyFilter.class, MakerVerifyTakerFeePayment.class, SellerSignAndFinalizePayoutTx.class, SellerBroadcastPayoutTx.class, - SellerSendPayoutTxPublishedMessage.class - ) + SellerSendPayoutTxPublishedMessage.class) + .using(new TradeTaskRunner(trade, + () -> { + resultHandler.handleResult(); + handleTaskRunnerSuccess(event); + }, + (errorMessage) -> { + errorMessageHandler.handleErrorMessage(errorMessage); + handleTaskRunnerFault(event, errorMessage); + }))) .run(() -> trade.setState(Trade.State.SELLER_CONFIRMED_IN_UI_FIAT_PAYMENT_RECEIPT)) - .runTasks(); + .executeTasks(); } diff --git a/core/src/main/java/bisq/core/trade/protocol/SellerAsTakerProtocol.java b/core/src/main/java/bisq/core/trade/protocol/SellerAsTakerProtocol.java index baf59f61a9..3a5090f06a 100644 --- a/core/src/main/java/bisq/core/trade/protocol/SellerAsTakerProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/SellerAsTakerProtocol.java @@ -87,21 +87,22 @@ public class SellerAsTakerProtocol extends TradeProtocol implements SellerProtoc /////////////////////////////////////////////////////////////////////////////////////////// - // Start trade + // Take offer /////////////////////////////////////////////////////////////////////////////////////////// @Override public void takeAvailableOffer() { - expectedPhase(Trade.Phase.INIT) - .on(TakerEvent.TAKE_OFFER) - .withTimeout(30) - .addTasks( + given(phase(Trade.Phase.INIT) + .with(TakerEvent.TAKE_OFFER)) + .setup(tasks( ApplyFilter.class, TakerVerifyMakerFeePayment.class, CreateTakerFeeTx.class, // SellerAsTakerCreatesDepositTxInputs.class, - TakerSendInputsForDepositTxRequest.class - ).runTasks(); + TakerSendInputsForDepositTxRequest.class) + .withTimeout(30)) + .run(() -> processModel.setTempTradingPeerNodeAddress(trade.getTradingPeerNodeAddress())) + .executeTasks(); } @@ -110,11 +111,10 @@ public class SellerAsTakerProtocol extends TradeProtocol implements SellerProtoc /////////////////////////////////////////////////////////////////////////////////////////// private void handle(InputsForDepositTxResponse message, NodeAddress peer) { - expectedPhase(Trade.Phase.INIT) - .on(message) - .from(peer) - .withTimeout(30) - .addTasks( + given(phase(Trade.Phase.INIT) + .with(message) + .from(peer)) + .setup(tasks( TakerProcessesInputsForDepositTxResponse.class, ApplyFilter.class, VerifyPeersAccountAgeWitness.class, @@ -122,23 +122,21 @@ public class SellerAsTakerProtocol extends TradeProtocol implements SellerProtoc TakerPublishFeeTx.class, SellerAsTakerSignsDepositTx.class, SellerCreatesDelayedPayoutTx.class, - SellerSendDelayedPayoutTxSignatureRequest.class - ) - .runTasks(); + SellerSendDelayedPayoutTxSignatureRequest.class) + .withTimeout(30)) + .executeTasks(); } private void handle(DelayedPayoutTxSignatureResponse message, NodeAddress peer) { - expectedPhase(Trade.Phase.TAKER_FEE_PUBLISHED) - .on(message) - .from(peer) - .addTasks( - SellerProcessDelayedPayoutTxSignatureResponse.class, + given(phase(Trade.Phase.TAKER_FEE_PUBLISHED) + .with(message) + .from(peer)) + .setup(tasks(SellerProcessDelayedPayoutTxSignatureResponse.class, SellerSignsDelayedPayoutTx.class, SellerFinalizesDelayedPayoutTx.class, SellerSendsDepositTxAndDelayedPayoutTxMessage.class, SellerPublishesDepositTx.class, - PublishTradeStatistics.class - ) + PublishTradeStatistics.class)) .run(() -> { // We stop timeout here and don't start a new one as the // SellerSendsDepositTxAndDelayedPayoutTxMessage repeats the send the message and has it's own @@ -148,7 +146,7 @@ public class SellerAsTakerProtocol extends TradeProtocol implements SellerProtoc //TODO still needed? If so move to witness domain processModel.witnessDebugLog(trade); }) - .runTasks(); + .executeTasks(); } @@ -157,8 +155,8 @@ public class SellerAsTakerProtocol extends TradeProtocol implements SellerProtoc /////////////////////////////////////////////////////////////////////////////////////////// private void handle(CounterCurrencyTransferStartedMessage message, NodeAddress peer) { - expectedPhase(Trade.Phase.DEPOSIT_CONFIRMED) - .on(message) + given(phase(Trade.Phase.DEPOSIT_CONFIRMED) + .with(message) .from(peer) .preCondition(trade.getPayoutTx() == null, () -> { @@ -167,13 +165,12 @@ public class SellerAsTakerProtocol extends TradeProtocol implements SellerProtoc "arrive and the peer repeats sending us the message. We send another ACK msg."); sendAckMessage(message, true, null); processModel.removeMailboxMessageAfterProcessing(trade); - }) - .addTasks( + })) + .setup(tasks( SellerProcessCounterCurrencyTransferStartedMessage.class, ApplyFilter.class, - TakerVerifyMakerFeePayment.class - ) - .runTasks(); + TakerVerifyMakerFeePayment.class)) + .executeTasks(); } @@ -184,27 +181,27 @@ public class SellerAsTakerProtocol extends TradeProtocol implements SellerProtoc @Override public void onFiatPaymentReceived(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { SellerEvent event = SellerEvent.PAYMENT_RECEIVED; - expectedPhase(Trade.Phase.FIAT_SENT) - .on(event) - .preCondition(!wasDisputed()) - .setTaskRunner(new TradeTaskRunner(trade, - () -> { - resultHandler.handleResult(); - handleTaskRunnerSuccess(event); - }, - (errorMessage) -> { - errorMessageHandler.handleErrorMessage(errorMessage); - handleTaskRunnerFault(event, errorMessage); - })) - .addTasks( + given(phase(Trade.Phase.FIAT_SENT) + .with(event) + .preCondition(!wasDisputed())) + .setup(tasks( ApplyFilter.class, TakerVerifyMakerFeePayment.class, SellerSignAndFinalizePayoutTx.class, SellerBroadcastPayoutTx.class, - SellerSendPayoutTxPublishedMessage.class //TODO add repeated msg send, check UI - ) + //TODO add repeated msg send, check UI + SellerSendPayoutTxPublishedMessage.class) + .using(new TradeTaskRunner(trade, + () -> { + resultHandler.handleResult(); + handleTaskRunnerSuccess(event); + }, + (errorMessage) -> { + errorMessageHandler.handleErrorMessage(errorMessage); + handleTaskRunnerFault(event, errorMessage); + }))) .run(() -> trade.setState(Trade.State.SELLER_CONFIRMED_IN_UI_FIAT_PAYMENT_RECEIPT)) - .runTasks(); + .executeTasks(); } diff --git a/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java b/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java index 8b36a2b05b..06f09b58a5 100644 --- a/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java +++ b/core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java @@ -60,12 +60,14 @@ import java.security.PublicKey; import java.util.HashSet; import java.util.Set; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import javax.annotation.Nullable; import static bisq.core.util.Validator.isTradeIdValid; import static bisq.core.util.Validator.nonEmptyStringOf; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @Slf4j @@ -442,109 +444,171 @@ public abstract class TradeProtocol { /////////////////////////////////////////////////////////////////////////////////////////// - // FluentProcess + // FluentProtocol /////////////////////////////////////////////////////////////////////////////////////////// - protected FluentProcess expectedPhase(Trade.Phase phase) { - return new FluentProcess(trade, phase); + protected FluentProtocol given(Condition condition) { + return new FluentProtocol(condition); } - protected FluentProcess expectedPhases(Trade.Phase... phase) { - return new FluentProcess(trade, phase); + protected Condition phase(Trade.Phase expectedPhase) { + return new Condition(trade, expectedPhase); } - class FluentProcess { - private final Trade trade; - @Nullable - private TradeMessage message; - private final Set expectedPhases = new HashSet<>(); - private final Set preConditions = new HashSet<>(); - @Nullable - private Event event; - private Runnable preConditionFailedHandler; - private int timeoutSec; - private NodeAddress peersNodeAddress; - private TradeTaskRunner taskRunner; + protected Condition anyPhase(Trade.Phase... expectedPhases) { + return new Condition(trade, expectedPhases); + } - public FluentProcess(Trade trade, - Trade.Phase expectedPhase) { - this.trade = trade; - this.expectedPhases.add(expectedPhase); + @SafeVarargs + public final Setup tasks(Class>... tasks) { + return new Setup(trade, tasks); + } + + // Main class. Contains the condition and setup, if condition is valid it will execute the + // taskRunner and the optional runnable. + class FluentProtocol { + private final Condition condition; + private Setup setup; + + public FluentProtocol(Condition condition) { + this.condition = condition; } - public FluentProcess(Trade trade, - Trade.Phase... expectedPhases) { - this.trade = trade; - this.expectedPhases.addAll(Set.of(expectedPhases)); + protected FluentProtocol setup(Setup setup) { + this.setup = setup; + return this; } - public FluentProcess run(Runnable runnable) { - if (isValid()) { + // Can be used before or after executeTasks + public FluentProtocol run(Runnable runnable) { + if (condition.isValid()) { runnable.run(); } return this; } - public FluentProcess runTasks() { - if (isValid()) { - if (timeoutSec > 0) { - startTimeout(timeoutSec); + public FluentProtocol executeTasks() { + if (condition.isValid()) { + if (setup.getTimeoutSec() > 0) { + startTimeout(setup.getTimeoutSec()); } - if (peersNodeAddress != null) { - processModel.setTempTradingPeerNodeAddress(peersNodeAddress); + NodeAddress peer = condition.getPeersNodeAddress(); + if (peer != null) { + processModel.setTempTradingPeerNodeAddress(peer); } + TradeMessage message = condition.getMessage(); if (message != null) { processModel.setTradeMessage(message); } - + TradeTaskRunner taskRunner = setup.getTaskRunner(message, condition.getEvent()); + taskRunner.addTasks(setup.getTasks()); taskRunner.run(); } + return this; + } + } + // + static class Condition { + private final Trade trade; + @Nullable + @Getter + private TradeMessage message; + private final Set expectedPhases = new HashSet<>(); + private final Set expectedStates = new HashSet<>(); + private final Set preConditions = new HashSet<>(); + @Nullable + @Getter + private Event event; + @Getter + private NodeAddress peersNodeAddress; + private boolean isValid; + private boolean isValidated; + private Runnable preConditionFailedHandler; + + public Condition(Trade trade, Trade.Phase expectedPhase) { + this.expectedPhases.add(expectedPhase); + this.trade = trade; + } + + public Condition(Trade trade, Trade.Phase... expectedPhases) { + this.expectedPhases.addAll(Set.of(expectedPhases)); + this.trade = trade; + } + + public Condition state(Trade.State state) { + this.expectedStates.add(state); + return this; + } + + public Condition anyState(Trade.State... states) { + this.expectedStates.addAll(Set.of(states)); + return this; + } + + public Condition with(Event event) { + checkArgument(!isValidated); + this.event = event; + return this; + } + + public Condition with(TradeMessage tradeMessage) { + checkArgument(!isValidated); + this.message = tradeMessage; + return this; + } + + public Condition from(NodeAddress peersNodeAddress) { + checkArgument(!isValidated); + this.peersNodeAddress = peersNodeAddress; + return this; + } + + public Condition preCondition(boolean preCondition) { + checkArgument(!isValidated); + preConditions.add(preCondition); + return this; + } + + public Condition preCondition(boolean preCondition, Runnable conditionFailedHandler) { + checkArgument(!isValidated); + preConditions.add(preCondition); + this.preConditionFailedHandler = conditionFailedHandler; return this; } private boolean isValid() { - boolean isPhaseValid = isPhaseValid(); - boolean allPreConditionsMet = preConditions.stream().allMatch(e -> e); - boolean isTradeIdValid = message == null || isTradeIdValid(processModel.getOfferId(), message); + if (!isValidated) { + boolean isPhaseValid = isPhaseValid(); + boolean isStateValid = isStateValid(); - if (!allPreConditionsMet) { - log.error("PreConditions not met. preConditions={}, this={}", preConditions, this); - if (preConditionFailedHandler != null) { - preConditionFailedHandler.run(); + boolean allPreConditionsMet = preConditions.stream().allMatch(e -> e); + boolean isTradeIdValid = message == null || isTradeIdValid(trade.getId(), message); + + if (!allPreConditionsMet) { + log.error("PreConditions not met. preConditions={}, this={}", preConditions, this); + if (preConditionFailedHandler != null) { + preConditionFailedHandler.run(); + } } - } - if (!isTradeIdValid) { - log.error("TradeId does not match tradeId in message, TradeId={}, tradeId in message={}", - trade.getId(), message.getTradeId()); - } - - return isPhaseValid && allPreConditionsMet && isTradeIdValid; - } - - @SafeVarargs - public final FluentProcess addTasks(Class>... tasks) { - if (taskRunner == null) { - if (message != null) { - taskRunner = new TradeTaskRunner(trade, - () -> handleTaskRunnerSuccess(message), - errorMessage -> handleTaskRunnerFault(message, errorMessage)); - } else if (event != null) { - taskRunner = new TradeTaskRunner(trade, - () -> handleTaskRunnerSuccess(event), - errorMessage -> handleTaskRunnerFault(event, errorMessage)); - } else { - throw new IllegalStateException("addTasks must not be called without message or event " + - "set in case no taskRunner has been created yet"); + if (!isTradeIdValid) { + log.error("TradeId does not match tradeId in message, TradeId={}, tradeId in message={}", + trade.getId(), message.getTradeId()); } + + isValid = isPhaseValid && isStateValid && allPreConditionsMet && isTradeIdValid; + isValidated = true; } - taskRunner.addTasks(tasks); - return this; + return isValid; } private boolean isPhaseValid() { + if (expectedPhases.isEmpty()) { + return true; + } + boolean isPhaseValid = expectedPhases.stream().anyMatch(e -> e == trade.getPhase()); String trigger = message != null ? message.getClass().getSimpleName() : @@ -568,60 +632,75 @@ public abstract class TradeProtocol { return isPhaseValid; } - public FluentProcess orInPhase(Trade.Phase phase) { - expectedPhases.add(phase); - return this; + private boolean isStateValid() { + if (expectedStates.isEmpty()) { + return true; + } + + boolean isStateValid = expectedStates.stream().anyMatch(e -> e == trade.getState()); + String trigger = message != null ? + message.getClass().getSimpleName() : + event != null ? + event.name() + " event" : + ""; + if (isStateValid) { + log.info("We received {} at state {}", + trigger, + trade.getState()); + } else { + log.error("We received {} but we are are not in the correct state. Expected states={}, " + + "Trade state= {} ", + trigger, + expectedStates, + trade.getState()); + } + + return isStateValid; + } + } + + // Setup for task runner + class Setup { + private final Trade trade; + @Getter + private final Class>[] tasks; + @Getter + private int timeoutSec; + @Nullable + private TradeTaskRunner taskRunner; + + @SafeVarargs + public Setup(Trade trade, Class>... tasks) { + this.trade = trade; + this.tasks = tasks; } - public FluentProcess on(Event event) { - this.event = event; - return this; - } - - public FluentProcess on(TradeMessage tradeMessage) { - this.message = tradeMessage; - return this; - } - - public FluentProcess preCondition(boolean preCondition) { - preConditions.add(preCondition); - return this; - } - - public FluentProcess preCondition(boolean preCondition, Runnable conditionFailedHandler) { - preConditions.add(preCondition); - this.preConditionFailedHandler = conditionFailedHandler; - return this; - } - - public FluentProcess withTimeout(int timeoutSec) { + public Setup withTimeout(int timeoutSec) { this.timeoutSec = timeoutSec; return this; } - public FluentProcess from(NodeAddress peersNodeAddress) { - this.peersNodeAddress = peersNodeAddress; - return this; - } - - public FluentProcess setTaskRunner(TradeTaskRunner taskRunner) { + public Setup using(TradeTaskRunner taskRunner) { this.taskRunner = taskRunner; return this; } - @Override - public String toString() { - return "FluentProcess{" + - "\n trade=" + trade + - ",\n message=" + message + - ",\n expectedPhases=" + expectedPhases + - ",\n preConditions=" + preConditions + - ",\n event=" + event + - ",\n preConditionFailedHandler=" + preConditionFailedHandler + - ",\n timeoutSec=" + timeoutSec + - ",\n peersNodeAddress=" + peersNodeAddress + - ",\n taskRunner=" + taskRunner + - "\n}"; + private TradeTaskRunner getTaskRunner(@Nullable TradeMessage message, @Nullable Event event) { + if (taskRunner == null) { + if (message != null) { + taskRunner = new TradeTaskRunner(trade, + () -> handleTaskRunnerSuccess(message), + errorMessage -> handleTaskRunnerFault(message, errorMessage)); + } else if (event != null) { + taskRunner = new TradeTaskRunner(trade, + () -> handleTaskRunnerSuccess(event), + errorMessage -> handleTaskRunnerFault(event, errorMessage)); + } else { + throw new IllegalStateException("addTasks must not be called without message or event " + + "set in case no taskRunner has been created yet"); + } + } + return taskRunner; } } }