Use ifInPhase APi for testing expected phase(s)

Add ifInPhase method to allow alternative phases. We can have parallel branches how we reach a new state, e.g. receiving the tx from network or receiving it from the peer. no guarantee which will happen first.

Allow phase transition to skip a future phase as we have phases only relevant for one role. This is not good for treading it as a state machine state, we need to redesign the state/phase handling...
This commit is contained in:
chimp1984 2020-09-22 21:09:09 -05:00
parent 48241af316
commit 7c6f0ac9b2
No known key found for this signature in database
GPG Key ID: 9801B4EC591F90E3
5 changed files with 197 additions and 159 deletions

View File

@ -127,7 +127,7 @@ public abstract class Trade implements Tradable, Model {
MAKER_SEND_FAILED_PUBLISH_DEPOSIT_TX_REQUEST(Phase.TAKER_FEE_PUBLISHED),
// taker perspective
TAKER_RECEIVED_PUBLISH_DEPOSIT_TX_REQUEST(Phase.TAKER_FEE_PUBLISHED),
TAKER_RECEIVED_PUBLISH_DEPOSIT_TX_REQUEST(Phase.TAKER_FEE_PUBLISHED), // Not used anymore
// #################### Phase DEPOSIT_PUBLISHED
@ -232,10 +232,11 @@ public abstract class Trade implements Tradable, Model {
return protobuf.Trade.Phase.valueOf(phase.name());
}
// We allow a phase change only if the phase is next phase
// We allow a phase change only if the phase a future phase (we cannot limit it to next phase as we have cases where
// we skip a phase as it is only relevant to one role -> states and phases need a redesign ;-( )
public boolean isValidTransitionTo(Phase newPhase) {
// this is current phase
return newPhase.ordinal() == this.ordinal() + 1;
return newPhase.ordinal() > this.ordinal();
}
}
@ -833,7 +834,8 @@ public abstract class Trade implements Tradable, Model {
if (state.isValidTransitionTo(newState)) {
setState(newState);
} else {
log.warn("State change is not getting applied because it would cause an invalid transition.");
log.warn("State change is not getting applied because it would cause an invalid transition. " +
"Trade state={}, intended state={}", state, newState);
}
}

View File

