Add reset method to trade protocol.

Add map for maintaining pending protocols by offerID.

Fixes a bug when the same taker got an early failure in a take offer attempt before the maker removes the offer and the taker did not persist the failed trade, thus the protection to not be permitted to take the same offer after a failure does not prevent another take offer attempt of the same taker. In such a case the maker has the old pending protocol listening for the trade messages and both the old and new protocol instance process those messages. In case the model data has changes (e.g. diff. inputs) this can cause a failed trade.
We do not remove the message listeners on failures to tolerate minor failures which can be recovered by resend routines.
There could be more solid fixes for that issue but this approach seems to carry less risks.
This commit is contained in:
HenrikJannsen 2022-12-26 12:29:33 -05:00
parent 72b55f429a
commit 3f4a069202
No known key found for this signature in database
GPG key ID: 02AA2BAE387C8307
2 changed files with 32 additions and 5 deletions

View file

@ -156,7 +156,18 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
private final Provider provider; private final Provider provider;
private final ClockWatcher clockWatcher; private final ClockWatcher clockWatcher;
private final Map<String, TradeProtocol> tradeProtocolByTradeId = new HashMap<>(); // We use uid for that map not the trade ID
private final Map<String, TradeProtocol> tradeProtocolByTradeUid = new HashMap<>();
// We maintain a map with trade (offer) ID to reset a pending trade protocol for the same offer.
// Pending trade protocol could happen in edge cases when an early error did not cause a removal of the
// offer and the same peer takes the offer later again. Usually it is prevented for the taker to take again after a
// failure but that is only based on failed trades state and it can be that either the taker deletes the failed trades
// file or it was not persisted. Such rare cases could lead to a pending protocol and when taker takes again the
// offer the message listener from the old pending protocol gets invoked and processes the messages based on
// potentially outdated model data (e.g. old inputs).
private final Map<String, TradeProtocol> pendingTradeProtocolByTradeId = new HashMap<>();
private final PersistenceManager<TradableList<Trade>> persistenceManager; private final PersistenceManager<TradableList<Trade>> persistenceManager;
private final TradableList<Trade> tradableList = new TradableList<>(); private final TradableList<Trade> tradableList = new TradableList<>();
@Getter @Getter
@ -408,15 +419,20 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
public TradeProtocol getTradeProtocol(TradeModel trade) { public TradeProtocol getTradeProtocol(TradeModel trade) {
String uid = trade.getUid(); String uid = trade.getUid();
if (tradeProtocolByTradeId.containsKey(uid)) { if (tradeProtocolByTradeUid.containsKey(uid)) {
return tradeProtocolByTradeId.get(uid); return tradeProtocolByTradeUid.get(uid);
} else { } else {
TradeProtocol tradeProtocol = TradeProtocolFactory.getNewTradeProtocol(trade); TradeProtocol tradeProtocol = TradeProtocolFactory.getNewTradeProtocol(trade);
TradeProtocol prev = tradeProtocolByTradeId.put(uid, tradeProtocol); TradeProtocol prev = tradeProtocolByTradeUid.put(uid, tradeProtocol);
if (prev != null) { if (prev != null) {
log.error("We had already an entry with uid {}", trade.getUid()); log.error("We had already an entry with uid {}", trade.getUid());
} }
TradeProtocol pending = pendingTradeProtocolByTradeId.put(trade.getId(), tradeProtocol);
if (pending != null) {
pending.reset();
}
return tradeProtocol; return tradeProtocol;
} }
} }
@ -618,7 +634,7 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
private TradeProtocol createTradeProtocol(TradeModel tradeModel) { private TradeProtocol createTradeProtocol(TradeModel tradeModel) {
TradeProtocol tradeProtocol = TradeProtocolFactory.getNewTradeProtocol(tradeModel); TradeProtocol tradeProtocol = TradeProtocolFactory.getNewTradeProtocol(tradeModel);
TradeProtocol prev = tradeProtocolByTradeId.put(tradeModel.getUid(), tradeProtocol); TradeProtocol prev = tradeProtocolByTradeUid.put(tradeModel.getUid(), tradeProtocol);
if (prev != null) { if (prev != null) {
log.error("We had already an entry with uid {}", tradeModel.getUid()); log.error("We had already an entry with uid {}", tradeModel.getUid());
} }
@ -626,6 +642,11 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
tradableList.add((Trade) tradeModel); tradableList.add((Trade) tradeModel);
} }
TradeProtocol pending = pendingTradeProtocolByTradeId.put(tradeModel.getId(), tradeProtocol);
if (pending != null) {
pending.reset();
}
// For BsqTrades we only store the trade at completion // For BsqTrades we only store the trade at completion
return tradeProtocol; return tradeProtocol;

View file

@ -96,6 +96,12 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D
cleanup(); cleanup();
} }
// Resets a potentially pending protocol
public void reset() {
tradeModel.setErrorMessage("Outdated pending protocol got reset.");
protocolModel.getP2PService().removeDecryptedDirectMessageListener(this);
}
protected void onMailboxMessage(TradeMessage message, NodeAddress peerNodeAddress) { protected void onMailboxMessage(TradeMessage message, NodeAddress peerNodeAddress) {
log.info("Received {} as MailboxMessage from {} with tradeId {} and uid {}", log.info("Received {} as MailboxMessage from {} with tradeId {} and uid {}",
message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid()); message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid());