mirror of
https://github.com/bisq-network/bisq.git
synced 2025-02-24 23:18:17 +01:00
Fix potential bug with peers pub key check. Use new isPubKeyValid method to avoid code duplication.
Before we did not apply messages if the peers key was null which can be the case at protocol start for maker side (takers key is not set yet). Makers key is in offer and available from the start. Fix incorrect setPubKeyRing call in BuyerAsTakerProtocol and BuyerProtocol. Only taker can set it from offer. This bug was introduce in past commits of this branch. Move FluentProtocol to own class file Close open offer in task instead at state listener Remove state listener Remove default timeout as not used anymore Add onWithdrawCompleted method to clean up when trade completed Rearrange code in TradeProtocol Rename doHandleDecryptedMessage to onTradeMessage Rename doApplyMailboxTradeMessage to onMailboxMessage
This commit is contained in:
parent
f059f08b27
commit
03023567a8
12 changed files with 510 additions and 444 deletions
|
@ -703,7 +703,7 @@ public abstract class Trade implements Tradable, Model {
|
|||
// or async calls there.
|
||||
// Clone to avoid ConcurrentModificationException. We remove items at the applyMailboxMessage call...
|
||||
HashSet<DecryptedMessageWithPubKey> set = new HashSet<>(decryptedMessageWithPubKeySet);
|
||||
set.forEach(msg -> tradeProtocol.applyMailboxMessage(msg, this));
|
||||
set.forEach(msg -> tradeProtocol.applyMailboxMessage(msg));
|
||||
}
|
||||
|
||||
|
||||
|
@ -773,7 +773,7 @@ public abstract class Trade implements Tradable, Model {
|
|||
// removeDecryptedMsgWithPubKey will be called synchronous after apply. We don't have threaded context
|
||||
// or async calls there.
|
||||
if (tradeProtocol != null)
|
||||
tradeProtocol.applyMailboxMessage(decryptedMessageWithPubKey, this);
|
||||
tradeProtocol.applyMailboxMessage(decryptedMessageWithPubKey);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -862,9 +862,6 @@ public abstract class Trade implements Tradable, Model {
|
|||
stateProperty.set(state);
|
||||
statePhaseProperty.set(state.getPhase());
|
||||
|
||||
if (state == State.WITHDRAW_COMPLETED && tradeProtocol != null)
|
||||
tradeProtocol.completed();
|
||||
|
||||
if (changed)
|
||||
persist();
|
||||
}
|
||||
|
|
|
@ -583,6 +583,7 @@ public class TradeManager implements PersistedDataHost {
|
|||
log.debug("onWithdraw onSuccess tx ID:" + transaction.getTxId().toString());
|
||||
addTradeToClosedTrades(trade);
|
||||
trade.setState(Trade.State.WITHDRAW_COMPLETED);
|
||||
trade.getTradeProtocol().onWithdrawCompleted();
|
||||
resultHandler.handleResult();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package bisq.core.trade.protocol;
|
||||
|
||||
|
||||
import bisq.core.offer.Offer;
|
||||
import bisq.core.trade.BuyerAsTakerTrade;
|
||||
import bisq.core.trade.Trade;
|
||||
import bisq.core.trade.messages.DelayedPayoutTxSignatureRequest;
|
||||
|
@ -46,6 +47,8 @@ import bisq.common.handlers.ResultHandler;
|
|||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
@Slf4j
|
||||
public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol {
|
||||
|
||||
|
@ -55,6 +58,9 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
|
|||
|
||||
public BuyerAsTakerProtocol(BuyerAsTakerTrade trade) {
|
||||
super(trade);
|
||||
|
||||
Offer offer = checkNotNull(trade.getOffer());
|
||||
processModel.getTradingPeer().setPubKeyRing(offer.getPubKeyRing());
|
||||
}
|
||||
|
||||
|
||||
|
@ -138,8 +144,8 @@ public class BuyerAsTakerProtocol extends BuyerProtocol implements TakerProtocol
|
|||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Override
|
||||
protected void doHandleDecryptedMessage(TradeMessage message, NodeAddress peer) {
|
||||
super.doHandleDecryptedMessage(message, peer);
|
||||
protected void onTradeMessage(TradeMessage message, NodeAddress peer) {
|
||||
super.onTradeMessage(message, peer);
|
||||
|
||||
if (message instanceof InputsForDepositTxResponse) {
|
||||
handle((InputsForDepositTxResponse) message, peer);
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package bisq.core.trade.protocol;
|
||||
|
||||
import bisq.core.offer.Offer;
|
||||
import bisq.core.trade.BuyerTrade;
|
||||
import bisq.core.trade.Trade;
|
||||
import bisq.core.trade.messages.DelayedPayoutTxSignatureRequest;
|
||||
|
@ -46,11 +45,9 @@ import bisq.common.handlers.ResultHandler;
|
|||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
@Slf4j
|
||||
public abstract class BuyerProtocol extends MediationProtocol {
|
||||
enum BuyerEvent implements TradeProtocol.Event {
|
||||
enum BuyerEvent implements FluentProtocol.Event {
|
||||
STARTUP,
|
||||
PAYMENT_SENT
|
||||
}
|
||||
|
@ -62,10 +59,6 @@ public abstract class BuyerProtocol extends MediationProtocol {
|
|||
public BuyerProtocol(BuyerTrade trade) {
|
||||
super(trade);
|
||||
|
||||
Offer offer = checkNotNull(trade.getOffer());
|
||||
processModel.getTradingPeer().setPubKeyRing(offer.getPubKeyRing());
|
||||
|
||||
|
||||
given(phase(Trade.Phase.TAKER_FEE_PUBLISHED)
|
||||
.with(BuyerEvent.STARTUP))
|
||||
.setup(tasks(BuyerSetupDepositTxListener.class))
|
||||
|
@ -85,8 +78,8 @@ public abstract class BuyerProtocol extends MediationProtocol {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void doApplyMailboxTradeMessage(TradeMessage message, NodeAddress peer) {
|
||||
super.doApplyMailboxTradeMessage(message, peer);
|
||||
public void onMailboxMessage(TradeMessage message, NodeAddress peer) {
|
||||
super.onMailboxMessage(message, peer);
|
||||
|
||||
if (message instanceof DepositTxAndDelayedPayoutTxMessage) {
|
||||
handle((DepositTxAndDelayedPayoutTxMessage) message, peer);
|
||||
|
@ -188,8 +181,8 @@ public abstract class BuyerProtocol extends MediationProtocol {
|
|||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Override
|
||||
protected void doHandleDecryptedMessage(TradeMessage message, NodeAddress peer) {
|
||||
super.doHandleDecryptedMessage(message, peer);
|
||||
protected void onTradeMessage(TradeMessage message, NodeAddress peer) {
|
||||
super.onTradeMessage(message, peer);
|
||||
|
||||
log.info("Received {} from {} with tradeId {} and uid {}",
|
||||
message.getClass().getSimpleName(), peer, message.getTradeId(), message.getUid());
|
||||
|
|
316
core/src/main/java/bisq/core/trade/protocol/FluentProtocol.java
Normal file
316
core/src/main/java/bisq/core/trade/protocol/FluentProtocol.java
Normal file
|
@ -0,0 +1,316 @@
|
|||
/*
|
||||
* This file is part of Bisq.
|
||||
*
|
||||
* Bisq is free software: you can redistribute it and/or modify it
|
||||
* under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or (at
|
||||
* your option) any later version.
|
||||
*
|
||||
* Bisq is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
|
||||
* License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package bisq.core.trade.protocol;
|
||||
|
||||
import bisq.core.trade.Trade;
|
||||
import bisq.core.trade.messages.TradeMessage;
|
||||
|
||||
import bisq.network.p2p.NodeAddress;
|
||||
|
||||
import bisq.common.taskrunner.Task;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import static bisq.core.util.Validator.isTradeIdValid;
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
|
||||
// Main class. Contains the condition and setup, if condition is valid it will execute the
|
||||
// taskRunner and the optional runnable.
|
||||
public class FluentProtocol {
|
||||
interface Event {
|
||||
String name();
|
||||
}
|
||||
|
||||
private final TradeProtocol tradeProtocol;
|
||||
private Condition condition;
|
||||
private Setup setup;
|
||||
|
||||
public FluentProtocol(TradeProtocol tradeProtocol) {
|
||||
this.tradeProtocol = tradeProtocol;
|
||||
}
|
||||
|
||||
protected FluentProtocol condition(Condition condition) {
|
||||
this.condition = condition;
|
||||
return this;
|
||||
}
|
||||
|
||||
protected FluentProtocol setup(Setup setup) {
|
||||
this.setup = setup;
|
||||
return this;
|
||||
}
|
||||
|
||||
// Can be used before or after executeTasks
|
||||
public FluentProtocol run(Runnable runnable) {
|
||||
if (condition.isValid()) {
|
||||
runnable.run();
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public FluentProtocol executeTasks() {
|
||||
if (condition.isValid()) {
|
||||
if (setup.getTimeoutSec() > 0) {
|
||||
tradeProtocol.startTimeout(setup.getTimeoutSec());
|
||||
}
|
||||
|
||||
NodeAddress peer = condition.getPeer();
|
||||
if (peer != null) {
|
||||
tradeProtocol.processModel.setTempTradingPeerNodeAddress(peer);
|
||||
}
|
||||
|
||||
TradeMessage message = condition.getMessage();
|
||||
if (message != null) {
|
||||
tradeProtocol.processModel.setTradeMessage(message);
|
||||
}
|
||||
|
||||
TradeTaskRunner taskRunner = setup.getTaskRunner(message, condition.getEvent());
|
||||
taskRunner.addTasks(setup.getTasks());
|
||||
taskRunner.run();
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Condition class
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Slf4j
|
||||
public static class Condition {
|
||||
private final Set<Trade.Phase> expectedPhases = new HashSet<>();
|
||||
private final Set<Trade.State> expectedStates = new HashSet<>();
|
||||
private final Set<Boolean> preConditions = new HashSet<>();
|
||||
private final Trade trade;
|
||||
|
||||
@Nullable
|
||||
@Getter
|
||||
private TradeMessage message;
|
||||
@Nullable
|
||||
@Getter
|
||||
private Event event;
|
||||
@Nullable
|
||||
@Getter
|
||||
private NodeAddress peer;
|
||||
@Nullable
|
||||
private Runnable preConditionFailedHandler;
|
||||
|
||||
private boolean isValid;
|
||||
private boolean isValidated; // We validate only once
|
||||
|
||||
public Condition(Trade trade) {
|
||||
this.trade = trade;
|
||||
}
|
||||
|
||||
public Condition phase(Trade.Phase expectedPhase) {
|
||||
checkArgument(!isValidated);
|
||||
this.expectedPhases.add(expectedPhase);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Condition anyPhase(Trade.Phase... expectedPhases) {
|
||||
checkArgument(!isValidated);
|
||||
this.expectedPhases.addAll(Set.of(expectedPhases));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Condition state(Trade.State state) {
|
||||
checkArgument(!isValidated);
|
||||
this.expectedStates.add(state);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Condition anyState(Trade.State... states) {
|
||||
checkArgument(!isValidated);
|
||||
this.expectedStates.addAll(Set.of(states));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Condition with(TradeMessage message) {
|
||||
checkArgument(!isValidated);
|
||||
this.message = message;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Condition with(Event event) {
|
||||
checkArgument(!isValidated);
|
||||
this.event = event;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Condition from(NodeAddress peer) {
|
||||
checkArgument(!isValidated);
|
||||
this.peer = peer;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Condition preCondition(boolean preCondition) {
|
||||
checkArgument(!isValidated);
|
||||
preConditions.add(preCondition);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Condition preCondition(boolean preCondition, Runnable conditionFailedHandler) {
|
||||
checkArgument(!isValidated);
|
||||
preCondition(preCondition);
|
||||
|
||||
this.preConditionFailedHandler = conditionFailedHandler;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isValid() {
|
||||
if (!isValidated) {
|
||||
boolean isPhaseValid = isPhaseValid();
|
||||
boolean isStateValid = isStateValid();
|
||||
|
||||
boolean allPreConditionsMet = preConditions.stream().allMatch(e -> e);
|
||||
boolean isTradeIdValid = message == null || isTradeIdValid(trade.getId(), message);
|
||||
|
||||
if (!allPreConditionsMet) {
|
||||
log.error("PreConditions not met. preConditions={}, this={}", preConditions, this);
|
||||
if (preConditionFailedHandler != null) {
|
||||
preConditionFailedHandler.run();
|
||||
}
|
||||
}
|
||||
if (!isTradeIdValid) {
|
||||
log.error("TradeId does not match tradeId in message, TradeId={}, tradeId in message={}",
|
||||
trade.getId(), message.getTradeId());
|
||||
}
|
||||
|
||||
isValid = isPhaseValid && isStateValid && allPreConditionsMet && isTradeIdValid;
|
||||
isValidated = true;
|
||||
}
|
||||
return isValid;
|
||||
}
|
||||
|
||||
private boolean isPhaseValid() {
|
||||
if (expectedPhases.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
boolean isPhaseValid = expectedPhases.stream().anyMatch(e -> e == trade.getPhase());
|
||||
String trigger = message != null ?
|
||||
message.getClass().getSimpleName() :
|
||||
event != null ?
|
||||
event.name() + " event" :
|
||||
"";
|
||||
if (isPhaseValid) {
|
||||
log.info("We received {} at phase {} and state {}",
|
||||
trigger,
|
||||
trade.getPhase(),
|
||||
trade.getState());
|
||||
} else {
|
||||
log.error("We received {} but we are are not in the correct phase. Expected phases={}, " +
|
||||
"Trade phase={}, Trade state= {} ",
|
||||
trigger,
|
||||
expectedPhases,
|
||||
trade.getPhase(),
|
||||
trade.getState());
|
||||
}
|
||||
|
||||
return isPhaseValid;
|
||||
}
|
||||
|
||||
private boolean isStateValid() {
|
||||
if (expectedStates.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
boolean isStateValid = expectedStates.stream().anyMatch(e -> e == trade.getState());
|
||||
String trigger = message != null ?
|
||||
message.getClass().getSimpleName() :
|
||||
event != null ?
|
||||
event.name() + " event" :
|
||||
"";
|
||||
if (isStateValid) {
|
||||
log.info("We received {} at state {}",
|
||||
trigger,
|
||||
trade.getState());
|
||||
} else {
|
||||
log.error("We received {} but we are are not in the correct state. Expected states={}, " +
|
||||
"Trade state= {} ",
|
||||
trigger,
|
||||
expectedStates,
|
||||
trade.getState());
|
||||
}
|
||||
|
||||
return isStateValid;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Setup class
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Slf4j
|
||||
public static class Setup {
|
||||
private final TradeProtocol tradeProtocol;
|
||||
private final Trade trade;
|
||||
@Getter
|
||||
private Class<? extends Task<Trade>>[] tasks;
|
||||
@Getter
|
||||
private int timeoutSec;
|
||||
@Nullable
|
||||
private TradeTaskRunner taskRunner;
|
||||
|
||||
public Setup(TradeProtocol tradeProtocol, Trade trade) {
|
||||
this.tradeProtocol = tradeProtocol;
|
||||
this.trade = trade;
|
||||
}
|
||||
|
||||
@SafeVarargs
|
||||
public final Setup tasks(Class<? extends Task<Trade>>... tasks) {
|
||||
this.tasks = tasks;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Setup withTimeout(int timeoutSec) {
|
||||
this.timeoutSec = timeoutSec;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Setup using(TradeTaskRunner taskRunner) {
|
||||
this.taskRunner = taskRunner;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TradeTaskRunner getTaskRunner(@Nullable TradeMessage message, @Nullable Event event) {
|
||||
if (taskRunner == null) {
|
||||
if (message != null) {
|
||||
taskRunner = new TradeTaskRunner(trade,
|
||||
() -> tradeProtocol.handleTaskRunnerSuccess(message),
|
||||
errorMessage -> tradeProtocol.handleTaskRunnerFault(message, errorMessage));
|
||||
} else if (event != null) {
|
||||
taskRunner = new TradeTaskRunner(trade,
|
||||
() -> tradeProtocol.handleTaskRunnerSuccess(event),
|
||||
errorMessage -> tradeProtocol.handleTaskRunnerFault(event, errorMessage));
|
||||
} else {
|
||||
throw new IllegalStateException("addTasks must not be called without message or event " +
|
||||
"set in case no taskRunner has been created yet");
|
||||
}
|
||||
}
|
||||
return taskRunner;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -43,7 +43,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||
@Slf4j
|
||||
public class MediationProtocol extends TradeProtocol {
|
||||
|
||||
enum DisputeEvent implements TradeProtocol.Event {
|
||||
enum DisputeEvent implements FluentProtocol.Event {
|
||||
MEDIATION_RESULT_ACCEPTED,
|
||||
MEDIATION_RESULT_REJECTED
|
||||
}
|
||||
|
@ -171,7 +171,7 @@ public class MediationProtocol extends TradeProtocol {
|
|||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Override
|
||||
protected void doHandleDecryptedMessage(TradeMessage message, NodeAddress peer) {
|
||||
protected void onTradeMessage(TradeMessage message, NodeAddress peer) {
|
||||
if (message instanceof MediatedPayoutTxSignatureMessage) {
|
||||
handle((MediatedPayoutTxSignatureMessage) message, peer);
|
||||
} else if (message instanceof MediatedPayoutTxPublishedMessage) {
|
||||
|
@ -182,13 +182,14 @@ public class MediationProtocol extends TradeProtocol {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doApplyMailboxTradeMessage(TradeMessage tradeMessage, NodeAddress peerNodeAddress) {
|
||||
if (tradeMessage instanceof MediatedPayoutTxSignatureMessage) {
|
||||
handle((MediatedPayoutTxSignatureMessage) tradeMessage, peerNodeAddress);
|
||||
} else if (tradeMessage instanceof MediatedPayoutTxPublishedMessage) {
|
||||
handle((MediatedPayoutTxPublishedMessage) tradeMessage, peerNodeAddress);
|
||||
} else if (tradeMessage instanceof PeerPublishedDelayedPayoutTxMessage) {
|
||||
handle((PeerPublishedDelayedPayoutTxMessage) tradeMessage, peerNodeAddress);
|
||||
protected void onMailboxMessage(TradeMessage message, NodeAddress peer) {
|
||||
super.onMailboxMessage(message, peer);
|
||||
if (message instanceof MediatedPayoutTxSignatureMessage) {
|
||||
handle((MediatedPayoutTxSignatureMessage) message, peer);
|
||||
} else if (message instanceof MediatedPayoutTxPublishedMessage) {
|
||||
handle((MediatedPayoutTxPublishedMessage) message, peer);
|
||||
} else if (message instanceof PeerPublishedDelayedPayoutTxMessage) {
|
||||
handle((PeerPublishedDelayedPayoutTxMessage) message, peer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -140,8 +140,8 @@ public class SellerAsMakerProtocol extends SellerProtocol implements MakerProtoc
|
|||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Override
|
||||
protected void doHandleDecryptedMessage(TradeMessage message, NodeAddress peer) {
|
||||
super.doHandleDecryptedMessage(message, peer);
|
||||
protected void onTradeMessage(TradeMessage message, NodeAddress peer) {
|
||||
super.onTradeMessage(message, peer);
|
||||
|
||||
log.info("Received {} from {} with tradeId {} and uid {}",
|
||||
message.getClass().getSimpleName(), peer, message.getTradeId(), message.getUid());
|
||||
|
|
|
@ -138,8 +138,8 @@ public class SellerAsTakerProtocol extends SellerProtocol implements TakerProtoc
|
|||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Override
|
||||
protected void doHandleDecryptedMessage(TradeMessage message, NodeAddress peer) {
|
||||
super.doHandleDecryptedMessage(message, peer);
|
||||
protected void onTradeMessage(TradeMessage message, NodeAddress peer) {
|
||||
super.onTradeMessage(message, peer);
|
||||
|
||||
log.info("Received {} from {} with tradeId {} and uid {}",
|
||||
message.getClass().getSimpleName(), peer, message.getTradeId(), message.getUid());
|
||||
|
|
|
@ -45,7 +45,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||
|
||||
@Slf4j
|
||||
public abstract class SellerProtocol extends MediationProtocol {
|
||||
enum SellerEvent implements TradeProtocol.Event {
|
||||
enum SellerEvent implements FluentProtocol.Event {
|
||||
PAYMENT_RECEIVED
|
||||
}
|
||||
|
||||
|
@ -59,8 +59,8 @@ public abstract class SellerProtocol extends MediationProtocol {
|
|||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Override
|
||||
public void doApplyMailboxTradeMessage(TradeMessage message, NodeAddress peerNodeAddress) {
|
||||
super.doApplyMailboxTradeMessage(message, peerNodeAddress);
|
||||
public void onMailboxMessage(TradeMessage message, NodeAddress peerNodeAddress) {
|
||||
super.onMailboxMessage(message, peerNodeAddress);
|
||||
|
||||
if (message instanceof CounterCurrencyTransferStartedMessage) {
|
||||
handle((CounterCurrencyTransferStartedMessage) message, peerNodeAddress);
|
||||
|
@ -146,8 +146,8 @@ public abstract class SellerProtocol extends MediationProtocol {
|
|||
|
||||
|
||||
@Override
|
||||
protected void doHandleDecryptedMessage(TradeMessage message, NodeAddress peer) {
|
||||
super.doHandleDecryptedMessage(message, peer);
|
||||
protected void onTradeMessage(TradeMessage message, NodeAddress peer) {
|
||||
super.onTradeMessage(message, peer);
|
||||
|
||||
log.info("Received {} from {} with tradeId {} and uid {}",
|
||||
message.getClass().getSimpleName(), peer, message.getTradeId(), message.getUid());
|
||||
|
|
|
@ -20,7 +20,7 @@ package bisq.core.trade.protocol;
|
|||
public interface TakerProtocol {
|
||||
void onTakeOffer();
|
||||
|
||||
enum TakerEvent implements TradeProtocol.Event {
|
||||
enum TakerEvent implements FluentProtocol.Event {
|
||||
TAKE_OFFER
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package bisq.core.trade.protocol;
|
||||
|
||||
import bisq.core.trade.MakerTrade;
|
||||
import bisq.core.trade.Trade;
|
||||
import bisq.core.trade.TradeManager;
|
||||
import bisq.core.trade.messages.CounterCurrencyTransferStartedMessage;
|
||||
|
@ -38,34 +37,14 @@ import bisq.common.crypto.PubKeyRing;
|
|||
import bisq.common.proto.network.NetworkEnvelope;
|
||||
import bisq.common.taskrunner.Task;
|
||||
|
||||
import javafx.beans.value.ChangeListener;
|
||||
|
||||
import java.security.PublicKey;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import static bisq.core.util.Validator.isTradeIdValid;
|
||||
import static bisq.core.util.Validator.nonEmptyStringOf;
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
@Slf4j
|
||||
public abstract class TradeProtocol {
|
||||
interface Event {
|
||||
String name();
|
||||
}
|
||||
|
||||
private static final long DEFAULT_TIMEOUT_SEC = 180;
|
||||
public abstract class TradeProtocol implements DecryptedDirectMessageListener {
|
||||
|
||||
protected final ProcessModel processModel;
|
||||
private DecryptedDirectMessageListener decryptedDirectMessageListener;
|
||||
private ChangeListener<Trade.State> stateChangeListener;
|
||||
protected Trade trade;
|
||||
private Timer timeoutTimer;
|
||||
|
||||
|
@ -78,31 +57,52 @@ public abstract class TradeProtocol {
|
|||
this.trade = trade;
|
||||
this.processModel = trade.getProcessModel();
|
||||
|
||||
setupListeners();
|
||||
if (!trade.isWithdrawn()) {
|
||||
processModel.getP2PService().addDecryptedDirectMessageListener(this);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// API
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
public void completed() {
|
||||
|
||||
public void onWithdrawCompleted() {
|
||||
cleanup();
|
||||
}
|
||||
|
||||
public void applyMailboxMessage(DecryptedMessageWithPubKey decryptedMessageWithPubKey, Trade trade) {
|
||||
NetworkEnvelope networkEnvelope = decryptedMessageWithPubKey.getNetworkEnvelope();
|
||||
if (processModel.getTradingPeer().getPubKeyRing() != null &&
|
||||
decryptedMessageWithPubKey.getSignaturePubKey().equals(processModel.getTradingPeer().getPubKeyRing().getSignaturePubKey())) {
|
||||
processModel.setDecryptedMessageWithPubKey(decryptedMessageWithPubKey);
|
||||
|
||||
if (networkEnvelope instanceof MailboxMessage && networkEnvelope instanceof TradeMessage) {
|
||||
this.trade = trade;
|
||||
TradeMessage message = (TradeMessage) networkEnvelope;
|
||||
public void applyMailboxMessage(DecryptedMessageWithPubKey message) {
|
||||
if (isPubKeyValid(message)) {
|
||||
NetworkEnvelope networkEnvelope = message.getNetworkEnvelope();
|
||||
if (networkEnvelope instanceof MailboxMessage &&
|
||||
networkEnvelope instanceof TradeMessage) {
|
||||
processModel.setDecryptedMessageWithPubKey(message);
|
||||
TradeMessage tradeMessage = (TradeMessage) networkEnvelope;
|
||||
NodeAddress peerNodeAddress = ((MailboxMessage) networkEnvelope).getSenderNodeAddress();
|
||||
doApplyMailboxTradeMessage(message, peerNodeAddress);
|
||||
onMailboxMessage(tradeMessage, peerNodeAddress);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void onMailboxMessage(TradeMessage message, NodeAddress peerNodeAddress) {
|
||||
log.info("Received {} as MailboxMessage from {} with tradeId {} and uid {}",
|
||||
message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid());
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// DecryptedDirectMessageListener
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Override
|
||||
public void onDirectMessage(DecryptedMessageWithPubKey message, NodeAddress peer) {
|
||||
if (isPubKeyValid(message)) {
|
||||
NetworkEnvelope networkEnvelope = message.getNetworkEnvelope();
|
||||
if (networkEnvelope instanceof TradeMessage) {
|
||||
onTradeMessage((TradeMessage) networkEnvelope, peer);
|
||||
} else if (networkEnvelope instanceof AckMessage) {
|
||||
onAckMessage((AckMessage) networkEnvelope, peer);
|
||||
}
|
||||
} else {
|
||||
log.error("SignaturePubKey in message does not match the SignaturePubKey we have stored to that trading peer.");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -110,59 +110,60 @@ public abstract class TradeProtocol {
|
|||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Abstract
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
protected abstract void doHandleDecryptedMessage(TradeMessage message, NodeAddress peer);
|
||||
|
||||
protected abstract void onTradeMessage(TradeMessage message, NodeAddress peer);
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Protected
|
||||
// FluentProtocol
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
protected void doApplyMailboxTradeMessage(TradeMessage message, NodeAddress peerNodeAddress) {
|
||||
log.info("Received {} as MailboxMessage from {} with tradeId {} and uid {}",
|
||||
message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid());
|
||||
protected FluentProtocol given(FluentProtocol.Condition condition) {
|
||||
return new FluentProtocol(this).condition(condition);
|
||||
}
|
||||
|
||||
protected void startTimeout() {
|
||||
startTimeout(DEFAULT_TIMEOUT_SEC);
|
||||
protected FluentProtocol.Condition phase(Trade.Phase expectedPhase) {
|
||||
return new FluentProtocol.Condition(trade).phase(expectedPhase);
|
||||
}
|
||||
|
||||
protected void startTimeout(long timeoutSec) {
|
||||
stopTimeout();
|
||||
|
||||
timeoutTimer = UserThread.runAfter(() -> {
|
||||
log.error("Timeout reached. TradeID={}, state={}, timeoutSec={}",
|
||||
trade.getId(), trade.stateProperty().get(), timeoutSec);
|
||||
trade.setErrorMessage("Timeout reached. Protocol did not complete in " + timeoutSec + " sec.");
|
||||
cleanupTradeOnFault();
|
||||
cleanup();
|
||||
}, timeoutSec);
|
||||
protected FluentProtocol.Condition anyPhase(Trade.Phase... expectedPhases) {
|
||||
return new FluentProtocol.Condition(trade).anyPhase(expectedPhases);
|
||||
}
|
||||
|
||||
protected void stopTimeout() {
|
||||
if (timeoutTimer != null) {
|
||||
timeoutTimer.stop();
|
||||
timeoutTimer = null;
|
||||
@SafeVarargs
|
||||
public final FluentProtocol.Setup tasks(Class<? extends Task<Trade>>... tasks) {
|
||||
return new FluentProtocol.Setup(this, trade).tasks(tasks);
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// ACK msg
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private void onAckMessage(AckMessage ackMessage, NodeAddress peer) {
|
||||
if (ackMessage.getSourceType() == AckMessageSourceType.TRADE_MESSAGE &&
|
||||
ackMessage.getSourceId().equals(trade.getId())) {
|
||||
// We handle the ack for CounterCurrencyTransferStartedMessage and DepositTxAndDelayedPayoutTxMessage
|
||||
// as we support automatic re-send of the msg in case it was not ACKed after a certain time
|
||||
if (ackMessage.getSourceMsgClassName().equals(CounterCurrencyTransferStartedMessage.class.getSimpleName())) {
|
||||
processModel.setPaymentStartedAckMessage(ackMessage);
|
||||
} else if (ackMessage.getSourceMsgClassName().equals(DepositTxAndDelayedPayoutTxMessage.class.getSimpleName())) {
|
||||
processModel.setDepositTxSentAckMessage(ackMessage);
|
||||
}
|
||||
|
||||
if (ackMessage.isSuccess()) {
|
||||
log.info("Received AckMessage for {} from {} with tradeId {} and uid {}",
|
||||
ackMessage.getSourceMsgClassName(), peer, trade.getId(), ackMessage.getSourceUid());
|
||||
} else {
|
||||
log.warn("Received AckMessage with error state for {} from {} with tradeId {} and errorMessage={}",
|
||||
ackMessage.getSourceMsgClassName(), peer, trade.getId(), ackMessage.getErrorMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void handleTaskRunnerSuccess(TradeMessage message) {
|
||||
handleTaskRunnerSuccess(message, null);
|
||||
}
|
||||
|
||||
protected void handleTaskRunnerSuccess(Event event) {
|
||||
handleTaskRunnerSuccess(null, event.name());
|
||||
}
|
||||
|
||||
protected void sendAckMessage(@Nullable TradeMessage message, boolean result, @Nullable String errorMessage) {
|
||||
// We complete at initial protocol setup with the setup listener tasks.
|
||||
// Other cases are if we start from an UI event the task runner (payment started, confirmed).
|
||||
// In such cases we have not set any message and we ignore the sendAckMessage call.
|
||||
if (message == null)
|
||||
return;
|
||||
|
||||
protected void sendAckMessage(TradeMessage message, boolean result, @Nullable String errorMessage) {
|
||||
String tradeId = message.getTradeId();
|
||||
String sourceUid = message.getUid();
|
||||
|
||||
AckMessage ackMessage = new AckMessage(processModel.getMyNodeAddress(),
|
||||
AckMessageSourceType.TRADE_MESSAGE,
|
||||
message.getClass().getSimpleName(),
|
||||
|
@ -203,98 +204,106 @@ public abstract class TradeProtocol {
|
|||
);
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Timeout
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
protected void startTimeout(long timeoutSec) {
|
||||
stopTimeout();
|
||||
|
||||
timeoutTimer = UserThread.runAfter(() -> {
|
||||
log.error("Timeout reached. TradeID={}, state={}, timeoutSec={}",
|
||||
trade.getId(), trade.stateProperty().get(), timeoutSec);
|
||||
trade.setErrorMessage("Timeout reached. Protocol did not complete in " + timeoutSec + " sec.");
|
||||
cleanupTradeOnFault();
|
||||
}, timeoutSec);
|
||||
}
|
||||
|
||||
protected void stopTimeout() {
|
||||
if (timeoutTimer != null) {
|
||||
timeoutTimer.stop();
|
||||
timeoutTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Task runner
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
protected void handleTaskRunnerSuccess(TradeMessage message) {
|
||||
handleTaskRunnerSuccess(message, message.getClass().getSimpleName());
|
||||
}
|
||||
|
||||
protected void handleTaskRunnerSuccess(FluentProtocol.Event event) {
|
||||
handleTaskRunnerSuccess(null, event.name());
|
||||
}
|
||||
|
||||
protected void handleTaskRunnerFault(TradeMessage message, String errorMessage) {
|
||||
handleTaskRunnerFault(message, message.getClass().getSimpleName(), errorMessage);
|
||||
}
|
||||
|
||||
protected void handleTaskRunnerFault(FluentProtocol.Event event, String errorMessage) {
|
||||
handleTaskRunnerFault(null, event.name(), errorMessage);
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Validation
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private boolean isPubKeyValid(DecryptedMessageWithPubKey message) {
|
||||
// We can only validate the peers pubKey if we have it already. If we are the taker we get it from the offer
|
||||
// Otherwise it depends on the state of the trade protocol if we have received the peers pubKeyRing already.
|
||||
PubKeyRing peersPubKeyRing = processModel.getTradingPeer().getPubKeyRing();
|
||||
boolean isValid = true;
|
||||
if (peersPubKeyRing != null &&
|
||||
!message.getSignaturePubKey().equals(peersPubKeyRing.getSignaturePubKey())) {
|
||||
isValid = false;
|
||||
log.error("SignaturePubKey in message does not match the SignaturePubKey we have set for our trading peer.");
|
||||
}
|
||||
return isValid;
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Private
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private void setupListeners() {
|
||||
decryptedDirectMessageListener = (decryptedMessageWithPubKey, peer) -> {
|
||||
// We check the sig only as soon we have stored the peers pubKeyRing.
|
||||
PubKeyRing tradingPeerPubKeyRing = processModel.getTradingPeer().getPubKeyRing();
|
||||
PublicKey signaturePubKey = decryptedMessageWithPubKey.getSignaturePubKey();
|
||||
if (tradingPeerPubKeyRing != null && signaturePubKey.equals(tradingPeerPubKeyRing.getSignaturePubKey())) {
|
||||
NetworkEnvelope networkEnvelope = decryptedMessageWithPubKey.getNetworkEnvelope();
|
||||
if (networkEnvelope instanceof TradeMessage) {
|
||||
TradeMessage message = (TradeMessage) networkEnvelope;
|
||||
nonEmptyStringOf(message.getTradeId());
|
||||
|
||||
if (message.getTradeId().equals(processModel.getOfferId())) {
|
||||
doHandleDecryptedMessage(message, peer);
|
||||
}
|
||||
} else if (networkEnvelope instanceof AckMessage) {
|
||||
AckMessage ackMessage = (AckMessage) networkEnvelope;
|
||||
if (ackMessage.getSourceType() == AckMessageSourceType.TRADE_MESSAGE &&
|
||||
ackMessage.getSourceId().equals(trade.getId())) {
|
||||
// We handle the ack for CounterCurrencyTransferStartedMessage and DepositTxAndDelayedPayoutTxMessage
|
||||
// as we support automatic re-send of the msg in case it was not ACKed after a certain time
|
||||
if (ackMessage.getSourceMsgClassName().equals(CounterCurrencyTransferStartedMessage.class.getSimpleName())) {
|
||||
processModel.setPaymentStartedAckMessage(ackMessage);
|
||||
} else if (ackMessage.getSourceMsgClassName().equals(DepositTxAndDelayedPayoutTxMessage.class.getSimpleName())) {
|
||||
processModel.setDepositTxSentAckMessage(ackMessage);
|
||||
}
|
||||
|
||||
if (ackMessage.isSuccess()) {
|
||||
log.info("Received AckMessage for {} from {} with tradeId {} and uid {}",
|
||||
ackMessage.getSourceMsgClassName(), peer, ackMessage.getSourceId(), ackMessage.getSourceUid());
|
||||
} else {
|
||||
log.warn("Received AckMessage with error state for {} from {} with tradeId {} and errorMessage={}",
|
||||
ackMessage.getSourceMsgClassName(), peer, ackMessage.getSourceId(), ackMessage.getErrorMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
processModel.getP2PService().addDecryptedDirectMessageListener(decryptedDirectMessageListener);
|
||||
|
||||
//todo move
|
||||
stateChangeListener = (observable, oldValue, newValue) -> {
|
||||
if (newValue.getPhase() == Trade.Phase.TAKER_FEE_PUBLISHED && trade instanceof MakerTrade)
|
||||
processModel.getOpenOfferManager().closeOpenOffer(checkNotNull(trade.getOffer()));
|
||||
};
|
||||
trade.stateProperty().addListener(stateChangeListener);
|
||||
private void handleTaskRunnerSuccess(@Nullable TradeMessage message, String source) {
|
||||
log.info("TaskRunner successfully completed. Triggered from {}", source);
|
||||
if (message != null) {
|
||||
sendAckMessage(message, true, null);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleTaskRunnerFault(@Nullable TradeMessage message, String source, String errorMessage) {
|
||||
log.error("Task runner failed with error {}. Triggered from {}", errorMessage, source);
|
||||
|
||||
private void handleTaskRunnerSuccess(@Nullable TradeMessage message, @Nullable String trigger) {
|
||||
String triggerEvent = trigger != null ? trigger :
|
||||
message != null ? message.getClass().getSimpleName() : "N/A";
|
||||
log.info("TaskRunner successfully completed. {}", "Triggered from message " + triggerEvent);
|
||||
|
||||
sendAckMessage(message, true, null);
|
||||
}
|
||||
|
||||
protected void handleTaskRunnerFault(@Nullable TradeMessage message, String errorMessage) {
|
||||
log.error("Task runner failed on {} with error {}", message, errorMessage);
|
||||
|
||||
sendAckMessage(message, false, errorMessage);
|
||||
|
||||
if (message != null) {
|
||||
sendAckMessage(message, false, errorMessage);
|
||||
}
|
||||
cleanupTradeOnFault();
|
||||
cleanup();
|
||||
}
|
||||
|
||||
protected void handleTaskRunnerFault(@Nullable Event event, String errorMessage) {
|
||||
log.error("Task runner failed on {} with error {}", event, errorMessage);
|
||||
|
||||
cleanupTradeOnFault();
|
||||
cleanup();
|
||||
}
|
||||
|
||||
private void cleanup() {
|
||||
stopTimeout();
|
||||
trade.stateProperty().removeListener(stateChangeListener);
|
||||
// We removed that from here earlier as it broke the trade process in some non critical error cases.
|
||||
// But it should be actually removed...
|
||||
processModel.getP2PService().removeDecryptedDirectMessageListener(decryptedDirectMessageListener);
|
||||
processModel.getP2PService().removeDecryptedDirectMessageListener(this);
|
||||
}
|
||||
|
||||
//todo
|
||||
private void cleanupTradeOnFault() {
|
||||
Trade.State state = trade.getState();
|
||||
log.warn("cleanupTradableOnFault tradeState={}", state);
|
||||
cleanup();
|
||||
|
||||
log.warn("cleanupTradableOnFault tradeState={}", trade.getState());
|
||||
TradeManager tradeManager = processModel.getTradeManager();
|
||||
if (trade.isInPreparation()) {
|
||||
// no funds left. we just clean up the trade list
|
||||
tradeManager.removePreparedTrade(trade);
|
||||
} else if (!trade.isFundsLockedIn()) {
|
||||
// No deposit tx published yet
|
||||
if (processModel.getPreparedDepositTx() == null) {
|
||||
if (trade.isTakerFeePublished()) {
|
||||
tradeManager.addTradeToFailedTrades(trade);
|
||||
|
@ -307,266 +316,4 @@ public abstract class TradeProtocol {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// FluentProtocol
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
protected FluentProtocol given(Condition condition) {
|
||||
return new FluentProtocol(condition);
|
||||
}
|
||||
|
||||
protected Condition phase(Trade.Phase expectedPhase) {
|
||||
return new Condition(trade, expectedPhase);
|
||||
}
|
||||
|
||||
protected Condition anyPhase(Trade.Phase... expectedPhases) {
|
||||
return new Condition(trade, expectedPhases);
|
||||
}
|
||||
|
||||
@SafeVarargs
|
||||
public final Setup tasks(Class<? extends Task<Trade>>... tasks) {
|
||||
return new Setup(trade, tasks);
|
||||
}
|
||||
|
||||
// Main class. Contains the condition and setup, if condition is valid it will execute the
|
||||
// taskRunner and the optional runnable.
|
||||
class FluentProtocol {
|
||||
private final Condition condition;
|
||||
private Setup setup;
|
||||
|
||||
public FluentProtocol(Condition condition) {
|
||||
this.condition = condition;
|
||||
}
|
||||
|
||||
protected FluentProtocol setup(Setup setup) {
|
||||
this.setup = setup;
|
||||
return this;
|
||||
}
|
||||
|
||||
// Can be used before or after executeTasks
|
||||
public FluentProtocol run(Runnable runnable) {
|
||||
if (condition.isValid()) {
|
||||
runnable.run();
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public FluentProtocol executeTasks() {
|
||||
if (condition.isValid()) {
|
||||
if (setup.getTimeoutSec() > 0) {
|
||||
startTimeout(setup.getTimeoutSec());
|
||||
}
|
||||
|
||||
NodeAddress peer = condition.getPeer();
|
||||
if (peer != null) {
|
||||
processModel.setTempTradingPeerNodeAddress(peer);
|
||||
}
|
||||
|
||||
TradeMessage message = condition.getMessage();
|
||||
if (message != null) {
|
||||
processModel.setTradeMessage(message);
|
||||
}
|
||||
TradeTaskRunner taskRunner = setup.getTaskRunner(message, condition.getEvent());
|
||||
taskRunner.addTasks(setup.getTasks());
|
||||
taskRunner.run();
|
||||
}
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
static class Condition {
|
||||
private final Trade trade;
|
||||
@Nullable
|
||||
@Getter
|
||||
private TradeMessage message;
|
||||
private final Set<Trade.Phase> expectedPhases = new HashSet<>();
|
||||
private final Set<Trade.State> expectedStates = new HashSet<>();
|
||||
private final Set<Boolean> preConditions = new HashSet<>();
|
||||
@Nullable
|
||||
@Getter
|
||||
private Event event;
|
||||
@Getter
|
||||
private NodeAddress peer;
|
||||
private boolean isValid;
|
||||
private boolean isValidated;
|
||||
private Runnable preConditionFailedHandler;
|
||||
|
||||
public Condition(Trade trade, Trade.Phase expectedPhase) {
|
||||
this.expectedPhases.add(expectedPhase);
|
||||
this.trade = trade;
|
||||
}
|
||||
|
||||
public Condition(Trade trade, Trade.Phase... expectedPhases) {
|
||||
this.expectedPhases.addAll(Set.of(expectedPhases));
|
||||
this.trade = trade;
|
||||
}
|
||||
|
||||
public Condition state(Trade.State state) {
|
||||
this.expectedStates.add(state);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Condition anyState(Trade.State... states) {
|
||||
this.expectedStates.addAll(Set.of(states));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Condition with(Event event) {
|
||||
checkArgument(!isValidated);
|
||||
this.event = event;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Condition with(TradeMessage message) {
|
||||
checkArgument(!isValidated);
|
||||
this.message = message;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Condition from(NodeAddress peer) {
|
||||
checkArgument(!isValidated);
|
||||
this.peer = peer;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Condition preCondition(boolean preCondition) {
|
||||
checkArgument(!isValidated);
|
||||
preConditions.add(preCondition);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Condition preCondition(boolean preCondition, Runnable conditionFailedHandler) {
|
||||
checkArgument(!isValidated);
|
||||
preConditions.add(preCondition);
|
||||
this.preConditionFailedHandler = conditionFailedHandler;
|
||||
return this;
|
||||
}
|
||||
|
||||
private boolean isValid() {
|
||||
if (!isValidated) {
|
||||
boolean isPhaseValid = isPhaseValid();
|
||||
boolean isStateValid = isStateValid();
|
||||
|
||||
boolean allPreConditionsMet = preConditions.stream().allMatch(e -> e);
|
||||
boolean isTradeIdValid = message == null || isTradeIdValid(trade.getId(), message);
|
||||
|
||||
if (!allPreConditionsMet) {
|
||||
log.error("PreConditions not met. preConditions={}, this={}", preConditions, this);
|
||||
if (preConditionFailedHandler != null) {
|
||||
preConditionFailedHandler.run();
|
||||
}
|
||||
}
|
||||
if (!isTradeIdValid) {
|
||||
log.error("TradeId does not match tradeId in message, TradeId={}, tradeId in message={}",
|
||||
trade.getId(), message.getTradeId());
|
||||
}
|
||||
|
||||
isValid = isPhaseValid && isStateValid && allPreConditionsMet && isTradeIdValid;
|
||||
isValidated = true;
|
||||
}
|
||||
return isValid;
|
||||
}
|
||||
|
||||
private boolean isPhaseValid() {
|
||||
if (expectedPhases.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
boolean isPhaseValid = expectedPhases.stream().anyMatch(e -> e == trade.getPhase());
|
||||
String trigger = message != null ?
|
||||
message.getClass().getSimpleName() :
|
||||
event != null ?
|
||||
event.name() + " event" :
|
||||
"";
|
||||
if (isPhaseValid) {
|
||||
log.info("We received {} at phase {} and state {}",
|
||||
trigger,
|
||||
trade.getPhase(),
|
||||
trade.getState());
|
||||
} else {
|
||||
log.error("We received {} but we are are not in the correct phase. Expected phases={}, " +
|
||||
"Trade phase={}, Trade state= {} ",
|
||||
trigger,
|
||||
expectedPhases,
|
||||
trade.getPhase(),
|
||||
trade.getState());
|
||||
}
|
||||
|
||||
return isPhaseValid;
|
||||
}
|
||||
|
||||
private boolean isStateValid() {
|
||||
if (expectedStates.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
boolean isStateValid = expectedStates.stream().anyMatch(e -> e == trade.getState());
|
||||
String trigger = message != null ?
|
||||
message.getClass().getSimpleName() :
|
||||
event != null ?
|
||||
event.name() + " event" :
|
||||
"";
|
||||
if (isStateValid) {
|
||||
log.info("We received {} at state {}",
|
||||
trigger,
|
||||
trade.getState());
|
||||
} else {
|
||||
log.error("We received {} but we are are not in the correct state. Expected states={}, " +
|
||||
"Trade state= {} ",
|
||||
trigger,
|
||||
expectedStates,
|
||||
trade.getState());
|
||||
}
|
||||
|
||||
return isStateValid;
|
||||
}
|
||||
}
|
||||
|
||||
// Setup for task runner
|
||||
class Setup {
|
||||
private final Trade trade;
|
||||
@Getter
|
||||
private final Class<? extends Task<Trade>>[] tasks;
|
||||
@Getter
|
||||
private int timeoutSec;
|
||||
@Nullable
|
||||
private TradeTaskRunner taskRunner;
|
||||
|
||||
@SafeVarargs
|
||||
public Setup(Trade trade, Class<? extends Task<Trade>>... tasks) {
|
||||
this.trade = trade;
|
||||
this.tasks = tasks;
|
||||
}
|
||||
|
||||
public Setup withTimeout(int timeoutSec) {
|
||||
this.timeoutSec = timeoutSec;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Setup using(TradeTaskRunner taskRunner) {
|
||||
this.taskRunner = taskRunner;
|
||||
return this;
|
||||
}
|
||||
|
||||
private TradeTaskRunner getTaskRunner(@Nullable TradeMessage message, @Nullable Event event) {
|
||||
if (taskRunner == null) {
|
||||
if (message != null) {
|
||||
taskRunner = new TradeTaskRunner(trade,
|
||||
() -> handleTaskRunnerSuccess(message),
|
||||
errorMessage -> handleTaskRunnerFault(message, errorMessage));
|
||||
} else if (event != null) {
|
||||
taskRunner = new TradeTaskRunner(trade,
|
||||
() -> handleTaskRunnerSuccess(event),
|
||||
errorMessage -> handleTaskRunnerFault(event, errorMessage));
|
||||
} else {
|
||||
throw new IllegalStateException("addTasks must not be called without message or event " +
|
||||
"set in case no taskRunner has been created yet");
|
||||
}
|
||||
}
|
||||
return taskRunner;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -90,6 +90,11 @@ public abstract class MakerSendsInputsForDepositTxResponse extends TradeTask {
|
|||
|
||||
trade.setState(Trade.State.MAKER_SENT_PUBLISH_DEPOSIT_TX_REQUEST);
|
||||
|
||||
// We could consider to remove the offer later once the deposit is published to reduce the risk
|
||||
// of lost maker fee in case of a failed trade. The protocol has not maker specific tasks at that moment on
|
||||
// so it would require a bit of extra work to add that (without using instance of checks...).
|
||||
processModel.getOpenOfferManager().closeOpenOffer(checkNotNull(trade.getOffer()));
|
||||
|
||||
NodeAddress peersNodeAddress = trade.getTradingPeerNodeAddress();
|
||||
log.info("Send {} to peer {}. tradeId={}, uid={}",
|
||||
message.getClass().getSimpleName(), peersNodeAddress, message.getTradeId(), message.getUid());
|
||||
|
|
Loading…
Add table
Reference in a new issue