Redesigned fluent API again...

Move code duplication to Buyer protocol
This commit is contained in:
chimp1984 2020-09-23 16:21:29 -05:00
parent 54d625e564
commit cdbd6cdfa8
No known key found for this signature in database
GPG key ID: 9801B4EC591F90E3
6 changed files with 436 additions and 391 deletions

View file

@ -11,7 +11,7 @@
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details. * License for more details.
* *
* You should have received a copy of the GNU Affero General Public License * You should have with a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>. * along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/ */
@ -62,28 +62,7 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol
public BuyerAsMakerProtocol(BuyerAsMakerTrade trade) { public BuyerAsMakerProtocol(BuyerAsMakerTrade trade) {
super(trade); super(trade);
Trade.Phase phase = trade.getState().getPhase(); maybeSetupTaskRunners(trade, processModel, this);
if (phase == Trade.Phase.TAKER_FEE_PUBLISHED) {
TradeTaskRunner taskRunner = new TradeTaskRunner(trade,
() -> handleTaskRunnerSuccess(BuyerEvent.STARTUP),
errorMessage -> handleTaskRunnerFault(BuyerEvent.STARTUP, errorMessage));
taskRunner.addTasks(BuyerSetupDepositTxListener.class);
taskRunner.run();
} else if (trade.isFiatSent() && !trade.isPayoutPublished()) {
TradeTaskRunner taskRunner = new TradeTaskRunner(trade,
() -> handleTaskRunnerSuccess(BuyerEvent.STARTUP),
errorMessage -> handleTaskRunnerFault(BuyerEvent.STARTUP, errorMessage));
taskRunner.addTasks(BuyerSetupPayoutTxListener.class);
if (trade.getState() == Trade.State.BUYER_STORED_IN_MAILBOX_FIAT_PAYMENT_INITIATED_MSG ||
trade.getState() == Trade.State.BUYER_SEND_FAILED_FIAT_PAYMENT_INITIATED_MSG) {
// In case we have not received an ACK from the CounterCurrencyTransferStartedMessage we re-send it
// periodically in BuyerSendCounterCurrencyTransferStartedMessage
taskRunner.addTasks(BuyerSendCounterCurrencyTransferStartedMessage.class);
}
taskRunner.run();
}
} }
@ -104,24 +83,17 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Start trade // Handle take offer request
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@Override @Override
public void handleTakeOfferRequest(InputsForDepositTxRequest message, public void handleTakeOfferRequest(InputsForDepositTxRequest message,
NodeAddress peer, NodeAddress peer,
ErrorMessageHandler errorMessageHandler) { ErrorMessageHandler errorMessageHandler) {
expectedPhase(Trade.Phase.INIT) given(phase(Trade.Phase.INIT)
.on(message) .with(message)
.from(peer) .from(peer))
.withTimeout(30) .setup(tasks(
.setTaskRunner(new TradeTaskRunner(trade,
() -> handleTaskRunnerSuccess(message),
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(message, errorMessage);
}))
.addTasks(
MakerProcessesInputsForDepositTxRequest.class, MakerProcessesInputsForDepositTxRequest.class,
ApplyFilter.class, ApplyFilter.class,
VerifyPeersAccountAgeWitness.class, VerifyPeersAccountAgeWitness.class,
@ -130,9 +102,15 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol
MakerCreateAndSignContract.class, MakerCreateAndSignContract.class,
BuyerAsMakerCreatesAndSignsDepositTx.class, BuyerAsMakerCreatesAndSignsDepositTx.class,
BuyerSetupDepositTxListener.class, BuyerSetupDepositTxListener.class,
BuyerAsMakerSendsInputsForDepositTxResponse.class BuyerAsMakerSendsInputsForDepositTxResponse.class).
) using(new TradeTaskRunner(trade,
.runTasks(); () -> handleTaskRunnerSuccess(message),
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(message, errorMessage);
}))
.withTimeout(30))
.executeTasks();
} }
@ -141,20 +119,20 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private void handle(DelayedPayoutTxSignatureRequest message, NodeAddress peer) { private void handle(DelayedPayoutTxSignatureRequest message, NodeAddress peer) {
expectedPhase(Trade.Phase.TAKER_FEE_PUBLISHED) given(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
.on(message) .with(message)
.from(peer) .from(peer))
.withTimeout(30) .setup(tasks(
.addTasks(
BuyerProcessDelayedPayoutTxSignatureRequest.class, BuyerProcessDelayedPayoutTxSignatureRequest.class,
BuyerVerifiesPreparedDelayedPayoutTx.class, BuyerVerifiesPreparedDelayedPayoutTx.class,
BuyerSignsDelayedPayoutTx.class, BuyerSignsDelayedPayoutTx.class,
BuyerSendsDelayedPayoutTxSignatureResponse.class BuyerSendsDelayedPayoutTxSignatureResponse.class)
).runTasks(); .withTimeout(30))
.executeTasks();
} }
// The DepositTxAndDelayedPayoutTxMessage is a mailbox message as earlier we use only the deposit tx which can // The DepositTxAndDelayedPayoutTxMessage is a mailbox message as earlier we use only the deposit tx which can
// be also received from the network once published. // be also with from the network once published.
// Now we send the delayed payout tx as well and with that this message is mandatory for continuing the protocol. // Now we send the delayed payout tx as well and with that this message is mandatory for continuing the protocol.
// We do not support mailbox message handling during the take offer process as it is expected that both peers // We do not support mailbox message handling during the take offer process as it is expected that both peers
// are online. // are online.
@ -162,31 +140,30 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol
// mailbox message but the stored in mailbox case is not expected and the seller would try to send the message again // mailbox message but the stored in mailbox case is not expected and the seller would try to send the message again
// in the hope to reach the buyer directly. // in the hope to reach the buyer directly.
private void handle(DepositTxAndDelayedPayoutTxMessage message, NodeAddress peer) { private void handle(DepositTxAndDelayedPayoutTxMessage message, NodeAddress peer) {
expectedPhases(Trade.Phase.TAKER_FEE_PUBLISHED, Trade.Phase.DEPOSIT_PUBLISHED) given(anyPhase(Trade.Phase.TAKER_FEE_PUBLISHED, Trade.Phase.DEPOSIT_PUBLISHED)
.on(message) .with(message)
.from(peer) .from(peer)
.preCondition(trade.getDepositTx() == null || trade.getDelayedPayoutTx() == null, .preCondition(trade.getDepositTx() == null || trade.getDelayedPayoutTx() == null,
() -> { () -> {
log.warn("We received a DepositTxAndDelayedPayoutTxMessage but we have already processed the deposit and " + log.warn("We with a DepositTxAndDelayedPayoutTxMessage but we have already processed the deposit and " +
"delayed payout tx so we ignore the message. This can happen if the ACK message to the peer did not " + "delayed payout tx so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg."); "arrive and the peer repeats sending us the message. We send another ACK msg.");
stopTimeout(); stopTimeout();
sendAckMessage(message, true, null); sendAckMessage(message, true, null);
processModel.removeMailboxMessageAfterProcessing(trade); processModel.removeMailboxMessageAfterProcessing(trade);
}) }))
.setTaskRunner(new TradeTaskRunner(trade, .setup(tasks(
() -> {
stopTimeout();
handleTaskRunnerSuccess(message);
},
errorMessage -> handleTaskRunnerFault(message, errorMessage)))
.addTasks(
BuyerProcessDepositTxAndDelayedPayoutTxMessage.class, BuyerProcessDepositTxAndDelayedPayoutTxMessage.class,
BuyerVerifiesFinalDelayedPayoutTx.class, BuyerVerifiesFinalDelayedPayoutTx.class,
PublishTradeStatistics.class PublishTradeStatistics.class)
) .using(new TradeTaskRunner(trade,
() -> {
stopTimeout();
handleTaskRunnerSuccess(message);
},
errorMessage -> handleTaskRunnerFault(message, errorMessage))))
.run(() -> processModel.witnessDebugLog(trade)) .run(() -> processModel.witnessDebugLog(trade))
.runTasks(); .executeTasks();
} }
@ -197,27 +174,26 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol
@Override @Override
public void onFiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { public void onFiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
BuyerEvent event = BuyerEvent.PAYMENT_SENT; BuyerEvent event = BuyerEvent.PAYMENT_SENT;
expectedPhase(Trade.Phase.DEPOSIT_CONFIRMED) given(phase(Trade.Phase.DEPOSIT_CONFIRMED)
.on(event) .with(event)
.preCondition(!wasDisputed()) .preCondition(!wasDisputed()))
.setTaskRunner(new TradeTaskRunner(trade, .setup(tasks(
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess(event);
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(event, errorMessage);
}))
.addTasks(
ApplyFilter.class, ApplyFilter.class,
MakerVerifyTakerFeePayment.class, MakerVerifyTakerFeePayment.class,
BuyerSignPayoutTx.class, BuyerSignPayoutTx.class,
BuyerSetupPayoutTxListener.class, BuyerSetupPayoutTxListener.class,
BuyerSendCounterCurrencyTransferStartedMessage.class BuyerSendCounterCurrencyTransferStartedMessage.class)
) .using(new TradeTaskRunner(trade,
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess(event);
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(event, errorMessage);
})))
.run(() -> trade.setState(Trade.State.BUYER_CONFIRMED_IN_UI_FIAT_PAYMENT_INITIATED)) .run(() -> trade.setState(Trade.State.BUYER_CONFIRMED_IN_UI_FIAT_PAYMENT_INITIATED))
.runTasks(); .executeTasks();
} }
@ -226,13 +202,11 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private void handle(PayoutTxPublishedMessage message, NodeAddress peer) { private void handle(PayoutTxPublishedMessage message, NodeAddress peer) {
expectedPhases(Trade.Phase.FIAT_SENT, Trade.Phase.PAYOUT_PUBLISHED) given(anyPhase(Trade.Phase.FIAT_SENT, Trade.Phase.PAYOUT_PUBLISHED)
.on(message) .with(message)
.from(peer) .from(peer))
.addTasks( .setup(tasks(BuyerProcessPayoutTxPublishedMessage.class))
BuyerProcessPayoutTxPublishedMessage.class .executeTasks();
)
.runTasks();
} }

