mirror of
https://github.com/bisq-network/bisq.git
synced 2025-02-23 15:00:30 +01:00
Refactor data request at starup to separate class
This commit is contained in:
parent
3222019480
commit
2d518f9a16
25 changed files with 444 additions and 296 deletions
|
@ -67,22 +67,24 @@ public class Log {
|
|||
}
|
||||
|
||||
public static void traceCall() {
|
||||
StackTraceElement stackTraceElement = new Throwable().getStackTrace()[1];
|
||||
String methodName = stackTraceElement.getMethodName();
|
||||
if (methodName.equals("<init>"))
|
||||
methodName = "Constructor ";
|
||||
String className = stackTraceElement.getClassName();
|
||||
LoggerFactory.getLogger(className).trace("Called: {}", methodName);
|
||||
if (LoggerFactory.getLogger(Log.class).isTraceEnabled()) {
|
||||
StackTraceElement stackTraceElement = new Throwable().getStackTrace()[1];
|
||||
String methodName = stackTraceElement.getMethodName();
|
||||
if (methodName.equals("<init>"))
|
||||
methodName = "Constructor ";
|
||||
String className = stackTraceElement.getClassName();
|
||||
LoggerFactory.getLogger(className).trace("Called: {}", methodName);
|
||||
}
|
||||
}
|
||||
|
||||
public static void traceCall(String message) {
|
||||
StackTraceElement stackTraceElement = new Throwable().getStackTrace()[1];
|
||||
String methodName = stackTraceElement.getMethodName();
|
||||
if (methodName.equals("<init>"))
|
||||
methodName = "Constructor ";
|
||||
String className = stackTraceElement.getClassName();
|
||||
LoggerFactory.getLogger(className).trace("Called: {} [{}]", methodName, message);
|
||||
if (LoggerFactory.getLogger(Log.class).isTraceEnabled()) {
|
||||
StackTraceElement stackTraceElement = new Throwable().getStackTrace()[1];
|
||||
String methodName = stackTraceElement.getMethodName();
|
||||
if (methodName.equals("<init>"))
|
||||
methodName = "Constructor ";
|
||||
String className = stackTraceElement.getClassName();
|
||||
LoggerFactory.getLogger(className).trace("Called: {} [{}]", methodName, message);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ import io.bitsquare.common.crypto.KeyRing;
|
|||
import io.bitsquare.common.handlers.ErrorMessageHandler;
|
||||
import io.bitsquare.common.handlers.ResultHandler;
|
||||
import io.bitsquare.p2p.Address;
|
||||
import io.bitsquare.p2p.P2PNetworkReadyListener;
|
||||
import io.bitsquare.p2p.FirstPeerAuthenticatedListener;
|
||||
import io.bitsquare.p2p.P2PService;
|
||||
import io.bitsquare.p2p.storage.HashMapChangedListener;
|
||||
import io.bitsquare.p2p.storage.data.ProtectedData;
|
||||
|
@ -86,7 +86,7 @@ public class ArbitratorManager {
|
|||
));
|
||||
private static final String publicKeyForTesting = "027a381b5333a56e1cc3d90d3a7d07f26509adf7029ed06fc997c656621f8da1ee";
|
||||
private final boolean isDevTest;
|
||||
private P2PNetworkReadyListener p2PNetworkReadyListener;
|
||||
private FirstPeerAuthenticatedListener firstPeerAuthenticatedListener;
|
||||
|
||||
@Inject
|
||||
public ArbitratorManager(@Named(ProgramArguments.DEV_TEST) boolean isDevTest, KeyRing keyRing, ArbitratorService arbitratorService, User user) {
|
||||
|
@ -112,14 +112,14 @@ public class ArbitratorManager {
|
|||
if (user.getRegisteredArbitrator() != null) {
|
||||
|
||||
P2PService p2PService = arbitratorService.getP2PService();
|
||||
if (!p2PService.isAuthenticated()) {
|
||||
p2PNetworkReadyListener = new P2PNetworkReadyListener() {
|
||||
if (!p2PService.getFirstPeerAuthenticated()) {
|
||||
firstPeerAuthenticatedListener = new FirstPeerAuthenticatedListener() {
|
||||
@Override
|
||||
public void onFirstPeerAuthenticated() {
|
||||
republishArbitrator();
|
||||
}
|
||||
};
|
||||
p2PService.addP2PServiceListener(p2PNetworkReadyListener);
|
||||
p2PService.addP2PServiceListener(firstPeerAuthenticatedListener);
|
||||
|
||||
} else {
|
||||
republishArbitrator();
|
||||
|
@ -136,8 +136,8 @@ public class ArbitratorManager {
|
|||
}
|
||||
|
||||
private void republishArbitrator() {
|
||||
if (p2PNetworkReadyListener != null)
|
||||
arbitratorService.getP2PService().removeP2PServiceListener(p2PNetworkReadyListener);
|
||||
if (firstPeerAuthenticatedListener != null)
|
||||
arbitratorService.getP2PService().removeP2PServiceListener(firstPeerAuthenticatedListener);
|
||||
|
||||
Arbitrator registeredArbitrator = user.getRegisteredArbitrator();
|
||||
if (registeredArbitrator != null) {
|
||||
|
|
|
@ -27,8 +27,8 @@ import io.bitsquare.btc.exceptions.WalletException;
|
|||
import io.bitsquare.common.crypto.KeyRing;
|
||||
import io.bitsquare.common.crypto.PubKeyRing;
|
||||
import io.bitsquare.p2p.Address;
|
||||
import io.bitsquare.p2p.FirstPeerAuthenticatedListener;
|
||||
import io.bitsquare.p2p.Message;
|
||||
import io.bitsquare.p2p.P2PNetworkReadyListener;
|
||||
import io.bitsquare.p2p.P2PService;
|
||||
import io.bitsquare.p2p.messaging.DecryptedMsgWithPubKey;
|
||||
import io.bitsquare.p2p.messaging.SendMailboxMessageListener;
|
||||
|
@ -67,7 +67,7 @@ public class DisputeManager {
|
|||
private final DisputeList<Dispute> disputes;
|
||||
transient private final ObservableList<Dispute> disputesObservableList;
|
||||
private final String disputeInfo;
|
||||
private final P2PNetworkReadyListener p2PNetworkReadyListener;
|
||||
private final FirstPeerAuthenticatedListener firstPeerAuthenticatedListener;
|
||||
private final CopyOnWriteArraySet<DecryptedMsgWithPubKey> decryptedMailboxMessageWithPubKeys = new CopyOnWriteArraySet<>();
|
||||
private final CopyOnWriteArraySet<DecryptedMsgWithPubKey> decryptedMailMessageWithPubKeys = new CopyOnWriteArraySet<>();
|
||||
|
||||
|
@ -106,22 +106,22 @@ public class DisputeManager {
|
|||
|
||||
p2PService.addDecryptedMailListener((decryptedMessageWithPubKey, senderAddress) -> {
|
||||
decryptedMailMessageWithPubKeys.add(decryptedMessageWithPubKey);
|
||||
if (p2PService.isAuthenticated())
|
||||
if (p2PService.getFirstPeerAuthenticated())
|
||||
applyMessages();
|
||||
});
|
||||
p2PService.addDecryptedMailboxListener((decryptedMessageWithPubKey, senderAddress) -> {
|
||||
decryptedMailboxMessageWithPubKeys.add(decryptedMessageWithPubKey);
|
||||
if (p2PService.isAuthenticated())
|
||||
if (p2PService.getFirstPeerAuthenticated())
|
||||
applyMessages();
|
||||
});
|
||||
|
||||
p2PNetworkReadyListener = new P2PNetworkReadyListener() {
|
||||
firstPeerAuthenticatedListener = new FirstPeerAuthenticatedListener() {
|
||||
@Override
|
||||
public void onFirstPeerAuthenticated() {
|
||||
applyMessages();
|
||||
}
|
||||
};
|
||||
p2PService.addP2PServiceListener(p2PNetworkReadyListener);
|
||||
p2PService.addP2PServiceListener(firstPeerAuthenticatedListener);
|
||||
}
|
||||
|
||||
private void applyMessages() {
|
||||
|
@ -143,7 +143,7 @@ public class DisputeManager {
|
|||
});
|
||||
decryptedMailboxMessageWithPubKeys.clear();
|
||||
|
||||
p2PService.removeP2PServiceListener(p2PNetworkReadyListener);
|
||||
p2PService.removeP2PServiceListener(firstPeerAuthenticatedListener);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -27,8 +27,8 @@ import io.bitsquare.common.crypto.KeyRing;
|
|||
import io.bitsquare.common.handlers.FaultHandler;
|
||||
import io.bitsquare.common.handlers.ResultHandler;
|
||||
import io.bitsquare.p2p.Address;
|
||||
import io.bitsquare.p2p.FirstPeerAuthenticatedListener;
|
||||
import io.bitsquare.p2p.Message;
|
||||
import io.bitsquare.p2p.P2PNetworkReadyListener;
|
||||
import io.bitsquare.p2p.P2PService;
|
||||
import io.bitsquare.p2p.messaging.DecryptedMailListener;
|
||||
import io.bitsquare.p2p.messaging.DecryptedMailboxListener;
|
||||
|
@ -83,7 +83,7 @@ public class TradeManager {
|
|||
private final Storage<TradableList<Trade>> tradableListStorage;
|
||||
private final TradableList<Trade> trades;
|
||||
private final BooleanProperty pendingTradesInitialized = new SimpleBooleanProperty();
|
||||
private P2PNetworkReadyListener p2PNetworkReadyListener;
|
||||
private FirstPeerAuthenticatedListener firstPeerAuthenticatedListener;
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -150,14 +150,14 @@ public class TradeManager {
|
|||
}
|
||||
});
|
||||
|
||||
p2PNetworkReadyListener = new P2PNetworkReadyListener() {
|
||||
firstPeerAuthenticatedListener = new FirstPeerAuthenticatedListener() {
|
||||
@Override
|
||||
public void onFirstPeerAuthenticated() {
|
||||
// give a bit delay to be sure other listeners has dont its jobs
|
||||
UserThread.runAfter(() -> initPendingTrades(), 100, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
};
|
||||
p2PService.addP2PServiceListener(p2PNetworkReadyListener);
|
||||
p2PService.addP2PServiceListener(firstPeerAuthenticatedListener);
|
||||
}
|
||||
|
||||
|
||||
|
@ -172,7 +172,7 @@ public class TradeManager {
|
|||
}
|
||||
|
||||
private void initPendingTrades() {
|
||||
if (p2PNetworkReadyListener != null) p2PService.removeP2PServiceListener(p2PNetworkReadyListener);
|
||||
if (firstPeerAuthenticatedListener != null) p2PService.removeP2PServiceListener(firstPeerAuthenticatedListener);
|
||||
|
||||
List<Trade> failedTrades = new ArrayList<>();
|
||||
for (Trade trade : trades) {
|
||||
|
|
|
@ -26,8 +26,8 @@ import io.bitsquare.common.handlers.ErrorMessageHandler;
|
|||
import io.bitsquare.common.handlers.ResultHandler;
|
||||
import io.bitsquare.common.util.Utilities;
|
||||
import io.bitsquare.p2p.Address;
|
||||
import io.bitsquare.p2p.FirstPeerAuthenticatedListener;
|
||||
import io.bitsquare.p2p.Message;
|
||||
import io.bitsquare.p2p.P2PNetworkReadyListener;
|
||||
import io.bitsquare.p2p.P2PService;
|
||||
import io.bitsquare.p2p.messaging.SendMailMessageListener;
|
||||
import io.bitsquare.storage.Storage;
|
||||
|
@ -69,7 +69,7 @@ public class OpenOfferManager {
|
|||
private final TradableList<OpenOffer> openOffers;
|
||||
private final Storage<TradableList<OpenOffer>> openOffersStorage;
|
||||
private boolean shutDownRequested;
|
||||
private P2PNetworkReadyListener p2PNetworkReadyListener;
|
||||
private FirstPeerAuthenticatedListener firstPeerAuthenticatedListener;
|
||||
private final Timer timer = new Timer();
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -128,14 +128,14 @@ public class OpenOfferManager {
|
|||
// Before the TTL is reached we re-publish our offers
|
||||
// If offer removal at shutdown fails we don't want to have long term dangling dead offers, so we set TTL quite short and use re-publish as
|
||||
// strategy. Offerers need to be online anyway.
|
||||
if (!p2PService.isAuthenticated()) {
|
||||
p2PNetworkReadyListener = new P2PNetworkReadyListener() {
|
||||
if (!p2PService.getFirstPeerAuthenticated()) {
|
||||
firstPeerAuthenticatedListener = new FirstPeerAuthenticatedListener() {
|
||||
@Override
|
||||
public void onFirstPeerAuthenticated() {
|
||||
startRePublishThread();
|
||||
}
|
||||
};
|
||||
p2PService.addP2PServiceListener(p2PNetworkReadyListener);
|
||||
p2PService.addP2PServiceListener(firstPeerAuthenticatedListener);
|
||||
|
||||
} else {
|
||||
startRePublishThread();
|
||||
|
@ -143,8 +143,8 @@ public class OpenOfferManager {
|
|||
}
|
||||
|
||||
private void startRePublishThread() {
|
||||
if (p2PNetworkReadyListener != null)
|
||||
p2PService.removeP2PServiceListener(p2PNetworkReadyListener);
|
||||
if (firstPeerAuthenticatedListener != null)
|
||||
p2PService.removeP2PServiceListener(firstPeerAuthenticatedListener);
|
||||
|
||||
long period = (long) (Offer.TTL * 0.8); // republish sufficiently before offer would expires
|
||||
TimerTask timerTask = new TimerTask() {
|
||||
|
|
|
@ -165,10 +165,10 @@ class MainViewModel implements ViewModel {
|
|||
log.trace("initializeAllServices");
|
||||
|
||||
BooleanProperty walletInitialized = initBitcoinWallet();
|
||||
BooleanProperty bootstrapDone = initP2PNetwork();
|
||||
BooleanProperty p2pNetWorkReady = initP2PNetwork();
|
||||
|
||||
// need to store it to not get garbage collected
|
||||
allServicesDone = EasyBind.combine(walletInitialized, bootstrapDone, (a, b) -> a && b);
|
||||
allServicesDone = EasyBind.combine(walletInitialized, p2pNetWorkReady, (a, b) -> a && b);
|
||||
allServicesDone.subscribe((observable, oldValue, newValue) -> {
|
||||
if (newValue)
|
||||
onAllServicesInitialized();
|
||||
|
@ -181,26 +181,38 @@ class MainViewModel implements ViewModel {
|
|||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private BooleanProperty initP2PNetwork() {
|
||||
final BooleanProperty initialDataReady = new SimpleBooleanProperty();
|
||||
final BooleanProperty p2pNetWorkReady = new SimpleBooleanProperty();
|
||||
splashP2PNetworkInfo.set("Connecting to Tor network...");
|
||||
|
||||
p2PService.start(new P2PServiceListener() {
|
||||
@Override
|
||||
public void onTorNodeReady() {
|
||||
splashP2PNetworkInfo.set("Publishing Tor Hidden Service...");
|
||||
splashP2PNetworkInfo.set("Tor node created.");
|
||||
p2PNetworkInfo.set(splashP2PNetworkInfo.get());
|
||||
p2PNetworkIconId.set("image-connection-tor");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onHiddenServicePublished() {
|
||||
splashP2PNetworkInfo.set("Authenticating to a seed node...");
|
||||
splashP2PNetworkInfo.set("Hidden Service published.");
|
||||
p2PNetworkInfo.set(splashP2PNetworkInfo.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRequestingDataCompleted() {
|
||||
initialDataReady.set(true);
|
||||
if (p2PService.getNumAuthenticatedPeers().get() == 0) {
|
||||
splashP2PNetworkInfo.set("Initial data received.");
|
||||
p2PNetworkInfo.set(splashP2PNetworkInfo.get());
|
||||
} else {
|
||||
updateP2pNetworkInfo();
|
||||
}
|
||||
p2pNetWorkReady.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNoSeedNodeAvailable() {
|
||||
splashP2PNetworkInfo.set("No seed node available.");
|
||||
p2PNetworkInfo.set(splashP2PNetworkInfo.get());
|
||||
p2pNetWorkReady.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -217,7 +229,7 @@ class MainViewModel implements ViewModel {
|
|||
}
|
||||
});
|
||||
|
||||
return initialDataReady;
|
||||
return p2pNetWorkReady;
|
||||
}
|
||||
|
||||
private BooleanProperty initBitcoinWallet() {
|
||||
|
|
|
@ -185,6 +185,6 @@ class ArbitratorRegistrationViewModel extends ActivatableViewModel {
|
|||
}
|
||||
|
||||
boolean isAuthenticated() {
|
||||
return p2PService.isAuthenticated();
|
||||
return p2PService.getFirstPeerAuthenticated();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -470,7 +470,7 @@ class CreateOfferViewModel extends ActivatableWithDataModel<CreateOfferDataModel
|
|||
}
|
||||
|
||||
boolean isAuthenticated() {
|
||||
return p2PService.isAuthenticated();
|
||||
return p2PService.getFirstPeerAuthenticated();
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -158,7 +158,7 @@ class OfferBookViewModel extends ActivatableViewModel {
|
|||
}
|
||||
|
||||
boolean isAuthenticated() {
|
||||
return p2PService.isAuthenticated();
|
||||
return p2PService.getFirstPeerAuthenticated();
|
||||
}
|
||||
|
||||
public TradeCurrency getTradeCurrency() {
|
||||
|
|
|
@ -74,6 +74,6 @@ class OpenOffersViewModel extends ActivatableWithDataModel<OpenOffersDataModel>
|
|||
}
|
||||
|
||||
boolean isAuthenticated() {
|
||||
return p2PService.isAuthenticated();
|
||||
return p2PService.getFirstPeerAuthenticated();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -220,7 +220,7 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad
|
|||
}
|
||||
|
||||
public boolean isAuthenticated() {
|
||||
return p2PService.isAuthenticated();
|
||||
return p2PService.getFirstPeerAuthenticated();
|
||||
}
|
||||
|
||||
// columns
|
||||
|
|
|
@ -104,6 +104,11 @@ public class NetworkSettingsView extends ActivatableViewAndModel<GridPane, Activ
|
|||
public void onRequestingDataCompleted() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNoSeedNodeAvailable() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFirstPeerAuthenticated() {
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package io.bitsquare.p2p;
|
||||
|
||||
|
||||
public abstract class P2PNetworkReadyListener implements P2PServiceListener {
|
||||
public abstract class FirstPeerAuthenticatedListener implements P2PServiceListener {
|
||||
@Override
|
||||
public void onTorNodeReady() {
|
||||
}
|
||||
|
@ -10,6 +10,10 @@ public abstract class P2PNetworkReadyListener implements P2PServiceListener {
|
|||
public void onHiddenServicePublished() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNoSeedNodeAvailable() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSetupFailed(Throwable throwable) {
|
||||
}
|
|
@ -8,7 +8,6 @@ import com.google.inject.name.Named;
|
|||
import io.bitsquare.app.Log;
|
||||
import io.bitsquare.app.ProgramArguments;
|
||||
import io.bitsquare.common.ByteArray;
|
||||
import io.bitsquare.common.UserThread;
|
||||
import io.bitsquare.common.crypto.CryptoException;
|
||||
import io.bitsquare.common.crypto.KeyRing;
|
||||
import io.bitsquare.common.crypto.PubKeyRing;
|
||||
|
@ -17,15 +16,14 @@ import io.bitsquare.crypto.SealedAndSignedMessage;
|
|||
import io.bitsquare.p2p.messaging.*;
|
||||
import io.bitsquare.p2p.network.*;
|
||||
import io.bitsquare.p2p.peers.PeerGroup;
|
||||
import io.bitsquare.p2p.peers.RequestDataManager;
|
||||
import io.bitsquare.p2p.seed.SeedNodesRepository;
|
||||
import io.bitsquare.p2p.storage.HashMapChangedListener;
|
||||
import io.bitsquare.p2p.storage.ProtectedExpirableDataStorage;
|
||||
import io.bitsquare.p2p.storage.P2PDataStorage;
|
||||
import io.bitsquare.p2p.storage.data.ExpirableMailboxPayload;
|
||||
import io.bitsquare.p2p.storage.data.ExpirablePayload;
|
||||
import io.bitsquare.p2p.storage.data.ProtectedData;
|
||||
import io.bitsquare.p2p.storage.data.ProtectedMailboxData;
|
||||
import io.bitsquare.p2p.storage.messages.GetDataRequest;
|
||||
import io.bitsquare.p2p.storage.messages.GetDataResponse;
|
||||
import io.bitsquare.storage.Storage;
|
||||
import javafx.beans.property.*;
|
||||
import org.fxmisc.easybind.EasyBind;
|
||||
|
@ -39,12 +37,11 @@ import java.io.File;
|
|||
import java.security.PublicKey;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
public class P2PService implements SetupListener, MessageListener, ConnectionListener {
|
||||
public class P2PService implements SetupListener, MessageListener, ConnectionListener, HashMapChangedListener {
|
||||
private static final Logger log = LoggerFactory.getLogger(P2PService.class);
|
||||
|
||||
private final SeedNodesRepository seedNodesRepository;
|
||||
|
@ -59,7 +56,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
|
|||
// set in init
|
||||
private NetworkNode networkNode;
|
||||
private PeerGroup peerGroup;
|
||||
private ProtectedExpirableDataStorage dataStorage;
|
||||
private P2PDataStorage dataStorage;
|
||||
|
||||
private final CopyOnWriteArraySet<DecryptedMailListener> decryptedMailListeners = new CopyOnWriteArraySet<>();
|
||||
private final CopyOnWriteArraySet<DecryptedMailboxListener> decryptedMailboxListeners = new CopyOnWriteArraySet<>();
|
||||
|
@ -69,7 +66,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
|
|||
private final CopyOnWriteArraySet<Runnable> shutDownResultHandlers = new CopyOnWriteArraySet<>();
|
||||
private final BooleanProperty hiddenServicePublished = new SimpleBooleanProperty();
|
||||
private final BooleanProperty requestingDataCompleted = new SimpleBooleanProperty();
|
||||
private final BooleanProperty authenticated = new SimpleBooleanProperty();
|
||||
private final BooleanProperty firstPeerAuthenticated = new SimpleBooleanProperty();
|
||||
private final IntegerProperty numAuthenticatedPeers = new SimpleIntegerProperty(0);
|
||||
|
||||
private Address connectedSeedNode;
|
||||
|
@ -78,6 +75,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
|
|||
private MonadicBinding<Boolean> readyForAuthentication;
|
||||
private final Storage<Address> dbStorage;
|
||||
private Address myOnionAddress;
|
||||
private RequestDataManager requestDataManager;
|
||||
private Set<Address> seedNodeAddresses;
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -118,71 +117,158 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
|
|||
private void init(int networkId, File storageDir) {
|
||||
Log.traceCall();
|
||||
|
||||
// lets check if we have already stored our onion address
|
||||
Address persistedOnionAddress = dbStorage.initAndGetPersisted("myOnionAddress");
|
||||
if (persistedOnionAddress != null)
|
||||
this.myOnionAddress = persistedOnionAddress;
|
||||
|
||||
// network
|
||||
seedNodeAddresses = seedNodesRepository.geSeedNodeAddresses(useLocalhost, networkId);
|
||||
|
||||
// network node
|
||||
networkNode = useLocalhost ? new LocalhostNetworkNode(port) : new TorNetworkNode(port, torDir);
|
||||
Set<Address> seedNodeAddresses = seedNodesRepository.geSeedNodeAddresses(useLocalhost, networkId);
|
||||
|
||||
// peer group
|
||||
peerGroup = new PeerGroup(networkNode, seedNodeAddresses);
|
||||
if (useLocalhost)
|
||||
PeerGroup.setSimulateAuthTorNode(400);
|
||||
|
||||
// P2P network storage
|
||||
dataStorage = new ProtectedExpirableDataStorage(peerGroup, storageDir);
|
||||
|
||||
networkNode.addConnectionListener(this);
|
||||
networkNode.addMessageListener(this);
|
||||
|
||||
dataStorage.addHashMapChangedListener(new HashMapChangedListener() {
|
||||
@Override
|
||||
public void onAdded(ProtectedData entry) {
|
||||
if (entry instanceof ProtectedMailboxData)
|
||||
processProtectedMailboxData((ProtectedMailboxData) entry);
|
||||
}
|
||||
// peer group
|
||||
peerGroup = new PeerGroup(networkNode);
|
||||
if (useLocalhost)
|
||||
PeerGroup.setSimulateAuthTorNode(200);
|
||||
|
||||
@Override
|
||||
public void onRemoved(ProtectedData entry) {
|
||||
}
|
||||
});
|
||||
// P2P network data storage
|
||||
dataStorage = new P2PDataStorage(peerGroup, storageDir);
|
||||
dataStorage.addHashMapChangedListener(this);
|
||||
|
||||
readyForAuthentication = EasyBind.combine(hiddenServicePublished, requestingDataCompleted, authenticated,
|
||||
(a, b, c) -> a && b && !c);
|
||||
|
||||
readyForAuthentication = EasyBind.combine(hiddenServicePublished, requestingDataCompleted, firstPeerAuthenticated,
|
||||
(hiddenServicePublished, requestingDataCompleted, firstPeerAuthenticated)
|
||||
-> hiddenServicePublished && requestingDataCompleted && !firstPeerAuthenticated);
|
||||
readyForAuthentication.subscribe((observable, oldValue, newValue) -> {
|
||||
// we need to have both the initial data delivered and the hidden service published before we
|
||||
// bootstrap and authenticate to other nodes.
|
||||
// authenticate to a seed node.
|
||||
if (newValue)
|
||||
authenticateSeedNode();
|
||||
});
|
||||
|
||||
requestingDataCompleted.addListener((observable, oldValue, newValue) -> {
|
||||
if (newValue)
|
||||
p2pServiceListeners.stream().forEach(e -> e.onRequestingDataCompleted());
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// API
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public void startAsSeedNode(Address mySeedNodeAddress, @Nullable P2PServiceListener listener) {
|
||||
Log.traceCall();
|
||||
seedNodeAddresses.remove(mySeedNodeAddress);
|
||||
start(listener);
|
||||
}
|
||||
|
||||
public void start(@Nullable P2PServiceListener listener) {
|
||||
Log.traceCall();
|
||||
if (listener != null)
|
||||
addP2PServiceListener(listener);
|
||||
|
||||
peerGroup.setSeedNodeAddresses(seedNodeAddresses);
|
||||
networkNode.start(this);
|
||||
}
|
||||
|
||||
public void shutDown(Runnable shutDownCompleteHandler) {
|
||||
Log.traceCall();
|
||||
if (!shutDownInProgress) {
|
||||
shutDownInProgress = true;
|
||||
|
||||
shutDownResultHandlers.add(shutDownCompleteHandler);
|
||||
|
||||
if (dataStorage != null)
|
||||
dataStorage.shutDown();
|
||||
|
||||
if (peerGroup != null)
|
||||
peerGroup.shutDown();
|
||||
|
||||
if (networkNode != null)
|
||||
networkNode.shutDown(() -> {
|
||||
shutDownResultHandlers.stream().forEach(e -> e.run());
|
||||
shutDownComplete = true;
|
||||
});
|
||||
} else {
|
||||
if (shutDownComplete)
|
||||
shutDownCompleteHandler.run();
|
||||
else
|
||||
shutDownResultHandlers.add(shutDownCompleteHandler);
|
||||
|
||||
log.debug("shutDown already in progress");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// SetupListener implementation
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Override
|
||||
public void onTorNodeReady() {
|
||||
Log.traceCall();
|
||||
p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady());
|
||||
|
||||
// 1. Step: As soon we have the tor node ready (hidden service still not available) we request the
|
||||
// data set from a random seed node.
|
||||
requestDataManager = new RequestDataManager(networkNode, dataStorage, new RequestDataManager.Listener() {
|
||||
@Override
|
||||
public void onNoSeedNodeAvailable() {
|
||||
// 2b. or 3b Step: If no seed node available we keep trying again after a random pause
|
||||
p2pServiceListeners.stream().forEach(e -> e.onNoSeedNodeAvailable());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDataReceived(Address seedNode) {
|
||||
// 2a. or 3a Step: We received initial data set
|
||||
connectedSeedNode = seedNode;
|
||||
requestingDataCompleted.set(true);
|
||||
p2pServiceListeners.stream().forEach(e -> e.onRequestingDataCompleted());
|
||||
}
|
||||
});
|
||||
requestDataManager.requestData(seedNodeAddresses);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onHiddenServicePublished() {
|
||||
Log.traceCall();
|
||||
checkArgument(networkNode.getAddress() != null, "Address must be set when we have the hidden service ready");
|
||||
if (myOnionAddress != null) {
|
||||
checkArgument(networkNode.getAddress().equals(myOnionAddress),
|
||||
"If we are a seed node networkNode.getAddress() must be same as myOnionAddress.");
|
||||
} else {
|
||||
myOnionAddress = networkNode.getAddress();
|
||||
dbStorage.queueUpForSave(myOnionAddress);
|
||||
}
|
||||
|
||||
// 3. (or 2.). Step: Hidden service is published
|
||||
hiddenServicePublished.set(true);
|
||||
|
||||
p2pServiceListeners.stream().forEach(e -> e.onHiddenServicePublished());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSetupFailed(Throwable throwable) {
|
||||
Log.traceCall();
|
||||
p2pServiceListeners.stream().forEach(e -> e.onSetupFailed(throwable));
|
||||
}
|
||||
|
||||
// 4. Step: hiddenServicePublished and requestingDataCompleted. We start authenticate to the connected seed node.
|
||||
private void authenticateSeedNode() {
|
||||
Log.traceCall();
|
||||
checkNotNull(connectedSeedNode != null, "connectedSeedNode must not be null");
|
||||
peerGroup.authenticateSeedNode(connectedSeedNode);
|
||||
}
|
||||
|
||||
// 5. Step: in RequestDataManager (after authentication to first seed node we request again the data)
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// MessageListener implementation
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message, Connection connection) {
|
||||
if (message instanceof GetDataRequest) {
|
||||
Log.traceCall(message.toString());
|
||||
networkNode.sendMessage(connection, new GetDataResponse(getDataSet()));
|
||||
} else if (message instanceof GetDataResponse) {
|
||||
Log.traceCall(message.toString());
|
||||
GetDataResponse getDataResponse = (GetDataResponse) message;
|
||||
HashSet<ProtectedData> set = getDataResponse.set;
|
||||
// we keep that connection open as the bootstrapping peer will use that for the authentication
|
||||
// as we are not authenticated yet the data adding will not be broadcasted
|
||||
set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress()));
|
||||
onRequestingDataComplete();
|
||||
} else if (message instanceof SealedAndSignedMessage) {
|
||||
if (message instanceof SealedAndSignedMessage) {
|
||||
Log.traceCall(message.toString());
|
||||
// Seed nodes don't have set the encryptionService
|
||||
if (encryptionService != null) {
|
||||
|
@ -227,9 +313,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
|
|||
"peerAddress must match connection.getPeerAddress()");
|
||||
authenticatedPeerAddresses.add(peerAddress);
|
||||
|
||||
if (!authenticated.get()) {
|
||||
authenticated.set(true);
|
||||
sendGetDataRequestAfterAuthentication(peerAddress, connection);
|
||||
if (!firstPeerAuthenticated.get()) {
|
||||
firstPeerAuthenticated.set(true);
|
||||
p2pServiceListeners.stream().forEach(e -> e.onFirstPeerAuthenticated());
|
||||
}
|
||||
|
||||
|
@ -251,174 +336,18 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
|
|||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// SetupListener implementation
|
||||
// HashMapChangedListener implementation
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Override
|
||||
public void onTorNodeReady() {
|
||||
Log.traceCall();
|
||||
p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady());
|
||||
|
||||
// 1. Step: As soon we have the tor node ready (hidden service still not available) we request the
|
||||
// data set from a random seed node.
|
||||
sendGetDataRequest(peerGroup.getSeedNodeAddresses());
|
||||
}
|
||||
|
||||
private void sendGetDataRequest(Collection<Address> seedNodeAddresses) {
|
||||
Log.traceCall(seedNodeAddresses.toString());
|
||||
if (!seedNodeAddresses.isEmpty()) {
|
||||
List<Address> remainingSeedNodeAddresses = new ArrayList<>(seedNodeAddresses);
|
||||
Collections.shuffle(remainingSeedNodeAddresses);
|
||||
Address candidate = remainingSeedNodeAddresses.remove(0);
|
||||
log.info("We try to send a GetAllDataMessage request to a random seed node. " + candidate);
|
||||
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(candidate, new GetDataRequest());
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Connection connection) {
|
||||
log.info("Send GetAllDataMessage to " + candidate + " succeeded.");
|
||||
checkArgument(connectedSeedNode == null, "We have already a connectedSeedNode. That should not happen.");
|
||||
connectedSeedNode = candidate;
|
||||
|
||||
// In case we get called from a retry we check if we need to authenticate
|
||||
if (!authenticated.get() && hiddenServicePublished.get())
|
||||
authenticateSeedNode();
|
||||
else
|
||||
log.debug("No connected seedNode available.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@NotNull Throwable throwable) {
|
||||
log.info("Send GetAllDataMessage to " + candidate + " failed. " +
|
||||
"That is expected if other seed nodes are offline. " +
|
||||
"Exception:" + throwable.getMessage());
|
||||
if (!remainingSeedNodeAddresses.isEmpty())
|
||||
log.trace("We try to connect another random seed node. " + remainingSeedNodeAddresses);
|
||||
|
||||
sendGetDataRequest(remainingSeedNodeAddresses);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
log.info("There is no seed node available for requesting data. " +
|
||||
"That is expected if no seed node is online.\n" +
|
||||
"We will try again after a bit ");
|
||||
onRequestingDataComplete();
|
||||
|
||||
UserThread.runAfterRandomDelay(() -> sendGetDataRequest(peerGroup.getSeedNodeAddresses()),
|
||||
20, 30, TimeUnit.SECONDS);
|
||||
}
|
||||
public void onAdded(ProtectedData entry) {
|
||||
if (entry instanceof ProtectedMailboxData)
|
||||
processProtectedMailboxData((ProtectedMailboxData) entry);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onHiddenServicePublished() {
|
||||
Log.traceCall();
|
||||
checkArgument(networkNode.getAddress() != null, "Address must be set when we have the hidden service ready");
|
||||
if (myOnionAddress != null)
|
||||
checkArgument(networkNode.getAddress().equals(myOnionAddress),
|
||||
"networkNode.getAddress() must be same as myOnionAddress.");
|
||||
|
||||
myOnionAddress = networkNode.getAddress();
|
||||
dbStorage.queueUpForSave(myOnionAddress);
|
||||
|
||||
p2pServiceListeners.stream().forEach(e -> e.onHiddenServicePublished());
|
||||
|
||||
// 3. (or 2.). Step: Hidden service is published
|
||||
hiddenServicePublished.set(true);
|
||||
public void onRemoved(ProtectedData entry) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSetupFailed(Throwable throwable) {
|
||||
Log.traceCall();
|
||||
p2pServiceListeners.stream().forEach(e -> e.onSetupFailed(throwable));
|
||||
}
|
||||
|
||||
private void onRequestingDataComplete() {
|
||||
Log.traceCall();
|
||||
// 2. (or 3.) Step: We got all data loaded (or no seed node available - should not happen in real operation)
|
||||
requestingDataCompleted.set(true);
|
||||
}
|
||||
|
||||
// 4. Step: hiddenServicePublished and allDataLoaded. We start authenticate to the connected seed node.
|
||||
private void authenticateSeedNode() {
|
||||
Log.traceCall();
|
||||
checkNotNull(connectedSeedNode != null, "connectedSeedNode must not be null");
|
||||
if (connectedSeedNode != null)
|
||||
peerGroup.authenticateSeedNode(connectedSeedNode);
|
||||
}
|
||||
|
||||
// 5. Step:
|
||||
private void sendGetDataRequestAfterAuthentication(Address peerAddress, Connection connection) {
|
||||
Log.traceCall(peerAddress.toString());
|
||||
// We have to exchange the data again as we might have missed pushed data in the meantime
|
||||
// After authentication we send our data set to the other peer.
|
||||
// As he will do the same we will get his actual data set.
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(connection, new GetDataRequest());
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Connection connection) {
|
||||
log.trace("sendGetDataRequestAfterAuthentication: Send GetDataRequest to " + peerAddress + " succeeded.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@NotNull Throwable throwable) {
|
||||
//TODO how to deal with that case?
|
||||
log.warn("sendGetDataRequestAfterAuthentication: Send GetDataRequest to " + peerAddress + " failed. " +
|
||||
"Exception:" + throwable.getMessage());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// API
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// used by seed nodes to exclude themselves form list
|
||||
public void removeMySeedNodeAddressFromList(Address mySeedNodeAddress) {
|
||||
Log.traceCall();
|
||||
peerGroup.removeMySeedNodeAddressFromList(mySeedNodeAddress);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
Log.traceCall();
|
||||
start(null);
|
||||
}
|
||||
|
||||
public void start(@Nullable P2PServiceListener listener) {
|
||||
Log.traceCall();
|
||||
if (listener != null)
|
||||
addP2PServiceListener(listener);
|
||||
|
||||
networkNode.start(this);
|
||||
}
|
||||
|
||||
public void shutDown(Runnable shutDownCompleteHandler) {
|
||||
Log.traceCall();
|
||||
if (!shutDownInProgress) {
|
||||
shutDownInProgress = true;
|
||||
|
||||
shutDownResultHandlers.add(shutDownCompleteHandler);
|
||||
|
||||
if (dataStorage != null)
|
||||
dataStorage.shutDown();
|
||||
|
||||
if (peerGroup != null)
|
||||
peerGroup.shutDown();
|
||||
|
||||
if (networkNode != null)
|
||||
networkNode.shutDown(() -> {
|
||||
shutDownResultHandlers.stream().forEach(e -> e.run());
|
||||
shutDownComplete = true;
|
||||
});
|
||||
} else {
|
||||
if (shutDownComplete)
|
||||
shutDownCompleteHandler.run();
|
||||
else
|
||||
shutDownResultHandlers.add(shutDownCompleteHandler);
|
||||
|
||||
log.debug("shutDown already in progress");
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// MailMessages
|
||||
|
@ -693,8 +622,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
|
|||
// Getters
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public boolean isAuthenticated() {
|
||||
return authenticated.get();
|
||||
public boolean getFirstPeerAuthenticated() {
|
||||
return firstPeerAuthenticated.get();
|
||||
}
|
||||
|
||||
public NetworkNode getNetworkNode() {
|
||||
|
|
|
@ -7,5 +7,7 @@ public interface P2PServiceListener extends SetupListener {
|
|||
|
||||
void onRequestingDataCompleted();
|
||||
|
||||
void onNoSeedNodeAvailable();
|
||||
|
||||
void onFirstPeerAuthenticated();
|
||||
}
|
||||
|
|
|
@ -52,7 +52,8 @@ public class LocalhostNetworkNode extends NetworkNode {
|
|||
@Override
|
||||
public void start(@Nullable SetupListener setupListener) {
|
||||
Log.traceCall();
|
||||
if (setupListener != null) addSetupListener(setupListener);
|
||||
if (setupListener != null)
|
||||
addSetupListener(setupListener);
|
||||
|
||||
createExecutorService();
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@ public class PeerGroup implements MessageListener, ConnectionListener {
|
|||
private static final int MAX_REPORTED_PEERS = 1000;
|
||||
|
||||
private final NetworkNode networkNode;
|
||||
private final Set<Address> seedNodeAddresses;
|
||||
private Set<Address> seedNodeAddresses;
|
||||
|
||||
private final Map<Address, Peer> authenticatedPeers = new HashMap<>();
|
||||
private final Set<ReportedPeer> reportedPeers = new HashSet<>();
|
||||
|
@ -60,11 +60,10 @@ public class PeerGroup implements MessageListener, ConnectionListener {
|
|||
// Constructor
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public PeerGroup(NetworkNode networkNode, Set<Address> seeds) {
|
||||
public PeerGroup(NetworkNode networkNode) {
|
||||
Log.traceCall();
|
||||
|
||||
this.networkNode = networkNode;
|
||||
this.seedNodeAddresses = seeds;
|
||||
|
||||
networkNode.addMessageListener(this);
|
||||
networkNode.addConnectionListener(this);
|
||||
|
@ -73,6 +72,9 @@ public class PeerGroup implements MessageListener, ConnectionListener {
|
|||
startGetPeersTimer();
|
||||
}
|
||||
|
||||
public void setSeedNodeAddresses(Set<Address> seedNodeAddresses) {
|
||||
this.seedNodeAddresses = seedNodeAddresses;
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// MessageListener implementation
|
||||
|
@ -114,11 +116,6 @@ public class PeerGroup implements MessageListener, ConnectionListener {
|
|||
// API
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public void removeMySeedNodeAddressFromList(Address mySeedNodeAddress) {
|
||||
Log.traceCall();
|
||||
seedNodeAddresses.remove(mySeedNodeAddress);
|
||||
}
|
||||
|
||||
public void broadcast(DataBroadcastMessage message, @Nullable Address sender) {
|
||||
Log.traceCall("Sender " + sender + ". Message " + message.toString());
|
||||
if (authenticatedPeers.values().size() > 0) {
|
||||
|
@ -836,5 +833,4 @@ public class PeerGroup implements MessageListener, ConnectionListener {
|
|||
result.append("\n------------------------------------------------------------\n");
|
||||
log.info(result.toString());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,176 @@
|
|||
package io.bitsquare.p2p.peers;
|
||||
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import io.bitsquare.app.Log;
|
||||
import io.bitsquare.common.UserThread;
|
||||
import io.bitsquare.p2p.Address;
|
||||
import io.bitsquare.p2p.Message;
|
||||
import io.bitsquare.p2p.network.Connection;
|
||||
import io.bitsquare.p2p.network.ConnectionListener;
|
||||
import io.bitsquare.p2p.network.MessageListener;
|
||||
import io.bitsquare.p2p.network.NetworkNode;
|
||||
import io.bitsquare.p2p.storage.P2PDataStorage;
|
||||
import io.bitsquare.p2p.storage.data.ProtectedData;
|
||||
import io.bitsquare.p2p.storage.messages.GetDataRequest;
|
||||
import io.bitsquare.p2p.storage.messages.GetDataResponse;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
|
||||
public class RequestDataManager implements MessageListener, ConnectionListener {
|
||||
private static final Logger log = LoggerFactory.getLogger(RequestDataManager.class);
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Listener
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public interface Listener {
|
||||
void onNoSeedNodeAvailable();
|
||||
|
||||
void onDataReceived(Address seedNode);
|
||||
}
|
||||
|
||||
|
||||
private NetworkNode networkNode;
|
||||
private Address connectedSeedNodeAddress;
|
||||
private Collection<Address> seedNodeAddresses;
|
||||
private P2PDataStorage dataStorage;
|
||||
private Listener listener;
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Constructor
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public RequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, Listener listener) {
|
||||
this.networkNode = networkNode;
|
||||
this.dataStorage = dataStorage;
|
||||
this.listener = listener;
|
||||
|
||||
networkNode.addMessageListener(this);
|
||||
networkNode.addConnectionListener(this);
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// API
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public void requestData(Collection<Address> seedNodeAddresses) {
|
||||
if (this.seedNodeAddresses == null)
|
||||
this.seedNodeAddresses = seedNodeAddresses;
|
||||
|
||||
Log.traceCall(seedNodeAddresses.toString());
|
||||
if (!seedNodeAddresses.isEmpty()) {
|
||||
List<Address> remainingSeedNodeAddresses = new ArrayList<>(seedNodeAddresses);
|
||||
Collections.shuffle(remainingSeedNodeAddresses);
|
||||
Address candidate = remainingSeedNodeAddresses.remove(0);
|
||||
|
||||
log.info("We try to send a GetAllDataMessage request to a random seed node. " + candidate);
|
||||
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(candidate, new GetDataRequest());
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Connection connection) {
|
||||
log.info("Send GetAllDataMessage to " + candidate + " succeeded.");
|
||||
checkArgument(connectedSeedNodeAddress == null, "We have already a connectedSeedNode. That must not happen.");
|
||||
connectedSeedNodeAddress = candidate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@NotNull Throwable throwable) {
|
||||
log.info("Send GetAllDataMessage to " + candidate + " failed. " +
|
||||
"That is expected if the seed node is offline. " +
|
||||
"Exception:" + throwable.getMessage());
|
||||
if (!remainingSeedNodeAddresses.isEmpty())
|
||||
log.trace("We try to connect another random seed node from our remaining list. " + remainingSeedNodeAddresses);
|
||||
|
||||
requestData(remainingSeedNodeAddresses);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
log.info("There is no seed node available for requesting data. " +
|
||||
"That is expected if no seed node is online.\n" +
|
||||
"We will try again after a pause of 20-30 sec.");
|
||||
listener.onNoSeedNodeAvailable();
|
||||
|
||||
// We re try after 20-30 sec.
|
||||
UserThread.runAfterRandomDelay(() -> requestData(this.seedNodeAddresses),
|
||||
20, 30, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// MessageListener implementation
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message, Connection connection) {
|
||||
if (message instanceof GetDataRequest) {
|
||||
// We are a seed node and receive that msg from a new node
|
||||
Log.traceCall(message.toString());
|
||||
networkNode.sendMessage(connection, new GetDataResponse(new HashSet<>(dataStorage.getMap().values())));
|
||||
} else if (message instanceof GetDataResponse) {
|
||||
// We are the new node which has requested the data
|
||||
Log.traceCall(message.toString());
|
||||
GetDataResponse getDataResponse = (GetDataResponse) message;
|
||||
HashSet<ProtectedData> set = getDataResponse.set;
|
||||
// we keep that connection open as the bootstrapping peer will use that for the authentication
|
||||
// as we are not authenticated yet the data adding will not be broadcasted
|
||||
set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress()));
|
||||
listener.onDataReceived(connectedSeedNodeAddress);
|
||||
}
|
||||
}
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
// ConnectionListener implementation
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Override
|
||||
public void onConnection(Connection connection) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
|
||||
if (connectedSeedNodeAddress.equals(peerAddress))
|
||||
requestDataFromAuthenticatedSeedNode(peerAddress, connection);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Reason reason, Connection connection) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
}
|
||||
|
||||
// 5. Step after authentication to first seed node we request again the data
|
||||
private void requestDataFromAuthenticatedSeedNode(Address peerAddress, Connection connection) {
|
||||
Log.traceCall(peerAddress.toString());
|
||||
// We have to request the data again as we might have missed pushed data in the meantime
|
||||
SettableFuture<Connection> future = networkNode.sendMessage(connection, new GetDataRequest());
|
||||
Futures.addCallback(future, new FutureCallback<Connection>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Connection connection) {
|
||||
log.info("requestDataFromAuthenticatedSeedNode from " + peerAddress + " succeeded.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@NotNull Throwable throwable) {
|
||||
log.warn("requestDataFromAuthenticatedSeedNode from " + peerAddress + " failed. " +
|
||||
"Exception:" + throwable.getMessage()
|
||||
+ "\nWe will try again to request data from any of our seed nodes.");
|
||||
|
||||
// We will try again to request data from any of our seed nodes.
|
||||
requestData(seedNodeAddresses);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -133,8 +133,7 @@ public class SeedNode {
|
|||
log.info("Created torDir at " + torDir.getAbsolutePath());
|
||||
|
||||
p2PService = new P2PService(seedNodesRepository, mySeedNodeAddress.port, torDir, useLocalhost, networkId, storageDir);
|
||||
p2PService.removeMySeedNodeAddressFromList(mySeedNodeAddress);
|
||||
p2PService.start(listener);
|
||||
p2PService.startAsSeedNode(mySeedNodeAddress, listener);
|
||||
}
|
||||
|
||||
public P2PService getP2PService() {
|
||||
|
|
|
@ -32,8 +32,8 @@ import java.util.*;
|
|||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
|
||||
// Run in UserThread
|
||||
public class ProtectedExpirableDataStorage implements MessageListener {
|
||||
private static final Logger log = LoggerFactory.getLogger(ProtectedExpirableDataStorage.class);
|
||||
public class P2PDataStorage implements MessageListener {
|
||||
private static final Logger log = LoggerFactory.getLogger(P2PDataStorage.class);
|
||||
|
||||
@VisibleForTesting
|
||||
public static int CHECK_TTL_INTERVAL = 10 * 60 * 1000;
|
||||
|
@ -51,7 +51,7 @@ public class ProtectedExpirableDataStorage implements MessageListener {
|
|||
// Constructor
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public ProtectedExpirableDataStorage(PeerGroup peerGroup, File storageDir) {
|
||||
public P2PDataStorage(PeerGroup peerGroup, File storageDir) {
|
||||
Log.traceCall();
|
||||
this.peerGroup = peerGroup;
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
package io.bitsquare.p2p.storage.data;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.bitsquare.p2p.storage.ProtectedExpirableDataStorage;
|
||||
import io.bitsquare.p2p.storage.P2PDataStorage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -11,7 +11,7 @@ import java.security.PublicKey;
|
|||
import java.util.Date;
|
||||
|
||||
public class ProtectedData implements Serializable {
|
||||
private static final Logger log = LoggerFactory.getLogger(ProtectedExpirableDataStorage.class);
|
||||
private static final Logger log = LoggerFactory.getLogger(P2PDataStorage.class);
|
||||
|
||||
public final ExpirablePayload expirablePayload;
|
||||
transient public long ttl;
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package io.bitsquare.p2p.storage.data;
|
||||
|
||||
import io.bitsquare.p2p.storage.ProtectedExpirableDataStorage;
|
||||
import io.bitsquare.p2p.storage.P2PDataStorage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -9,7 +9,7 @@ import java.security.PublicKey;
|
|||
import java.util.Date;
|
||||
|
||||
public class ProtectedMailboxData extends ProtectedData {
|
||||
private static final Logger log = LoggerFactory.getLogger(ProtectedExpirableDataStorage.class);
|
||||
private static final Logger log = LoggerFactory.getLogger(P2PDataStorage.class);
|
||||
|
||||
public final PublicKey receiversPubKey;
|
||||
|
||||
|
|
|
@ -86,6 +86,10 @@ public class TestUtils {
|
|||
public void onRequestingDataCompleted() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNoSeedNodeAvailable() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFirstPeerAuthenticated() {
|
||||
}
|
||||
|
@ -127,6 +131,10 @@ public class TestUtils {
|
|||
public void onRequestingDataCompleted() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNoSeedNodeAvailable() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTorNodeReady() {
|
||||
|
||||
|
|
|
@ -89,7 +89,10 @@ public class PeerGroupTest {
|
|||
|
||||
@Override
|
||||
public void onTorNodeReady() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNoSeedNodeAvailable() {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -131,6 +134,10 @@ public class PeerGroupTest {
|
|||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNoSeedNodeAvailable() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTorNodeReady() {
|
||||
|
||||
|
@ -163,8 +170,11 @@ public class PeerGroupTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onTorNodeReady() {
|
||||
public void onNoSeedNodeAvailable() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTorNodeReady() {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -393,8 +403,11 @@ public class PeerGroupTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onTorNodeReady() {
|
||||
public void onNoSeedNodeAvailable() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTorNodeReady() {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -39,7 +39,7 @@ public class ProtectedDataStorageTest {
|
|||
private NetworkNode networkNode1;
|
||||
private PeerGroup peerGroup1;
|
||||
private EncryptionService encryptionService1, encryptionService2;
|
||||
private ProtectedExpirableDataStorage dataStorage1;
|
||||
private P2PDataStorage dataStorage1;
|
||||
private KeyPair storageSignatureKeyPair1, storageSignatureKeyPair2;
|
||||
private KeyRing keyRing1, keyRing2;
|
||||
private MockData mockData;
|
||||
|
@ -58,15 +58,16 @@ public class ProtectedDataStorageTest {
|
|||
dir2.mkdir();
|
||||
|
||||
UserThread.setExecutor(Executors.newSingleThreadExecutor());
|
||||
ProtectedExpirableDataStorage.CHECK_TTL_INTERVAL = 10 * 60 * 1000;
|
||||
P2PDataStorage.CHECK_TTL_INTERVAL = 10 * 60 * 1000;
|
||||
|
||||
keyRing1 = new KeyRing(new KeyStorage(dir1));
|
||||
|
||||
storageSignatureKeyPair1 = keyRing1.getSignatureKeyPair();
|
||||
encryptionService1 = new EncryptionService(keyRing1);
|
||||
networkNode1 = TestUtils.getAndStartSeedNode(8001, useClearNet, seedNodes).getP2PService().getNetworkNode();
|
||||
peerGroup1 = new PeerGroup(networkNode1, seedNodes);
|
||||
dataStorage1 = new ProtectedExpirableDataStorage(peerGroup1, new File("dummy"));
|
||||
peerGroup1 = new PeerGroup(networkNode1);
|
||||
peerGroup1.setSeedNodeAddresses(seedNodes);
|
||||
dataStorage1 = new P2PDataStorage(peerGroup1, new File("dummy"));
|
||||
|
||||
// for mailbox
|
||||
keyRing2 = new KeyRing(new KeyStorage(dir2));
|
||||
|
@ -106,9 +107,9 @@ public class ProtectedDataStorageTest {
|
|||
|
||||
@Test
|
||||
public void testExpirableData() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
|
||||
ProtectedExpirableDataStorage.CHECK_TTL_INTERVAL = 10;
|
||||
P2PDataStorage.CHECK_TTL_INTERVAL = 10;
|
||||
// CHECK_TTL_INTERVAL is used in constructor of ProtectedExpirableDataStorage so we recreate it here
|
||||
dataStorage1 = new ProtectedExpirableDataStorage(peerGroup1, new File("dummy"));
|
||||
dataStorage1 = new P2PDataStorage(peerGroup1, new File("dummy"));
|
||||
mockData.ttl = 50;
|
||||
|
||||
ProtectedData data = dataStorage1.getDataWithSignedSeqNr(mockData, storageSignatureKeyPair1);
|
||||
|
|
Loading…
Add table
Reference in a new issue