Handle offer removal on disconnect (WIP)

This commit is contained in:
Manfred Karrer 2016-02-16 13:18:25 +01:00
parent 92ad387d70
commit db363fac48
12 changed files with 199 additions and 55 deletions

View File

@ -25,6 +25,7 @@ import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.common.util.JsonExclude;
import io.bitsquare.locale.Country;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.storage.data.RequiresLiveOwner;
import io.bitsquare.p2p.storage.data.StorageMessage;
import io.bitsquare.payment.PaymentMethod;
import io.bitsquare.trade.protocol.availability.OfferAvailabilityModel;
@ -46,18 +47,22 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
public final class Offer implements StorageMessage {
public final class Offer implements StorageMessage, RequiresLiveOwner {
// That object is sent over the wire, so we need to take care of version compatibility.
@JsonExclude
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
@JsonExclude
private static final Logger log = LoggerFactory.getLogger(Offer.class);
public static final long TTL = TimeUnit.MINUTES.toMillis(10);
//public static final long TTL = TimeUnit.MINUTES.toMillis(10);
//TODO
public static final long TTL = TimeUnit.SECONDS.toMillis(10);
public final static String TAC_OFFERER = "When placing that offer I accept that anyone who fulfills my conditions can " +
"take that offer.";
public static final String TAC_TAKER = "With taking the offer I commit to the trade conditions as defined.";
public enum Direction {BUY, SELL}
public enum State {
@ -154,6 +159,11 @@ public final class Offer implements StorageMessage {
}
}
@Override
public NodeAddress getOwnerNodeAddress() {
return offererNodeAddress;
}
public void validate() {
checkNotNull(getAmount(), "Amount is null");
checkNotNull(getArbitratorNodeAddresses(), "Arbitrator is null");

View File

@ -21,10 +21,13 @@ import io.bitsquare.common.handlers.ErrorMessageHandler;
import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.storage.HashMapChangedListener;
import io.bitsquare.p2p.storage.data.ProtectedData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.inject.Inject;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
@ -35,8 +38,14 @@ import java.util.stream.Collectors;
public class OfferBookService {
private static final Logger log = LoggerFactory.getLogger(OfferBookService.class);
private final P2PService p2PService;
public interface OfferBookChangedListener {
void onAdded(Offer offer);
void onRemoved(Offer offer);
}
private final P2PService p2PService;
private final List<OfferBookChangedListener> offerBookChangedListeners = new LinkedList<>();
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -45,12 +54,37 @@ public class OfferBookService {
@Inject
public OfferBookService(P2PService p2PService) {
this.p2PService = p2PService;
p2PService.addHashSetChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedData entry) {
log.debug("OfferBookService.onAdded " + entry);
offerBookChangedListeners.stream().forEach(listener -> {
if (entry.expirableMessage instanceof Offer)
listener.onAdded((Offer) entry.expirableMessage);
});
}
@Override
public void onRemoved(ProtectedData entry) {
offerBookChangedListeners.stream().forEach(listener -> {
log.debug("OfferBookService.onRemoved " + entry);
if (entry.expirableMessage instanceof Offer)
listener.onRemoved((Offer) entry.expirableMessage);
});
}
});
}
public void addHashSetChangedListener(HashMapChangedListener hashMapChangedListener) {
p2PService.addHashSetChangedListener(hashMapChangedListener);
public void addOfferBookChangedListener(OfferBookChangedListener offerBookChangedListener) {
offerBookChangedListeners.add(offerBookChangedListener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void addOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
doAddOffer(offer, resultHandler, errorMessageHandler, false);
}
@ -74,25 +108,27 @@ public class OfferBookService {
}
}
public void removeOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
public void removeOffer(Offer offer, @Nullable ResultHandler resultHandler, @Nullable ErrorMessageHandler errorMessageHandler) {
if (p2PService.removeData(offer)) {
log.trace("Remove offer from network was successful. Offer = " + offer);
if (resultHandler != null) resultHandler.handleResult();
if (resultHandler != null)
resultHandler.handleResult();
} else {
if (errorMessageHandler != null) errorMessageHandler.handleErrorMessage("Remove offer failed");
if (errorMessageHandler != null)
errorMessageHandler.handleErrorMessage("Remove offer failed");
}
}
public List<Offer> getOffers() {
final List<Offer> offers = p2PService.getDataMap().values().stream()
return p2PService.getDataMap().values().stream()
.filter(e -> e.expirableMessage instanceof Offer)
.map(e -> (Offer) e.expirableMessage)
.collect(Collectors.toList());
return offers;
}
public void removeOfferAtShutDown(Offer offer) {
log.debug("removeOfferAtShutDown " + offer);
removeOffer(offer, null, null);
}
}

View File

@ -71,6 +71,7 @@ public class OpenOfferManager {
private BootstrapListener bootstrapListener;
private final Timer timer = new Timer();
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
@ -153,6 +154,18 @@ public class OpenOfferManager {
}
};
timer.scheduleAtFixedRate(timerTask, 500, period);
p2PService.getNumConnectedPeers().addListener((observable, oldValue, newValue) -> {
if ((int) oldValue == 0 && (int) newValue > 0) {
rePublishOffers();
// We repeat a rePublishOffers call after 10 seconds if we have more than 3 peers
UserThread.runAfter(() -> {
if (p2PService.getNumConnectedPeers().get() > 3)
rePublishOffers();
}, 10);
}
});
}
private void rePublishOffers() {
@ -161,7 +174,6 @@ public class OpenOfferManager {
offerBookService.republishOffer(openOffer.getOffer(),
() -> log.debug("Successful added offer to P2P network"),
errorMessage -> log.error("Add offer to P2P network failed. " + errorMessage));
//setupDepositPublishedListener(openOffer);
openOffer.setStorage(openOffersStorage);
}
}
@ -175,14 +187,14 @@ public class OpenOfferManager {
timer.cancel();
if (!shutDownRequested) {
log.debug("shutDown");
log.info("remove all open offers at shutDown");
shutDownRequested = true;
int numOffers = openOffers.size();
// we remove own offers from offerbook when we go offline
openOffers.forEach(openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer()));
//TODO
// openOffers.forEach(openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer()));
if (completeHandler != null)
UserThread.runAfter(completeHandler::run, numOffers * 200 + 300, TimeUnit.MILLISECONDS);
UserThread.runAfter(completeHandler::run, openOffers.size() * 200 + 300, TimeUnit.MILLISECONDS);
}
}