View file

@ -11,14 +11,13 @@
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details. * License for more details.
* *
* You should have received a copy of the GNU Affero General Public License * You should have with a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>. * along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/ */
package bisq.core.trade.protocol; package bisq.core.trade.protocol;
import bisq.core.offer.Offer;
import bisq.core.trade.BuyerAsTakerTrade; import bisq.core.trade.BuyerAsTakerTrade;
import bisq.core.trade.Trade; import bisq.core.trade.Trade;
import bisq.core.trade.messages.DelayedPayoutTxSignatureRequest; import bisq.core.trade.messages.DelayedPayoutTxSignatureRequest;
@ -57,8 +56,6 @@ import bisq.common.handlers.ResultHandler;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import static com.google.common.base.Preconditions.checkNotNull;
@Slf4j @Slf4j
public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol, TakerProtocol { public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol, TakerProtocol {
@ -69,32 +66,7 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
public BuyerAsTakerProtocol(BuyerAsTakerTrade trade) { public BuyerAsTakerProtocol(BuyerAsTakerTrade trade) {
super(trade); super(trade);
Offer offer = checkNotNull(trade.getOffer()); maybeSetupTaskRunners(trade, processModel, this);
processModel.getTradingPeer().setPubKeyRing(offer.getPubKeyRing());
Trade.Phase phase = trade.getState().getPhase();
if (phase == Trade.Phase.TAKER_FEE_PUBLISHED) {
TradeTaskRunner taskRunner = new TradeTaskRunner(trade,
() -> handleTaskRunnerSuccess(BuyerEvent.STARTUP),
errorMessage -> handleTaskRunnerFault(BuyerEvent.STARTUP, errorMessage));
taskRunner.addTasks(BuyerSetupDepositTxListener.class);
taskRunner.run();
} else if (trade.isFiatSent() && !trade.isPayoutPublished()) {
TradeTaskRunner taskRunner = new TradeTaskRunner(trade,
() -> handleTaskRunnerSuccess(BuyerEvent.STARTUP),
errorMessage -> handleTaskRunnerFault(BuyerEvent.STARTUP, errorMessage));
taskRunner.addTasks(BuyerSetupPayoutTxListener.class);
if (trade.getState() == Trade.State.BUYER_STORED_IN_MAILBOX_FIAT_PAYMENT_INITIATED_MSG ||
trade.getState() == Trade.State.BUYER_SEND_FAILED_FIAT_PAYMENT_INITIATED_MSG) {
// In case we have not received an ACK from the CounterCurrencyTransferStartedMessage we re-send it
// periodically in BuyerSendCounterCurrencyTransferStartedMessage
taskRunner.addTasks(BuyerSendCounterCurrencyTransferStartedMessage.class);
}
taskRunner.run();
}
} }
@ -103,33 +75,34 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@Override @Override
public void doApplyMailboxTradeMessage(TradeMessage message, NodeAddress peerNodeAddress) { public void doApplyMailboxTradeMessage(TradeMessage message, NodeAddress peer) {
super.doApplyMailboxTradeMessage(message, peerNodeAddress); super.doApplyMailboxTradeMessage(message, peer);
if (message instanceof DepositTxAndDelayedPayoutTxMessage) { if (message instanceof DepositTxAndDelayedPayoutTxMessage) {
handle((DepositTxAndDelayedPayoutTxMessage) message, peerNodeAddress); handle((DepositTxAndDelayedPayoutTxMessage) message, peer);
} else if (message instanceof PayoutTxPublishedMessage) { } else if (message instanceof PayoutTxPublishedMessage) {
handle((PayoutTxPublishedMessage) message, peerNodeAddress); handle((PayoutTxPublishedMessage) message, peer);
} }
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Start trade // Take offer
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@Override @Override
public void takeAvailableOffer() { public void takeAvailableOffer() {
expectedPhase(Trade.Phase.INIT) given(phase(Trade.Phase.INIT)
.on(TakerEvent.TAKE_OFFER) .with(TakerEvent.TAKE_OFFER))
.withTimeout(30) .setup(tasks(
.addTasks(
ApplyFilter.class, ApplyFilter.class,
TakerVerifyMakerFeePayment.class, TakerVerifyMakerFeePayment.class,
CreateTakerFeeTx.class, CreateTakerFeeTx.class,
BuyerAsTakerCreatesDepositTxInputs.class, BuyerAsTakerCreatesDepositTxInputs.class,
TakerSendInputsForDepositTxRequest.class TakerSendInputsForDepositTxRequest.class)
).runTasks(); .withTimeout(30))
.run(() -> processModel.setTempTradingPeerNodeAddress(trade.getTradingPeerNodeAddress()))
.executeTasks();
} }
@ -138,11 +111,10 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private void handle(InputsForDepositTxResponse message, NodeAddress peer) { private void handle(InputsForDepositTxResponse message, NodeAddress peer) {
expectedPhase(Trade.Phase.INIT) given(phase(Trade.Phase.INIT)
.on(message) .with(message)
.from(peer) .from(peer))
.withTimeout(30) .setup(tasks(TakerProcessesInputsForDepositTxResponse.class,
.addTasks(TakerProcessesInputsForDepositTxResponse.class,
ApplyFilter.class, ApplyFilter.class,
VerifyPeersAccountAgeWitness.class, VerifyPeersAccountAgeWitness.class,
TakerVerifyAndSignContract.class, TakerVerifyAndSignContract.class,
@ -150,25 +122,25 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
BuyerAsTakerSignsDepositTx.class, BuyerAsTakerSignsDepositTx.class,
BuyerSetupDepositTxListener.class, BuyerSetupDepositTxListener.class,
BuyerAsTakerSendsDepositTxMessage.class) BuyerAsTakerSendsDepositTxMessage.class)
.runTasks(); .withTimeout(30))
.executeTasks();
} }
private void handle(DelayedPayoutTxSignatureRequest message, NodeAddress peer) { private void handle(DelayedPayoutTxSignatureRequest message, NodeAddress peer) {
expectedPhase(Trade.Phase.TAKER_FEE_PUBLISHED) given(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
.on(message) .with(message)
.from(peer) .from(peer))
.withTimeout(30) .setup(tasks(
.addTasks(
BuyerProcessDelayedPayoutTxSignatureRequest.class, BuyerProcessDelayedPayoutTxSignatureRequest.class,
BuyerVerifiesPreparedDelayedPayoutTx.class, BuyerVerifiesPreparedDelayedPayoutTx.class,
BuyerSignsDelayedPayoutTx.class, BuyerSignsDelayedPayoutTx.class,
BuyerSendsDelayedPayoutTxSignatureResponse.class BuyerSendsDelayedPayoutTxSignatureResponse.class)
) .withTimeout(30))
.runTasks(); .executeTasks();
} }
// The DepositTxAndDelayedPayoutTxMessage is a mailbox message as earlier we use only the deposit tx which can // The DepositTxAndDelayedPayoutTxMessage is a mailbox message as earlier we use only the deposit tx which can
// be also received from the network once published. // be also with from the network once published.
// Now we send the delayed payout tx as well and with that this message is mandatory for continuing the protocol. // Now we send the delayed payout tx as well and with that this message is mandatory for continuing the protocol.
// We do not support mailbox message handling during the take offer process as it is expected that both peers // We do not support mailbox message handling during the take offer process as it is expected that both peers
// are online. // are online.
@ -176,29 +148,29 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
// mailbox message but the stored in mailbox case is not expected and the seller would try to send the message again // mailbox message but the stored in mailbox case is not expected and the seller would try to send the message again
// in the hope to reach the buyer directly. // in the hope to reach the buyer directly.
private void handle(DepositTxAndDelayedPayoutTxMessage message, NodeAddress peer) { private void handle(DepositTxAndDelayedPayoutTxMessage message, NodeAddress peer) {
expectedPhases(Trade.Phase.TAKER_FEE_PUBLISHED, Trade.Phase.DEPOSIT_PUBLISHED) given(anyPhase(Trade.Phase.TAKER_FEE_PUBLISHED, Trade.Phase.DEPOSIT_PUBLISHED)
.on(message) .with(message)
.from(peer) .from(peer)
.preCondition(trade.getDepositTx() == null || trade.getDelayedPayoutTx() == null, .preCondition(trade.getDepositTx() == null || trade.getDelayedPayoutTx() == null,
() -> { () -> {
log.warn("We received a DepositTxAndDelayedPayoutTxMessage but we have already processed the deposit and " + log.warn("We with a DepositTxAndDelayedPayoutTxMessage but we have already processed the deposit and " +
"delayed payout tx so we ignore the message. This can happen if the ACK message to the peer did not " + "delayed payout tx so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg."); "arrive and the peer repeats sending us the message. We send another ACK msg.");
stopTimeout(); stopTimeout();
sendAckMessage(message, true, null); sendAckMessage(message, true, null);
processModel.removeMailboxMessageAfterProcessing(trade); processModel.removeMailboxMessageAfterProcessing(trade);
}) }))
.setTaskRunner(new TradeTaskRunner(trade, .setup(tasks(BuyerProcessDepositTxAndDelayedPayoutTxMessage.class,
() -> {
stopTimeout();
handleTaskRunnerSuccess(message);
},
errorMessage -> handleTaskRunnerFault(message, errorMessage)))
.addTasks(BuyerProcessDepositTxAndDelayedPayoutTxMessage.class,
BuyerVerifiesFinalDelayedPayoutTx.class, BuyerVerifiesFinalDelayedPayoutTx.class,
PublishTradeStatistics.class) PublishTradeStatistics.class)
.runTasks(); .using(new TradeTaskRunner(trade,
//processModel.witnessDebugLog(buyerAsTakerTrade); () -> {
stopTimeout();
handleTaskRunnerSuccess(message);
},
errorMessage -> handleTaskRunnerFault(message, errorMessage))))
.run(() -> processModel.witnessDebugLog(trade))
.executeTasks();
} }
@ -210,26 +182,25 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
@Override @Override
public void onFiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { public void onFiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
BuyerEvent event = BuyerEvent.PAYMENT_SENT; BuyerEvent event = BuyerEvent.PAYMENT_SENT;
expectedPhase(Trade.Phase.DEPOSIT_CONFIRMED) given(phase(Trade.Phase.DEPOSIT_CONFIRMED)
.on(event) .with(event)
.preCondition(!wasDisputed()) .preCondition(!wasDisputed()))
.setTaskRunner(new TradeTaskRunner(trade, .setup(tasks(ApplyFilter.class,
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess(event);
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(event, errorMessage);
}))
.addTasks(ApplyFilter.class,
TakerVerifyMakerFeePayment.class, TakerVerifyMakerFeePayment.class,
BuyerSignPayoutTx.class, BuyerSignPayoutTx.class,
BuyerSetupPayoutTxListener.class, BuyerSetupPayoutTxListener.class,
BuyerSendCounterCurrencyTransferStartedMessage.class BuyerSendCounterCurrencyTransferStartedMessage.class)
) .using(new TradeTaskRunner(trade,
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess(event);
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(event, errorMessage);
})))
.run(() -> trade.setState(Trade.State.BUYER_CONFIRMED_IN_UI_FIAT_PAYMENT_INITIATED)) .run(() -> trade.setState(Trade.State.BUYER_CONFIRMED_IN_UI_FIAT_PAYMENT_INITIATED))
.runTasks(); .executeTasks();
} }
@ -238,13 +209,11 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private void handle(PayoutTxPublishedMessage message, NodeAddress peer) { private void handle(PayoutTxPublishedMessage message, NodeAddress peer) {
expectedPhases(Trade.Phase.FIAT_SENT, Trade.Phase.PAYOUT_PUBLISHED) given(anyPhase(Trade.Phase.FIAT_SENT, Trade.Phase.PAYOUT_PUBLISHED)
.on(message) .with(message)
.from(peer) .from(peer))
.addTasks( .setup(tasks(BuyerProcessPayoutTxPublishedMessage.class))
BuyerProcessPayoutTxPublishedMessage.class .executeTasks();
)
.runTasks();
} }

View file

@ -17,9 +17,17 @@
package bisq.core.trade.protocol; package bisq.core.trade.protocol;
import bisq.core.offer.Offer;
import bisq.core.trade.Trade;
import bisq.core.trade.protocol.tasks.buyer.BuyerSendCounterCurrencyTransferStartedMessage;
import bisq.core.trade.protocol.tasks.buyer.BuyerSetupDepositTxListener;
import bisq.core.trade.protocol.tasks.buyer.BuyerSetupPayoutTxListener;
import bisq.common.handlers.ErrorMessageHandler; import bisq.common.handlers.ErrorMessageHandler;
import bisq.common.handlers.ResultHandler; import bisq.common.handlers.ResultHandler;
import static com.google.common.base.Preconditions.checkNotNull;
public interface BuyerProtocol { public interface BuyerProtocol {
void onFiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler); void onFiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler);
@ -27,4 +35,27 @@ public interface BuyerProtocol {
STARTUP, STARTUP,
PAYMENT_SENT PAYMENT_SENT
} }
default void maybeSetupTaskRunners(Trade trade, ProcessModel processModel, TradeProtocol tradeProtocol) {
Offer offer = checkNotNull(trade.getOffer());
processModel.getTradingPeer().setPubKeyRing(offer.getPubKeyRing());
tradeProtocol.given(tradeProtocol.phase(Trade.Phase.TAKER_FEE_PUBLISHED)
.with(BuyerEvent.STARTUP))
.setup(tradeProtocol.tasks(BuyerSetupDepositTxListener.class))
.executeTasks();
tradeProtocol.given(tradeProtocol.anyPhase(Trade.Phase.FIAT_SENT, Trade.Phase.FIAT_RECEIVED)
.with(BuyerEvent.STARTUP))
.setup(tradeProtocol.tasks(BuyerSetupPayoutTxListener.class))
.executeTasks();
tradeProtocol.given(tradeProtocol.anyPhase(Trade.Phase.FIAT_SENT, Trade.Phase.FIAT_RECEIVED)
.anyState(Trade.State.BUYER_STORED_IN_MAILBOX_FIAT_PAYMENT_INITIATED_MSG,
Trade.State.BUYER_SEND_FAILED_FIAT_PAYMENT_INITIATED_MSG)
.with(BuyerEvent.STARTUP))
.setup(tradeProtocol.tasks(BuyerSendCounterCurrencyTransferStartedMessage.class))
.executeTasks();
}
} }

View file

@ -82,24 +82,17 @@ public class SellerAsMakerProtocol extends TradeProtocol implements SellerProtoc
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Start trade // Handle take offer request
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@Override @Override
public void handleTakeOfferRequest(InputsForDepositTxRequest message, public void handleTakeOfferRequest(InputsForDepositTxRequest message,
NodeAddress peer, NodeAddress peer,
ErrorMessageHandler errorMessageHandler) { ErrorMessageHandler errorMessageHandler) {
expectedPhase(Trade.Phase.INIT) given(phase(Trade.Phase.INIT)
.on(message) .with(message)
.from(peer) .from(peer))
.withTimeout(30) .setup(tasks(
.setTaskRunner(new TradeTaskRunner(trade,
() -> handleTaskRunnerSuccess(message),
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(message, errorMessage);
}))
.addTasks(
MakerProcessesInputsForDepositTxRequest.class, MakerProcessesInputsForDepositTxRequest.class,
ApplyFilter.class, ApplyFilter.class,
VerifyPeersAccountAgeWitness.class, VerifyPeersAccountAgeWitness.class,
@ -107,9 +100,15 @@ public class SellerAsMakerProtocol extends TradeProtocol implements SellerProtoc
MakerSetsLockTime.class, MakerSetsLockTime.class,
MakerCreateAndSignContract.class, MakerCreateAndSignContract.class,
SellerAsMakerCreatesUnsignedDepositTx.class, SellerAsMakerCreatesUnsignedDepositTx.class,
SellerAsMakerSendsInputsForDepositTxResponse.class SellerAsMakerSendsInputsForDepositTxResponse.class)
) .using(new TradeTaskRunner(trade,
.runTasks(); () -> handleTaskRunnerSuccess(message),
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(message, errorMessage);
}))
.withTimeout(30))
.executeTasks();
} }
@ -118,39 +117,37 @@ public class SellerAsMakerProtocol extends TradeProtocol implements SellerProtoc
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
protected void handle(DepositTxMessage message, NodeAddress peer) { protected void handle(DepositTxMessage message, NodeAddress peer) {
expectedPhase(Trade.Phase.TAKER_FEE_PUBLISHED) given(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
.on(message) .with(message)
.from(peer) .from(peer))
.withTimeout(30) .setup(tasks(
.addTasks(
SellerAsMakerProcessDepositTxMessage.class, SellerAsMakerProcessDepositTxMessage.class,
SellerAsMakerFinalizesDepositTx.class, SellerAsMakerFinalizesDepositTx.class,
SellerCreatesDelayedPayoutTx.class, SellerCreatesDelayedPayoutTx.class,
SellerSendDelayedPayoutTxSignatureRequest.class SellerSendDelayedPayoutTxSignatureRequest.class)
) .withTimeout(30))
.runTasks(); .executeTasks();
} }
private void handle(DelayedPayoutTxSignatureResponse message, NodeAddress peer) { private void handle(DelayedPayoutTxSignatureResponse message, NodeAddress peer) {
expectedPhase(Trade.Phase.TAKER_FEE_PUBLISHED) given(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
.on(message) .with(message)
.from(peer) .from(peer))
.setTaskRunner(new TradeTaskRunner(trade, .setup(tasks(
() -> {
stopTimeout();
handleTaskRunnerSuccess(message);
},
errorMessage -> handleTaskRunnerFault(message, errorMessage)))
.addTasks(
SellerProcessDelayedPayoutTxSignatureResponse.class, SellerProcessDelayedPayoutTxSignatureResponse.class,
SellerSignsDelayedPayoutTx.class, SellerSignsDelayedPayoutTx.class,
SellerFinalizesDelayedPayoutTx.class, SellerFinalizesDelayedPayoutTx.class,
SellerSendsDepositTxAndDelayedPayoutTxMessage.class, SellerSendsDepositTxAndDelayedPayoutTxMessage.class,
SellerPublishesDepositTx.class, SellerPublishesDepositTx.class,
PublishTradeStatistics.class PublishTradeStatistics.class)
) .using(new TradeTaskRunner(trade,
() -> {
stopTimeout();
handleTaskRunnerSuccess(message);
},
errorMessage -> handleTaskRunnerFault(message, errorMessage))))
.run(() -> processModel.witnessDebugLog(trade)) //TODO still needed? If so move to witness domain .run(() -> processModel.witnessDebugLog(trade)) //TODO still needed? If so move to witness domain
.runTasks(); .executeTasks();
} }
@ -159,23 +156,22 @@ public class SellerAsMakerProtocol extends TradeProtocol implements SellerProtoc
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private void handle(CounterCurrencyTransferStartedMessage message, NodeAddress peer) { private void handle(CounterCurrencyTransferStartedMessage message, NodeAddress peer) {
expectedPhase(Trade.Phase.DEPOSIT_CONFIRMED) given(phase(Trade.Phase.DEPOSIT_CONFIRMED)
.on(message) .with(message)
.from(peer) .from(peer)
.preCondition(trade.getPayoutTx() == null, .preCondition(trade.getPayoutTx() == null,
() -> { () -> {
log.warn("We received a CounterCurrencyTransferStartedMessage but we have already created the payout tx " + log.warn("We with a CounterCurrencyTransferStartedMessage but we have already created the payout tx " +
"so we ignore the message. This can happen if the ACK message to the peer did not " + "so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg."); "arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(message, true, null); sendAckMessage(message, true, null);
processModel.removeMailboxMessageAfterProcessing(trade); processModel.removeMailboxMessageAfterProcessing(trade);
}) }))
.addTasks( .setup(tasks(
SellerProcessCounterCurrencyTransferStartedMessage.class, SellerProcessCounterCurrencyTransferStartedMessage.class,
ApplyFilter.class, ApplyFilter.class,
MakerVerifyTakerFeePayment.class MakerVerifyTakerFeePayment.class))
) .executeTasks();
.runTasks();
} }
@ -186,27 +182,26 @@ public class SellerAsMakerProtocol extends TradeProtocol implements SellerProtoc
@Override @Override
public void onFiatPaymentReceived(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { public void onFiatPaymentReceived(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
SellerEvent event = SellerEvent.PAYMENT_RECEIVED; SellerEvent event = SellerEvent.PAYMENT_RECEIVED;
expectedPhase(Trade.Phase.FIAT_SENT) given(phase(Trade.Phase.FIAT_SENT)
.on(event) .with(event)
.preCondition(!wasDisputed()) .preCondition(!wasDisputed()))
.setTaskRunner(new TradeTaskRunner(trade, .setup(tasks(
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess(event);
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(event, errorMessage);
}))
.addTasks(
ApplyFilter.class, ApplyFilter.class,
MakerVerifyTakerFeePayment.class, MakerVerifyTakerFeePayment.class,
SellerSignAndFinalizePayoutTx.class, SellerSignAndFinalizePayoutTx.class,
SellerBroadcastPayoutTx.class, SellerBroadcastPayoutTx.class,
SellerSendPayoutTxPublishedMessage.class SellerSendPayoutTxPublishedMessage.class)
) .using(new TradeTaskRunner(trade,
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess(event);
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(event, errorMessage);
})))
.run(() -> trade.setState(Trade.State.SELLER_CONFIRMED_IN_UI_FIAT_PAYMENT_RECEIPT)) .run(() -> trade.setState(Trade.State.SELLER_CONFIRMED_IN_UI_FIAT_PAYMENT_RECEIPT))
.runTasks(); .executeTasks();
} }

