Use fluent interface for checking state and conditions

This commit is contained in:
chimp1984 2020-09-23 01:48:02 -05:00
parent 7c6f0ac9b2
commit 6fa2225b65
No known key found for this signature in database
GPG Key ID: 9801B4EC591F90E3
11 changed files with 345 additions and 309 deletions

View File

@ -53,8 +53,6 @@ import bisq.common.handlers.ResultHandler;
import lombok.extern.slf4j.Slf4j;
import static com.google.common.base.Preconditions.checkArgument;
@Slf4j
public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol, MakerProtocol {
private final BuyerAsMakerTrade buyerAsMakerTrade;
@ -118,8 +116,9 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol
public void handleTakeOfferRequest(InputsForDepositTxRequest tradeMessage,
NodeAddress peerNodeAddress,
ErrorMessageHandler errorMessageHandler) {
ifInPhase(Trade.Phase.INIT, tradeMessage)
.run(() -> {
from(Trade.Phase.INIT)
.onMessage(tradeMessage)
.process(() -> {
Validator.checkTradeId(processModel.getOfferId(), tradeMessage);
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(peerNodeAddress);
@ -151,8 +150,9 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol
///////////////////////////////////////////////////////////////////////////////////////////
private void handle(DelayedPayoutTxSignatureRequest tradeMessage, NodeAddress peerNodeAddress) {
ifInPhase(Trade.Phase.TAKER_FEE_PUBLISHED, tradeMessage)
.run(() -> {
from(Trade.Phase.TAKER_FEE_PUBLISHED)
.onMessage(tradeMessage)
.process(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(peerNodeAddress);
@ -178,17 +178,17 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol
// mailbox message but the stored in mailbox case is not expected and the seller would try to send the message again
// in the hope to reach the buyer directly.
private void handle(DepositTxAndDelayedPayoutTxMessage tradeMessage, NodeAddress peerNodeAddress) {
if (trade.getDepositTx() != null && trade.getDelayedPayoutTx() != null) {
log.warn("We received a DepositTxAndDelayedPayoutTxMessage but we have already processed the deposit and " +
"delayed payout tx so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(tradeMessage, true, null);
processModel.removeMailboxMessageAfterProcessing(trade);
return;
}
ifInPhase(Trade.Phase.TAKER_FEE_PUBLISHED, tradeMessage).orInPhase(Trade.Phase.DEPOSIT_PUBLISHED)
.run(() -> {
fromAny(Trade.Phase.TAKER_FEE_PUBLISHED, Trade.Phase.DEPOSIT_PUBLISHED)
.onMessage(tradeMessage)
.condition(trade.getDepositTx() == null || trade.getDelayedPayoutTx() == null,
() -> {
log.warn("We received a DepositTxAndDelayedPayoutTxMessage but we have already processed the deposit and " +
"delayed payout tx so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(tradeMessage, true, null);
processModel.removeMailboxMessageAfterProcessing(trade);
})
.process(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(peerNodeAddress);
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsMakerTrade,
@ -213,11 +213,10 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol
@Override
public void onFiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
checkArgument(!wasDisputed(), "A call to onFiatPaymentStarted is not permitted once a " +
"dispute has been opened.");
ifInPhase(Trade.Phase.DEPOSIT_CONFIRMED)
.run(() -> {
from(Trade.Phase.DEPOSIT_CONFIRMED)
.onEvent(BuyerEvent.PAYMENT_SENT)
.condition(!wasDisputed())
.process(() -> {
buyerAsMakerTrade.setState(Trade.State.BUYER_CONFIRMED_IN_UI_FIAT_PAYMENT_INITIATED);
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsMakerTrade,
() -> {
@ -245,8 +244,9 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol
///////////////////////////////////////////////////////////////////////////////////////////
private void handle(PayoutTxPublishedMessage tradeMessage, NodeAddress peerNodeAddress) {
ifInPhase(Trade.Phase.FIAT_SENT, tradeMessage).orInPhase(Trade.Phase.PAYOUT_PUBLISHED)
.run(() -> {
fromAny(Trade.Phase.FIAT_SENT, Trade.Phase.PAYOUT_PUBLISHED)
.onMessage(tradeMessage)
.process(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(peerNodeAddress);

View File

@ -57,7 +57,6 @@ import bisq.common.handlers.ResultHandler;
import lombok.extern.slf4j.Slf4j;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@Slf4j
@ -125,8 +124,9 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
@Override
public void takeAvailableOffer() {
ifInPhase(Trade.Phase.INIT)
.run(() -> {
from(Trade.Phase.INIT)
.onEvent(TakerEvent.TAKE_OFFER)
.process(() -> {
processModel.setTempTradingPeerNodeAddress(trade.getTradingPeerNodeAddress());
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsTakerTrade,
() -> handleTaskRunnerSuccess("takeAvailableOffer"),
@ -153,8 +153,9 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
///////////////////////////////////////////////////////////////////////////////////////////
private void handle(InputsForDepositTxResponse tradeMessage, NodeAddress sender) {
ifInPhase(Trade.Phase.INIT, tradeMessage)
.run(() -> {
from(Trade.Phase.INIT)
.onMessage(tradeMessage)
.process(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(sender);
@ -178,8 +179,9 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
}
private void handle(DelayedPayoutTxSignatureRequest tradeMessage, NodeAddress sender) {
ifInPhase(Trade.Phase.TAKER_FEE_PUBLISHED, tradeMessage)
.run(() -> {
from(Trade.Phase.TAKER_FEE_PUBLISHED)
.onMessage(tradeMessage)
.process(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(sender);
@ -208,17 +210,17 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
// mailbox message but the stored in mailbox case is not expected and the seller would try to send the message again
// in the hope to reach the buyer directly.
private void handle(DepositTxAndDelayedPayoutTxMessage tradeMessage, NodeAddress peerNodeAddress) {
if (trade.getDepositTx() != null && trade.getDelayedPayoutTx() != null) {
log.warn("We received a DepositTxAndDelayedPayoutTxMessage but we have already processed the deposit and " +
"delayed payout tx so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(tradeMessage, true, null);
processModel.removeMailboxMessageAfterProcessing(trade);
return;
}
ifInPhase(Trade.Phase.TAKER_FEE_PUBLISHED, tradeMessage).orInPhase(Trade.Phase.DEPOSIT_PUBLISHED)
.run(() -> {
fromAny(Trade.Phase.TAKER_FEE_PUBLISHED, Trade.Phase.DEPOSIT_PUBLISHED)
.onMessage(tradeMessage)
.condition(trade.getDepositTx() == null || trade.getDelayedPayoutTx() == null,
() -> {
log.warn("We received a DepositTxAndDelayedPayoutTxMessage but we have already processed the deposit and " +
"delayed payout tx so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(tradeMessage, true, null);
processModel.removeMailboxMessageAfterProcessing(trade);
})
.process(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(peerNodeAddress);
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsTakerTrade,
@ -231,9 +233,7 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
);
taskRunner.run();
processModel.witnessDebugLog(buyerAsTakerTrade);
}).otherWise(() -> {
log.warn("");
});
});
}
@ -244,11 +244,10 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
// User clicked the "bank transfer started" button
@Override
public void onFiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
checkArgument(!wasDisputed(), "A call to onFiatPaymentStarted is not permitted once a " +
"dispute has been opened.");
ifInPhase(Trade.Phase.DEPOSIT_CONFIRMED)
.run(() -> {
from(Trade.Phase.DEPOSIT_CONFIRMED)
.onEvent(BuyerEvent.PAYMENT_SENT)
.condition(!wasDisputed())
.process(() -> {
buyerAsTakerTrade.setState(Trade.State.BUYER_CONFIRMED_IN_UI_FIAT_PAYMENT_INITIATED);
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsTakerTrade,
() -> {
@ -276,8 +275,9 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
///////////////////////////////////////////////////////////////////////////////////////////
private void handle(PayoutTxPublishedMessage tradeMessage, NodeAddress peerNodeAddress) {
ifInPhase(Trade.Phase.FIAT_SENT, tradeMessage).orInPhase(Trade.Phase.PAYOUT_PUBLISHED)
.run(() -> {
fromAny(Trade.Phase.FIAT_SENT, Trade.Phase.PAYOUT_PUBLISHED)
.onMessage(tradeMessage)
.process(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(peerNodeAddress);

View File

@ -22,4 +22,8 @@ import bisq.common.handlers.ResultHandler;
public interface BuyerProtocol {
void onFiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler);
enum BuyerEvent implements TradeProtocol.Event {
PAYMENT_SENT
}
}

View File

@ -25,5 +25,7 @@ import bisq.network.p2p.NodeAddress;
import bisq.common.handlers.ErrorMessageHandler;
public interface MakerProtocol {
void handleTakeOfferRequest(InputsForDepositTxRequest message, NodeAddress taker, ErrorMessageHandler errorMessageHandler);
void handleTakeOfferRequest(InputsForDepositTxRequest message,
NodeAddress taker,
ErrorMessageHandler errorMessageHandler);
}

View File

@ -56,8 +56,6 @@ import bisq.common.handlers.ResultHandler;
import lombok.extern.slf4j.Slf4j;
import static com.google.common.base.Preconditions.checkArgument;
@Slf4j
public class SellerAsMakerProtocol extends TradeProtocol implements SellerProtocol, MakerProtocol {
private final SellerAsMakerTrade sellerAsMakerTrade;
@ -96,29 +94,33 @@ public class SellerAsMakerProtocol extends TradeProtocol implements SellerProtoc
public void handleTakeOfferRequest(InputsForDepositTxRequest tradeMessage,
NodeAddress sender,
ErrorMessageHandler errorMessageHandler) {
Validator.checkTradeId(processModel.getOfferId(), tradeMessage);
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(sender);
from(Trade.Phase.INIT)
.onMessage(tradeMessage)
.process(() -> {
Validator.checkTradeId(processModel.getOfferId(), tradeMessage);
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(sender);
TradeTaskRunner taskRunner = new TradeTaskRunner(sellerAsMakerTrade,
() -> handleTaskRunnerSuccess(tradeMessage, "handleTakeOfferRequest"),
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(tradeMessage, errorMessage);
TradeTaskRunner taskRunner = new TradeTaskRunner(sellerAsMakerTrade,
() -> handleTaskRunnerSuccess(tradeMessage, "handleTakeOfferRequest"),
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(tradeMessage, errorMessage);
});
taskRunner.addTasks(
MakerProcessesInputsForDepositTxRequest.class,
ApplyFilter.class,
VerifyPeersAccountAgeWitness.class,
MakerVerifyTakerFeePayment.class,
MakerSetsLockTime.class,
MakerCreateAndSignContract.class,
SellerAsMakerCreatesUnsignedDepositTx.class,
SellerAsMakerSendsInputsForDepositTxResponse.class
);
taskRunner.run();
});
taskRunner.addTasks(
MakerProcessesInputsForDepositTxRequest.class,
ApplyFilter.class,
VerifyPeersAccountAgeWitness.class,
MakerVerifyTakerFeePayment.class,
MakerSetsLockTime.class,
MakerCreateAndSignContract.class,
SellerAsMakerCreatesUnsignedDepositTx.class,
SellerAsMakerSendsInputsForDepositTxResponse.class
);
taskRunner.run();
}
@ -127,43 +129,51 @@ public class SellerAsMakerProtocol extends TradeProtocol implements SellerProtoc
///////////////////////////////////////////////////////////////////////////////////////////
protected void handle(DepositTxMessage tradeMessage, NodeAddress sender) {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(sender);
from(Trade.Phase.TAKER_FEE_PUBLISHED)
.onMessage(tradeMessage)
.process(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(sender);
TradeTaskRunner taskRunner = new TradeTaskRunner(sellerAsMakerTrade,
() -> handleTaskRunnerSuccess(tradeMessage, "handle DepositTxMessage"),
errorMessage -> handleTaskRunnerFault(tradeMessage, errorMessage));
TradeTaskRunner taskRunner = new TradeTaskRunner(sellerAsMakerTrade,
() -> handleTaskRunnerSuccess(tradeMessage, "handle DepositTxMessage"),
errorMessage -> handleTaskRunnerFault(tradeMessage, errorMessage));
taskRunner.addTasks(
SellerAsMakerProcessDepositTxMessage.class,
SellerAsMakerFinalizesDepositTx.class,
SellerCreatesDelayedPayoutTx.class,
SellerSendDelayedPayoutTxSignatureRequest.class
);
taskRunner.run();
taskRunner.addTasks(
SellerAsMakerProcessDepositTxMessage.class,
SellerAsMakerFinalizesDepositTx.class,
SellerCreatesDelayedPayoutTx.class,
SellerSendDelayedPayoutTxSignatureRequest.class
);
taskRunner.run();
});
}
private void handle(DelayedPayoutTxSignatureResponse tradeMessage, NodeAddress sender) {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(sender);
from(Trade.Phase.TAKER_FEE_PUBLISHED)
.onMessage(tradeMessage)
.process(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(sender);
TradeTaskRunner taskRunner = new TradeTaskRunner(sellerAsMakerTrade,
() -> {
stopTimeout();
handleTaskRunnerSuccess(tradeMessage, "handle DelayedPayoutTxSignatureResponse");
},
errorMessage -> handleTaskRunnerFault(tradeMessage, errorMessage));
TradeTaskRunner taskRunner = new TradeTaskRunner(sellerAsMakerTrade,
() -> {
stopTimeout();
handleTaskRunnerSuccess(tradeMessage, "handle DelayedPayoutTxSignatureResponse");
},
errorMessage -> handleTaskRunnerFault(tradeMessage, errorMessage));
taskRunner.addTasks(
SellerProcessDelayedPayoutTxSignatureResponse.class,
SellerSignsDelayedPayoutTx.class,
SellerFinalizesDelayedPayoutTx.class,
SellerSendsDepositTxAndDelayedPayoutTxMessage.class,
SellerPublishesDepositTx.class,
PublishTradeStatistics.class
);
taskRunner.run();
processModel.witnessDebugLog(sellerAsMakerTrade);
taskRunner.addTasks(
SellerProcessDelayedPayoutTxSignatureResponse.class,
SellerSignsDelayedPayoutTx.class,
SellerFinalizesDelayedPayoutTx.class,
SellerSendsDepositTxAndDelayedPayoutTxMessage.class,
SellerPublishesDepositTx.class,
PublishTradeStatistics.class
);
taskRunner.run();
processModel.witnessDebugLog(sellerAsMakerTrade);
});
}
@ -172,17 +182,17 @@ public class SellerAsMakerProtocol extends TradeProtocol implements SellerProtoc
///////////////////////////////////////////////////////////////////////////////////////////
private void handle(CounterCurrencyTransferStartedMessage tradeMessage, NodeAddress sender) {
if (trade.getPayoutTx() != null) {
log.warn("We received a CounterCurrencyTransferStartedMessage but we have already created the payout tx " +
"so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(tradeMessage, true, null);
processModel.removeMailboxMessageAfterProcessing(trade);
return;
}
ifInPhase(Trade.Phase.DEPOSIT_CONFIRMED, tradeMessage)
.run(() -> {
from(Trade.Phase.DEPOSIT_CONFIRMED)
.onMessage(tradeMessage)
.condition(trade.getPayoutTx() == null,
() -> {
log.warn("We received a CounterCurrencyTransferStartedMessage but we have already created the payout tx " +
"so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(tradeMessage, true, null);
processModel.removeMailboxMessageAfterProcessing(trade);
})
.process(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(sender);
@ -206,52 +216,30 @@ public class SellerAsMakerProtocol extends TradeProtocol implements SellerProtoc
@Override
public void onFiatPaymentReceived(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
checkArgument(!wasDisputed(), "A call to onFiatPaymentReceived is not permitted once a " +
"dispute has been opened.");
from(Trade.Phase.FIAT_SENT)
.onEvent(SellerEvent.PAYMENT_RECEIVED)
.condition(!wasDisputed())
.process(() -> {
sellerAsMakerTrade.setState(Trade.State.SELLER_CONFIRMED_IN_UI_FIAT_PAYMENT_RECEIPT);
TradeTaskRunner taskRunner = new TradeTaskRunner(sellerAsMakerTrade,
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess("onFiatPaymentReceived");
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(errorMessage);
});
if (trade.getPayoutTx() == null) {
sellerAsMakerTrade.setState(Trade.State.SELLER_CONFIRMED_IN_UI_FIAT_PAYMENT_RECEIPT);
TradeTaskRunner taskRunner = new TradeTaskRunner(sellerAsMakerTrade,
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess("onFiatPaymentReceived 1");
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(errorMessage);
});
taskRunner.addTasks(
ApplyFilter.class,
MakerVerifyTakerFeePayment.class,
SellerSignAndFinalizePayoutTx.class,
SellerBroadcastPayoutTx.class,
SellerSendPayoutTxPublishedMessage.class
);
taskRunner.run();
} else {
// we don't set the state as we have already a later phase reached
log.info("onFiatPaymentReceived called twice. " +
"That can happen if message did not arrive the first time and we send msg again.\n" +
"state=" + sellerAsMakerTrade.getState());
TradeTaskRunner taskRunner = new TradeTaskRunner(sellerAsMakerTrade,
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess("onFiatPaymentReceived 2");
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(errorMessage);
});
taskRunner.addTasks(
ApplyFilter.class,
MakerVerifyTakerFeePayment.class,
SellerSendPayoutTxPublishedMessage.class
);
taskRunner.run();
}
taskRunner.addTasks(
ApplyFilter.class,
MakerVerifyTakerFeePayment.class,
SellerSignAndFinalizePayoutTx.class,
SellerBroadcastPayoutTx.class,
SellerSendPayoutTxPublishedMessage.class
);
taskRunner.run();
});
}

View File

@ -55,7 +55,6 @@ import bisq.common.handlers.ResultHandler;
import lombok.extern.slf4j.Slf4j;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@Slf4j
@ -97,21 +96,25 @@ public class SellerAsTakerProtocol extends TradeProtocol implements SellerProtoc
@Override
public void takeAvailableOffer() {
processModel.setTempTradingPeerNodeAddress(trade.getTradingPeerNodeAddress());
TradeTaskRunner taskRunner = new TradeTaskRunner(sellerAsTakerTrade,
() -> handleTaskRunnerSuccess("takeAvailableOffer"),
this::handleTaskRunnerFault);
from(Trade.Phase.INIT)
.onEvent(TakerEvent.TAKE_OFFER)
.process(() -> {
processModel.setTempTradingPeerNodeAddress(trade.getTradingPeerNodeAddress());
TradeTaskRunner taskRunner = new TradeTaskRunner(sellerAsTakerTrade,
() -> handleTaskRunnerSuccess("takeAvailableOffer"),
this::handleTaskRunnerFault);
taskRunner.addTasks(
ApplyFilter.class,
TakerVerifyMakerFeePayment.class,
CreateTakerFeeTx.class,
SellerAsTakerCreatesDepositTxInputs.class,
TakerSendInputsForDepositTxRequest.class
);
taskRunner.addTasks(
ApplyFilter.class,
TakerVerifyMakerFeePayment.class,
CreateTakerFeeTx.class, //
SellerAsTakerCreatesDepositTxInputs.class,
TakerSendInputsForDepositTxRequest.class
);
startTimeout();
taskRunner.run();
startTimeout();
taskRunner.run();
});
}
@ -120,49 +123,57 @@ public class SellerAsTakerProtocol extends TradeProtocol implements SellerProtoc
///////////////////////////////////////////////////////////////////////////////////////////
private void handle(InputsForDepositTxResponse tradeMessage, NodeAddress sender) {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(sender);
from(Trade.Phase.INIT)
.onMessage(tradeMessage)
.process(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(sender);
TradeTaskRunner taskRunner = new TradeTaskRunner(sellerAsTakerTrade,
() -> {
handleTaskRunnerSuccess(tradeMessage, "handle InputsForDepositTxResponse");
},
errorMessage -> handleTaskRunnerFault(tradeMessage, errorMessage));
TradeTaskRunner taskRunner = new TradeTaskRunner(sellerAsTakerTrade,
() -> {
handleTaskRunnerSuccess(tradeMessage, "handle InputsForDepositTxResponse");
},
errorMessage -> handleTaskRunnerFault(tradeMessage, errorMessage));
taskRunner.addTasks(
TakerProcessesInputsForDepositTxResponse.class,
ApplyFilter.class,
VerifyPeersAccountAgeWitness.class,
TakerVerifyAndSignContract.class,
TakerPublishFeeTx.class,
SellerAsTakerSignsDepositTx.class,
SellerCreatesDelayedPayoutTx.class,
SellerSendDelayedPayoutTxSignatureRequest.class
);
taskRunner.run();
taskRunner.addTasks(
TakerProcessesInputsForDepositTxResponse.class,
ApplyFilter.class,
VerifyPeersAccountAgeWitness.class,
TakerVerifyAndSignContract.class,
TakerPublishFeeTx.class,
SellerAsTakerSignsDepositTx.class,
SellerCreatesDelayedPayoutTx.class,
SellerSendDelayedPayoutTxSignatureRequest.class
);
taskRunner.run();
});
}
private void handle(DelayedPayoutTxSignatureResponse tradeMessage, NodeAddress sender) {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(sender);
from(Trade.Phase.TAKER_FEE_PUBLISHED)
.onMessage(tradeMessage)
.process(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(sender);
TradeTaskRunner taskRunner = new TradeTaskRunner(sellerAsTakerTrade,
() -> {
stopTimeout();
handleTaskRunnerSuccess(tradeMessage, "handle DelayedPayoutTxSignatureResponse");
},
errorMessage -> handleTaskRunnerFault(tradeMessage, errorMessage));
TradeTaskRunner taskRunner = new TradeTaskRunner(sellerAsTakerTrade,
() -> {
stopTimeout();
handleTaskRunnerSuccess(tradeMessage, "handle DelayedPayoutTxSignatureResponse");
},
errorMessage -> handleTaskRunnerFault(tradeMessage, errorMessage));
taskRunner.addTasks(
SellerProcessDelayedPayoutTxSignatureResponse.class,
SellerSignsDelayedPayoutTx.class,
SellerFinalizesDelayedPayoutTx.class,
SellerSendsDepositTxAndDelayedPayoutTxMessage.class,
SellerPublishesDepositTx.class,
PublishTradeStatistics.class
);
taskRunner.run();
processModel.witnessDebugLog(sellerAsTakerTrade);
taskRunner.addTasks(
SellerProcessDelayedPayoutTxSignatureResponse.class,
SellerSignsDelayedPayoutTx.class,
SellerFinalizesDelayedPayoutTx.class,
SellerSendsDepositTxAndDelayedPayoutTxMessage.class, // SEND_MSG(DepositTxAndDelayedPayoutTxMessage)
SellerPublishesDepositTx.class, // PUBLISH_DEPOSIT_TX
PublishTradeStatistics.class
);
taskRunner.run();
processModel.witnessDebugLog(sellerAsTakerTrade);
});
}
@ -171,17 +182,26 @@ public class SellerAsTakerProtocol extends TradeProtocol implements SellerProtoc
///////////////////////////////////////////////////////////////////////////////////////////
private void handle(CounterCurrencyTransferStartedMessage tradeMessage, NodeAddress sender) {
if (trade.getPayoutTx() != null) {
log.warn("We received a CounterCurrencyTransferStartedMessage but we have already created the payout tx " +
"so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(tradeMessage, true, null);
processModel.removeMailboxMessageAfterProcessing(trade);
return;
}
from(Trade.Phase.DEPOSIT_CONFIRMED)
.onMessage(tradeMessage)
.condition(trade.getPayoutTx() == null,
() -> {
log.warn("We received a CounterCurrencyTransferStartedMessage but we have already created the payout tx " +
"so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(tradeMessage, true, null);
processModel.removeMailboxMessageAfterProcessing(trade);
})
.process(() -> {
if (trade.getPayoutTx() != null) {
log.warn("We received a CounterCurrencyTransferStartedMessage but we have already created the payout tx " +
"so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(tradeMessage, true, null);
processModel.removeMailboxMessageAfterProcessing(trade);
return;
}
ifInPhase(Trade.Phase.DEPOSIT_CONFIRMED, tradeMessage)
.run(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(sender);
@ -205,52 +225,30 @@ public class SellerAsTakerProtocol extends TradeProtocol implements SellerProtoc
@Override
public void onFiatPaymentReceived(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
checkArgument(!wasDisputed(), "A call to onFiatPaymentReceived is not permitted once a " +
"dispute has been opened.");
from(Trade.Phase.FIAT_SENT)
.onEvent(SellerEvent.PAYMENT_RECEIVED)
.condition(!wasDisputed())
.process(() -> {
sellerAsTakerTrade.setState(Trade.State.SELLER_CONFIRMED_IN_UI_FIAT_PAYMENT_RECEIPT);
TradeTaskRunner taskRunner = new TradeTaskRunner(sellerAsTakerTrade,
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess("onFiatPaymentReceived");
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(errorMessage);
});
if (trade.getPayoutTx() == null) {
sellerAsTakerTrade.setState(Trade.State.SELLER_CONFIRMED_IN_UI_FIAT_PAYMENT_RECEIPT);
TradeTaskRunner taskRunner = new TradeTaskRunner(sellerAsTakerTrade,
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess("onFiatPaymentReceived 1");
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(errorMessage);
});
taskRunner.addTasks(
ApplyFilter.class,
TakerVerifyMakerFeePayment.class,
SellerSignAndFinalizePayoutTx.class,
SellerBroadcastPayoutTx.class,
SellerSendPayoutTxPublishedMessage.class
);
taskRunner.run();
} else {
// we don't set the state as we have already a higher phase reached
log.info("onFiatPaymentReceived called twice. " +
"That can happen if message did not arrive the first time and we send msg again.\n" +
"state=" + sellerAsTakerTrade.getState());
TradeTaskRunner taskRunner = new TradeTaskRunner(sellerAsTakerTrade,
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess("onFiatPaymentReceived 2");
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(errorMessage);
});
taskRunner.addTasks(
ApplyFilter.class,
TakerVerifyMakerFeePayment.class,
SellerSendPayoutTxPublishedMessage.class
);
taskRunner.run();
}
taskRunner.addTasks(
ApplyFilter.class,
TakerVerifyMakerFeePayment.class,
SellerSignAndFinalizePayoutTx.class,
SellerBroadcastPayoutTx.class,
SellerSendPayoutTxPublishedMessage.class //TODO add repeated msg send, check UI
);
taskRunner.run();
});
}

View File

@ -22,4 +22,8 @@ import bisq.common.handlers.ResultHandler;
public interface SellerProtocol {
void onFiatPaymentReceived(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler);
enum SellerEvent implements TradeProtocol.Event {
PAYMENT_RECEIVED
}
}

View File

@ -19,4 +19,8 @@ package bisq.core.trade.protocol;
public interface TakerProtocol {
void takeAvailableOffer();
enum TakerEvent implements TradeProtocol.Event {
TAKE_OFFER
}
}

View File

@ -68,6 +68,10 @@ import static com.google.common.base.Preconditions.checkNotNull;
@Slf4j
public abstract class TradeProtocol {
interface Event {
String name();
}
private static final long TIMEOUT = 180;
protected final ProcessModel processModel;
@ -412,64 +416,98 @@ public abstract class TradeProtocol {
}
}
protected TradeStateValidation ifInPhase(Trade.Phase phase) {
return new TradeStateValidation(trade, phase, null);
///////////////////////////////////////////////////////////////////////////////////////////
// FluentProcess
///////////////////////////////////////////////////////////////////////////////////////////
protected FluentProcess from(Trade.Phase phase) {
return new FluentProcess(trade, phase);
}
protected TradeStateValidation ifInPhase(Trade.Phase phase,
@Nullable TradeMessage tradeMessage) {
return new TradeStateValidation(trade, phase, tradeMessage);
protected FluentProcess fromAny(Trade.Phase... phase) {
return new FluentProcess(trade, phase);
}
static class TradeStateValidation {
static class FluentProcess {
private final Trade trade;
private final Trade.Phase expectedPhase;
@Nullable
private final TradeMessage tradeMessage;
private Set<Trade.Phase> alternativePhase = new HashSet<>();
private TradeMessage tradeMessage;
private final Set<Trade.Phase> expectedPhases = new HashSet<>();
@Nullable
private Event event;
private boolean condition = true;
private Runnable conditionFailedHandler;
protected TradeStateValidation run(Runnable runnable) {
if (isValidPhase()) {
protected FluentProcess process(Runnable runnable) {
if (isPhaseValid() && condition) {
runnable.run();
}
if (!condition && conditionFailedHandler != null) {
conditionFailedHandler.run();
}
return this;
}
protected void otherWise(Runnable runnable) {
runnable.run();
}
public TradeStateValidation(Trade trade,
Trade.Phase expectedPhase,
@Nullable TradeMessage tradeMessage) {
public FluentProcess(Trade trade,
Trade.Phase expectedPhase) {
this.trade = trade;
this.expectedPhase = expectedPhase;
this.tradeMessage = tradeMessage;
this.expectedPhases.add(expectedPhase);
}
public boolean isValidPhase() {
boolean isValidPhase = trade.getPhase() == expectedPhase ||
(alternativePhase.stream().anyMatch(e -> e == trade.getPhase()));
public FluentProcess(Trade trade,
Trade.Phase... expectedPhases) {
this.trade = trade;
this.expectedPhases.addAll(Set.of(expectedPhases));
}
if (!isValidPhase) {
if (tradeMessage != null) {
log.error("We received a {} but we are not in the correct phase. Expected phase={}, Trade phase={}, Trade state= {} ",
tradeMessage.getClass().getSimpleName(),
expectedPhase,
trade.getPhase(),
trade.getState());
} else {
log.error("We are not in the correct phase. Expected phase={}, Trade phase={}, Trade state= {} ",
expectedPhase,
trade.getPhase(),
trade.getState());
}
private boolean isPhaseValid() {
boolean isPhaseValid = expectedPhases.stream().anyMatch(e -> e == trade.getPhase());
String trigger = tradeMessage != null ?
tradeMessage.getClass().getSimpleName() :
event != null ?
event.name() + " event" :
"";
if (isPhaseValid) {
log.info("We received {} at phase {} and state {}",
trigger,
trade.getPhase(),
trade.getState());
} else {
log.error("We received {} but we are are not in the correct phase. Expected phases={}, " +
"Trade phase={}, Trade state= {} ",
trigger,
expectedPhases,
trade.getPhase(),
trade.getState());
}
return isValidPhase;
return isPhaseValid;
}
public TradeStateValidation orInPhase(Trade.Phase phase) {
alternativePhase.add(phase);
public FluentProcess orInPhase(Trade.Phase phase) {
expectedPhases.add(phase);
return this;
}
public FluentProcess onEvent(Event event) {
this.event = event;
return this;
}
public FluentProcess onMessage(TradeMessage tradeMessage) {
this.tradeMessage = tradeMessage;
return this;
}
public FluentProcess condition(boolean condition) {
this.condition = condition;
return this;
}
public FluentProcess condition(boolean condition, Runnable conditionFailedHandler) {
this.condition = condition;
this.conditionFailedHandler = conditionFailedHandler;
return this;
}
}

View File

@ -33,6 +33,7 @@ import lombok.extern.slf4j.Slf4j;
import static com.google.common.base.Preconditions.checkNotNull;
//TODO add repeated msg send
@EqualsAndHashCode(callSuper = true)
@Slf4j
public class SellerSendPayoutTxPublishedMessage extends SendMailboxMessageTask {

View File

@ -45,7 +45,6 @@ import bisq.core.payment.payload.SepaInstantAccountPayload;
import bisq.core.payment.payload.USPostalMoneyOrderAccountPayload;
import bisq.core.payment.payload.WesternUnionAccountPayload;
import bisq.core.trade.Contract;
import bisq.core.trade.Trade;
import bisq.core.trade.txproof.AssetTxProofResult;
import bisq.core.user.DontShowAgainLookup;
@ -458,8 +457,6 @@ public class SellerStep3View extends TradeStepView {
log.info("User pressed the [Confirm payment receipt] button for Trade {}", trade.getShortId());
busyAnimation.play();
statusLabel.setText(Res.get("shared.sendingConfirmation"));
if (!trade.isPayoutPublished())
trade.setState(Trade.State.SELLER_CONFIRMED_IN_UI_FIAT_PAYMENT_RECEIPT);
model.dataModel.onFiatPaymentReceived(() -> {
// In case the first send failed we got the support button displayed.