View File

@ -17,8 +17,6 @@
package io.bitsquare.gui.main.offer.offerbook;
import io.bitsquare.p2p.storage.HashMapChangedListener;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.trade.TradeManager;
import io.bitsquare.trade.offer.Offer;
import io.bitsquare.trade.offer.OfferBookService;
@ -28,7 +26,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;
@ -53,33 +50,26 @@ public class OfferBook {
@Inject
OfferBook(OfferBookService offerBookService, TradeManager tradeManager) {
this.offerBookService = offerBookService;
offerBookService.addHashSetChangedListener(new HashMapChangedListener() {
offerBookService.addOfferBookChangedListener(new OfferBookService.OfferBookChangedListener() {
@Override
public void onAdded(ProtectedData entry) {
log.debug("onAdded " + entry);
Serializable data = entry.expirableMessage;
if (data instanceof Offer) {
Offer offer = (Offer) data;
OfferBookListItem offerBookListItem = new OfferBookListItem(offer);
if (!offerBookListItems.contains(offerBookListItem))
offerBookListItems.add(offerBookListItem);
}
public void onAdded(Offer offer) {
log.debug("onAdded " + offer);
OfferBookListItem offerBookListItem = new OfferBookListItem(offer);
if (!offerBookListItems.contains(offerBookListItem))
offerBookListItems.add(offerBookListItem);
}
@Override
public void onRemoved(ProtectedData entry) {
log.debug("onRemoved " + entry);
if (entry.expirableMessage instanceof Offer) {
Offer offer = (Offer) entry.expirableMessage;
public void onRemoved(Offer offer) {
log.debug("onRemoved " + offer);
// Update state in case that that offer is used in the take offer screen, so it gets updated correctly
offer.setState(Offer.State.REMOVED);
// Update state in case that that offer is used in the take offer screen, so it gets updated correctly
offer.setState(Offer.State.REMOVED);
// clean up possible references in openOfferManager
tradeManager.onOfferRemovedFromRemoteOfferBook(offer);
// clean up possible references in openOfferManager
tradeManager.onOfferRemovedFromRemoteOfferBook(offer);
offerBookListItems.removeIf(item -> item.getOffer().getId().equals(offer.getId()));
}
offerBookListItems.removeIf(item -> item.getOffer().getId().equals(offer.getId()));
}
});
}

View File

@ -190,7 +190,7 @@ public class NetworkSettingsView extends ActivatableViewAndModel<GridPane, Activ
updateP2PStatistics();
totalTraffic.textProperty().bind(EasyBind.combine(Statistic.totalSentBytesProperty(), Statistic.totalReceivedBytesProperty(),
(sent, received) -> "Sent:" + formatter.formatBytes((int) sent) + " / Received: " + formatter.formatBytes((int) received)));
(sent, received) -> "Sent: " + formatter.formatBytes((int) sent) + ", received: " + formatter.formatBytes((int) received)));
}
@Override

View File

@ -59,7 +59,9 @@ public class Connection implements MessageListener {
private static final int MAX_MSG_SIZE = 100 * 1024; // 100 kb of compressed data
private static final int MSG_THROTTLE_PER_SEC = 10; // With MAX_MSG_SIZE of 100kb results in bandwidth of 10 mbit/sec
private static final int MSG_THROTTLE_PER_10SEC = 50; // With MAX_MSG_SIZE of 100kb results in bandwidth of 5 mbit/sec for 10 sec
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60);
//private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60);
//TODO
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(6);
public static int getMaxMsgSize() {
return MAX_MSG_SIZE;

View File

@ -38,14 +38,14 @@ public class PeerManager implements ConnectionListener, MessageListener {
public static void setMaxConnections(int maxConnections) {
MAX_CONNECTIONS = maxConnections;
MIN_CONNECTIONS = maxConnections - 4;
MIN_CONNECTIONS = Math.max(1, maxConnections - 4);
MAX_CONNECTIONS_EXTENDED_1 = MAX_CONNECTIONS + 5;
MAX_CONNECTIONS_EXTENDED_2 = MAX_CONNECTIONS + 10;
MAX_CONNECTIONS_EXTENDED_3 = MAX_CONNECTIONS + 20;
}
static {
setMaxConnections(12);
setMaxConnections(3);
}
private static final int MAX_REPORTED_PEERS = 1000;

View File

@ -18,14 +18,15 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class KeepAliveManager implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(KeepAliveManager.class);
private static final int INTERVAL_SEC = new Random().nextInt(10) + 10;
//private static final int INTERVAL_SEC = new Random().nextInt(10) + 10;
//TODO
private static final int INTERVAL_SEC = 3;
private final NetworkNode networkNode;
private final PeerManager peerManager;

View File

@ -11,9 +11,7 @@ import io.bitsquare.common.crypto.Sig;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.MessageListener;
import io.bitsquare.p2p.network.NetworkNode;
import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.Broadcaster;
import io.bitsquare.p2p.storage.data.*;
import io.bitsquare.p2p.storage.messages.AddDataMessage;
@ -31,19 +29,22 @@ import java.io.Serializable;
import java.security.KeyPair;
import java.security.PublicKey;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
// Run in UserThread
public class P2PDataStorage implements MessageListener {
public class P2PDataStorage implements MessageListener, ConnectionListener {
private static final Logger log = LoggerFactory.getLogger(P2PDataStorage.class);
@VisibleForTesting
public static int CHECK_TTL_INTERVAL = new Random().nextInt(1000) + (int) TimeUnit.MINUTES.toMillis(10); // 10-11 min.
public static int CHECK_TTL_INTERVAL_SEC = new Random().nextInt(60) + (int) TimeUnit.MINUTES.toSeconds(10); // 10-11 min.
//TODO
// public static int CHECK_TTL_INTERVAL_SEC = 10;
private final Broadcaster broadcaster;
private final Map<ByteArray, ProtectedData> map = new HashMap<>();
private final Map<ByteArray, ProtectedData> map = new ConcurrentHashMap<>();
private final CopyOnWriteArraySet<HashMapChangedListener> hashMapChangedListeners = new CopyOnWriteArraySet<>();
private HashMap<ByteArray, Integer> sequenceNumberMap = new HashMap<>();
private final Storage<HashMap> storage;
@ -57,10 +58,12 @@ public class P2PDataStorage implements MessageListener {
this.broadcaster = broadcaster;
networkNode.addMessageListener(this);
networkNode.addConnectionListener(this);
storage = new Storage<>(storageDir);
removeExpiredEntriesExecutor = Utilities.getScheduledThreadPoolExecutor("removeExpiredEntries", 1, 10, 5);
log.debug("CHECK_TTL_INTERVAL_SEC " + CHECK_TTL_INTERVAL_SEC);
init();
}
@ -69,7 +72,7 @@ public class P2PDataStorage implements MessageListener {
if (persisted != null)
sequenceNumberMap = persisted;
removeExpiredEntriesExecutor.scheduleAtFixedRate(() -> UserThread.execute(this::removeExpiredEntries), CHECK_TTL_INTERVAL, CHECK_TTL_INTERVAL, TimeUnit.SECONDS);
removeExpiredEntriesExecutor.scheduleAtFixedRate(() -> UserThread.execute(this::removeExpiredEntries), CHECK_TTL_INTERVAL_SEC, CHECK_TTL_INTERVAL_SEC, TimeUnit.SECONDS);
}
private void removeExpiredEntries() {
@ -116,6 +119,72 @@ public class P2PDataStorage implements MessageListener {
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onConnection(Connection connection) {
}
@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
Log.traceCall();
map.values().stream()
.filter(protectedData -> protectedData.expirableMessage instanceof RequiresLiveOwner)
.forEach(protectedData -> removeRequiresLiveOwnerDataOnDisconnect(protectedData, ((RequiresLiveOwner) protectedData.expirableMessage).getOwnerNodeAddress()));
}
public boolean removeRequiresLiveOwnerDataOnDisconnect(ProtectedData protectedData, NodeAddress owner) {
Log.traceCall();
ByteArray hashOfPayload = getHashAsByteArray(protectedData.expirableMessage);
boolean containsKey = map.containsKey(hashOfPayload);
if (containsKey) {
doRemoveProtectedExpirableData(protectedData, hashOfPayload);
//broadcast(new RemoveDataMessage(protectedData), owner);
// sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber);
sequenceNumberMap.remove(hashOfPayload);
storage.queueUpForSave(sequenceNumberMap, 5000);
} else {
log.debug("Remove data ignored as we don't have an entry for that data.");
}
return containsKey;
}
// If the data owner gets disconnected we remove his data. Used for offers to get clean up when the peer is in
// sleep/hibernate mode or closes the app without proper shutdown (crash).
// We don't want to wait the until the TTL period is over so we add that method to improve usability
public boolean removeLocalDataOnDisconnectedDataOwner(ExpirableMessage expirableMessage) {
Log.traceCall();
ByteArray hashOfPayload = getHashAsByteArray(expirableMessage);
boolean containsKey = map.containsKey(hashOfPayload);
if (containsKey) {
map.remove(hashOfPayload);
log.trace("Data removed from our map.");
StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n" +
"Data set after removeProtectedExpirableData: (truncated)");
map.values().stream().forEach(e -> sb.append("\n").append(StringUtils.abbreviate(e.toString(), 100)));
sb.append("\n------------------------------------------------------------\n");
log.trace(sb.toString());
log.info("Data set after addProtectedExpirableData: size=" + map.values().size());
// sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber);
// storage.queueUpForSave(sequenceNumberMap, 5000);
} else {
log.debug("Remove data ignored as we don't have an entry for that data.");
}
return containsKey;
}
@Override
public void onError(Throwable throwable) {
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
@ -226,6 +295,7 @@ public class P2PDataStorage implements MessageListener {
return result;
}
public Map<ByteArray, ProtectedData> getMap() {
return map;
}
@ -283,7 +353,6 @@ public class P2PDataStorage implements MessageListener {
sb.append("\n------------------------------------------------------------\n");
log.trace(sb.toString());
log.info("Data set after addProtectedExpirableData: size=" + map.values().size());
}
private boolean isSequenceNrValid(ProtectedData data, ByteArray hashOfData) {

View File

@ -0,0 +1,18 @@
package io.bitsquare.p2p.storage.data;
import io.bitsquare.p2p.NodeAddress;
import java.io.Serializable;
/**
* Used for messages which require that the data owner is online.
* <p>
* This is used for the offers to avoid dead offers in case the offerer is in sleep/hibernate mode or the app has
* terminated without sending the remove message (e.g. in case of a crash).
*/
public interface RequiresLiveOwner extends Serializable {
/**
* @return NodeAddress of the data owner
*/
NodeAddress getOwnerNodeAddress();
}

View File

@ -60,7 +60,7 @@ public class ProtectedDataStorageTest {
dir2.mkdir();
UserThread.setExecutor(Executors.newSingleThreadExecutor());
P2PDataStorage.CHECK_TTL_INTERVAL = (int) TimeUnit.MINUTES.toMillis(10);
P2PDataStorage.CHECK_TTL_INTERVAL_SEC = (int) TimeUnit.MINUTES.toMillis(10);
keyRing1 = new KeyRing(new KeyStorage(dir1));
@ -111,7 +111,7 @@ public class ProtectedDataStorageTest {
@Test
public void testExpirableData() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
P2PDataStorage.CHECK_TTL_INTERVAL = 10;
P2PDataStorage.CHECK_TTL_INTERVAL_SEC = 10;
// CHECK_TTL_INTERVAL is used in constructor of ProtectedExpirableDataStorage so we recreate it here
//TODO

View File

@ -3,6 +3,7 @@ package io.bitsquare.p2p.seed;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.bitsquare.app.BitsquareEnvironment;
import io.bitsquare.common.UserThread;
import io.bitsquare.trade.offer.OfferBookService;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -15,6 +16,7 @@ public class SeedNodeMain {
private static final Logger log = LoggerFactory.getLogger(SeedNodeMain.class);
public static final boolean USE_DETAILED_LOGGING = true;
private OfferBookService offerBookService;
private SeedNode seedNode;
@ -48,6 +50,10 @@ public class SeedNodeMain {
seedNode = new SeedNode(BitsquareEnvironment.defaultUserDataDir());
seedNode.processArgs(args);
seedNode.createAndStartP2PService(USE_DETAILED_LOGGING);
// We need the offerbook service to handle the case when the offerer is in sleep/hibernate mode and
// we want to remove his offers and not wait until TTL is over.
offerBookService = new OfferBookService(seedNode.getSeedNodeP2PService());
} catch (Throwable t) {
log.error("Executing task failed. " + t.getMessage());
t.printStackTrace();