Add timeouts, improve interrupted trade handling

This commit is contained in:
Manfred Karrer 2015-04-04 19:24:53 +02:00
parent 76a4a7f2c3
commit f498d393b0
20 changed files with 185 additions and 102 deletions

View file

@ -40,7 +40,6 @@ import javax.crypto.NoSuchPaddingException;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
// TODO consider to use SealedObject
public class EncryptionService<T> {
private static final Logger log = LoggerFactory.getLogger(EncryptionService.class);
private static final String ALGO_SYM = "AES";
@ -55,15 +54,21 @@ public class EncryptionService<T> {
}
public KeyPair getGeneratedDSAKeyPair() throws NoSuchAlgorithmException {
long ts = System.currentTimeMillis();
final KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("DSA");
keyPairGenerator.initialize(1024);
return keyPairGenerator.genKeyPair();
KeyPair keyPair = keyPairGenerator.genKeyPair();
log.debug("getGeneratedDSAKeyPair needed {} ms", System.currentTimeMillis() - ts);
return keyPair;
}
public KeyPair getGeneratedRSAKeyPair() throws NoSuchAlgorithmException {
long ts = System.currentTimeMillis();
KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance(ALGO_ASYM);
keyPairGenerator.initialize(KEY_SIZE_ASYM);
return keyPairGenerator.genKeyPair();
KeyPair keyPair = keyPairGenerator.genKeyPair();
log.debug("getGeneratedRSAKeyPair needed {} ms", System.currentTimeMillis() - ts);
return keyPair;
}
public Bucket encryptObject(PublicKey publicKey, Object object) throws IllegalBlockSizeException, InvalidKeyException, BadPaddingException,
@ -78,6 +83,7 @@ public class EncryptionService<T> {
public Bucket encrypt(PublicKey publicKey, byte[] payload) throws NoSuchAlgorithmException, NoSuchPaddingException, InvalidKeyException,
BadPaddingException, IllegalBlockSizeException {
long ts = System.currentTimeMillis();
// Create symmetric key and
KeyGenerator keyGenerator = KeyGenerator.getInstance(ALGO_SYM);
keyGenerator.init(KEY_SIZE_SYM);
@ -95,11 +101,13 @@ public class EncryptionService<T> {
cipherSym.init(Cipher.ENCRYPT_MODE, keySpec);
log.debug("encrypt payload length: " + payload.length);
byte[] encryptedPayload = cipherSym.doFinal(payload);
log.debug("Encryption needed {} ms", System.currentTimeMillis() - ts);
return new Bucket(encryptedKey, encryptedPayload);
}
public byte[] decrypt(PrivateKey privateKey, Bucket bucket) throws NoSuchPaddingException, NoSuchAlgorithmException,
InvalidKeyException, BadPaddingException, IllegalBlockSizeException {
long ts = System.currentTimeMillis();
byte[] encryptedPayload = bucket.encryptedPayload;
byte[] encryptedKey = bucket.encryptedKey;
@ -114,6 +122,7 @@ public class EncryptionService<T> {
cipherSym.init(Cipher.DECRYPT_MODE, key);
byte[] payload = cipherSym.doFinal(encryptedPayload);
log.debug("decrypt payload length: " + payload.length);
log.debug("Decryption needed {} ms", System.currentTimeMillis() - ts);
return payload;
}
}

View file

@ -74,7 +74,7 @@ class ClosedTradesDataModel implements Activatable, DataModel {
list.addAll(tradeManager.getClosedTrades().stream().map(ClosedTradesListItem::new).collect(Collectors.toList()));
// we sort by date, earliest first
list.sort((o1, o2) -> o2.getTrade().getDate().compareTo(o1.getTrade().getDate()));
list.sort((o1, o2) -> o2.getTrade().getTakeOfferDate().compareTo(o1.getTrade().getTakeOfferDate()));
}
}

View file

@ -65,7 +65,7 @@ class ClosedTradesViewModel extends ActivatableWithDataModel<ClosedTradesDataMod
}
String getDate(ClosedTradesListItem item) {
return formatter.formatDateTime(item.getTrade().getDate());
return formatter.formatDateTime(item.getTrade().getTakeOfferDate());
}
String getState(ClosedTradesListItem item) {

View file

@ -102,7 +102,7 @@ class PendingTradesDataModel implements Activatable, DataModel {
list.addAll(tradeManager.getPendingTrades().stream().map(PendingTradesListItem::new).collect(Collectors.toList()));
// we sort by date, earliest first
list.sort((o1, o2) -> o2.getTrade().getDate().compareTo(o1.getTrade().getDate()));
list.sort((o1, o2) -> o2.getTrade().getTakeOfferDate().compareTo(o1.getTrade().getTakeOfferDate()));
log.debug("onListChanged {}", list.size());
if (list.size() > 0)

View file

@ -65,7 +65,7 @@ public class PendingTradesListItem {
}
public Date getDate() {
return trade.getDate();
return trade.getTakeOfferDate();
}
public String getId() {

View file

@ -54,7 +54,6 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad
SELLER_WAIT_TX_CONF,
SELLER_WAIT_PAYMENT_STARTED,
SELLER_CONFIRM_RECEIVE_PAYMENT,
SELLER_PUBLISH_PAYOUT_TX,
SELLER_SEND_PUBLISHED_MSG,
SELLER_COMPLETED,
@ -290,7 +289,6 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad
break;
case FIAT_PAYMENT_RECEIVED:
viewState.set(ViewState.SELLER_PUBLISH_PAYOUT_TX);
break;
case PAYOUT_PUBLISHED:
viewState.set(ViewState.SELLER_SEND_PUBLISHED_MSG);
@ -302,9 +300,6 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad
case MESSAGE_SENDING_FAILED:
viewState.set(ViewState.MESSAGE_SENDING_FAILED);
break;
case TIMEOUT:
viewState.set(ViewState.TIMEOUT);
break;
case EXCEPTION:
viewState.set(ViewState.EXCEPTION);
break;
@ -335,7 +330,6 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad
break;
case FIAT_PAYMENT_RECEIVED:
viewState.set(ViewState.SELLER_PUBLISH_PAYOUT_TX);
break;
case PAYOUT_PUBLISHED:
viewState.set(ViewState.SELLER_SEND_PUBLISHED_MSG);
@ -347,9 +341,6 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad
case MESSAGE_SENDING_FAILED:
viewState.set(ViewState.MESSAGE_SENDING_FAILED);
break;
case TIMEOUT:
viewState.set(ViewState.TIMEOUT);
break;
case EXCEPTION:
viewState.set(ViewState.EXCEPTION);
break;
@ -434,9 +425,6 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad
case MESSAGE_SENDING_FAILED:
viewState.set(ViewState.MESSAGE_SENDING_FAILED);
break;
case TIMEOUT:
viewState.set(ViewState.TIMEOUT);
break;
case EXCEPTION:
viewState.set(ViewState.EXCEPTION);
break;

View file

@ -123,12 +123,15 @@ public class SellerSubView extends TradeSubView {
"your security deposit and the Bitcoin buyer receive the Bitcoin amount you sold.",
model.getCurrencyCode()));
break;
case SELLER_PUBLISH_PAYOUT_TX:
((ConfirmFiatReceivedView) tradeStepDetailsView).setStatusText("Publishing transaction...");
break;
case SELLER_SEND_PUBLISHED_MSG:
((ConfirmFiatReceivedView) tradeStepDetailsView).setStatusText("Sending message to trading partner...");
if (confirmFiatReceived == null) {
waitTxInBlockchain.done();
waitFiatStarted.done();
showItem(confirmFiatReceived);
}
((ConfirmFiatReceivedView) tradeStepDetailsView).setStatusText("Sending message to trading peer transaction...");
break;
case SELLER_COMPLETED:
waitTxInBlockchain.done();

View file

@ -24,6 +24,7 @@ import io.bitsquare.gui.util.Layout;
import javafx.beans.value.ChangeListener;
import javafx.event.ActionEvent;
import javafx.scene.*;
import javafx.scene.control.*;
import org.slf4j.Logger;
@ -41,6 +42,7 @@ public class ConfirmFiatReceivedView extends TradeStepDetailsView {
private Button confirmFiatReceivedButton;
private Label statusLabel;
private ProgressIndicator statusProgressIndicator;
private Parent root;
///////////////////////////////////////////////////////////////////////////////////////////
@ -67,6 +69,9 @@ public class ConfirmFiatReceivedView extends TradeStepDetailsView {
model.txId.removeListener(txIdChangeListener);
txIdTextField.cleanup();
statusLabel.setText("Publishing transaction...");
if (root != null)
root.setMouseTransparent(false);
}
@ -81,6 +86,8 @@ public class ConfirmFiatReceivedView extends TradeStepDetailsView {
confirmFiatReceivedButton.setDisable(true);
statusProgressIndicator.setVisible(true);
statusProgressIndicator.setProgress(-1);
root = statusProgressIndicator.getScene().getRoot();
root.setMouseTransparent(true);
}

View file

@ -26,6 +26,7 @@ import io.bitsquare.locale.BSResources;
import javafx.beans.value.ChangeListener;
import javafx.event.ActionEvent;
import javafx.scene.*;
import javafx.scene.control.*;
import org.slf4j.Logger;
@ -48,6 +49,7 @@ public class StartFiatView extends TradeStepDetailsView {
private final ChangeListener<String> txIdChangeListener;
private ProgressIndicator statusProgressIndicator;
private Parent root;
///////////////////////////////////////////////////////////////////////////////////////////
@ -94,6 +96,8 @@ public class StartFiatView extends TradeStepDetailsView {
model.txId.removeListener(txIdChangeListener);
txIdTextField.cleanup();
if (root != null)
root.setMouseTransparent(false);
}
///////////////////////////////////////////////////////////////////////////////////////////
@ -107,6 +111,8 @@ public class StartFiatView extends TradeStepDetailsView {
statusProgressIndicator.setVisible(true);
statusProgressIndicator.setProgress(-1);
statusLabel.setText("Sending message to trading partner...");
root = statusProgressIndicator.getScene().getRoot();
root.setMouseTransparent(true);
}

View file

@ -148,13 +148,13 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic
@Override
public void addMessageHandler(MessageHandler listener) {
if (!messageHandlers.add(listener))
throw new IllegalArgumentException("Add listener did not change list. Probably listener has been already added.");
log.error("Add listener did not change list. Probably listener has been already added.");
}
@Override
public void removeMessageHandler(MessageHandler listener) {
if (!messageHandlers.remove(listener))
throw new IllegalArgumentException("Try to remove listener which was never added.");
log.error("Try to remove listener which was never added.");
}
@ -171,10 +171,10 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic
if (message instanceof Message)
executor.execute(() -> messageHandlers.stream().forEach(e -> e.handleMessage((Message) message, new TomP2PPeer(sender))));
else
throw new RuntimeException("We got an object which is not type of Message. That must never happen. Request object = " + message);
log.error("We got an object which is not type of Message. That must never happen. Request object = " + message);
}
else {
throw new RuntimeException("Received msg from myself. That must never happen.");
log.error("Received msg from myself. That must never happen.");
}
return true;

View file

@ -90,10 +90,10 @@ abstract public class Trade implements Model, Serializable {
// Immutable
private final Offer offer;
private final Date date;
private final ProcessModel processModel;
// Mutable
private Date takeOfferDate;
protected TradeState.ProcessState processState;
protected TradeState.LifeCycleState lifeCycleState;
private MailboxMessage mailboxMessage;
@ -118,7 +118,6 @@ abstract public class Trade implements Model, Serializable {
this.offer = offer;
this.storage = storage;
date = new Date();
processModel = new ProcessModel();
tradeVolumeProperty = new SimpleObjectProperty<>();
tradeAmountProperty = new SimpleObjectProperty<>();
@ -165,8 +164,11 @@ abstract public class Trade implements Model, Serializable {
createProtocol();
if (mailboxMessage != null)
tradeProtocol.setMailboxMessage(mailboxMessage);
if (mailboxMessage != null) {
tradeProtocol.applyMailboxMessage(mailboxMessage);
// After applied to protocol we remove it
mailboxMessage = null;
}
}
protected void initStateProperties() {
@ -210,10 +212,6 @@ abstract public class Trade implements Model, Serializable {
public void setMailboxMessage(MailboxMessage mailboxMessage) {
this.mailboxMessage = mailboxMessage;
if (tradeProtocol != null)
tradeProtocol.setMailboxMessage(mailboxMessage);
storage.queueUpForSave();
}
public void setStorage(Storage<? extends TradeList> storage) {
@ -253,10 +251,6 @@ abstract public class Trade implements Model, Serializable {
// Getter only
///////////////////////////////////////////////////////////////////////////////////////////
public Date getDate() {
return date;
}
public String getId() {
return offer.getId();
}
@ -312,6 +306,14 @@ abstract public class Trade implements Model, Serializable {
// Getter/Setter for Mutable objects
///////////////////////////////////////////////////////////////////////////////////////////
public Date getTakeOfferDate() {
return takeOfferDate;
}
public void setTakeOfferDate(Date takeOfferDate) {
this.takeOfferDate = takeOfferDate;
}
public void setTradingPeer(Peer tradingPeer) {
this.tradingPeer = tradingPeer;
}
@ -439,7 +441,7 @@ abstract public class Trade implements Model, Serializable {
", storage=" + storage +
", tradeProtocol=" + tradeProtocol +
", offer=" + offer +
", date=" + date +
", date=" + takeOfferDate +
", processModel=" + processModel +
", processState=" + processState +
", lifeCycleState=" + lifeCycleState +

View file

@ -59,6 +59,7 @@ import com.google.common.util.concurrent.FutureCallback;
import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -152,6 +153,7 @@ public class TradeManager {
// When all services are initialized we create the protocols for our open offers and persisted pendingTrades
// OffererAsBuyerProtocol listens for take offer requests, so we need to instantiate it early.
public void onAllServicesInitialized() {
log.trace("onAllServicesInitialized");
for (Trade trade : openOfferTrades) {
Offer offer = trade.getOffer();
// We add own offers to offerbook when we go online again
@ -161,8 +163,48 @@ public class TradeManager {
setupDepositPublishedListener(trade);
trade.setStorage(openOfferTradesStorage);
initTrade(trade);
}
// If there are messages in our mailbox we apply it and remove them from the DHT
// We run that before initializing the pending trades to be sure the state is correct
mailboxService.getAllMessages(user.getP2pSigPubKey(),
(encryptedMailboxMessages) -> {
log.trace("mailboxService.getAllMessages success");
setMailboxMessagesToTrades(encryptedMailboxMessages);
emptyMailbox();
initPendingTrades();
});
}
private void setMailboxMessagesToTrades(List<EncryptedMailboxMessage> encryptedMailboxMessages) {
log.trace("applyMailboxMessage encryptedMailboxMessage.size=" + encryptedMailboxMessages.size());
for (EncryptedMailboxMessage encrypted : encryptedMailboxMessages) {
try {
MailboxMessage mailboxMessage = encryptionService.decryptToObject(user.getP2pEncryptPrivateKey(), encrypted.getBucket());
if (mailboxMessage instanceof TradeMessage) {
String tradeId = ((TradeMessage) mailboxMessage).tradeId;
Optional<Trade> tradeOptional = pendingTrades.stream().filter(e -> e.getId().equals(tradeId)).findAny();
if (tradeOptional.isPresent())
tradeOptional.get().setMailboxMessage(mailboxMessage);
}
} catch (Throwable e) {
e.printStackTrace();
log.error(e.getMessage());
}
}
}
private void emptyMailbox() {
mailboxService.removeAllMessages(user.getP2pSigPubKey(),
() -> log.debug("All mailbox entries removed"),
(errorMessage, fault) -> {
log.error(errorMessage);
log.error(fault.getMessage());
});
}
private void initPendingTrades() {
log.trace("initPendingTrades");
List<Trade> failedTrades = new ArrayList<>();
for (Trade trade : pendingTrades) {
// We continue an interrupted trade.
@ -188,42 +230,8 @@ public class TradeManager {
pendingTrades.remove(trade);
closedTrades.add(trade);
}
// if there are messages in our mailbox we apply it and remove them from the DHT
mailboxService.getAllMessages(user.getP2pSigPubKey(),
(encryptedMailboxMessages) -> {
setMailboxMessagesToTrades(encryptedMailboxMessages);
emptyMailbox();
});
}
private void setMailboxMessagesToTrades(List<EncryptedMailboxMessage> encryptedMailboxMessages) {
log.trace("applyMailboxMessage encryptedMailboxMessage.size=" + encryptedMailboxMessages.size());
for (EncryptedMailboxMessage encrypted : encryptedMailboxMessages) {
try {
MailboxMessage mailboxMessage = encryptionService.decryptToObject(user.getP2pEncryptPrivateKey(), encrypted.getBucket());
if (mailboxMessage instanceof TradeMessage) {
String tradeId = ((TradeMessage) mailboxMessage).tradeId;
Optional<Trade> tradeOptional = pendingTrades.stream().filter(e -> e.getId().equals(tradeId)).findAny();
if (tradeOptional.isPresent())
tradeOptional.get().setMailboxMessage(mailboxMessage);
}
} catch (Throwable e) {
e.printStackTrace();
log.error(e.getMessage());
}
}
}
private void emptyMailbox() {
mailboxService.removeAllMessages(user.getP2pSigPubKey(),
() -> log.debug("All mailbox entries removed"),
(errorMessage, fault) -> {
log.error(errorMessage);
log.error(fault.getMessage());
});
}
public void shutDown() {
if (!shutDownRequested) {
@ -298,6 +306,7 @@ public class TradeManager {
() -> log.debug("remove offer was successful"),
log::error,
false);
trade.setTakeOfferDate(new Date());
pendingTrades.add(trade);
trade.setStorage(pendingTradesStorage);
}
@ -383,6 +392,7 @@ public class TradeManager {
else
trade = new BuyerAsTakerTrade(offer, amount, model.getPeer(), pendingTradesStorage);
trade.setTakeOfferDate(new Date());
initTrade(trade);
pendingTrades.add(trade);
if (trade instanceof TakerTrade)

View file

@ -29,16 +29,24 @@ import io.bitsquare.trade.protocol.availability.tasks.GetPeerAddress;
import io.bitsquare.trade.protocol.availability.tasks.ProcessReportOfferAvailabilityMessage;
import io.bitsquare.trade.protocol.availability.tasks.RequestIsOfferAvailable;
import java.util.Timer;
import java.util.TimerTask;
import javafx.application.Platform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CheckOfferAvailabilityProtocol {
private static final Logger log = LoggerFactory.getLogger(CheckOfferAvailabilityProtocol.class);
private static final long TIMEOUT = 10000;
private final CheckOfferAvailabilityModel model;
private final ResultHandler resultHandler;
private final ErrorMessageHandler errorMessageHandler;
private final MessageHandler messageHandler;
private Timer timeoutTimer;
private boolean isCanceled;
private TaskRunner<CheckOfferAvailabilityModel> taskRunner;
@ -56,6 +64,7 @@ public class CheckOfferAvailabilityProtocol {
}
private void cleanup() {
stopTimeout();
model.messageService.removeMessageHandler(messageHandler);
}
@ -78,6 +87,7 @@ public class CheckOfferAvailabilityProtocol {
GetPeerAddress.class,
RequestIsOfferAvailable.class
);
startTimeout();
taskRunner.run();
}
@ -95,11 +105,12 @@ public class CheckOfferAvailabilityProtocol {
private void handleMessage(Message message, @SuppressWarnings("UnusedParameters") Peer sender) {
if (!isCanceled) {
if (message instanceof ReportOfferAvailabilityMessage && model.offer.getId().equals(((ReportOfferAvailabilityMessage) message).offerId))
handleReportOfferAvailabilityMessage((ReportOfferAvailabilityMessage) message);
handle((ReportOfferAvailabilityMessage) message);
}
}
private void handleReportOfferAvailabilityMessage(ReportOfferAvailabilityMessage message) {
private void handle(ReportOfferAvailabilityMessage message) {
stopTimeout();
model.setMessage(message);
taskRunner = new TaskRunner<>(model,
@ -115,4 +126,30 @@ public class CheckOfferAvailabilityProtocol {
taskRunner.addTasks(ProcessReportOfferAvailabilityMessage.class);
taskRunner.run();
}
protected void startTimeout() {
log.debug("startTimeout");
stopTimeout();
timeoutTimer = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
Platform.runLater(() -> {
log.debug("Timeout reached");
errorMessageHandler.handleErrorMessage("Timeout reached: Peer has not responded.");
});
}
};
timeoutTimer.schedule(task, TIMEOUT);
}
protected void stopTimeout() {
log.debug("stopTimeout");
if (timeoutTimer != null) {
timeoutTimer.cancel();
timeoutTimer = null;
}
}
}

View file

@ -74,11 +74,11 @@ public class BuyerAsOffererProtocol extends TradeProtocol {
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void setMailboxMessage(MailboxMessage mailboxMessage) {
public void applyMailboxMessage(MailboxMessage mailboxMessage) {
log.debug("setMailboxMessage " + mailboxMessage);
// Might be called twice, so check that its only processed once
if (processModel.getMailboxMessage() == null) {
processModel.setMailboxMessage(mailboxMessage);
if (!processModel.isMailboxMessageProcessed()) {
processModel.mailboxMessageProcessed();
if (mailboxMessage instanceof PayoutTxPublishedMessage) {
handle((PayoutTxPublishedMessage) mailboxMessage);
}
@ -137,9 +137,11 @@ public class BuyerAsOffererProtocol extends TradeProtocol {
SendRequestPayDepositMessage.class
);
taskRunner.run();
startTimeout();
}
private void handle(RequestPublishDepositTxMessage tradeMessage) {
stopTimeout();
processModel.setTradeMessage(tradeMessage);
TaskRunner<Trade> taskRunner = new TaskRunner<>(buyerAsOffererTrade,

View file

@ -71,11 +71,11 @@ public class BuyerAsTakerProtocol extends TradeProtocol {
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void setMailboxMessage(MailboxMessage mailboxMessage) {
public void applyMailboxMessage(MailboxMessage mailboxMessage) {
log.debug("setMailboxMessage " + mailboxMessage);
// Might be called twice, so check that its only processed once
if (processModel.getMailboxMessage() == null) {
processModel.setMailboxMessage(mailboxMessage);
if (!processModel.isMailboxMessageProcessed()) {
processModel.mailboxMessageProcessed();
if (mailboxMessage instanceof PayoutTxPublishedMessage) {
handle((PayoutTxPublishedMessage) mailboxMessage);
}
@ -94,6 +94,7 @@ public class BuyerAsTakerProtocol extends TradeProtocol {
SendRequestPayDepositMessage.class
);
taskRunner.run();
startTimeout();
}
@ -102,6 +103,7 @@ public class BuyerAsTakerProtocol extends TradeProtocol {
///////////////////////////////////////////////////////////////////////////////////////////
private void handle(RequestPublishDepositTxMessage tradeMessage) {
stopTimeout();
processModel.setTradeMessage(tradeMessage);
TaskRunner<Trade> taskRunner = new TaskRunner<>(buyerAsTakerTrade,
@ -137,7 +139,7 @@ public class BuyerAsTakerProtocol extends TradeProtocol {
///////////////////////////////////////////////////////////////////////////////////////////
// Incoming message handling
// After peer has received Fiat tx
///////////////////////////////////////////////////////////////////////////////////////////
private void handle(PayoutTxPublishedMessage tradeMessage) {

View file

@ -26,7 +26,6 @@ import io.bitsquare.common.taskrunner.Model;
import io.bitsquare.crypto.SignatureService;
import io.bitsquare.fiat.FiatAccount;
import io.bitsquare.offer.Offer;
import io.bitsquare.p2p.MailboxMessage;
import io.bitsquare.p2p.MessageService;
import io.bitsquare.trade.protocol.trade.messages.TradeMessage;
import io.bitsquare.user.User;
@ -67,7 +66,7 @@ public class ProcessModel implements Model, Serializable {
// Mutable
public final TradingPeer tradingPeer;
transient private MailboxMessage mailboxMessage;
transient private boolean mailboxMessageProcessed;
transient private TradeMessage tradeMessage;
private String takeOfferFeeTxId;
private Transaction payoutTx;
@ -158,13 +157,13 @@ public class ProcessModel implements Model, Serializable {
return tradeMessage;
}
public void setMailboxMessage(MailboxMessage mailboxMessage) {
this.mailboxMessage = mailboxMessage;
public void mailboxMessageProcessed() {
this.mailboxMessageProcessed = true;
}
@Nullable
public MailboxMessage getMailboxMessage() {
return mailboxMessage;
public boolean isMailboxMessageProcessed() {
return mailboxMessageProcessed;
}
@Nullable

View file

@ -73,11 +73,11 @@ public class SellerAsOffererProtocol extends TradeProtocol {
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void setMailboxMessage(MailboxMessage mailboxMessage) {
public void applyMailboxMessage(MailboxMessage mailboxMessage) {
log.debug("setMailboxMessage " + mailboxMessage);
// Might be called twice, so check that its only processed once
if (processModel.getMailboxMessage() == null) {
processModel.setMailboxMessage(mailboxMessage);
if (!processModel.isMailboxMessageProcessed()) {
processModel.mailboxMessageProcessed();
if (mailboxMessage instanceof FiatTransferStartedMessage) {
handle((FiatTransferStartedMessage) mailboxMessage);
}
@ -148,9 +148,11 @@ public class SellerAsOffererProtocol extends TradeProtocol {
SendRequestPublishDepositTxMessage.class
);
taskRunner.run();
startTimeout();
}
private void handle(DepositTxPublishedMessage tradeMessage) {
stopTimeout();
processModel.setTradeMessage(tradeMessage);
TaskRunner<Trade> taskRunner = new TaskRunner<>(sellerAsOffererTrade,
@ -164,6 +166,11 @@ public class SellerAsOffererProtocol extends TradeProtocol {
taskRunner.run();
}
///////////////////////////////////////////////////////////////////////////////////////////
// After peer has started Fiat tx
///////////////////////////////////////////////////////////////////////////////////////////
private void handle(FiatTransferStartedMessage tradeMessage) {
processModel.setTradeMessage(tradeMessage);

View file

@ -73,11 +73,11 @@ public class SellerAsTakerProtocol extends TradeProtocol {
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void setMailboxMessage(MailboxMessage mailboxMessage) {
public void applyMailboxMessage(MailboxMessage mailboxMessage) {
log.debug("setMailboxMessage " + mailboxMessage);
// Might be called twice, so check that its only processed once
if (processModel.getMailboxMessage() == null) {
processModel.setMailboxMessage(mailboxMessage);
if (!processModel.isMailboxMessageProcessed()) {
processModel.mailboxMessageProcessed();
if (mailboxMessage instanceof FiatTransferStartedMessage) {
handle((FiatTransferStartedMessage) mailboxMessage);
}
@ -89,10 +89,7 @@ public class SellerAsTakerProtocol extends TradeProtocol {
public void takeAvailableOffer() {
TaskRunner<Trade> taskRunner = new TaskRunner<>(sellerAsTakerTrade,
() -> {
log.debug("taskRunner at takeAvailableOffer completed");
startTimeout();
},
() -> log.debug("taskRunner at takeAvailableOffer completed"),
this::handleTaskRunnerFault);
taskRunner.addTasks(
@ -101,6 +98,7 @@ public class SellerAsTakerProtocol extends TradeProtocol {
SendRequestDepositTxInputsMessage.class
);
taskRunner.run();
startTimeout();
}
@ -125,9 +123,11 @@ public class SellerAsTakerProtocol extends TradeProtocol {
SendRequestPublishDepositTxMessage.class
);
taskRunner.run();
startTimeout();
}
private void handle(DepositTxPublishedMessage tradeMessage) {
stopTimeout();
processModel.setTradeMessage(tradeMessage);
TaskRunner<Trade> taskRunner = new TaskRunner<>(sellerAsTakerTrade,
@ -141,6 +141,11 @@ public class SellerAsTakerProtocol extends TradeProtocol {
taskRunner.run();
}
///////////////////////////////////////////////////////////////////////////////////////////
// After peer has started Fiat tx
///////////////////////////////////////////////////////////////////////////////////////////
private void handle(FiatTransferStartedMessage tradeMessage) {
processModel.setTradeMessage(tradeMessage);

View file

@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
public abstract class TradeProtocol {
private static final Logger log = LoggerFactory.getLogger(TradeProtocol.class);
private static final long TIMEOUT = 10000;
protected final ProcessModel processModel;
protected MessageHandler messageHandler;
@ -46,10 +47,11 @@ public abstract class TradeProtocol {
public void cleanup() {
log.debug("cleanup " + this);
stopTimeout();
processModel.getMessageService().removeMessageHandler(messageHandler);
}
abstract public void setMailboxMessage(MailboxMessage mailboxMessage);
abstract public void applyMailboxMessage(MailboxMessage mailboxMessage);
protected void startTimeout() {
log.debug("startTimeout");
@ -69,7 +71,7 @@ public abstract class TradeProtocol {
}
};
timeoutTimer.schedule(task, 3000);
timeoutTimer.schedule(task, TIMEOUT);
}
protected void stopTimeout() {

View file

@ -31,6 +31,8 @@ import org.bitcoinj.core.Transaction;
import com.google.common.util.concurrent.FutureCallback;
import java.util.Date;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
@ -73,6 +75,8 @@ public class SignAndPublishDepositTx extends TradeTask {
trade.setLifeCycleState(OffererTradeState.LifeCycleState.PENDING);
}
trade.setTakeOfferDate(new Date());
complete();
}