@ -118,28 +118,31 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol
public void handleTakeOfferRequest(InputsForDepositTxRequest tradeMessage,
NodeAddress peerNodeAddress,
ErrorMessageHandler errorMessageHandler) {
Validator.checkTradeId(processModel.getOfferId(), tradeMessage);
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(peerNodeAddress);
ifInPhase(Trade.Phase.INIT, tradeMessage)
.run(() -> {
Validator.checkTradeId(processModel.getOfferId(), tradeMessage);
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(peerNodeAddress);
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsMakerTrade,
() -> handleTaskRunnerSuccess(tradeMessage, "handleTakeOfferRequest"),
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(errorMessage);
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsMakerTrade,
() -> handleTaskRunnerSuccess(tradeMessage, "handleTakeOfferRequest"),
errorMessage -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(errorMessage);
});
taskRunner.addTasks(
MakerProcessesInputsForDepositTxRequest.class,
ApplyFilter.class,
VerifyPeersAccountAgeWitness.class,
MakerVerifyTakerFeePayment.class,
MakerSetsLockTime.class,
MakerCreateAndSignContract.class,
BuyerAsMakerCreatesAndSignsDepositTx.class,
BuyerSetupDepositTxListener.class,
BuyerAsMakerSendsInputsForDepositTxResponse.class
);
taskRunner.run();
});
taskRunner.addTasks(
MakerProcessesInputsForDepositTxRequest.class,
ApplyFilter.class,
VerifyPeersAccountAgeWitness.class,
MakerVerifyTakerFeePayment.class,
MakerSetsLockTime.class,
MakerCreateAndSignContract.class,
BuyerAsMakerCreatesAndSignsDepositTx.class,
BuyerSetupDepositTxListener.class,
BuyerAsMakerSendsInputsForDepositTxResponse.class
);
taskRunner.run();
}
@ -148,19 +151,22 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol
///////////////////////////////////////////////////////////////////////////////////////////
private void handle(DelayedPayoutTxSignatureRequest tradeMessage, NodeAddress peerNodeAddress) {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(peerNodeAddress);
ifInPhase(Trade.Phase.TAKER_FEE_PUBLISHED, tradeMessage)
.run(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(peerNodeAddress);
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsMakerTrade,
() -> handleTaskRunnerSuccess(tradeMessage, "handle DelayedPayoutTxSignatureRequest"),
errorMessage -> handleTaskRunnerFault(tradeMessage, errorMessage));
taskRunner.addTasks(
BuyerProcessDelayedPayoutTxSignatureRequest.class,
BuyerVerifiesPreparedDelayedPayoutTx.class,
BuyerSignsDelayedPayoutTx.class,
BuyerSendsDelayedPayoutTxSignatureResponse.class
);
taskRunner.run();
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsMakerTrade,
() -> handleTaskRunnerSuccess(tradeMessage, "handle DelayedPayoutTxSignatureRequest"),
errorMessage -> handleTaskRunnerFault(tradeMessage, errorMessage));
taskRunner.addTasks(
BuyerProcessDelayedPayoutTxSignatureRequest.class,
BuyerVerifiesPreparedDelayedPayoutTx.class,
BuyerSignsDelayedPayoutTx.class,
BuyerSendsDelayedPayoutTxSignatureResponse.class
);
taskRunner.run();
});
}
// The DepositTxAndDelayedPayoutTxMessage is a mailbox message as earlier we use only the deposit tx which can
@ -181,7 +187,7 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol
return;
}
ifInPhase(Trade.Phase.TAKER_FEE_PUBLISHED, tradeMessage)
ifInPhase(Trade.Phase.TAKER_FEE_PUBLISHED, tradeMessage).orInPhase(Trade.Phase.DEPOSIT_PUBLISHED)
.run(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(peerNodeAddress);
@ -210,28 +216,27 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol
checkArgument(!wasDisputed(), "A call to onFiatPaymentStarted is not permitted once a " +
"dispute has been opened.");
if (trade.isDepositConfirmed() && !trade.isFiatSent()) {
buyerAsMakerTrade.setState(Trade.State.BUYER_CONFIRMED_IN_UI_FIAT_PAYMENT_INITIATED);
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsMakerTrade,
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess("onFiatPaymentStarted");
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(errorMessage);
});
taskRunner.addTasks(
ApplyFilter.class,
MakerVerifyTakerFeePayment.class,
BuyerSignPayoutTx.class,
BuyerSetupPayoutTxListener.class,
BuyerSendCounterCurrencyTransferStartedMessage.class
);
taskRunner.run();
} else {
log.warn("onFiatPaymentStarted called twice. tradeState=" + trade.getState());
}
ifInPhase(Trade.Phase.DEPOSIT_CONFIRMED)
.run(() -> {
buyerAsMakerTrade.setState(Trade.State.BUYER_CONFIRMED_IN_UI_FIAT_PAYMENT_INITIATED);
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsMakerTrade,
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess("onFiatPaymentStarted");
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(errorMessage);
});
taskRunner.addTasks(
ApplyFilter.class,
MakerVerifyTakerFeePayment.class,
BuyerSignPayoutTx.class,
BuyerSetupPayoutTxListener.class,
BuyerSendCounterCurrencyTransferStartedMessage.class
);
taskRunner.run();
});
}
@ -240,17 +245,20 @@ public class BuyerAsMakerProtocol extends TradeProtocol implements BuyerProtocol
///////////////////////////////////////////////////////////////////////////////////////////
private void handle(PayoutTxPublishedMessage tradeMessage, NodeAddress peerNodeAddress) {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(peerNodeAddress);
ifInPhase(Trade.Phase.FIAT_SENT, tradeMessage).orInPhase(Trade.Phase.PAYOUT_PUBLISHED)
.run(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(peerNodeAddress);
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsMakerTrade,
() -> handleTaskRunnerSuccess(tradeMessage, "handle PayoutTxPublishedMessage"),
errorMessage -> handleTaskRunnerFault(tradeMessage, errorMessage));
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsMakerTrade,
() -> handleTaskRunnerSuccess(tradeMessage, "handle PayoutTxPublishedMessage"),
errorMessage -> handleTaskRunnerFault(tradeMessage, errorMessage));
taskRunner.addTasks(
BuyerProcessPayoutTxPublishedMessage.class
);
taskRunner.run();
taskRunner.addTasks(
BuyerProcessPayoutTxPublishedMessage.class
);
taskRunner.run();
});
}

View File

