Improve OpenOfferManager disconnection handling

This commit is contained in:
Manfred Karrer 2016-02-20 14:20:37 +01:00
parent 447849900f
commit cec1e99e98
7 changed files with 231 additions and 241 deletions

View file

@ -21,7 +21,6 @@ import com.google.inject.Inject;
import io.bitsquare.app.Log;
import io.bitsquare.btc.TradeWalletService;
import io.bitsquare.btc.WalletService;
import io.bitsquare.common.Clock;
import io.bitsquare.common.Timer;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.KeyRing;
@ -31,11 +30,10 @@ import io.bitsquare.p2p.BootstrapListener;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.messaging.DecryptedDirectMessageListener;
import io.bitsquare.p2p.messaging.DecryptedMsgWithPubKey;
import io.bitsquare.p2p.messaging.SendDirectMessageListener;
import io.bitsquare.p2p.network.CloseConnectionReason;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.peers.PeerManager;
import io.bitsquare.storage.Storage;
import io.bitsquare.trade.TradableList;
import io.bitsquare.trade.closed.ClosedTradableManager;
@ -58,9 +56,12 @@ import java.util.concurrent.TimeUnit;
import static com.google.inject.internal.util.$Preconditions.checkNotNull;
import static io.bitsquare.util.Validator.nonEmptyStringOf;
public class OpenOfferManager {
public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMessageListener {
private static final Logger log = LoggerFactory.getLogger(OpenOfferManager.class);
private static final long RETRY_DELAY_AFTER_ALL_CON_LOST_SEC = 5;
private final KeyRing keyRing;
private final User user;
private final P2PService p2PService;
@ -68,23 +69,16 @@ public class OpenOfferManager {
private final TradeWalletService tradeWalletService;
private final OfferBookService offerBookService;
private final ClosedTradableManager closedTradableManager;
private Clock clock;
private final TradableList<OpenOffer> openOffers;
private final Storage<TradableList<OpenOffer>> openOffersStorage;
private boolean shutDownRequested;
private boolean stopped;
private Timer periodicRepublishOffersTimer, periodicRefreshOffersTimer, republishOffersTimer;
private BootstrapListener bootstrapListener;
//private final Timer republishOffersTimer = new Timer();
private Timer refreshOffersTimer;
private Timer republishOffersTimer;
private boolean allowRefreshOffers;
private boolean lostAllConnections;
private long refreshOffersPeriod;
private Clock.Listener listener;
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
// Constructor, Initialization
///////////////////////////////////////////////////////////////////////////////////////////
@Inject
@ -95,7 +89,6 @@ public class OpenOfferManager {
TradeWalletService tradeWalletService,
OfferBookService offerBookService,
ClosedTradableManager closedTradableManager,
Clock clock,
@Named("storage.dir") File storageDir) {
this.keyRing = keyRing;
this.user = user;
@ -104,176 +97,24 @@ public class OpenOfferManager {
this.tradeWalletService = tradeWalletService;
this.offerBookService = offerBookService;
this.closedTradableManager = closedTradableManager;
this.clock = clock;
openOffersStorage = new Storage<>(storageDir);
this.openOffers = new TradableList<>(openOffersStorage, "OpenOffers");
init();
}
private void init() {
// In case the app did get killed the shutDown from the modules is not called, so we use a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(OpenOfferManager.this::shutDown,
"OpenOfferManager.ShutDownHook"));
// Handler for incoming offer availability requests
p2PService.addDecryptedDirectMessageListener((decryptedMessageWithPubKey, peersNodeAddress) -> {
// We get an encrypted message but don't do the signature check as we don't know the peer yet.
// A basic sig check is in done also at decryption time
Message message = decryptedMessageWithPubKey.message;
if (message instanceof OfferAvailabilityRequest)
handleOfferAvailabilityRequest((OfferAvailabilityRequest) message, peersNodeAddress);
});
}
///////////////////////////////////////////////////////////////////////////////////////////
// Lifecycle
///////////////////////////////////////////////////////////////////////////////////////////
public void onAllServicesInitialized() {
log.trace("onAllServicesInitialized");
// We add own offers to offerbook when we go online again
// setupAnStartRePublishThread will re-publish at method call
// Before the TTL is reached we re-publish our offers
// If offer removal at shutdown fails we don't want to have long term dangling dead offers, so we set
// TTL quite short and use re-publish as strategy. Offerers need to be online anyway.
if (!p2PService.isBootstrapped()) {
bootstrapListener = new BootstrapListener() {
@Override
public void onBootstrapComplete() {
onBootstrapped();
}
};
p2PService.addP2PServiceListener(bootstrapListener);
} else {
onBootstrapped();
}
}
private void onBootstrapped() {
if (bootstrapListener != null)
p2PService.removeP2PServiceListener(bootstrapListener);
republishOffers();
startRefreshOffersThread();
//TODO should not be needed
//startRepublishOffersThread();
// we check if app was idle for more then 5 sec.
listener = new Clock.Listener() {
bootstrapListener = new BootstrapListener() {
@Override
public void onSecondTick() {
}
@Override
public void onMinuteTick() {
}
@Override
public void onMissedSecondTick(long missed) {
if (missed > Clock.IDLE_TOLERANCE) {
log.error("We have been idle for {} sec", missed / 1000);
// We have been idle for at least 5 sec.
//republishOffers();
// run again after 5 sec as it might be that the app needs a bit for getting all re-animated again
allowRefreshOffers = false;
if (republishOffersTimer == null)
republishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers, 5);
}
public void onBootstrapComplete() {
OpenOfferManager.this.onBootstrapComplete();
}
};
clock.addListener(listener);
// We also check if we got completely disconnected
NetworkNode networkNode = p2PService.getNetworkNode();
networkNode.addConnectionListener(new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
if (lostAllConnections) {
lostAllConnections = false;
if (republishOffersTimer != null)
republishOffersTimer.stop();
//republishOffers();
// run again after 5 sec as it might be that the app needs a bit for getting all re-animated again
log.error("We got re-connected again after loss of all connection. We re-publish our offers now.");
allowRefreshOffers = false;
republishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers, 5);
}
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
lostAllConnections = networkNode.getAllConnections().isEmpty();
if (lostAllConnections) {
allowRefreshOffers = false;
log.error("We got disconnected from all peers");
}
}
@Override
public void onError(Throwable throwable) {
}
});
}
/*
private void startRepublishOffersThread() {
long period = Offer.TTL * 10;
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
UserThread.execute(OpenOfferManager.this::republishOffers);
}
};
republishOffersTimer.scheduleAtFixedRate(timerTask, period, period);
}*/
private void republishOffers() {
Log.traceCall("Number of offer for republish: " + openOffers.size());
allowRefreshOffers = false;
if (republishOffersTimer != null) {
republishOffersTimer.stop();
republishOffersTimer = null;
}
for (OpenOffer openOffer : openOffers) {
offerBookService.republishOffers(openOffer.getOffer(),
() -> {
log.debug("Successful added offer to P2P network");
allowRefreshOffers = true;
},
errorMessage -> {
//TODO handle with retry
log.error("Add offer to P2P network failed. " + errorMessage);
});
openOffer.setStorage(openOffersStorage);
}
}
private void startRefreshOffersThread() {
// refresh sufficiently before offer would expire
refreshOffersPeriod = (long) (Offer.TTL * 0.5);
refreshOffersTimer = UserThread.runPeriodically(OpenOfferManager.this::refreshOffers, refreshOffersPeriod, TimeUnit.MILLISECONDS);
}
private void refreshOffers() {
if (allowRefreshOffers) {
Log.traceCall("Number of offer for refresh: " + openOffers.size());
for (OpenOffer openOffer : openOffers) {
offerBookService.refreshOffer(openOffer.getOffer(),
() -> log.debug("Successful refreshed TTL for offer"),
errorMessage -> log.error("Refresh TTL for offer failed. " + errorMessage));
openOffer.setStorage(openOffersStorage);
}
}
p2PService.addP2PServiceListener(bootstrapListener);
p2PService.addDecryptedDirectMessageListener(this);
}
@SuppressWarnings("WeakerAccess")
@ -282,23 +123,144 @@ public class OpenOfferManager {
}
public void shutDown(@Nullable Runnable completeHandler) {
if (republishOffersTimer != null)
republishOffersTimer.stop();
stopped = true;
p2PService.getPeerManager().removeListener(this);
p2PService.removeDecryptedDirectMessageListener(this);
if (bootstrapListener != null)
p2PService.removeP2PServiceListener(bootstrapListener);
if (refreshOffersTimer != null)
refreshOffersTimer.stop();
stopPeriodicRefreshOffersTimer();
stopPeriodicRepublishOffersTimer();
if (listener != null)
clock.removeListener(listener);
log.info("remove all open offers at shutDown");
// we remove own offers from offerbook when we go offline
openOffers.forEach(openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer()));
if (!shutDownRequested) {
log.info("remove all open offers at shutDown");
shutDownRequested = true;
// we remove own offers from offerbook when we go offline
openOffers.forEach(openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer()));
if (completeHandler != null)
UserThread.runAfter(completeHandler::run, openOffers.size() * 200 + 300, TimeUnit.MILLISECONDS);
}
if (completeHandler != null)
UserThread.runAfter(completeHandler::run, openOffers.size() * 200 + 300, TimeUnit.MILLISECONDS);
///////////////////////////////////////////////////////////////////////////////////////////
// DecryptedDirectMessageListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onDirectMessage(DecryptedMsgWithPubKey decryptedMsgWithPubKey, NodeAddress peerNodeAddress) {
// Handler for incoming offer availability requests
// We get an encrypted message but don't do the signature check as we don't know the peer yet.
// A basic sig check is in done also at decryption time
Message message = decryptedMsgWithPubKey.message;
if (message instanceof OfferAvailabilityRequest)
handleOfferAvailabilityRequest((OfferAvailabilityRequest) message, peerNodeAddress);
}
///////////////////////////////////////////////////////////////////////////////////////////
// BootstrapListener delegate
///////////////////////////////////////////////////////////////////////////////////////////
public void onBootstrapComplete() {
p2PService.removeP2PServiceListener(bootstrapListener);
stopped = false;
// Republish means we send the complete offer object
republishOffers();
startRepublishOffersThread();
// Refresh is started once we get a success from republish
p2PService.getPeerManager().addListener(this);
}
///////////////////////////////////////////////////////////////////////////////////////////
// PeerManager.Listener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onAllConnectionsLost() {
stopped = true;
stopPeriodicRefreshOffersTimer();
stopPeriodicRepublishOffersTimer();
}
@Override
public void onNewConnectionAfterAllConnectionsLost() {
restart();
}
@Override
public void onAwakeFromStandby() {
if (!p2PService.getNetworkNode().getAllConnections().isEmpty())
restart();
}
///////////////////////////////////////////////////////////////////////////////////////////
// RepublishOffers, refreshOffers
///////////////////////////////////////////////////////////////////////////////////////////
private void startRepublishOffersThread() {
stopped = false;
if (periodicRepublishOffersTimer == null)
periodicRepublishOffersTimer = UserThread.runPeriodically(OpenOfferManager.this::republishOffers,
Offer.TTL * 10,
TimeUnit.MILLISECONDS);
}
private void republishOffers() {
Log.traceCall("Number of offer for republish: " + openOffers.size());
if (!stopped) {
stopPeriodicRefreshOffersTimer();
for (OpenOffer openOffer : openOffers) {
offerBookService.republishOffers(openOffer.getOffer(),
() -> {
log.debug("Successful added offer to P2P network");
// Refresh means we send only the dat needed to refresh the TTL (hash, signature and sequence nr.)
startRefreshOffersThread();
},
errorMessage -> {
//TODO handle with retry
log.error("Add offer to P2P network failed. " + errorMessage);
});
openOffer.setStorage(openOffersStorage);
}
} else {
log.warn("We have stopped already. We ignore that republishOffers call.");
}
}
private void startRefreshOffersThread() {
stopped = false;
// refresh sufficiently before offer would expire
if (periodicRefreshOffersTimer == null)
periodicRefreshOffersTimer = UserThread.runPeriodically(OpenOfferManager.this::refreshOffers,
(long) (Offer.TTL * 0.5),
TimeUnit.MILLISECONDS);
}
private void refreshOffers() {
if (!stopped) {
Log.traceCall("Number of offer for refresh: " + openOffers.size());
for (OpenOffer openOffer : openOffers) {
offerBookService.refreshOffer(openOffer.getOffer(),
() -> log.debug("Successful refreshed TTL for offer"),
errorMessage -> log.error("Refresh TTL for offer failed. " + errorMessage));
}
} else {
log.warn("We have stopped already. We ignore that refreshOffers call.");
}
}
private void restart() {
startRepublishOffersThread();
startRefreshOffersThread();
if (republishOffersTimer == null) {
stopped = false;
republishOffersTimer = UserThread.runAfter(OpenOfferManager.this::republishOffers, RETRY_DELAY_AFTER_ALL_CON_LOST_SEC);
}
}
@ -307,11 +269,9 @@ public class OpenOfferManager {
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void onPlaceOffer(Offer offer,
TransactionResultHandler resultHandler) {
public void placeOffer(Offer offer,
TransactionResultHandler resultHandler) {
PlaceOfferModel model = new PlaceOfferModel(offer, walletService, tradeWalletService, offerBookService, user);
PlaceOfferProtocol placeOfferProtocol = new PlaceOfferProtocol(
model,
transaction -> {
@ -321,15 +281,14 @@ public class OpenOfferManager {
resultHandler.handleResult(transaction);
}
);
placeOfferProtocol.placeOffer();
}
public void onRemoveOpenOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
// Remove from offerbook
public void removeOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
Optional<OpenOffer> openOfferOptional = findOpenOffer(offer.getId());
if (openOfferOptional.isPresent()) {
onRemoveOpenOffer(openOfferOptional.get(), resultHandler, errorMessageHandler);
removeOpenOffer(openOfferOptional.get(), resultHandler, errorMessageHandler);
} else {
log.warn("Offer was not found in our list of open offers. We still try to remove it from the offerbook.");
errorMessageHandler.handleErrorMessage("Offer was not found in our list of open offers. " +
@ -340,7 +299,8 @@ public class OpenOfferManager {
}
}
public void onRemoveOpenOffer(OpenOffer openOffer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
// Remove from my offers
public void removeOpenOffer(OpenOffer openOffer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
Offer offer = openOffer.getOffer();
offerBookService.removeOffer(offer,
() -> {
@ -353,6 +313,17 @@ public class OpenOfferManager {
errorMessageHandler);
}
// Close openOffer after deposit published
public void closeOpenOffer(Offer offer) {
findOpenOffer(offer.getId()).ifPresent(openOffer -> {
openOffers.remove(openOffer);
openOffer.setState(OpenOffer.State.CLOSED);
offerBookService.removeOffer(openOffer.getOffer(),
() -> log.trace("Successful removed offer"),
log::error);
});
}
public void reserveOpenOffer(OpenOffer openOffer) {
openOffer.setState(OpenOffer.State.RESERVED);
}
@ -374,16 +345,8 @@ public class OpenOfferManager {
return openOffers.stream().filter(openOffer -> openOffer.getId().equals(offerId)).findAny();
}
// Close openOffer after deposit published
public void closeOpenOffer(Offer offer) {
findOpenOffer(offer.getId()).ifPresent(openOffer -> {
openOffers.remove(openOffer);
openOffer.setState(OpenOffer.State.CLOSED);
offerBookService.removeOffer(openOffer.getOffer(),
() -> log.trace("Successful removed offer"),
log::error);
});
public Optional<OpenOffer> getOpenOfferById(String offerId) {
return openOffers.stream().filter(e -> e.getId().equals(offerId)).findFirst();
}
@ -393,38 +356,64 @@ public class OpenOfferManager {
private void handleOfferAvailabilityRequest(OfferAvailabilityRequest message, NodeAddress sender) {
log.trace("handleNewMessage: message = " + message.getClass().getSimpleName() + " from " + sender);
try {
nonEmptyStringOf(message.offerId);
checkNotNull(message.getPubKeyRing());
} catch (Throwable t) {
log.warn("Invalid message " + message.toString());
return;
}
if (!stopped) {
try {
nonEmptyStringOf(message.offerId);
checkNotNull(message.getPubKeyRing());
} catch (Throwable t) {
log.warn("Invalid message " + message.toString());
return;
}
Optional<OpenOffer> openOfferOptional = findOpenOffer(message.offerId);
boolean isAvailable = openOfferOptional.isPresent() && openOfferOptional.get().getState() == OpenOffer.State.AVAILABLE;
try {
p2PService.sendEncryptedDirectMessage(sender,
message.getPubKeyRing(),
new OfferAvailabilityResponse(message.offerId, isAvailable),
new SendDirectMessageListener() {
@Override
public void onArrived() {
log.trace("OfferAvailabilityResponse successfully arrived at peer");
}
Optional<OpenOffer> openOfferOptional = findOpenOffer(message.offerId);
boolean isAvailable = openOfferOptional.isPresent() && openOfferOptional.get().getState() == OpenOffer.State.AVAILABLE;
try {
p2PService.sendEncryptedDirectMessage(sender,
message.getPubKeyRing(),
new OfferAvailabilityResponse(message.offerId, isAvailable),
new SendDirectMessageListener() {
@Override
public void onArrived() {
log.trace("OfferAvailabilityResponse successfully arrived at peer");
}
@Override
public void onFault() {
log.info("Sending OfferAvailabilityResponse failed.");
}
});
} catch (Throwable t) {
t.printStackTrace();
log.info("Exception at handleRequestIsOfferAvailableMessage " + t.getMessage());
@Override
public void onFault() {
log.info("Sending OfferAvailabilityResponse failed.");
}
});
} catch (Throwable t) {
t.printStackTrace();
log.info("Exception at handleRequestIsOfferAvailableMessage " + t.getMessage());
}
} else {
log.warn("We have stopped already. We ignore that handleOfferAvailabilityRequest call.");
}
}
public Optional<OpenOffer> getOpenOfferById(String offerId) {
return openOffers.stream().filter(e -> e.getId().equals(offerId)).findFirst();
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void stopPeriodicRefreshOffersTimer() {
if (periodicRefreshOffersTimer != null) {
periodicRefreshOffersTimer.stop();
periodicRefreshOffersTimer = null;
}
}
private void stopPeriodicRepublishOffersTimer() {
if (periodicRepublishOffersTimer != null) {
periodicRepublishOffersTimer.stop();
periodicRepublishOffersTimer = null;
}
}
private void stopRepublishOffersTimer() {
if (republishOffersTimer != null) {
republishOffersTimer.stop();
republishOffersTimer = null;
}
}
}

View file

@ -73,7 +73,7 @@ public class OfferAvailabilityProtocol {
private void cleanup() {
stopTimeout();
model.p2PService.removeDecryptedMailListener(decryptedDirectMessageListener);
model.p2PService.removeDecryptedDirectMessageListener(decryptedDirectMessageListener);
}

View file

@ -88,7 +88,7 @@ public abstract class TradeProtocol {
log.debug("cleanup " + this);
stopTimeout();
processModel.getP2PService().removeDecryptedMailListener(decryptedDirectMessageListener);
processModel.getP2PService().removeDecryptedDirectMessageListener(decryptedDirectMessageListener);
}

View file

@ -272,7 +272,7 @@ class CreateOfferDataModel extends ActivatableDataModel {
}
private void doPlaceOffer(Offer offer, TransactionResultHandler resultHandler) {
openOfferManager.onPlaceOffer(offer, resultHandler);
openOfferManager.placeOffer(offer, resultHandler);
}
public void onPaymentAccountSelected(PaymentAccount paymentAccount) {

View file

@ -179,7 +179,7 @@ class OfferBookViewModel extends ActivatableViewModel {
}
void onRemoveOpenOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
openOfferManager.onRemoveOpenOffer(offer, resultHandler, errorMessageHandler);
openOfferManager.removeOffer(offer, resultHandler, errorMessageHandler);
}

View file

@ -55,7 +55,7 @@ class OpenOffersDataModel extends ActivatableDataModel {
}
void onCancelOpenOffer(OpenOffer openOffer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
openOfferManager.onRemoveOpenOffer(openOffer, resultHandler, errorMessageHandler);
openOfferManager.removeOpenOffer(openOffer, resultHandler, errorMessageHandler);
}

View file

@ -698,7 +698,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
decryptedDirectMessageListeners.add(listener);
}
public void removeDecryptedMailListener(DecryptedDirectMessageListener listener) {
public void removeDecryptedDirectMessageListener(DecryptedDirectMessageListener listener) {
decryptedDirectMessageListeners.remove(listener);
}
@ -711,7 +711,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
public void removeP2PServiceListener(P2PServiceListener listener) {
p2pServiceListeners.remove(listener);
if (p2pServiceListeners.contains(listener))
p2pServiceListeners.remove(listener);
}
public void addHashSetChangedListener(HashMapChangedListener hashMapChangedListener) {