View file

@ -87,21 +87,22 @@ public class SellerAsTakerProtocol extends TradeProtocol implements SellerProtoc
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Start trade // Take offer
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@Override @Override
public void takeAvailableOffer() { public void takeAvailableOffer() {
expectedPhase(Trade.Phase.INIT) given(phase(Trade.Phase.INIT)
.on(TakerEvent.TAKE_OFFER) .with(TakerEvent.TAKE_OFFER))
.withTimeout(30) .setup(tasks(
.addTasks(
ApplyFilter.class, ApplyFilter.class,
TakerVerifyMakerFeePayment.class, TakerVerifyMakerFeePayment.class,
CreateTakerFeeTx.class, // CreateTakerFeeTx.class, //
SellerAsTakerCreatesDepositTxInputs.class, SellerAsTakerCreatesDepositTxInputs.class,
TakerSendInputsForDepositTxRequest.class TakerSendInputsForDepositTxRequest.class)
).runTasks(); .withTimeout(30))
.run(() -> processModel.setTempTradingPeerNodeAddress(trade.getTradingPeerNodeAddress()))
.executeTasks();
} }
@ -110,11 +111,10 @@ public class SellerAsTakerProtocol extends TradeProtocol implements SellerProtoc
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private void handle(InputsForDepositTxResponse message, NodeAddress peer) { private void handle(InputsForDepositTxResponse message, NodeAddress peer) {
expectedPhase(Trade.Phase.INIT) given(phase(Trade.Phase.INIT)
.on(message) .with(message)
.from(peer) .from(peer))
.withTimeout(30) .setup(tasks(
.addTasks(
TakerProcessesInputsForDepositTxResponse.class, TakerProcessesInputsForDepositTxResponse.class,
ApplyFilter.class, ApplyFilter.class,
VerifyPeersAccountAgeWitness.class, VerifyPeersAccountAgeWitness.class,
@ -122,23 +122,21 @@ public class SellerAsTakerProtocol extends TradeProtocol implements SellerProtoc
TakerPublishFeeTx.class, TakerPublishFeeTx.class,
SellerAsTakerSignsDepositTx.class, SellerAsTakerSignsDepositTx.class,
SellerCreatesDelayedPayoutTx.class, SellerCreatesDelayedPayoutTx.class,
SellerSendDelayedPayoutTxSignatureRequest.class SellerSendDelayedPayoutTxSignatureRequest.class)
) .withTimeout(30))
.runTasks(); .executeTasks();
} }
private void handle(DelayedPayoutTxSignatureResponse message, NodeAddress peer) { private void handle(DelayedPayoutTxSignatureResponse message, NodeAddress peer) {
expectedPhase(Trade.Phase.TAKER_FEE_PUBLISHED) given(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
.on(message) .with(message)
.from(peer) .from(peer))
.addTasks( .setup(tasks(SellerProcessDelayedPayoutTxSignatureResponse.class,
SellerProcessDelayedPayoutTxSignatureResponse.class,
SellerSignsDelayedPayoutTx.class, SellerSignsDelayedPayoutTx.class,
SellerFinalizesDelayedPayoutTx.class, SellerFinalizesDelayedPayoutTx.class,
SellerSendsDepositTxAndDelayedPayoutTxMessage.class, SellerSendsDepositTxAndDelayedPayoutTxMessage.class,
SellerPublishesDepositTx.class, SellerPublishesDepositTx.class,
PublishTradeStatistics.class PublishTradeStatistics.class))
)
.run(() -> { .run(() -> {
// We stop timeout here and don't start a new one as the // We stop timeout here and don't start a new one as the
// SellerSendsDepositTxAndDelayedPayoutTxMessage repeats the send the message and has it's own // SellerSendsDepositTxAndDelayedPayoutTxMessage repeats the send the message and has it's own
@ -148,7 +146,7 @@ public class SellerAsTakerProtocol extends TradeProtocol implements SellerProtoc
//TODO still needed? If so move to witness domain //TODO still needed? If so move to witness domain
processModel.witnessDebugLog(trade); processModel.witnessDebugLog(trade);
}) })
.runTasks(); .executeTasks();
} }
@ -157,8 +155,8 @@ public class SellerAsTakerProtocol extends TradeProtocol implements SellerProtoc
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private void handle(CounterCurrencyTransferStartedMessage message, NodeAddress peer) { private void handle(CounterCurrencyTransferStartedMessage message, NodeAddress peer) {
expectedPhase(Trade.Phase.DEPOSIT_CONFIRMED) given(phase(Trade.Phase.DEPOSIT_CONFIRMED)
.on(message) .with(message)
.from(peer) .from(peer)
.preCondition(trade.getPayoutTx() == null, .preCondition(trade.getPayoutTx() == null,
() -> { () -> {
@ -167,13 +165,12 @@ public class SellerAsTakerProtocol extends TradeProtocol implements SellerProtoc
"arrive and the peer repeats sending us the message. We send another ACK msg."); "arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(message, true, null); sendAckMessage(message, true, null);
processModel.removeMailboxMessageAfterProcessing(trade); processModel.removeMailboxMessageAfterProcessing(trade);
}) }))
.addTasks( .setup(tasks(
SellerProcessCounterCurrencyTransferStartedMessage.class, SellerProcessCounterCurrencyTransferStartedMessage.class,
ApplyFilter.class, ApplyFilter.class,
TakerVerifyMakerFeePayment.class TakerVerifyMakerFeePayment.class))
) .executeTasks();
.runTasks();
} }
@ -184,27 +181,27 @@ public class SellerAsTakerProtocol extends TradeProtocol implements SellerProtoc
@Override @Override
public void onFiatPaymentReceived(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { public void onFiatPaymentReceived(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
SellerEvent event = SellerEvent.PAYMENT_RECEIVED; SellerEvent event = SellerEvent.PAYMENT_RECEIVED;
expectedPhase(Trade.Phase.FIAT_SENT) given(phase(Trade.Phase.FIAT_SENT)
.on(event) .with(event)
.preCondition(!wasDisputed()) .preCondition(!wasDisputed()))
.setTaskRunner(new TradeTaskRunner(trade, .setup(tasks(
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess(event);
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(event, errorMessage);
}))
.addTasks(
ApplyFilter.class, ApplyFilter.class,
TakerVerifyMakerFeePayment.class, TakerVerifyMakerFeePayment.class,
SellerSignAndFinalizePayoutTx.class, SellerSignAndFinalizePayoutTx.class,
SellerBroadcastPayoutTx.class, SellerBroadcastPayoutTx.class,
SellerSendPayoutTxPublishedMessage.class //TODO add repeated msg send, check UI //TODO add repeated msg send, check UI
) SellerSendPayoutTxPublishedMessage.class)
.using(new TradeTaskRunner(trade,
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess(event);
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(event, errorMessage);
})))
.run(() -> trade.setState(Trade.State.SELLER_CONFIRMED_IN_UI_FIAT_PAYMENT_RECEIPT)) .run(() -> trade.setState(Trade.State.SELLER_CONFIRMED_IN_UI_FIAT_PAYMENT_RECEIPT))
.runTasks(); .executeTasks();
} }