@ -125,23 +125,26 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
@Override
public void takeAvailableOffer() {
processModel.setTempTradingPeerNodeAddress(trade.getTradingPeerNodeAddress());
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsTakerTrade,
() -> handleTaskRunnerSuccess("takeAvailableOffer"),
this::handleTaskRunnerFault);
ifInPhase(Trade.Phase.INIT)
.run(() -> {
processModel.setTempTradingPeerNodeAddress(trade.getTradingPeerNodeAddress());
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsTakerTrade,
() -> handleTaskRunnerSuccess("takeAvailableOffer"),
this::handleTaskRunnerFault);
taskRunner.addTasks(
ApplyFilter.class,
TakerVerifyMakerFeePayment.class,
CreateTakerFeeTx.class,
BuyerAsTakerCreatesDepositTxInputs.class,
TakerSendInputsForDepositTxRequest.class
);
taskRunner.addTasks(
ApplyFilter.class,
TakerVerifyMakerFeePayment.class,
CreateTakerFeeTx.class,
BuyerAsTakerCreatesDepositTxInputs.class,
TakerSendInputsForDepositTxRequest.class
);
//TODO if peer does get an error he does not respond and all we get is the timeout now knowing why it failed.
// We should add an error message the peer sends us in such cases.
startTimeout();
taskRunner.run();
//TODO if peer does get an error he does not respond and all we get is the timeout now knowing why it failed.
// We should add an error message the peer sends us in such cases.
startTimeout();
taskRunner.run();
});
}
@ -150,44 +153,50 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
///////////////////////////////////////////////////////////////////////////////////////////
private void handle(InputsForDepositTxResponse tradeMessage, NodeAddress sender) {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(sender);
ifInPhase(Trade.Phase.INIT, tradeMessage)
.run(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(sender);
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsTakerTrade,
() -> {
handleTaskRunnerSuccess(tradeMessage, "handle InputsForDepositTxResponse");
},
errorMessage -> handleTaskRunnerFault(tradeMessage, errorMessage));
taskRunner.addTasks(
TakerProcessesInputsForDepositTxResponse.class,
ApplyFilter.class,
VerifyPeersAccountAgeWitness.class,
TakerVerifyAndSignContract.class,
TakerPublishFeeTx.class,
BuyerAsTakerSignsDepositTx.class,
BuyerSetupDepositTxListener.class,
BuyerAsTakerSendsDepositTxMessage.class
);
taskRunner.run();
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsTakerTrade,
() -> {
handleTaskRunnerSuccess(tradeMessage, "handle InputsForDepositTxResponse");
},
errorMessage -> handleTaskRunnerFault(tradeMessage, errorMessage));
taskRunner.addTasks(
TakerProcessesInputsForDepositTxResponse.class,
ApplyFilter.class,
VerifyPeersAccountAgeWitness.class,
TakerVerifyAndSignContract.class,
TakerPublishFeeTx.class,
BuyerAsTakerSignsDepositTx.class,
BuyerSetupDepositTxListener.class,
BuyerAsTakerSendsDepositTxMessage.class
);
taskRunner.run();
});
}
private void handle(DelayedPayoutTxSignatureRequest tradeMessage, NodeAddress sender) {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(sender);
ifInPhase(Trade.Phase.TAKER_FEE_PUBLISHED, tradeMessage)
.run(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(sender);
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsTakerTrade,
() -> {
stopTimeout(); // We stop timeout here as last DepositTxAndDelayedPayoutTxMessage is not mandatory
handleTaskRunnerSuccess(tradeMessage, "handle DelayedPayoutTxSignatureRequest");
},
errorMessage -> handleTaskRunnerFault(tradeMessage, errorMessage));
taskRunner.addTasks(
BuyerProcessDelayedPayoutTxSignatureRequest.class,
BuyerVerifiesPreparedDelayedPayoutTx.class,
BuyerSignsDelayedPayoutTx.class,
BuyerSendsDelayedPayoutTxSignatureResponse.class
);
taskRunner.run();
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsTakerTrade,
() -> {
stopTimeout(); // We stop timeout here as last DepositTxAndDelayedPayoutTxMessage is not mandatory
handleTaskRunnerSuccess(tradeMessage, "handle DelayedPayoutTxSignatureRequest");
},
errorMessage -> handleTaskRunnerFault(tradeMessage, errorMessage));
taskRunner.addTasks(
BuyerProcessDelayedPayoutTxSignatureRequest.class,
BuyerVerifiesPreparedDelayedPayoutTx.class,
BuyerSignsDelayedPayoutTx.class,
BuyerSendsDelayedPayoutTxSignatureResponse.class
);
taskRunner.run();
});
}
// The DepositTxAndDelayedPayoutTxMessage is a mailbox message as earlier we use only the deposit tx which can
@ -208,7 +217,7 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
return;
}
ifInPhase(Trade.Phase.TAKER_FEE_PUBLISHED, tradeMessage)
ifInPhase(Trade.Phase.TAKER_FEE_PUBLISHED, tradeMessage).orInPhase(Trade.Phase.DEPOSIT_PUBLISHED)
.run(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(peerNodeAddress);
@ -238,29 +247,27 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
checkArgument(!wasDisputed(), "A call to onFiatPaymentStarted is not permitted once a " +
"dispute has been opened.");
if (!trade.isFiatSent()) {
buyerAsTakerTrade.setState(Trade.State.BUYER_CONFIRMED_IN_UI_FIAT_PAYMENT_INITIATED);
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsTakerTrade,
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess("onFiatPaymentStarted");
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(errorMessage);
});
taskRunner.addTasks(
ApplyFilter.class,
TakerVerifyMakerFeePayment.class,
BuyerSignPayoutTx.class,
BuyerSetupPayoutTxListener.class,
BuyerSendCounterCurrencyTransferStartedMessage.class
);
taskRunner.run();
} else {
log.warn("onFiatPaymentStarted called twice. tradeState=" + trade.getState());
}
ifInPhase(Trade.Phase.DEPOSIT_CONFIRMED)
.run(() -> {
buyerAsTakerTrade.setState(Trade.State.BUYER_CONFIRMED_IN_UI_FIAT_PAYMENT_INITIATED);
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsTakerTrade,
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess("onFiatPaymentStarted");
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(errorMessage);
});
taskRunner.addTasks(
ApplyFilter.class,
TakerVerifyMakerFeePayment.class,
BuyerSignPayoutTx.class,
BuyerSetupPayoutTxListener.class,
BuyerSendCounterCurrencyTransferStartedMessage.class
);
taskRunner.run();
});
}
@ -269,18 +276,20 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
///////////////////////////////////////////////////////////////////////////////////////////
private void handle(PayoutTxPublishedMessage tradeMessage, NodeAddress peerNodeAddress) {
log.debug("handle PayoutTxPublishedMessage called");
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(peerNodeAddress);
ifInPhase(Trade.Phase.FIAT_SENT, tradeMessage).orInPhase(Trade.Phase.PAYOUT_PUBLISHED)
.run(() -> {
processModel.setTradeMessage(tradeMessage);
processModel.setTempTradingPeerNodeAddress(peerNodeAddress);
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsTakerTrade,
() -> handleTaskRunnerSuccess(tradeMessage, "handle PayoutTxPublishedMessage"),
errorMessage -> handleTaskRunnerFault(tradeMessage, errorMessage));
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsTakerTrade,
() -> handleTaskRunnerSuccess(tradeMessage, "handle PayoutTxPublishedMessage"),
errorMessage -> handleTaskRunnerFault(tradeMessage, errorMessage));
taskRunner.addTasks(
BuyerProcessPayoutTxPublishedMessage.class
);
taskRunner.run();
taskRunner.addTasks(
BuyerProcessPayoutTxPublishedMessage.class
);
taskRunner.run();
});
}

