Rename given method to expect to make it more clear that it is mandatory that the condition is met. Also added a protocol error handler if the condition is not met.

For more tolerant usage we added the given method, which does not log errors and does not call a error handler.
This commit is contained in:
chimp1984 2020-09-25 21:10:20 -05:00
parent dceba6d350
commit 98dec09c18
No known key found for this signature in database
GPG key ID: 9801B4EC591F90E3
9 changed files with 141 additions and 62 deletions

View file

@ -730,8 +730,9 @@ public abstract class Trade implements Tradable, Model {
@Nullable
public Transaction getDepositTx() {
if (depositTx == null)
if (depositTx == null) {
depositTx = depositTxId != null ? btcWalletService.getTransaction(depositTxId) : null;
}
return depositTx;
}
@ -985,9 +986,9 @@ public abstract class Trade implements Tradable, Model {
}
private long getTradeStartTime() {
final long now = System.currentTimeMillis();
long now = System.currentTimeMillis();
long startTime;
final Transaction depositTx = getDepositTx();
Transaction depositTx = getDepositTx();
if (depositTx != null && getTakeOfferDate() != null) {
if (depositTx.getConfidence().getDepthInBlocks() > 0) {
final long tradeTime = getTakeOfferDate().getTime();

View file

@ -66,7 +66,7 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
public void handleTakeOfferRequest(InputsForDepositTxRequest message,
NodeAddress peer,
ErrorMessageHandler errorMessageHandler) {
given(phase(Trade.Phase.INIT)
expect(phase(Trade.Phase.INIT)
.with(message)
.from(peer))
.setup(tasks(
@ -95,7 +95,7 @@ public class BuyerAsMakerProtocol extends BuyerProtocol implements MakerProtocol
///////////////////////////////////////////////////////////////////////////////////////////
protected void handle(DelayedPayoutTxSignatureRequest message, NodeAddress peer) {
given(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
expect(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
.with(message)
.from(peer))
.setup(tasks(

View file

@ -74,7 +74,7 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
@Override
public void onTakeOffer() {
given(phase(Trade.Phase.INIT)
expect(phase(Trade.Phase.INIT)
.with(TakerEvent.TAKE_OFFER))
.setup(tasks(
ApplyFilter.class,
@ -93,7 +93,7 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
///////////////////////////////////////////////////////////////////////////////////////////
private void handle(InputsForDepositTxResponse message, NodeAddress peer) {
given(phase(Trade.Phase.INIT)
expect(phase(Trade.Phase.INIT)
.with(message)
.from(peer))
.setup(tasks(TakerProcessesInputsForDepositTxResponse.class,
@ -109,7 +109,7 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
}
protected void handle(DelayedPayoutTxSignatureRequest message, NodeAddress peer) {
given(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
expect(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
.with(message)
.from(peer))
.setup(tasks(

View file

@ -55,6 +55,8 @@ public abstract class BuyerProtocol extends MediationProtocol {
public BuyerProtocol(BuyerTrade trade) {
super(trade);
// We get called the constructor with any possible state and phase. As we don't want to log an error for such
// cases we use the alternative 'given' method instead of 'expect'.
given(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
.with(BuyerEvent.STARTUP))
.setup(tasks(BuyerSetupDepositTxListener.class))
@ -95,7 +97,7 @@ public abstract class BuyerProtocol extends MediationProtocol {
// mailbox message but the stored in mailbox case is not expected and the seller would try to send the message again
// in the hope to reach the buyer directly.
protected void handle(DepositTxAndDelayedPayoutTxMessage message, NodeAddress peer) {
given(anyPhase(Trade.Phase.TAKER_FEE_PUBLISHED, Trade.Phase.DEPOSIT_PUBLISHED)
expect(anyPhase(Trade.Phase.TAKER_FEE_PUBLISHED, Trade.Phase.DEPOSIT_PUBLISHED)
.with(message)
.from(peer)
.preCondition(trade.getDepositTx() == null || trade.getDelayedPayoutTx() == null,
@ -126,7 +128,7 @@ public abstract class BuyerProtocol extends MediationProtocol {
public void onFiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
BuyerEvent event = BuyerEvent.PAYMENT_SENT;
given(phase(Trade.Phase.DEPOSIT_CONFIRMED)
expect(phase(Trade.Phase.DEPOSIT_CONFIRMED)
.with(event)
.preCondition(!wasDisputed()))
.setup(tasks(ApplyFilter.class,
@ -152,7 +154,7 @@ public abstract class BuyerProtocol extends MediationProtocol {
///////////////////////////////////////////////////////////////////////////////////////////
protected void handle(PayoutTxPublishedMessage message, NodeAddress peer) {
given(anyPhase(Trade.Phase.FIAT_SENT, Trade.Phase.PAYOUT_PUBLISHED)
expect(anyPhase(Trade.Phase.FIAT_SENT, Trade.Phase.PAYOUT_PUBLISHED)
.with(message)
.from(peer))
.setup(tasks(BuyerProcessPayoutTxPublishedMessage.class))

View file

@ -24,8 +24,11 @@ import bisq.network.p2p.NodeAddress;
import bisq.common.taskrunner.Task;
import java.text.MessageFormat;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@ -38,6 +41,8 @@ import static com.google.common.base.Preconditions.checkArgument;
// Main class. Contains the condition and setup, if condition is valid it will execute the
// taskRunner and the optional runnable.
public class FluentProtocol {
interface Event {
String name();
}
@ -45,6 +50,7 @@ public class FluentProtocol {
private final TradeProtocol tradeProtocol;
private Condition condition;
private Setup setup;
private Consumer<Condition.Result> resultHandler;
public FluentProtocol(TradeProtocol tradeProtocol) {
this.tradeProtocol = tradeProtocol;
@ -60,16 +66,26 @@ public class FluentProtocol {
return this;
}
public FluentProtocol resultHandler(Consumer<Condition.Result> resultHandler) {
this.resultHandler = resultHandler;
return this;
}
// Can be used before or after executeTasks
public FluentProtocol run(Runnable runnable) {
if (condition.isValid()) {
Condition.Result result = condition.getResult();
if (result.isValid) {
runnable.run();
} else if (resultHandler != null) {
resultHandler.accept(result);
}
return this;
}
public FluentProtocol executeTasks() {
if (condition.isValid()) {
Condition.Result result = condition.getResult();
if (result.isValid) {
if (setup.getTimeoutSec() > 0) {
tradeProtocol.startTimeout(setup.getTimeoutSec());
}
@ -87,6 +103,8 @@ public class FluentProtocol {
TradeTaskRunner taskRunner = setup.getTaskRunner(message, condition.getEvent());
taskRunner.addTasks(setup.getTasks());
taskRunner.run();
} else if (resultHandler != null) {
resultHandler.accept(result);
}
return this;
}
@ -98,10 +116,37 @@ public class FluentProtocol {
@Slf4j
public static class Condition {
enum Result {
VALID(true),
INVALID_PHASE,
INVALID_STATE,
INVALID_PRE_CONDITION,
INVALID_TRADE_ID;
@Getter
private boolean isValid;
@Getter
private String info;
Result() {
}
Result(boolean isValid) {
this.isValid = isValid;
}
public Result info(String info) {
this.info = info;
return this;
}
}
private final Set<Trade.Phase> expectedPhases = new HashSet<>();
private final Set<Trade.State> expectedStates = new HashSet<>();
private final Set<Boolean> preConditions = new HashSet<>();
private final Trade trade;
@Nullable
private Result result;
@Nullable
@Getter
@ -115,97 +160,110 @@ public class FluentProtocol {
@Nullable
private Runnable preConditionFailedHandler;
private boolean isValid;
private boolean isValidated; // We validate only once
public Condition(Trade trade) {
this.trade = trade;
}
public Condition phase(Trade.Phase expectedPhase) {
checkArgument(!isValidated);
checkArgument(result == null);
this.expectedPhases.add(expectedPhase);
return this;
}
public Condition anyPhase(Trade.Phase... expectedPhases) {
checkArgument(!isValidated);
checkArgument(result == null);
this.expectedPhases.addAll(Set.of(expectedPhases));
return this;
}
public Condition state(Trade.State state) {
checkArgument(!isValidated);
checkArgument(result == null);
this.expectedStates.add(state);
return this;
}
public Condition anyState(Trade.State... states) {
checkArgument(!isValidated);
checkArgument(result == null);
this.expectedStates.addAll(Set.of(states));
return this;
}
public Condition with(TradeMessage message) {
checkArgument(!isValidated);
checkArgument(result == null);
this.message = message;
return this;
}
public Condition with(Event event) {
checkArgument(!isValidated);
checkArgument(result == null);
this.event = event;
return this;
}
public Condition from(NodeAddress peer) {
checkArgument(!isValidated);
checkArgument(result == null);
this.peer = peer;
return this;
}
public Condition preCondition(boolean preCondition) {
checkArgument(!isValidated);
checkArgument(result == null);
preConditions.add(preCondition);
return this;
}
public Condition preCondition(boolean preCondition, Runnable conditionFailedHandler) {
checkArgument(!isValidated);
checkArgument(result == null);
preCondition(preCondition);
this.preConditionFailedHandler = conditionFailedHandler;
return this;
}
public boolean isValid() {
if (!isValidated) {
boolean isPhaseValid = isPhaseValid();
boolean isStateValid = isStateValid();
public Result getResult() {
if (result == null) {
boolean isTradeIdValid = message == null || isTradeIdValid(trade.getId(), message);
if (!isTradeIdValid) {
String info = MessageFormat.format("TradeId does not match tradeId in message, TradeId={0}, tradeId in message={1}",
trade.getId(), message.getTradeId());
result = Result.INVALID_TRADE_ID.info(info);
return result;
}
Result phaseValidationResult = getPhaseResult();
if (!phaseValidationResult.isValid) {
result = phaseValidationResult;
return result;
}
Result stateResult = getStateResult();
if (!stateResult.isValid) {
result = stateResult;
return result;
}
boolean allPreConditionsMet = preConditions.stream().allMatch(e -> e);
boolean isTradeIdValid = message == null || isTradeIdValid(trade.getId(), message);
if (!allPreConditionsMet) {
log.error("PreConditions not met. preConditions={}, this={}, tradeId={}", preConditions, this, trade.getId());
String info = MessageFormat.format("PreConditions not met. preConditions={0}, this={1}, tradeId={2}",
preConditions, this, trade.getId());
result = Result.INVALID_PRE_CONDITION.info(info);
if (preConditionFailedHandler != null) {
preConditionFailedHandler.run();
}
}
if (!isTradeIdValid) {
log.error("TradeId does not match tradeId in message, TradeId={}, tradeId in message={}",
trade.getId(), message.getTradeId());
return result;
}
isValid = isPhaseValid && isStateValid && allPreConditionsMet && isTradeIdValid;
isValidated = true;
result = Result.VALID;
}
return isValid;
return result;
}
private boolean isPhaseValid() {
private Result getPhaseResult() {
if (expectedPhases.isEmpty()) {
return true;
return Result.VALID;
}
boolean isPhaseValid = expectedPhases.stream().anyMatch(e -> e == trade.getPhase());
@ -215,27 +273,28 @@ public class FluentProtocol {
event.name() + " event" :
"";
if (isPhaseValid) {
log.info("We received {} at phase {} and state {}, tradeId={}",
String info = MessageFormat.format("We received {0} at phase {1} and state {2}, tradeId={3}",
trigger,
trade.getPhase(),
trade.getState(),
trade.getId());
log.info(info);
return Result.VALID.info(info);
} else {
log.error("We received {} but we are are not in the expected phase. Expected phases={}, " +
"Trade phase={}, Trade state= {}, tradeId={}",
String info = MessageFormat.format("We received {0} but we are are not in the expected phase. " +
"Expected phases={1}, Trade phase={2}, Trade state= {3}, tradeId={4}",
trigger,
expectedPhases,
trade.getPhase(),
trade.getState(),
trade.getId());
return Result.INVALID_PHASE.info(info);
}
}
return isPhaseValid;
}
private boolean isStateValid() {
private Result getStateResult() {
if (expectedStates.isEmpty()) {
return true;
return Result.VALID;
}
boolean isStateValid = expectedStates.stream().anyMatch(e -> e == trade.getState());
@ -245,20 +304,21 @@ public class FluentProtocol {
event.name() + " event" :
"";
if (isStateValid) {
log.info("We received {} at state {}, tradeId={}",
String info = MessageFormat.format("We received {0} at state {1}, tradeId={2}",
trigger,
trade.getState(),
trade.getId());
log.info(info);
return Result.VALID.info(info);
} else {
log.error("We received {} but we are are not in the expected state. Expected states={}, " +
"Trade state= {}, tradeId={}",
String info = MessageFormat.format("We received {0} but we are are not in the expected state. " +
"Expected states={1}, Trade state= {2}, tradeId={3}",
trigger,
expectedStates,
trade.getState(),
trade.getId());
return Result.INVALID_STATE.info(info);
}
return isStateValid;
}
}

View file

@ -67,7 +67,7 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
public void handleTakeOfferRequest(InputsForDepositTxRequest message,
NodeAddress peer,
ErrorMessageHandler errorMessageHandler) {
given(phase(Trade.Phase.INIT)
expect(phase(Trade.Phase.INIT)
.with(message)
.from(peer))
.setup(tasks(
@ -95,7 +95,7 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
///////////////////////////////////////////////////////////////////////////////////////////
protected void handle(DepositTxMessage message, NodeAddress peer) {
given(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
expect(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
.with(message)
.from(peer))
.setup(tasks(

View file

@ -69,7 +69,7 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
@Override
public void onTakeOffer() {
given(phase(Trade.Phase.INIT)
expect(phase(Trade.Phase.INIT)
.with(TakerEvent.TAKE_OFFER)
.from(trade.getTradingPeerNodeAddress()))
.setup(tasks(
@ -88,7 +88,7 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
///////////////////////////////////////////////////////////////////////////////////////////
private void handle(InputsForDepositTxResponse message, NodeAddress peer) {
given(phase(Trade.Phase.INIT)
expect(phase(Trade.Phase.INIT)
.with(message)
.from(peer))
.setup(tasks(

View file

@ -71,7 +71,7 @@ public abstract class SellerProtocol extends MediationProtocol {
///////////////////////////////////////////////////////////////////////////////////////////
protected void handle(DelayedPayoutTxSignatureResponse message, NodeAddress peer) {
given(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
expect(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
.with(message)
.from(peer))
.setup(tasks(SellerProcessDelayedPayoutTxSignatureResponse.class,
@ -97,7 +97,7 @@ public abstract class SellerProtocol extends MediationProtocol {
///////////////////////////////////////////////////////////////////////////////////////////
protected void handle(CounterCurrencyTransferStartedMessage message, NodeAddress peer) {
given(phase(Trade.Phase.DEPOSIT_CONFIRMED)
expect(phase(Trade.Phase.DEPOSIT_CONFIRMED)
.with(message)
.from(peer)
.preCondition(trade.getPayoutTx() == null,
@ -121,7 +121,7 @@ public abstract class SellerProtocol extends MediationProtocol {
public void onFiatPaymentReceived(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
SellerEvent event = SellerEvent.PAYMENT_RECEIVED;
given(phase(Trade.Phase.FIAT_SENT)
expect(phase(Trade.Phase.FIAT_SENT)
.with(event)
.preCondition(!wasDisputed()))
.setup(tasks(

View file

@ -117,8 +117,24 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener {
// FluentProtocol
///////////////////////////////////////////////////////////////////////////////////////////
// We log an error if condition is not met and call the protocol error handler
protected FluentProtocol expect(FluentProtocol.Condition condition) {
return new FluentProtocol(this)
.condition(condition)
.resultHandler(result -> {
if (!result.isValid()) {
log.error(result.getInfo());
handleTaskRunnerFault(null,
result.name(),
result.getInfo());
}
});
}
// We execute only if condition is met but do not log an error.
protected FluentProtocol given(FluentProtocol.Condition condition) {
return new FluentProtocol(this).condition(condition);
return new FluentProtocol(this)
.condition(condition);
}
protected FluentProtocol.Condition phase(Trade.Phase expectedPhase) {
@ -277,13 +293,13 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener {
///////////////////////////////////////////////////////////////////////////////////////////
private void handleTaskRunnerSuccess(@Nullable TradeMessage message, String source) {
log.info("TaskRunner successfully completed. Triggered from {}", source);
log.info("TaskRunner successfully completed. Triggered from {}, tradeId={}", source, trade.getId());
if (message != null) {
sendAckMessage(message, true, null);
}
}
private void handleTaskRunnerFault(@Nullable TradeMessage message, String source, String errorMessage) {
void handleTaskRunnerFault(@Nullable TradeMessage message, String source, String errorMessage) {
log.error("Task runner failed with error {}. Triggered from {}", errorMessage, source);
if (message != null) {