View file

@ -60,12 +60,14 @@ import java.security.PublicKey;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import static bisq.core.util.Validator.isTradeIdValid; import static bisq.core.util.Validator.isTradeIdValid;
import static bisq.core.util.Validator.nonEmptyStringOf; import static bisq.core.util.Validator.nonEmptyStringOf;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
@Slf4j @Slf4j
@ -442,109 +444,171 @@ public abstract class TradeProtocol {
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// FluentProcess // FluentProtocol
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
protected FluentProcess expectedPhase(Trade.Phase phase) { protected FluentProtocol given(Condition condition) {
return new FluentProcess(trade, phase); return new FluentProtocol(condition);
} }
protected FluentProcess expectedPhases(Trade.Phase... phase) { protected Condition phase(Trade.Phase expectedPhase) {
return new FluentProcess(trade, phase); return new Condition(trade, expectedPhase);
} }
class FluentProcess { protected Condition anyPhase(Trade.Phase... expectedPhases) {
private final Trade trade; return new Condition(trade, expectedPhases);
@Nullable }
private TradeMessage message;
private final Set<Trade.Phase> expectedPhases = new HashSet<>();
private final Set<Boolean> preConditions = new HashSet<>();
@Nullable
private Event event;
private Runnable preConditionFailedHandler;
private int timeoutSec;
private NodeAddress peersNodeAddress;
private TradeTaskRunner taskRunner;
public FluentProcess(Trade trade, @SafeVarargs
Trade.Phase expectedPhase) { public final Setup tasks(Class<? extends Task<Trade>>... tasks) {
this.trade = trade; return new Setup(trade, tasks);
this.expectedPhases.add(expectedPhase); }
// Main class. Contains the condition and setup, if condition is valid it will execute the
// taskRunner and the optional runnable.
class FluentProtocol {
private final Condition condition;
private Setup setup;
public FluentProtocol(Condition condition) {
this.condition = condition;
} }
public FluentProcess(Trade trade, protected FluentProtocol setup(Setup setup) {
Trade.Phase... expectedPhases) { this.setup = setup;
this.trade = trade; return this;
this.expectedPhases.addAll(Set.of(expectedPhases));
} }
public FluentProcess run(Runnable runnable) { // Can be used before or after executeTasks
if (isValid()) { public FluentProtocol run(Runnable runnable) {
if (condition.isValid()) {
runnable.run(); runnable.run();
} }
return this; return this;
} }
public FluentProcess runTasks() { public FluentProtocol executeTasks() {
if (isValid()) { if (condition.isValid()) {
if (timeoutSec > 0) { if (setup.getTimeoutSec() > 0) {
startTimeout(timeoutSec); startTimeout(setup.getTimeoutSec());
} }
if (peersNodeAddress != null) { NodeAddress peer = condition.getPeersNodeAddress();
processModel.setTempTradingPeerNodeAddress(peersNodeAddress); if (peer != null) {
processModel.setTempTradingPeerNodeAddress(peer);
} }
TradeMessage message = condition.getMessage();
if (message != null) { if (message != null) {
processModel.setTradeMessage(message); processModel.setTradeMessage(message);
} }
TradeTaskRunner taskRunner = setup.getTaskRunner(message, condition.getEvent());
taskRunner.addTasks(setup.getTasks());
taskRunner.run(); taskRunner.run();
} }
return this;
}
}
//
static class Condition {
private final Trade trade;
@Nullable
@Getter
private TradeMessage message;
private final Set<Trade.Phase> expectedPhases = new HashSet<>();
private final Set<Trade.State> expectedStates = new HashSet<>();
private final Set<Boolean> preConditions = new HashSet<>();
@Nullable
@Getter
private Event event;
@Getter
private NodeAddress peersNodeAddress;
private boolean isValid;
private boolean isValidated;
private Runnable preConditionFailedHandler;
public Condition(Trade trade, Trade.Phase expectedPhase) {
this.expectedPhases.add(expectedPhase);
this.trade = trade;
}
public Condition(Trade trade, Trade.Phase... expectedPhases) {
this.expectedPhases.addAll(Set.of(expectedPhases));
this.trade = trade;
}
public Condition state(Trade.State state) {
this.expectedStates.add(state);
return this;
}
public Condition anyState(Trade.State... states) {
this.expectedStates.addAll(Set.of(states));
return this;
}
public Condition with(Event event) {
checkArgument(!isValidated);
this.event = event;
return this;
}
public Condition with(TradeMessage tradeMessage) {
checkArgument(!isValidated);
this.message = tradeMessage;
return this;
}
public Condition from(NodeAddress peersNodeAddress) {
checkArgument(!isValidated);
this.peersNodeAddress = peersNodeAddress;
return this;
}
public Condition preCondition(boolean preCondition) {
checkArgument(!isValidated);
preConditions.add(preCondition);
return this;
}
public Condition preCondition(boolean preCondition, Runnable conditionFailedHandler) {
checkArgument(!isValidated);
preConditions.add(preCondition);
this.preConditionFailedHandler = conditionFailedHandler;
return this; return this;
} }
private boolean isValid() { private boolean isValid() {
boolean isPhaseValid = isPhaseValid(); if (!isValidated) {
boolean allPreConditionsMet = preConditions.stream().allMatch(e -> e); boolean isPhaseValid = isPhaseValid();
boolean isTradeIdValid = message == null || isTradeIdValid(processModel.getOfferId(), message); boolean isStateValid = isStateValid();
if (!allPreConditionsMet) { boolean allPreConditionsMet = preConditions.stream().allMatch(e -> e);
log.error("PreConditions not met. preConditions={}, this={}", preConditions, this); boolean isTradeIdValid = message == null || isTradeIdValid(trade.getId(), message);
if (preConditionFailedHandler != null) {
preConditionFailedHandler.run(); if (!allPreConditionsMet) {
log.error("PreConditions not met. preConditions={}, this={}", preConditions, this);
if (preConditionFailedHandler != null) {
preConditionFailedHandler.run();
}
} }
} if (!isTradeIdValid) {
if (!isTradeIdValid) { log.error("TradeId does not match tradeId in message, TradeId={}, tradeId in message={}",
log.error("TradeId does not match tradeId in message, TradeId={}, tradeId in message={}", trade.getId(), message.getTradeId());
trade.getId(), message.getTradeId());
}
return isPhaseValid && allPreConditionsMet && isTradeIdValid;
}
@SafeVarargs
public final FluentProcess addTasks(Class<? extends Task<Trade>>... tasks) {
if (taskRunner == null) {
if (message != null) {
taskRunner = new TradeTaskRunner(trade,
() -> handleTaskRunnerSuccess(message),
errorMessage -> handleTaskRunnerFault(message, errorMessage));
} else if (event != null) {
taskRunner = new TradeTaskRunner(trade,
() -> handleTaskRunnerSuccess(event),
errorMessage -> handleTaskRunnerFault(event, errorMessage));
} else {
throw new IllegalStateException("addTasks must not be called without message or event " +
"set in case no taskRunner has been created yet");
} }
isValid = isPhaseValid && isStateValid && allPreConditionsMet && isTradeIdValid;
isValidated = true;
} }
taskRunner.addTasks(tasks); return isValid;
return this;
} }
private boolean isPhaseValid() { private boolean isPhaseValid() {
if (expectedPhases.isEmpty()) {
return true;
}
boolean isPhaseValid = expectedPhases.stream().anyMatch(e -> e == trade.getPhase()); boolean isPhaseValid = expectedPhases.stream().anyMatch(e -> e == trade.getPhase());
String trigger = message != null ? String trigger = message != null ?
message.getClass().getSimpleName() : message.getClass().getSimpleName() :
@ -568,60 +632,75 @@ public abstract class TradeProtocol {
return isPhaseValid; return isPhaseValid;
} }
public FluentProcess orInPhase(Trade.Phase phase) { private boolean isStateValid() {
expectedPhases.add(phase); if (expectedStates.isEmpty()) {
return this; return true;
}
boolean isStateValid = expectedStates.stream().anyMatch(e -> e == trade.getState());
String trigger = message != null ?
message.getClass().getSimpleName() :
event != null ?
event.name() + " event" :
"";
if (isStateValid) {
log.info("We received {} at state {}",
trigger,
trade.getState());
} else {
log.error("We received {} but we are are not in the correct state. Expected states={}, " +
"Trade state= {} ",
trigger,
expectedStates,
trade.getState());
}
return isStateValid;
}
}
// Setup for task runner
class Setup {
private final Trade trade;
@Getter
private final Class<? extends Task<Trade>>[] tasks;
@Getter
private int timeoutSec;
@Nullable
private TradeTaskRunner taskRunner;
@SafeVarargs
public Setup(Trade trade, Class<? extends Task<Trade>>... tasks) {
this.trade = trade;
this.tasks = tasks;
} }
public FluentProcess on(Event event) { public Setup withTimeout(int timeoutSec) {
this.event = event;
return this;
}
public FluentProcess on(TradeMessage tradeMessage) {
this.message = tradeMessage;
return this;
}
public FluentProcess preCondition(boolean preCondition) {
preConditions.add(preCondition);
return this;
}
public FluentProcess preCondition(boolean preCondition, Runnable conditionFailedHandler) {
preConditions.add(preCondition);
this.preConditionFailedHandler = conditionFailedHandler;
return this;
}
public FluentProcess withTimeout(int timeoutSec) {
this.timeoutSec = timeoutSec; this.timeoutSec = timeoutSec;
return this; return this;
} }
public FluentProcess from(NodeAddress peersNodeAddress) { public Setup using(TradeTaskRunner taskRunner) {
this.peersNodeAddress = peersNodeAddress;
return this;
}
public FluentProcess setTaskRunner(TradeTaskRunner taskRunner) {
this.taskRunner = taskRunner; this.taskRunner = taskRunner;
return this; return this;
} }
@Override private TradeTaskRunner getTaskRunner(@Nullable TradeMessage message, @Nullable Event event) {
public String toString() { if (taskRunner == null) {
return "FluentProcess{" + if (message != null) {
"\n trade=" + trade + taskRunner = new TradeTaskRunner(trade,
",\n message=" + message + () -> handleTaskRunnerSuccess(message),
",\n expectedPhases=" + expectedPhases + errorMessage -> handleTaskRunnerFault(message, errorMessage));
",\n preConditions=" + preConditions + } else if (event != null) {
",\n event=" + event + taskRunner = new TradeTaskRunner(trade,
",\n preConditionFailedHandler=" + preConditionFailedHandler + () -> handleTaskRunnerSuccess(event),
",\n timeoutSec=" + timeoutSec + errorMessage -> handleTaskRunnerFault(event, errorMessage));
",\n peersNodeAddress=" + peersNodeAddress + } else {
",\n taskRunner=" + taskRunner + throw new IllegalStateException("addTasks must not be called without message or event " +
"\n}"; "set in case no taskRunner has been created yet");
}
}
return taskRunner;
} }
} }
} }