View File

@ -56,7 +56,9 @@ import javafx.beans.value.ChangeListener;
import java.security.PublicKey;
import lombok.Getter;
import java.util.HashSet;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
@ -420,11 +422,14 @@ public abstract class TradeProtocol {
}
static class TradeStateValidation {
@Getter
private final boolean isValidPhase;
private final Trade trade;
private final Trade.Phase expectedPhase;
@Nullable
private final TradeMessage tradeMessage;
private Set<Trade.Phase> alternativePhase = new HashSet<>();
protected TradeStateValidation run(Runnable runnable) {
if (isValidPhase) {
if (isValidPhase()) {
runnable.run();
}
return this;
@ -437,20 +442,35 @@ public abstract class TradeProtocol {
public TradeStateValidation(Trade trade,
Trade.Phase expectedPhase,
@Nullable TradeMessage tradeMessage) {
isValidPhase = trade.getPhase() == expectedPhase;
this.trade = trade;
this.expectedPhase = expectedPhase;
this.tradeMessage = tradeMessage;
}
public boolean isValidPhase() {
boolean isValidPhase = trade.getPhase() == expectedPhase ||
(alternativePhase.stream().anyMatch(e -> e == trade.getPhase()));
if (!isValidPhase) {
if (tradeMessage != null) {
log.error("We received a {} but we are not in the correct phase. Phase={}, State= {} ",
log.error("We received a {} but we are not in the correct phase. Expected phase={}, Trade phase={}, Trade state= {} ",
tradeMessage.getClass().getSimpleName(),
expectedPhase,
trade.getPhase(),
trade.getState());
} else {
log.error("We are not in the correct phase. Phase={}, State= {} ",
log.error("We are not in the correct phase. Expected phase={}, Trade phase={}, Trade state= {} ",
expectedPhase,
trade.getPhase(),
trade.getState());
}
}
return isValidPhase;
}
public TradeStateValidation orInPhase(Trade.Phase phase) {
alternativePhase.add(phase);
return this;
}
}
}

View File

@ -81,7 +81,6 @@ public class TakerProcessesInputsForDepositTxResponse extends TradeTask {
// update to the latest peer address of our peer if the message is correct
trade.setTradingPeerNodeAddress(processModel.getTempTradingPeerNodeAddress());
trade.setState(Trade.State.TAKER_RECEIVED_PUBLISH_DEPOSIT_TX_REQUEST);
complete();
} catch (Throwable t) {