P2P network improvements

This commit is contained in:
Manfred Karrer 2016-01-02 22:04:30 +01:00
parent 97629cb0cd
commit 7c3732c0e5
36 changed files with 831 additions and 416 deletions

View File

@ -48,7 +48,7 @@ public class Log {
rollingPolicy.start(); rollingPolicy.start();
triggeringPolicy = new SizeBasedTriggeringPolicy(); triggeringPolicy = new SizeBasedTriggeringPolicy();
triggeringPolicy.setMaxFileSize(useDetailedLogging ? "50MB" : "1MB"); triggeringPolicy.setMaxFileSize(useDetailedLogging ? "10MB" : "1MB");
triggeringPolicy.start(); triggeringPolicy.start();
PatternLayoutEncoder encoder = new PatternLayoutEncoder(); PatternLayoutEncoder encoder = new PatternLayoutEncoder();

View File

@ -92,7 +92,7 @@ public class Utilities {
executor.allowCoreThreadTimeOut(true); executor.allowCoreThreadTimeOut(true);
executor.setMaximumPoolSize(maximumPoolSize); executor.setMaximumPoolSize(maximumPoolSize);
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
executor.setRejectedExecutionHandler((r, e) -> log.warn("RejectedExecutionHandler called")); executor.setRejectedExecutionHandler((r, e) -> log.debug("RejectedExecutionHandler called"));
return executor; return executor;
} }

View File

@ -50,8 +50,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkNotNull;
/** /**
* Borrowed from BitcoinJ WalletFiles * Borrowed from BitcoinJ WalletFiles
* A class that handles atomic and optionally delayed writing of a file to disk. * A class that handles atomic and optionally delayed writing of a file to disk.
@ -67,7 +65,6 @@ public class FileManager<T> {
private final ScheduledThreadPoolExecutor executor; private final ScheduledThreadPoolExecutor executor;
private final AtomicBoolean savePending; private final AtomicBoolean savePending;
private final long delay; private final long delay;
private final TimeUnit delayTimeUnit;
private final Callable<Void> saveFileTask; private final Callable<Void> saveFileTask;
private T serializable; private T serializable;
@ -76,7 +73,7 @@ public class FileManager<T> {
// Constructor // Constructor
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public FileManager(File dir, File storageFile, long delay, TimeUnit delayTimeUnit) { public FileManager(File dir, File storageFile, long delay) {
this.dir = dir; this.dir = dir;
this.storageFile = storageFile; this.storageFile = storageFile;
@ -85,7 +82,6 @@ public class FileManager<T> {
// File must only be accessed from the auto-save executor from now on, to avoid simultaneous access. // File must only be accessed from the auto-save executor from now on, to avoid simultaneous access.
savePending = new AtomicBoolean(); savePending = new AtomicBoolean();
this.delay = delay; this.delay = delay;
this.delayTimeUnit = checkNotNull(delayTimeUnit);
saveFileTask = () -> { saveFileTask = () -> {
Thread.currentThread().setName("Save-file-task-" + new Random().nextInt(10000)); Thread.currentThread().setName("Save-file-task-" + new Random().nextInt(10000));
@ -126,11 +122,15 @@ public class FileManager<T> {
* Queues up a save in the background. Useful for not very important wallet changes. * Queues up a save in the background. Useful for not very important wallet changes.
*/ */
public void saveLater(T serializable) { public void saveLater(T serializable) {
saveLater(serializable, delay);
}
public void saveLater(T serializable, long delayInMilli) {
this.serializable = serializable; this.serializable = serializable;
if (savePending.getAndSet(true)) if (savePending.getAndSet(true))
return; // Already pending. return; // Already pending.
executor.schedule(saveFileTask, delay, delayTimeUnit); executor.schedule(saveFileTask, delayInMilli, TimeUnit.MILLISECONDS);
} }
public synchronized T read(File file) { public synchronized T read(File file) {

View File

@ -27,7 +27,6 @@ import javax.inject.Named;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
@ -82,7 +81,7 @@ public class Storage<T extends Serializable> {
public T initAndGetPersisted(String fileName) { public T initAndGetPersisted(String fileName) {
this.fileName = fileName; this.fileName = fileName;
storageFile = new File(dir, fileName); storageFile = new File(dir, fileName);
fileManager = new FileManager<>(dir, storageFile, 600, TimeUnit.MILLISECONDS); fileManager = new FileManager<>(dir, storageFile, 600);
return getPersisted(); return getPersisted();
} }
@ -97,7 +96,7 @@ public class Storage<T extends Serializable> {
this.serializable = serializable; this.serializable = serializable;
this.fileName = fileName; this.fileName = fileName;
storageFile = new File(dir, fileName); storageFile = new File(dir, fileName);
fileManager = new FileManager<>(dir, storageFile, 600, TimeUnit.MILLISECONDS); fileManager = new FileManager<>(dir, storageFile, 600);
return getPersisted(); return getPersisted();
} }
@ -106,6 +105,10 @@ public class Storage<T extends Serializable> {
queueUpForSave(serializable); queueUpForSave(serializable);
} }
public void queueUpForSave(long delayInMilli) {
queueUpForSave(serializable, delayInMilli);
}
// Save delayed and on a background thread // Save delayed and on a background thread
public void queueUpForSave(T serializable) { public void queueUpForSave(T serializable) {
if (serializable != null) { if (serializable != null) {
@ -118,6 +121,18 @@ public class Storage<T extends Serializable> {
} }
} }
public void queueUpForSave(T serializable, long delayInMilli) {
if (serializable != null) {
log.trace("save " + fileName);
checkNotNull(storageFile, "storageFile = null. Call setupFileStorage before using read/write.");
fileManager.saveLater(serializable, delayInMilli);
} else {
log.trace("queueUpForSave called but no serializable set");
}
}
public void remove(String fileName) { public void remove(String fileName) {
fileManager.removeFile(fileName); fileManager.removeFile(fileName);
} }

View File

@ -121,7 +121,7 @@ public class ArbitratorManager {
if (user.getRegisteredArbitrator() != null) { if (user.getRegisteredArbitrator() != null) {
P2PService p2PService = arbitratorService.getP2PService(); P2PService p2PService = arbitratorService.getP2PService();
if (!p2PService.getFirstPeerAuthenticated()) { if (!p2PService.isAuthenticated()) {
firstPeerAuthenticatedListener = new FirstPeerAuthenticatedListener() { firstPeerAuthenticatedListener = new FirstPeerAuthenticatedListener() {
@Override @Override
public void onFirstPeerAuthenticated() { public void onFirstPeerAuthenticated() {

View File

@ -106,12 +106,12 @@ public class DisputeManager {
p2PService.addDecryptedMailListener((decryptedMessageWithPubKey, senderAddress) -> { p2PService.addDecryptedMailListener((decryptedMessageWithPubKey, senderAddress) -> {
decryptedMailMessageWithPubKeys.add(decryptedMessageWithPubKey); decryptedMailMessageWithPubKeys.add(decryptedMessageWithPubKey);
if (p2PService.getFirstPeerAuthenticated()) if (p2PService.isAuthenticated())
applyMessages(); applyMessages();
}); });
p2PService.addDecryptedMailboxListener((decryptedMessageWithPubKey, senderAddress) -> { p2PService.addDecryptedMailboxListener((decryptedMessageWithPubKey, senderAddress) -> {
decryptedMailboxMessageWithPubKeys.add(decryptedMessageWithPubKey); decryptedMailboxMessageWithPubKeys.add(decryptedMessageWithPubKey);
if (p2PService.getFirstPeerAuthenticated()) if (p2PService.isAuthenticated())
applyMessages(); applyMessages();
}); });

View File

@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.Service;
import io.bitsquare.app.Log;
import io.bitsquare.btc.listeners.AddressConfidenceListener; import io.bitsquare.btc.listeners.AddressConfidenceListener;
import io.bitsquare.btc.listeners.BalanceListener; import io.bitsquare.btc.listeners.BalanceListener;
import io.bitsquare.btc.listeners.TxConfidenceListener; import io.bitsquare.btc.listeners.TxConfidenceListener;
@ -537,14 +536,12 @@ public class WalletService {
@Override @Override
protected void progress(double percentage, int blocksLeft, Date date) { protected void progress(double percentage, int blocksLeft, Date date) {
Log.traceCall("percentage=" + percentage);
super.progress(percentage, blocksLeft, date); super.progress(percentage, blocksLeft, date);
UserThread.execute(() -> this.percentage.set(percentage / 100d)); UserThread.execute(() -> this.percentage.set(percentage / 100d));
} }
@Override @Override
protected void doneDownload() { protected void doneDownload() {
Log.traceCall();
super.doneDownload(); super.doneDownload();
UserThread.execute(() -> this.percentage.set(1d)); UserThread.execute(() -> this.percentage.set(1d));
} }

View File

@ -125,9 +125,9 @@ public class OpenOfferManager {
// setupAnStartRePublishThread will re-publish at method call // setupAnStartRePublishThread will re-publish at method call
// Before the TTL is reached we re-publish our offers // Before the TTL is reached we re-publish our offers
// If offer removal at shutdown fails we don't want to have long term dangling dead offers, so we set TTL quite short and use re-publish as // If offer removal at shutdown fails we don't want to have long term dangling dead offers, so we set
// strategy. Offerers need to be online anyway. // TTL quite short and use re-publish as strategy. Offerers need to be online anyway.
if (!p2PService.getFirstPeerAuthenticated()) { if (!p2PService.isAuthenticated()) {
firstPeerAuthenticatedListener = new FirstPeerAuthenticatedListener() { firstPeerAuthenticatedListener = new FirstPeerAuthenticatedListener() {
@Override @Override
public void onFirstPeerAuthenticated() { public void onFirstPeerAuthenticated() {

View File

@ -88,7 +88,6 @@ public class Preferences implements Serializable {
private boolean useAnimations = true; private boolean useAnimations = true;
private boolean useEffects = true; private boolean useEffects = true;
private boolean displaySecurityDepositInfo = true; private boolean displaySecurityDepositInfo = true;
private boolean useUPnP = true;
private ArrayList<TradeCurrency> tradeCurrencies; private ArrayList<TradeCurrency> tradeCurrencies;
private BlockChainExplorer blockChainExplorerMainNet; private BlockChainExplorer blockChainExplorerMainNet;
private BlockChainExplorer blockChainExplorerTestNet; private BlockChainExplorer blockChainExplorerTestNet;
@ -123,7 +122,6 @@ public class Preferences implements Serializable {
setBtcDenomination(persisted.btcDenomination); setBtcDenomination(persisted.btcDenomination);
setUseAnimations(persisted.useAnimations); setUseAnimations(persisted.useAnimations);
setUseEffects(persisted.useEffects); setUseEffects(persisted.useEffects);
setUseUPnP(persisted.useUPnP);
setTradeCurrencies(persisted.tradeCurrencies); setTradeCurrencies(persisted.tradeCurrencies);
tradeCurrencies = new ArrayList<>(tradeCurrenciesAsObservable); tradeCurrencies = new ArrayList<>(tradeCurrenciesAsObservable);
displaySecurityDepositInfo = persisted.getDisplaySecurityDepositInfo(); displaySecurityDepositInfo = persisted.getDisplaySecurityDepositInfo();
@ -172,15 +170,15 @@ public class Preferences implements Serializable {
// Use that to guarantee update of the serializable field and to make a storage update in case of a change // Use that to guarantee update of the serializable field and to make a storage update in case of a change
btcDenominationProperty.addListener((ov) -> { btcDenominationProperty.addListener((ov) -> {
btcDenomination = btcDenominationProperty.get(); btcDenomination = btcDenominationProperty.get();
storage.queueUpForSave(); storage.queueUpForSave(2000);
}); });
useAnimationsProperty.addListener((ov) -> { useAnimationsProperty.addListener((ov) -> {
useAnimations = useAnimationsProperty.get(); useAnimations = useAnimationsProperty.get();
storage.queueUpForSave(); storage.queueUpForSave(2000);
}); });
useEffectsProperty.addListener((ov) -> { useEffectsProperty.addListener((ov) -> {
useEffects = useEffectsProperty.get(); useEffects = useEffectsProperty.get();
storage.queueUpForSave(); storage.queueUpForSave(2000);
}); });
tradeCurrenciesAsObservable.addListener((Observable ov) -> { tradeCurrenciesAsObservable.addListener((Observable ov) -> {
tradeCurrencies.clear(); tradeCurrencies.clear();
@ -191,7 +189,7 @@ public class Preferences implements Serializable {
public void dontShowAgain(String id) { public void dontShowAgain(String id) {
showAgainMap.put(id, false); showAgainMap.put(id, false);
storage.queueUpForSave(); storage.queueUpForSave(2000);
} }
@ -213,12 +211,7 @@ public class Preferences implements Serializable {
public void setDisplaySecurityDepositInfo(boolean displaySecurityDepositInfo) { public void setDisplaySecurityDepositInfo(boolean displaySecurityDepositInfo) {
this.displaySecurityDepositInfo = displaySecurityDepositInfo; this.displaySecurityDepositInfo = displaySecurityDepositInfo;
storage.queueUpForSave(); storage.queueUpForSave(2000);
}
public void setUseUPnP(boolean useUPnP) {
this.useUPnP = useUPnP;
storage.queueUpForSave();
} }
public void setBitcoinNetwork(BitcoinNetwork bitcoinNetwork) { public void setBitcoinNetwork(BitcoinNetwork bitcoinNetwork) {
@ -235,12 +228,12 @@ public class Preferences implements Serializable {
private void setBlockChainExplorerTestNet(BlockChainExplorer blockChainExplorerTestNet) { private void setBlockChainExplorerTestNet(BlockChainExplorer blockChainExplorerTestNet) {
this.blockChainExplorerTestNet = blockChainExplorerTestNet; this.blockChainExplorerTestNet = blockChainExplorerTestNet;
storage.queueUpForSave(); storage.queueUpForSave(2000);
} }
private void setBlockChainExplorerMainNet(BlockChainExplorer blockChainExplorerMainNet) { private void setBlockChainExplorerMainNet(BlockChainExplorer blockChainExplorerMainNet) {
this.blockChainExplorerMainNet = blockChainExplorerMainNet; this.blockChainExplorerMainNet = blockChainExplorerMainNet;
storage.queueUpForSave(); storage.queueUpForSave(2000);
} }
public void setBlockChainExplorer(BlockChainExplorer blockChainExplorer) { public void setBlockChainExplorer(BlockChainExplorer blockChainExplorer) {
@ -252,12 +245,12 @@ public class Preferences implements Serializable {
public void setShowPlaceOfferConfirmation(boolean showPlaceOfferConfirmation) { public void setShowPlaceOfferConfirmation(boolean showPlaceOfferConfirmation) {
this.showPlaceOfferConfirmation = showPlaceOfferConfirmation; this.showPlaceOfferConfirmation = showPlaceOfferConfirmation;
storage.queueUpForSave(); storage.queueUpForSave(2000);
} }
public void setShowTakeOfferConfirmation(boolean showTakeOfferConfirmation) { public void setShowTakeOfferConfirmation(boolean showTakeOfferConfirmation) {
this.showTakeOfferConfirmation = showTakeOfferConfirmation; this.showTakeOfferConfirmation = showTakeOfferConfirmation;
storage.queueUpForSave(); storage.queueUpForSave(2000);
} }
public void setTacAccepted(boolean tacAccepted) { public void setTacAccepted(boolean tacAccepted) {
@ -310,10 +303,6 @@ public class Preferences implements Serializable {
return useEffectsProperty; return useEffectsProperty;
} }
public boolean getUseUPnP() {
return useUPnP;
}
public BitcoinNetwork getBitcoinNetwork() { public BitcoinNetwork getBitcoinNetwork() {
return bitcoinNetwork; return bitcoinNetwork;
} }
@ -379,7 +368,7 @@ public class Preferences implements Serializable {
// if we add new and those are not in our stored map we display by default the new popup // if we add new and those are not in our stored map we display by default the new popup
if (!getShowAgainMap().containsKey(key)) { if (!getShowAgainMap().containsKey(key)) {
showAgainMap.put(key, true); showAgainMap.put(key, true);
storage.queueUpForSave(); storage.queueUpForSave(2000);
} }
return showAgainMap.get(key); return showAgainMap.get(key);

View File

@ -318,7 +318,7 @@ public class User implements Serializable {
public void setDevelopersAlert(Alert developersAlert) { public void setDevelopersAlert(Alert developersAlert) {
this.developersAlert = developersAlert; this.developersAlert = developersAlert;
storage.queueUpForSave(); storage.queueUpForSave(2000);
} }
public Alert getDevelopersAlert() { public Alert getDevelopersAlert() {
@ -327,7 +327,7 @@ public class User implements Serializable {
public void setDisplayedAlert(Alert displayedAlert) { public void setDisplayedAlert(Alert displayedAlert) {
this.displayedAlert = displayedAlert; this.displayedAlert = displayedAlert;
storage.queueUpForSave(); storage.queueUpForSave(2000);
} }
public Alert getDisplayedAlert() { public Alert getDisplayedAlert() {

View File

@ -32,6 +32,7 @@ import io.bitsquare.gui.common.view.View;
import io.bitsquare.gui.common.view.ViewLoader; import io.bitsquare.gui.common.view.ViewLoader;
import io.bitsquare.gui.common.view.guice.InjectorViewFactory; import io.bitsquare.gui.common.view.guice.InjectorViewFactory;
import io.bitsquare.gui.main.MainView; import io.bitsquare.gui.main.MainView;
import io.bitsquare.gui.main.MainViewModel;
import io.bitsquare.gui.main.debug.DebugView; import io.bitsquare.gui.main.debug.DebugView;
import io.bitsquare.gui.popups.EmptyWalletPopup; import io.bitsquare.gui.popups.EmptyWalletPopup;
import io.bitsquare.gui.popups.Popup; import io.bitsquare.gui.popups.Popup;
@ -304,15 +305,13 @@ public class BitsquareApp extends Application {
log.debug("gracefulShutDown"); log.debug("gracefulShutDown");
try { try {
if (injector != null) { if (injector != null) {
ArbitratorManager arbitratorManager = injector.getInstance(ArbitratorManager.class); injector.getInstance(ArbitratorManager.class).shutDown();
arbitratorManager.shutDown(); injector.getInstance(MainViewModel.class).shutDown();
OpenOfferManager openOfferManager = injector.getInstance(OpenOfferManager.class); injector.getInstance(OpenOfferManager.class).shutDown(() -> {
openOfferManager.shutDown(() -> { injector.getInstance(P2PService.class).shutDown(() -> {
P2PService p2PService = injector.getInstance(P2PService.class); injector.getInstance(WalletService.class).shutDownDone.addListener((ov, o, n) -> {
p2PService.shutDown(() -> {
WalletService walletService = injector.getInstance(WalletService.class);
walletService.shutDownDone.addListener((observable, oldValue, newValue) -> {
bitsquareAppModule.close(injector); bitsquareAppModule.close(injector);
log.info("Graceful shutdown completed");
resultHandler.handleResult(); resultHandler.handleResult();
}); });
injector.getInstance(WalletService.class).shutDown(); injector.getInstance(WalletService.class).shutDown();

View File

@ -46,7 +46,7 @@ public class Navigation implements Serializable {
// New listeners can be added during iteration so we use CopyOnWriteArrayList to // New listeners can be added during iteration so we use CopyOnWriteArrayList to
// prevent invalid array modification // prevent invalid array modification
transient private final CopyOnWriteArraySet<Listener> listeners = new CopyOnWriteArraySet<>(); transient private final CopyOnWriteArraySet<Listener> listeners = new CopyOnWriteArraySet<>();
transient private final Storage<Navigation> remoteStorage; transient private final Storage<Navigation> storage;
transient private ViewPath currentPath; transient private ViewPath currentPath;
// Used for returning to the last important view. After setup is done we want to // Used for returning to the last important view. After setup is done we want to
// return to the last opened view (e.g. sell/buy) // return to the last opened view (e.g. sell/buy)
@ -57,14 +57,13 @@ public class Navigation implements Serializable {
@Inject @Inject
public Navigation(Storage<Navigation> remoteStorage) { public Navigation(Storage<Navigation> storage) {
this.remoteStorage = remoteStorage; this.storage = storage;
Navigation persisted = remoteStorage.initAndGetPersisted(this); Navigation persisted = storage.initAndGetPersisted(this);
if (persisted != null) { if (persisted != null) {
previousPath = persisted.getPreviousPath(); previousPath = persisted.getPreviousPath();
} } else
else
previousPath = DEFAULT_VIEW_PATH; previousPath = DEFAULT_VIEW_PATH;
// need to be null initially and not DEFAULT_VIEW_PATH to navigate through all items // need to be null initially and not DEFAULT_VIEW_PATH to navigate through all items
@ -102,7 +101,7 @@ public class Navigation implements Serializable {
currentPath = newPath; currentPath = newPath;
previousPath = currentPath; previousPath = currentPath;
remoteStorage.queueUpForSave(); storage.queueUpForSave(2000);
listeners.stream().forEach((e) -> e.onNavigationRequested(currentPath)); listeners.stream().forEach((e) -> e.onNavigationRequested(currentPath));
} }

View File

@ -87,6 +87,7 @@ public class MainView extends InitializableView<StackPane, MainViewModel> {
private Label btcSplashInfo; private Label btcSplashInfo;
private List<String> persistedFilesCorrupted; private List<String> persistedFilesCorrupted;
private static BorderPane baseApplicationContainer; private static BorderPane baseApplicationContainer;
private Popup p2PNetworkWarnMsgPopup;
@Inject @Inject
public MainView(MainViewModel model, CachingViewLoader viewLoader, Navigation navigation, Transitions transitions, public MainView(MainViewModel model, CachingViewLoader viewLoader, Navigation navigation, Transitions transitions,
@ -271,8 +272,10 @@ public class MainView extends InitializableView<StackPane, MainViewModel> {
splashP2PNetworkIndicator.progressProperty().bind(model.splashP2PNetworkProgress); splashP2PNetworkIndicator.progressProperty().bind(model.splashP2PNetworkProgress);
splashP2PNetworkErrorMsgListener = (ov, oldValue, newValue) -> { splashP2PNetworkErrorMsgListener = (ov, oldValue, newValue) -> {
splashP2PNetworkLabel.setId("splash-error-state-msg"); if (newValue != null) {
splashP2PNetworkIndicator.setVisible(false); splashP2PNetworkLabel.setId("splash-error-state-msg");
splashP2PNetworkIndicator.setVisible(false);
}
}; };
model.p2PNetworkWarnMsg.addListener(splashP2PNetworkErrorMsgListener); model.p2PNetworkWarnMsg.addListener(splashP2PNetworkErrorMsgListener);
@ -392,14 +395,12 @@ public class MainView extends InitializableView<StackPane, MainViewModel> {
setRightAnchor(p2PNetworkIcon, 10d); setRightAnchor(p2PNetworkIcon, 10d);
setBottomAnchor(p2PNetworkIcon, 7d); setBottomAnchor(p2PNetworkIcon, 7d);
p2PNetworkIcon.idProperty().bind(model.p2PNetworkIconId); p2PNetworkIcon.idProperty().bind(model.p2PNetworkIconId);
p2PNetworkLabel.idProperty().bind(model.p2PNetworkLabelId);
model.p2PNetworkWarnMsg.addListener((ov, oldValue, newValue) -> { model.p2PNetworkWarnMsg.addListener((ov, oldValue, newValue) -> {
if (newValue != null) { if (newValue != null) {
p2PNetworkLabel.setId("splash-error-state-msg"); p2PNetworkWarnMsgPopup = new Popup().warning(newValue).show();
new Popup().warning(newValue + "\nPlease check your internet connection or try to restart the application.") } else if (p2PNetworkWarnMsgPopup != null) {
.show(); p2PNetworkWarnMsgPopup.hide();
} else {
p2PNetworkLabel.setId("footer-pane");
} }
}); });

View File

@ -47,6 +47,7 @@ import io.bitsquare.trade.offer.OpenOfferManager;
import io.bitsquare.user.Preferences; import io.bitsquare.user.Preferences;
import io.bitsquare.user.User; import io.bitsquare.user.User;
import javafx.beans.property.*; import javafx.beans.property.*;
import javafx.beans.value.ChangeListener;
import javafx.collections.ListChangeListener; import javafx.collections.ListChangeListener;
import org.bitcoinj.core.*; import org.bitcoinj.core.*;
import org.bitcoinj.store.BlockStoreException; import org.bitcoinj.store.BlockStoreException;
@ -65,7 +66,7 @@ import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
class MainViewModel implements ViewModel { public class MainViewModel implements ViewModel {
private static final Logger log = LoggerFactory.getLogger(MainViewModel.class); private static final Logger log = LoggerFactory.getLogger(MainViewModel.class);
private final WalletService walletService; private final WalletService walletService;
@ -105,12 +106,13 @@ class MainViewModel implements ViewModel {
final BooleanProperty showOpenDisputesNotification = new SimpleBooleanProperty(); final BooleanProperty showOpenDisputesNotification = new SimpleBooleanProperty();
private final BooleanProperty isSplashScreenRemoved = new SimpleBooleanProperty(); private final BooleanProperty isSplashScreenRemoved = new SimpleBooleanProperty();
private final String btcNetworkAsString; private final String btcNetworkAsString;
final StringProperty p2PNetworkLabelId = new SimpleStringProperty("footer-pane");
private MonadicBinding<Boolean> allServicesDone; private MonadicBinding<Boolean> allServicesDone;
private User user; private User user;
private int numBTCPeers = 0; private int numBTCPeers = 0;
private Timer checkForBtcSyncStateTimer; private Timer checkForBtcSyncStateTimer;
private ChangeListener<Number> numAuthenticatedPeersListener, btcNumPeersListener;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -170,13 +172,24 @@ class MainViewModel implements ViewModel {
}); });
} }
public void shutDown() {
if (numAuthenticatedPeersListener != null)
p2PService.getNumAuthenticatedPeers().removeListener(numAuthenticatedPeersListener);
if (btcNumPeersListener != null)
walletService.numPeersProperty().removeListener(btcNumPeersListener);
if (checkForBtcSyncStateTimer != null)
checkForBtcSyncStateTimer.stop();
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Initialisation // Initialisation
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private BooleanProperty initP2PNetwork() { private BooleanProperty initP2PNetwork() {
final BooleanProperty p2pNetWorkReady = new SimpleBooleanProperty(); final BooleanProperty p2pNetworkInitialized = new SimpleBooleanProperty();
p2PNetworkInfo.set("Connecting to Tor network..."); p2PNetworkInfo.set("Connecting to Tor network...");
p2PService.start(new P2PServiceListener() { p2PService.start(new P2PServiceListener() {
@Override @Override
@ -197,14 +210,26 @@ class MainViewModel implements ViewModel {
} else { } else {
updateP2pNetworkInfoWithPeersChanged(p2PService.getNumAuthenticatedPeers().get()); updateP2pNetworkInfoWithPeersChanged(p2PService.getNumAuthenticatedPeers().get());
} }
p2pNetWorkReady.set(true); p2pNetworkInitialized.set(true);
} }
@Override @Override
public void onNoSeedNodeAvailable() { public void onNoSeedNodeAvailable() {
p2PNetworkWarnMsg.set("There are no seed nodes available."); if (p2PService.getNumAuthenticatedPeers().get() == 0) {
p2PNetworkInfo.set("No seed node available"); p2PNetworkInfo.set("No seed nodes available");
p2pNetWorkReady.set(true); }
p2pNetworkInitialized.set(true);
}
@Override
public void onNoPeersAvailable() {
if (p2PService.getNumAuthenticatedPeers().get() == 0) {
p2PNetworkWarnMsg.set("There are no seed nodes or persisted peers available for requesting data.\n" +
"Please check your internet connection or try to restart the application.");
p2PNetworkInfo.set("No seed nodes and peers available");
p2PNetworkLabelId.set("splash-error-state-msg");
}
p2pNetworkInitialized.set(true);
} }
@Override @Override
@ -216,18 +241,22 @@ class MainViewModel implements ViewModel {
@Override @Override
public void onSetupFailed(Throwable throwable) { public void onSetupFailed(Throwable throwable) {
p2PNetworkWarnMsg.set("Connecting to the P2P network failed (reported error: " + throwable.getMessage() + ")."); p2PNetworkWarnMsg.set("Connecting to the P2P network failed (reported error: "
+ throwable.getMessage() + ").\n" +
"Please check your internet connection or try to restart the application.");
splashP2PNetworkProgress.set(0); splashP2PNetworkProgress.set(0);
if (p2PService.getNumAuthenticatedPeers().get() == 0)
p2PNetworkLabelId.set("splash-error-state-msg");
} }
}); });
return p2pNetWorkReady; return p2pNetworkInitialized;
} }
private BooleanProperty initBitcoinWallet() { private BooleanProperty initBitcoinWallet() {
EasyBind.subscribe(walletService.downloadPercentageProperty(), newValue -> setBitcoinNetworkSyncProgress((double) newValue)); EasyBind.subscribe(walletService.downloadPercentageProperty(), newValue -> setBitcoinNetworkSyncProgress((double) newValue));
walletService.numPeersProperty().addListener((observable, oldValue, newValue) -> { btcNumPeersListener = (observable, oldValue, newValue) -> {
if ((int) oldValue > 0 && (int) newValue == 0) if ((int) oldValue > 0 && (int) newValue == 0)
walletServiceErrorMsg.set("You lost the connection to all bitcoin peers."); walletServiceErrorMsg.set("You lost the connection to all bitcoin peers.");
else if ((int) oldValue == 0 && (int) newValue > 0) else if ((int) oldValue == 0 && (int) newValue > 0)
@ -235,7 +264,8 @@ class MainViewModel implements ViewModel {
numBTCPeers = (int) newValue; numBTCPeers = (int) newValue;
setBitcoinNetworkSyncProgress(walletService.downloadPercentageProperty().get()); setBitcoinNetworkSyncProgress(walletService.downloadPercentageProperty().get());
}); };
walletService.numPeersProperty().addListener(btcNumPeersListener);
final BooleanProperty walletInitialized = new SimpleBooleanProperty(); final BooleanProperty walletInitialized = new SimpleBooleanProperty();
walletService.initialize(null, walletService.initialize(null,
@ -341,14 +371,19 @@ class MainViewModel implements ViewModel {
.show(); .show();
// update nr of peers in footer // update nr of peers in footer
p2PService.getNumAuthenticatedPeers().addListener((observable, oldValue, newValue) -> { numAuthenticatedPeersListener = (observable, oldValue, newValue) -> {
if ((int) oldValue > 0 && (int) newValue == 0) if ((int) oldValue > 0 && (int) newValue == 0) {
p2PNetworkWarnMsg.set("You lost the connection to all P2P network peers."); p2PNetworkWarnMsg.set("You lost the connection to all P2P network peers.\n" +
else if ((int) oldValue == 0 && (int) newValue > 0) "Please check your internet connection or try to restart the application.");
p2PNetworkLabelId.set("splash-error-state-msg");
} else if ((int) oldValue == 0 && (int) newValue > 0) {
p2PNetworkWarnMsg.set(null); p2PNetworkWarnMsg.set(null);
p2PNetworkLabelId.set("footer-pane");
}
updateP2pNetworkInfoWithPeersChanged((int) newValue); updateP2pNetworkInfoWithPeersChanged((int) newValue);
}); };
p2PService.getNumAuthenticatedPeers().addListener(numAuthenticatedPeersListener);
// now show app // now show app
showAppScreen.set(true); showAppScreen.set(true);
@ -565,7 +600,6 @@ class MainViewModel implements ViewModel {
} }
private void setBitcoinNetworkSyncProgress(double value) { private void setBitcoinNetworkSyncProgress(double value) {
Log.traceCall("btcSyncProgress=" + value);
btcSyncProgress.set(value); btcSyncProgress.set(value);
String numPeers = "Nr. of peers: " + numBTCPeers; String numPeers = "Nr. of peers: " + numBTCPeers;
if (value == 1) { if (value == 1) {

View File

@ -185,6 +185,6 @@ class ArbitratorRegistrationViewModel extends ActivatableViewModel {
} }
boolean isAuthenticated() { boolean isAuthenticated() {
return p2PService.getFirstPeerAuthenticated(); return p2PService.isAuthenticated();
} }
} }

View File

@ -486,7 +486,7 @@ class CreateOfferViewModel extends ActivatableWithDataModel<CreateOfferDataModel
} }
boolean isAuthenticated() { boolean isAuthenticated() {
return p2PService.getFirstPeerAuthenticated(); return p2PService.isAuthenticated();
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////

View File

@ -158,7 +158,7 @@ class OfferBookViewModel extends ActivatableViewModel {
} }
boolean isAuthenticated() { boolean isAuthenticated() {
return p2PService.getFirstPeerAuthenticated(); return p2PService.isAuthenticated();
} }
public TradeCurrency getTradeCurrency() { public TradeCurrency getTradeCurrency() {

View File

@ -74,6 +74,6 @@ class OpenOffersViewModel extends ActivatableWithDataModel<OpenOffersDataModel>
} }
boolean isAuthenticated() { boolean isAuthenticated() {
return p2PService.getFirstPeerAuthenticated(); return p2PService.isAuthenticated();
} }
} }

View File

@ -220,7 +220,7 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad
} }
public boolean isAuthenticated() { public boolean isAuthenticated() {
return p2PService.getFirstPeerAuthenticated(); return p2PService.isAuthenticated();
} }
// columns // columns

View File

@ -120,7 +120,10 @@ public class NetworkSettingsView extends ActivatableViewAndModel<GridPane, Activ
@Override @Override
public void onNoSeedNodeAvailable() { public void onNoSeedNodeAvailable() {
}
@Override
public void onNoPeersAvailable() {
} }
@Override @Override

View File

@ -14,6 +14,10 @@ public abstract class FirstPeerAuthenticatedListener implements P2PServiceListen
public void onNoSeedNodeAvailable() { public void onNoSeedNodeAvailable() {
} }
@Override
public void onNoPeersAvailable() {
}
@Override @Override
public void onSetupFailed(Throwable throwable) { public void onSetupFailed(Throwable throwable) {
} }

View File

@ -49,6 +49,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private final int port; private final int port;
private final File torDir; private final File torDir;
private final boolean useLocalhost; private final boolean useLocalhost;
protected final File storageDir;
private final Optional<EncryptionService> optionalEncryptionService; private final Optional<EncryptionService> optionalEncryptionService;
private final Optional<KeyRing> optionalKeyRing; private final Optional<KeyRing> optionalKeyRing;
@ -63,16 +64,16 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
private final Map<DecryptedMsgWithPubKey, ProtectedMailboxData> mailboxMap = new HashMap<>(); private final Map<DecryptedMsgWithPubKey, ProtectedMailboxData> mailboxMap = new HashMap<>();
private final Set<Address> authenticatedPeerAddresses = new HashSet<>(); private final Set<Address> authenticatedPeerAddresses = new HashSet<>();
private final CopyOnWriteArraySet<Runnable> shutDownResultHandlers = new CopyOnWriteArraySet<>(); private final CopyOnWriteArraySet<Runnable> shutDownResultHandlers = new CopyOnWriteArraySet<>();
private final BooleanProperty hiddenServicePublished = new SimpleBooleanProperty(); protected final BooleanProperty hiddenServicePublished = new SimpleBooleanProperty();
protected final BooleanProperty requestingDataCompleted = new SimpleBooleanProperty(); private final BooleanProperty requestingDataCompleted = new SimpleBooleanProperty();
private final BooleanProperty firstPeerAuthenticated = new SimpleBooleanProperty(); protected final BooleanProperty notAuthenticated = new SimpleBooleanProperty(true);
private final IntegerProperty numAuthenticatedPeers = new SimpleIntegerProperty(0); private final IntegerProperty numAuthenticatedPeers = new SimpleIntegerProperty(0);
protected Address connectedSeedNode; private Address seedNodeOfInitialDataRequest;
private volatile boolean shutDownInProgress; private volatile boolean shutDownInProgress;
private boolean shutDownComplete; private boolean shutDownComplete;
@SuppressWarnings("FieldCanBeLocal") @SuppressWarnings("FieldCanBeLocal")
private MonadicBinding<Boolean> readyForAuthentication; private MonadicBinding<Boolean> readyForAuthenticationBinding;
private final Storage<Address> dbStorage; private final Storage<Address> dbStorage;
private Address myOnionAddress; private Address myOnionAddress;
protected RequestDataManager requestDataManager; protected RequestDataManager requestDataManager;
@ -97,6 +98,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
this.port = port; this.port = port;
this.torDir = torDir; this.torDir = torDir;
this.useLocalhost = useLocalhost; this.useLocalhost = useLocalhost;
this.storageDir = storageDir;
optionalEncryptionService = encryptionService == null ? Optional.empty() : Optional.of(encryptionService); optionalEncryptionService = encryptionService == null ? Optional.empty() : Optional.of(encryptionService);
optionalKeyRing = keyRing == null ? Optional.empty() : Optional.of(keyRing); optionalKeyRing = keyRing == null ? Optional.empty() : Optional.of(keyRing);
@ -122,7 +124,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
networkNode.addMessageListener(this); networkNode.addMessageListener(this);
// peer group // peer group
peerManager = createPeerManager(); peerManager = getNewPeerManager();
peerManager.setSeedNodeAddresses(seedNodeAddresses); peerManager.setSeedNodeAddresses(seedNodeAddresses);
peerManager.addAuthenticationListener(this); peerManager.addAuthenticationListener(this);
@ -130,44 +132,52 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
dataStorage = new P2PDataStorage(peerManager, networkNode, storageDir); dataStorage = new P2PDataStorage(peerManager, networkNode, storageDir);
dataStorage.addHashMapChangedListener(this); dataStorage.addHashMapChangedListener(this);
// Request initial data manager // Request data manager
requestDataManager = createRequestDataManager(); requestDataManager = getNewRequestDataManager();
peerManager.addAuthenticationListener(requestDataManager); requestDataManager.setRequestDataManagerListener(new RequestDataManager.Listener() {
// Test multiple states to check when we are ready for authenticateSeedNode
readyForAuthentication = EasyBind.combine(hiddenServicePublished, requestingDataCompleted, firstPeerAuthenticated,
(hiddenServicePublished, requestingDataCompleted, firstPeerAuthenticated)
-> hiddenServicePublished && requestingDataCompleted && !firstPeerAuthenticated);
readyForAuthentication.subscribe((observable, oldValue, newValue) -> {
// we need to have both the initial data delivered and the hidden service published before we
// authenticate to a seed node.
if (newValue)
authenticateSeedNode();
});
}
protected PeerManager createPeerManager() {
return new PeerManager(networkNode);
}
protected RequestDataManager createRequestDataManager() {
return new RequestDataManager(networkNode, dataStorage, peerManager, getRequestDataManager());
}
protected RequestDataManager.Listener getRequestDataManager() {
return new RequestDataManager.Listener() {
@Override @Override
public void onNoSeedNodeAvailable() { public void onNoSeedNodeAvailable() {
p2pServiceListeners.stream().forEach(e -> e.onNoSeedNodeAvailable()); p2pServiceListeners.stream().forEach(e -> e.onNoSeedNodeAvailable());
} }
@Override @Override
public void onDataReceived(Address seedNode) { public void onNoPeersAvailable() {
connectedSeedNode = seedNode; p2pServiceListeners.stream().forEach(e -> e.onNoPeersAvailable());
requestingDataCompleted.set(true); }
@Override
public void onDataReceived(Address address) {
if (!requestingDataCompleted.get()) {
seedNodeOfInitialDataRequest = address;
requestingDataCompleted.set(true);
}
p2pServiceListeners.stream().forEach(e -> e.onRequestingDataCompleted()); p2pServiceListeners.stream().forEach(e -> e.onRequestingDataCompleted());
} }
}; });
peerManager.addAuthenticationListener(requestDataManager);
// Test multiple states to check when we are ready for authenticateSeedNode
// We need to have both the initial data delivered and the hidden service published before we
// authenticate to a seed node.
readyForAuthenticationBinding = getNewReadyForAuthenticationBinding();
readyForAuthenticationBinding.subscribe((observable, oldValue, newValue) -> {
if (newValue)
authenticateToSeedNode();
});
}
protected MonadicBinding<Boolean> getNewReadyForAuthenticationBinding() {
return EasyBind.combine(hiddenServicePublished, requestingDataCompleted, notAuthenticated,
(hiddenServicePublished, requestingDataCompleted, notAuthenticated)
-> hiddenServicePublished && requestingDataCompleted && notAuthenticated);
}
protected PeerManager getNewPeerManager() {
return new PeerManager(networkNode, storageDir);
}
protected RequestDataManager getNewRequestDataManager() {
return new RequestDataManager(networkNode, dataStorage, peerManager);
} }
@ -175,7 +185,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
// API // API
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void start(@Nullable P2PServiceListener listener) { public void start(@Nullable P2PServiceListener listener) {
Log.traceCall(); Log.traceCall();
if (listener != null) if (listener != null)
@ -238,7 +247,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
@Override @Override
public void onTorNodeReady() { public void onTorNodeReady() {
Log.traceCall(); Log.traceCall();
requestDataManager.requestData(seedNodeAddresses); requestDataManager.requestDataFromSeedNodes(seedNodeAddresses);
p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady()); p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady());
} }
@ -265,10 +274,10 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
p2pServiceListeners.stream().forEach(e -> e.onSetupFailed(throwable)); p2pServiceListeners.stream().forEach(e -> e.onSetupFailed(throwable));
} }
private void authenticateSeedNode() { protected void authenticateToSeedNode() {
Log.traceCall(); Log.traceCall();
checkNotNull(connectedSeedNode != null, "connectedSeedNode must not be null"); checkNotNull(seedNodeOfInitialDataRequest != null, "seedNodeOfInitialDataRequest must not be null");
peerManager.authenticateToSeedNode(connectedSeedNode); peerManager.authenticateToSeedNode(seedNodeOfInitialDataRequest);
} }
@ -301,8 +310,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
Log.traceCall(); Log.traceCall();
authenticatedPeerAddresses.add(peerAddress); authenticatedPeerAddresses.add(peerAddress);
if (!firstPeerAuthenticated.get()) { if (notAuthenticated.get()) {
firstPeerAuthenticated.set(true); notAuthenticated.set(false);
p2pServiceListeners.stream().forEach(e -> e.onFirstPeerAuthenticated()); p2pServiceListeners.stream().forEach(e -> e.onFirstPeerAuthenticated());
} }
@ -658,8 +667,8 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
// Getters // Getters
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public boolean getFirstPeerAuthenticated() { public boolean isAuthenticated() {
return firstPeerAuthenticated.get(); return !notAuthenticated.get();
} }
public NetworkNode getNetworkNode() { public NetworkNode getNetworkNode() {
@ -678,6 +687,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
return authenticatedPeerAddresses; return authenticatedPeerAddresses;
} }
@NotNull
public ReadOnlyIntegerProperty getNumAuthenticatedPeers() { public ReadOnlyIntegerProperty getNumAuthenticatedPeers() {
return numAuthenticatedPeers; return numAuthenticatedPeers;
} }

View File

@ -9,5 +9,7 @@ public interface P2PServiceListener extends SetupListener {
void onNoSeedNodeAvailable(); void onNoSeedNodeAvailable();
void onNoPeersAvailable();
void onFirstPeerAuthenticated(); void onFirstPeerAuthenticated();
} }

View File

@ -1,10 +1,13 @@
package io.bitsquare.p2p; package io.bitsquare.p2p;
import io.bitsquare.app.Log;
import io.bitsquare.p2p.peers.PeerManager; import io.bitsquare.p2p.peers.PeerManager;
import io.bitsquare.p2p.peers.RequestDataManager; import io.bitsquare.p2p.peers.RequestDataManager;
import io.bitsquare.p2p.peers.SeedNodePeerManager; import io.bitsquare.p2p.peers.SeedNodePeerManager;
import io.bitsquare.p2p.peers.SeedNodeRequestDataManager; import io.bitsquare.p2p.peers.SeedNodeRequestDataManager;
import io.bitsquare.p2p.seed.SeedNodesRepository; import io.bitsquare.p2p.seed.SeedNodesRepository;
import org.fxmisc.easybind.EasyBind;
import org.fxmisc.easybind.monadic.MonadicBinding;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -26,12 +29,31 @@ public class SeedNodeP2PService extends P2PService {
} }
@Override @Override
protected PeerManager createPeerManager() { protected PeerManager getNewPeerManager() {
return new SeedNodePeerManager(networkNode); return new SeedNodePeerManager(networkNode, storageDir);
} }
@Override @Override
protected RequestDataManager createRequestDataManager() { protected RequestDataManager getNewRequestDataManager() {
return new SeedNodeRequestDataManager(networkNode, dataStorage, peerManager, getRequestDataManager()); return new SeedNodeRequestDataManager(networkNode, dataStorage, peerManager);
} }
@Override
protected MonadicBinding<Boolean> getNewReadyForAuthenticationBinding() {
return EasyBind.combine(hiddenServicePublished, notAuthenticated,
(hiddenServicePublished, notAuthenticated) -> hiddenServicePublished && notAuthenticated);
}
@Override
public void onTorNodeReady() {
Log.traceCall();
p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady());
}
@Override
protected void authenticateToSeedNode() {
Log.traceCall();
((SeedNodePeerManager) peerManager).authenticateToSeedNode();
}
} }

View File

@ -83,7 +83,6 @@ public class LocalhostNetworkNode extends NetworkNode {
@Override @Override
@Nullable @Nullable
public Address getAddress() { public Address getAddress() {
Log.traceCall();
return address; return address;
} }

View File

@ -42,7 +42,7 @@ public class AuthenticationHandshake implements MessageListener {
private long nonce = 0; private long nonce = 0;
private boolean stopped; private boolean stopped;
private Optional<SettableFuture<Connection>> resultFutureOptional = Optional.empty(); private Optional<SettableFuture<Connection>> resultFutureOptional = Optional.empty();
private Timer timeoutTimer; private Timer timeoutTimer, shutDownTimer;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -239,7 +239,10 @@ public class AuthenticationHandshake implements MessageListener {
"connection with his reported address to verify if his address is correct.", peerAddress); "connection with his reported address to verify if his address is correct.", peerAddress);
connection.shutDown(() -> { connection.shutDown(() -> {
UserThread.runAfter(() -> { if (shutDownTimer != null)
shutDownTimer.cancel();
shutDownTimer = UserThread.runAfter(() -> {
if (!stopped) { if (!stopped) {
// we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to // we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to
// inconsistent state // inconsistent state
@ -268,6 +271,9 @@ public class AuthenticationHandshake implements MessageListener {
} }
}); });
if (timeoutTimer != null)
timeoutTimer.cancel();
timeoutTimer = UserThread.runAfter(() -> failed(new AuthenticationException("Authentication of peer " timeoutTimer = UserThread.runAfter(() -> failed(new AuthenticationException("Authentication of peer "
+ peerAddress + peerAddress
+ " failed because of a timeout. " + + " failed because of a timeout. " +
@ -343,6 +349,9 @@ public class AuthenticationHandshake implements MessageListener {
if (timeoutTimer != null) if (timeoutTimer != null)
timeoutTimer.cancel(); timeoutTimer.cancel();
if (shutDownTimer != null)
shutDownTimer.cancel();
networkNode.removeMessageListener(this); networkNode.removeMessageListener(this);
} }
} }

View File

@ -116,7 +116,7 @@ public class PeerExchangeManager implements MessageListener {
if (!connectedPeersList.isEmpty()) { if (!connectedPeersList.isEmpty()) {
Log.traceCall(); Log.traceCall();
connectedPeersList.stream() connectedPeersList.stream()
.forEach(e -> UserThread.runAfterRandomDelay(() -> { .forEach(e -> {
SettableFuture<Connection> future = networkNode.sendMessage(e.connection, SettableFuture<Connection> future = networkNode.sendMessage(e.connection,
new GetPeersRequest(networkNode.getAddress(), new HashSet<>(authenticatedAndReportedPeersSupplier.get()))); new GetPeersRequest(networkNode.getAddress(), new HashSet<>(authenticatedAndReportedPeersSupplier.get())));
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
@ -131,7 +131,7 @@ public class PeerExchangeManager implements MessageListener {
removePeerConsumer.accept(e.address); removePeerConsumer.accept(e.address);
} }
}); });
}, 3, 5, TimeUnit.SECONDS)); });
} }
} }
} }

View File

@ -2,22 +2,27 @@ package io.bitsquare.p2p.peers;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log; import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread; import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Address; import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message; import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.network.*; import io.bitsquare.p2p.network.*;
import io.bitsquare.p2p.peers.messages.auth.AuthenticationRejection; import io.bitsquare.p2p.peers.messages.auth.AuthenticationRejection;
import io.bitsquare.p2p.peers.messages.auth.AuthenticationRequest; import io.bitsquare.p2p.peers.messages.auth.AuthenticationRequest;
import io.bitsquare.p2p.storage.messages.DataBroadcastMessage; import io.bitsquare.p2p.storage.messages.DataBroadcastMessage;
import io.bitsquare.storage.Storage;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.*; import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -31,7 +36,7 @@ public class PeerManager implements MessageListener, ConnectionListener {
// Static // Static
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private static int MAX_CONNECTIONS_LOW_PRIORITY; protected static int MAX_CONNECTIONS_LOW_PRIORITY;
private static int MAX_CONNECTIONS_NORMAL_PRIORITY; private static int MAX_CONNECTIONS_NORMAL_PRIORITY;
private static int MAX_CONNECTIONS_HIGH_PRIORITY; private static int MAX_CONNECTIONS_HIGH_PRIORITY;
@ -55,27 +60,28 @@ public class PeerManager implements MessageListener, ConnectionListener {
private final NetworkNode networkNode; private final NetworkNode networkNode;
private final MaintenanceManager maintenanceManager; private final MaintenanceManager maintenanceManager;
private final PeerExchangeManager peerExchangeManager; private final PeerExchangeManager peerExchangeManager;
protected final ScheduledThreadPoolExecutor checkSeedNodeConnectionExecutor;
private final Storage<HashSet<ReportedPeer>> dbStorage;
private final CopyOnWriteArraySet<AuthenticationListener> authenticationListeners = new CopyOnWriteArraySet<>(); private final CopyOnWriteArraySet<AuthenticationListener> authenticationListeners = new CopyOnWriteArraySet<>();
private final Map<Address, Peer> authenticatedPeers = new HashMap<>(); protected final Map<Address, Peer> authenticatedPeers = new HashMap<>();
private final Set<ReportedPeer> reportedPeers = new HashSet<>(); private final HashSet<ReportedPeer> reportedPeers = new HashSet<>();
private final Map<Address, AuthenticationHandshake> authenticationHandshakes = new HashMap<>(); private final HashSet<ReportedPeer> persistedPeers = new HashSet<>();
private final List<Address> remainingSeedNodes = new ArrayList<>(); protected final Map<Address, AuthenticationHandshake> authenticationHandshakes = new HashMap<>();
private Optional<Set<Address>> seedNodeAddressesOptional = Optional.empty(); protected final List<Address> remainingSeedNodes = new ArrayList<>();
private Timer connectToSeedNodeTimer; protected Optional<Set<Address>> seedNodeAddressesOptional = Optional.empty();
protected Timer authenticateToRemainingSeedNodeTimer, authenticateToRemainingReportedPeerTimer;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Constructor // Constructor
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public PeerManager(NetworkNode networkNode) { public PeerManager(NetworkNode networkNode, File storageDir) {
Log.traceCall(); Log.traceCall();
this.networkNode = networkNode; this.networkNode = networkNode;
dbStorage = new Storage<>(storageDir);
networkNode.addMessageListener(this);
networkNode.addConnectionListener(this);
maintenanceManager = new MaintenanceManager(networkNode, maintenanceManager = new MaintenanceManager(networkNode,
() -> getAuthenticatedPeers(), () -> getAuthenticatedPeers(),
@ -86,9 +92,21 @@ public class PeerManager implements MessageListener, ConnectionListener {
address -> removePeer(address), address -> removePeer(address),
(newReportedPeers, connection) -> addToReportedPeers(newReportedPeers, connection)); (newReportedPeers, connection) -> addToReportedPeers(newReportedPeers, connection));
startConnectToSeedNodeTimer(); checkSeedNodeConnectionExecutor = Utilities.getScheduledThreadPoolExecutor("checkSeedNodeConnection", 1, 10, 5);
init();
} }
private void init() {
networkNode.addMessageListener(this);
networkNode.addConnectionListener(this);
HashSet<ReportedPeer> persistedPeers = dbStorage.initAndGetPersisted("persistedPeers");
if (persistedPeers != null) {
log.info("We have persisted reported peers. " +
"\npersistedPeers=" + persistedPeers);
this.persistedPeers.addAll(persistedPeers);
}
}
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// ConnectionListener implementation // ConnectionListener implementation
@ -107,13 +125,15 @@ public class PeerManager implements MessageListener, ConnectionListener {
// if we are not in the authentication process // if we are not in the authentication process
// Connection shut down is an expected step in the authentication process. // Connection shut down is an expected step in the authentication process.
// If the disconnect happens on an authenticated peer we remove the peer. // If the disconnect happens on an authenticated peer we remove the peer.
if (authenticatedPeers.containsKey(peerAddress) || !authenticationHandshakes.containsKey(peerAddress)) { if (authenticatedPeers.containsKey(peerAddress) || !authenticationHandshakes.containsKey(peerAddress))
removePeer(peerAddress); removePeer(peerAddress);
if (!authenticationHandshakes.containsKey(peerAddress)) {
log.info("We got a disconnect. " + log.info("We got a disconnect. " +
"We will try again after a random pause to remaining reported peers."); "We will try again after a random pause to remaining reported peers.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), if (authenticateToRemainingReportedPeerTimer == null)
10, 20, TimeUnit.SECONDS); authenticateToRemainingReportedPeerTimer = UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
10, 20, TimeUnit.SECONDS);
} }
}); });
} }
@ -144,31 +164,29 @@ public class PeerManager implements MessageListener, ConnectionListener {
log.info("Broadcast message to {} peers. Message: {}", authenticatedPeers.values().size(), message); log.info("Broadcast message to {} peers. Message: {}", authenticatedPeers.values().size(), message);
authenticatedPeers.values().stream() authenticatedPeers.values().stream()
.filter(e -> !e.address.equals(sender)) .filter(e -> !e.address.equals(sender))
.forEach(peer -> UserThread.runAfterRandomDelay(() -> { .forEach(peer -> {
// as we use a delay we need to check again if our peer is still in the authenticated list if (authenticatedPeers.containsValue(peer)) {
if (authenticatedPeers.containsValue(peer)) { final Address address = peer.address;
final Address address = peer.address; log.trace("Broadcast message from " + getMyAddress() + " to " + address + ".");
log.trace("Broadcast message from " + getMyAddress() + " to " + address + "."); SettableFuture<Connection> future = networkNode.sendMessage(address, message);
SettableFuture<Connection> future = networkNode.sendMessage(address, message); Futures.addCallback(future, new FutureCallback<Connection>() {
Futures.addCallback(future, new FutureCallback<Connection>() { @Override
@Override public void onSuccess(Connection connection) {
public void onSuccess(Connection connection) { log.trace("Broadcast from " + getMyAddress() + " to " + address + " succeeded.");
log.trace("Broadcast from " + getMyAddress() + " to " + address + " succeeded.");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Broadcast failed. " + throwable.getMessage());
UserThread.execute(() -> removePeer(address));
}
});
} else {
log.debug("Peer is not in our authenticated list anymore. " +
"That can happen as we use a delay in the loop for the broadcast. " +
"Peer.address={}", peer.address);
} }
},
10, 100, TimeUnit.MILLISECONDS)); @Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Broadcast failed. " + throwable.getMessage());
UserThread.execute(() -> removePeer(address));
}
});
} else {
log.debug("Peer is not in our authenticated list anymore. " +
"That can happen as we use a stream loop for the broadcast. " +
"Peer.address={}", peer.address);
}
});
} else { } else {
log.info("Message not broadcasted because we have no authenticated peers yet. " + log.info("Message not broadcasted because we have no authenticated peers yet. " +
"message = {}", message); "message = {}", message);
@ -183,8 +201,13 @@ public class PeerManager implements MessageListener, ConnectionListener {
networkNode.removeMessageListener(this); networkNode.removeMessageListener(this);
networkNode.removeConnectionListener(this); networkNode.removeConnectionListener(this);
if (connectToSeedNodeTimer != null) if (authenticateToRemainingReportedPeerTimer != null)
connectToSeedNodeTimer.cancel(); authenticateToRemainingReportedPeerTimer.cancel();
if (authenticateToRemainingSeedNodeTimer != null)
authenticateToRemainingSeedNodeTimer.cancel();
MoreExecutors.shutdownAndAwaitTermination(checkSeedNodeConnectionExecutor, 500, TimeUnit.MILLISECONDS);
} }
public void addAuthenticationListener(AuthenticationListener listener) { public void addAuthenticationListener(AuthenticationListener listener) {
@ -209,8 +232,6 @@ public class PeerManager implements MessageListener, ConnectionListener {
if (!authenticationHandshakes.containsKey(peerAddress)) { if (!authenticationHandshakes.containsKey(peerAddress)) {
log.info("We got an incoming AuthenticationRequest for the peerAddress {}. " + log.info("We got an incoming AuthenticationRequest for the peerAddress {}. " +
"We create an AuthenticationHandshake.", peerAddress); "We create an AuthenticationHandshake.", peerAddress);
log.trace("message={}", message);
log.trace("connection={}", connection);
// We protect that connection from getting closed by maintenance cleanup... // We protect that connection from getting closed by maintenance cleanup...
connection.setConnectionPriority(ConnectionPriority.AUTH_REQUEST); connection.setConnectionPriority(ConnectionPriority.AUTH_REQUEST);
authenticationHandshake = new AuthenticationHandshake(networkNode, authenticationHandshake = new AuthenticationHandshake(networkNode,
@ -225,7 +246,7 @@ public class PeerManager implements MessageListener, ConnectionListener {
@Override @Override
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
log.info("We got the peer ({}) who requested authentication authenticated.", peerAddress); log.info("We got the peer ({}) who requested authentication authenticated.", peerAddress);
addAuthenticatedPeer(connection, peerAddress); handleAuthenticationSuccess(connection, peerAddress);
} }
@Override @Override
@ -248,7 +269,8 @@ public class PeerManager implements MessageListener, ConnectionListener {
} }
} else { } else {
log.info("We got an incoming AuthenticationRequest but we are already authenticated to peer {}.\n" + log.info("We got an incoming AuthenticationRequest but we are already authenticated to peer {}.\n" +
"That should not happen. We reject the request.", peerAddress); "That should not happen. " +
"We reject the request.", peerAddress);
rejectAuthenticationRequest(peerAddress); rejectAuthenticationRequest(peerAddress);
if (authenticationHandshakes.containsKey(peerAddress)) { if (authenticationHandshakes.containsKey(peerAddress)) {
@ -270,149 +292,181 @@ public class PeerManager implements MessageListener, ConnectionListener {
public void setSeedNodeAddresses(Set<Address> seedNodeAddresses) { public void setSeedNodeAddresses(Set<Address> seedNodeAddresses) {
seedNodeAddressesOptional = Optional.of(seedNodeAddresses); seedNodeAddressesOptional = Optional.of(seedNodeAddresses);
checkArgument(!seedNodeAddressesOptional.get().isEmpty(),
"seedNodeAddresses must not be empty");
} }
public void authenticateToSeedNode(Address peerAddress) { public void authenticateToSeedNode(Address peerAddress) {
Log.traceCall(); Log.traceCall();
checkArgument(seedNodeAddressesOptional.isPresent(), checkArgument(seedNodeAddressesOptional.isPresent(),
"seedNodeAddresses must be set before calling authenticateToSeedNode"); "seedNodeAddresses must be set before calling authenticateToSeedNode");
remainingSeedNodes.remove(peerAddress); remainingSeedNodes.remove(peerAddress);
remainingSeedNodes.addAll(seedNodeAddressesOptional.get()); remainingSeedNodes.addAll(seedNodeAddressesOptional.get());
authenticateToFirstSeedNode(peerAddress); authenticateToFirstSeedNode(peerAddress);
startCheckSeedNodeConnectionTask();
} }
private void authenticateToFirstSeedNode(Address peerAddress) { protected void authenticateToFirstSeedNode(Address peerAddress) {
Log.traceCall(); Log.traceCall();
if (!enoughConnectionsForAuthReached()) { if (!enoughConnections()) {
if (!authenticationHandshakes.containsKey(peerAddress)) { if (!authenticationHandshakes.containsKey(peerAddress)) {
log.info("We try to authenticate to seed node {}.", peerAddress); log.info("We try to authenticate to seed node {}.", peerAddress);
authenticate(peerAddress, new FutureCallback<Connection>() { authenticate(peerAddress, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
log.info("We got our first seed node authenticated. " + log.info("We got our first seed node authenticated. " +
"We try if there are reported peers available to authenticate."); "We try to authenticate to reported peers.");
handleAuthenticationSuccess(connection, peerAddress);
addAuthenticatedPeer(connection, peerAddress); onFirstSeedNodeAuthenticated();
authenticateToRemainingReportedPeer();
} }
@Override @Override
public void onFailure(@NotNull Throwable throwable) { public void onFailure(@NotNull Throwable throwable) {
log.info("Authentication to " + peerAddress + " failed at authenticateToFirstSeedNode." + log.info("Authentication to " + peerAddress + " failed at authenticateToFirstSeedNode." +
"\nThat is expected if seed nodes are offline." + "\nThat is expected if seed node is offline." +
"\nException:" + throwable.toString()); "\nException:" + throwable.toString());
handleAuthenticationFailure(peerAddress, throwable); handleAuthenticationFailure(peerAddress, throwable);
Optional<Address> seedNodeOptional = getAndRemoveNotAuthenticatingSeedNode(); Optional<Address> seedNodeOptional = getAndRemoveNotAuthenticatingSeedNode();
if (seedNodeOptional.isPresent()) { if (seedNodeOptional.isPresent()) {
log.info("We try another random seed node for first authentication attempt."); log.info("We try another random seed node for authenticateToFirstSeedNode.");
authenticateToFirstSeedNode(seedNodeOptional.get()); authenticateToFirstSeedNode(seedNodeOptional.get());
} else { } else {
log.info("There are no seed nodes available for authentication. " + log.info("There are no seed nodes available for authentication. " +
"We try if there are reported peers available to authenticate."); "We try to authenticate to reported peers.");
authenticateToRemainingReportedPeer(); authenticateToRemainingReportedPeer();
} }
} }
}); });
} else { } else {
log.warn("We got the first seed node already in the authenticationHandshakes. " + log.info("We have already an open authenticationHandshakes for the first seed node. " +
"That might happen when we received an AuthenticationRequest before we start authenticating. " + "That can happen when we received an AuthenticationRequest before we start authenticating. " +
"We will try after a random pause to authenticate to the reported peers."); "We will try to authenticate to another seed node.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), authenticateToRemainingSeedNode();
20, 30, TimeUnit.SECONDS);
} }
} else { } else {
log.info("We have already enough connections."); log.info("We have already enough connections (at authenticateToFirstSeedNode). " +
"That is very unlikely to happen but can be a theoretical case.");
} }
} }
private void authenticateToRemainingSeedNode() { protected void onFirstSeedNodeAuthenticated() {
authenticateToRemainingReportedPeer();
}
protected void authenticateToRemainingSeedNode() {
Log.traceCall(); Log.traceCall();
if (!enoughConnectionsForAuthReached()) { if (authenticateToRemainingSeedNodeTimer != null) {
authenticateToRemainingSeedNodeTimer.cancel();
authenticateToRemainingSeedNodeTimer = null;
}
if (!enoughConnections()) {
Optional<Address> seedNodeOptional = getAndRemoveNotAuthenticatingSeedNode(); Optional<Address> seedNodeOptional = getAndRemoveNotAuthenticatingSeedNode();
if (seedNodeOptional.isPresent()) { if (seedNodeOptional.isPresent()) {
Address peerAddress = seedNodeOptional.get(); Address peerAddress = seedNodeOptional.get();
log.info("We try to authenticate to seed node {}.", peerAddress); if (!authenticationHandshakes.containsKey(peerAddress)) {
authenticate(peerAddress, new FutureCallback<Connection>() { log.info("We try to authenticate to a randomly selected seed node {}.", peerAddress);
@Override authenticate(peerAddress, new FutureCallback<Connection>() {
public void onSuccess(Connection connection) { @Override
log.info("We got a seed node authenticated. " + public void onSuccess(Connection connection) {
"We try if there are more seed nodes available to authenticate."); log.info("We got a seed node authenticated. " +
"We try to authenticate to reported peers.");
addAuthenticatedPeer(connection, peerAddress); handleAuthenticationSuccess(connection, peerAddress);
authenticateToRemainingSeedNode(); onRemainingSeedNodeAuthenticated();
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Authentication to " + peerAddress + " failed at authenticateToRemainingSeedNode." +
"\nThat is expected if the seed node is offline." +
"\nException:" + throwable.toString());
handleAuthenticationFailure(peerAddress, throwable);
log.info("We try authenticateToRemainingSeedNode again.");
authenticateToRemainingSeedNode();
}
} }
);
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Authentication to " + peerAddress + " failed at authenticateToRemainingSeedNode." +
"\nThat is expected if the seed node is offline." +
"\nException:" + throwable.toString());
handleAuthenticationFailure(peerAddress, throwable);
log.info("We try another random seed node for authentication.");
authenticateToRemainingSeedNode();
}
}
);
} else if (reportedPeersAvailable() && !(this instanceof SeedNodePeerManager)) {
authenticateToRemainingReportedPeer();
} else {
log.info("We don't have seed nodes or reported peers available. " +
"We try again after a random pause with the seed nodes which failed or if " +
"none available with the reported peers.");
if (seedNodeAddressesOptional.isPresent()) {
resetRemainingSeedNodes();
if (remainingSeedNodes.isEmpty() && !(this instanceof SeedNodePeerManager)) {
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
10, 20, TimeUnit.SECONDS);
}
} else if (!(this instanceof SeedNodePeerManager)) {
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
10, 20, TimeUnit.SECONDS);
} else { } else {
UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNode(), log.info("We have already an open authenticationHandshakes for the selected seed node. " +
30, 60, TimeUnit.SECONDS); "That can happen in race conditions when we received an AuthenticationRequest before " +
"we start authenticating. " +
"We will try to authenticate to another seed node.");
authenticateToRemainingSeedNode();
} }
} else {
handleNoSeedNodesAvailableCase();
} }
} else { } else {
log.info("We have already enough connections."); log.info("We have already enough connections (at authenticateToRemainingSeedNode).");
} }
} }
private void resetRemainingSeedNodes() { protected void onRemainingSeedNodeAuthenticated() {
authenticateToRemainingReportedPeer();
}
protected void handleNoSeedNodesAvailableCase() {
Log.traceCall();
if (reportedPeersAvailable()) {
authenticateToRemainingReportedPeer();
} else {
log.info("We don't have seed nodes or reported peers available. " +
"We try again after a random pause with the seed nodes which failed or if " +
"none available with the reported peers.");
checkArgument(seedNodeAddressesOptional.isPresent(), "seedNodeAddresses must be present");
resetRemainingSeedNodes();
if (remainingSeedNodesAvailable()) {
if (authenticateToRemainingSeedNodeTimer == null)
authenticateToRemainingSeedNodeTimer = UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNode(),
10, 20, TimeUnit.SECONDS);
} else {
if (authenticateToRemainingReportedPeerTimer == null)
authenticateToRemainingReportedPeerTimer = UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
10, 20, TimeUnit.SECONDS);
}
}
}
protected void resetRemainingSeedNodes() {
Log.traceCall();
if (seedNodeAddressesOptional.isPresent()) { if (seedNodeAddressesOptional.isPresent()) {
remainingSeedNodes.clear(); remainingSeedNodes.clear();
seedNodeAddressesOptional.get().stream() seedNodeAddressesOptional.get().stream()
.filter(e -> !authenticatedPeers.containsKey(e) && !authenticationHandshakes.containsKey(e)) .filter(e -> !authenticatedPeers.containsKey(e) && !authenticationHandshakes.containsKey(e))
.forEach(e -> remainingSeedNodes.add(e)); .forEach(e -> remainingSeedNodes.add(e));
} else {
log.error("seedNodeAddressesOptional must be present");
} }
} }
// We want to stay connected to at least one seed node from time to time to avoid to get isolated with a group of peers protected void startCheckSeedNodeConnectionTask() {
private void startConnectToSeedNodeTimer() { checkSeedNodeConnectionExecutor.schedule(() -> UserThread.execute(()
Log.traceCall(); -> checkSeedNodeConnections()), 2, TimeUnit.MINUTES);
if (connectToSeedNodeTimer != null)
connectToSeedNodeTimer.cancel();
connectToSeedNodeTimer = UserThread.runAfterRandomDelay(() -> {
connectToSeedNode();
startConnectToSeedNodeTimer();
}, 10, 12, TimeUnit.MINUTES);
} }
private void connectToSeedNode() { // We want to stay connected to at least one seed node to avoid to get isolated with a group of peers
// remove enough connections first // Also needed for cases when all seed nodes get restarted, so peers will connect to the seed nodes again from time
if (getMyAddress() != null) { // to time and so keep the network connected.
checkIfConnectedPeersExceeds(MAX_CONNECTIONS_NORMAL_PRIORITY - 3); protected void checkSeedNodeConnections() {
UserThread.runAfter(() -> { Log.traceCall();
resetRemainingSeedNodes(); resetRemainingSeedNodes();
authenticateToRemainingSeedNode(); if (!remainingSeedNodes.isEmpty()) {
}, 500, TimeUnit.MILLISECONDS); log.info("We have remaining not connected seed node(s) available. " +
"We will call authenticateToRemainingSeedNode.");
// remove enough connections to be sure the authentication will succeed. I t might be that in the meantime
// we get other connection attempts, so remove 2 more than needed to have a bit of headroom.
checkIfConnectedPeersExceeds(MAX_CONNECTIONS_LOW_PRIORITY - remainingSeedNodes.size() - 2);
if (authenticateToRemainingSeedNodeTimer == null)
authenticateToRemainingSeedNodeTimer = UserThread.runAfter(() -> authenticateToRemainingSeedNode(),
500, TimeUnit.MILLISECONDS);
} else {
log.debug("There are no remainingSeedNodes available.");
} }
} }
@ -421,14 +475,19 @@ public class PeerManager implements MessageListener, ConnectionListener {
// Authentication to reported peers // Authentication to reported peers
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
private void authenticateToRemainingReportedPeer() { protected void authenticateToRemainingReportedPeer() {
Log.traceCall(); Log.traceCall();
if (!enoughConnectionsForAuthReached()) {
if (authenticateToRemainingReportedPeerTimer != null) {
authenticateToRemainingReportedPeerTimer.cancel();
authenticateToRemainingReportedPeerTimer = null;
}
if (!enoughConnections()) {
if (reportedPeersAvailable()) { if (reportedPeersAvailable()) {
Optional<ReportedPeer> andRemoveNotAuthenticatingReportedPeer = getAndRemoveNotAuthenticatingReportedPeer(); Optional<ReportedPeer> reportedPeer = getAndRemoveNotAuthenticatingReportedPeer();
if (andRemoveNotAuthenticatingReportedPeer.isPresent()) { if (reportedPeer.isPresent()) {
Address peerAddress = andRemoveNotAuthenticatingReportedPeer.get().address; Address peerAddress = reportedPeer.get().address;
removeFromReportedPeers(peerAddress);
if (!authenticationHandshakes.containsKey(peerAddress)) { if (!authenticationHandshakes.containsKey(peerAddress)) {
log.info("We try to authenticate to peer {}.", peerAddress); log.info("We try to authenticate to peer {}.", peerAddress);
authenticate(peerAddress, new FutureCallback<Connection>() { authenticate(peerAddress, new FutureCallback<Connection>() {
@ -437,7 +496,7 @@ public class PeerManager implements MessageListener, ConnectionListener {
log.info("We got a peer authenticated. " + log.info("We got a peer authenticated. " +
"We try if there are more reported peers available to authenticate."); "We try if there are more reported peers available to authenticate.");
addAuthenticatedPeer(connection, peerAddress); handleAuthenticationSuccess(connection, peerAddress);
authenticateToRemainingReportedPeer(); authenticateToRemainingReportedPeer();
} }
@ -454,31 +513,46 @@ public class PeerManager implements MessageListener, ConnectionListener {
} }
}); });
} else { } else {
log.warn("We got the selected peer in the authenticationHandshakes. That should not happen. " + log.warn("We got the selected peer in the authenticationHandshakes. " +
"We will try again after a short random pause."); "That should not happen. We will try again after a random pause.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), if (authenticateToRemainingReportedPeerTimer == null)
1, 2, TimeUnit.SECONDS); authenticateToRemainingReportedPeerTimer = UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
10, 20, TimeUnit.SECONDS);
} }
} else { } else {
log.info("We don't have a reported peers available (maybe one is authenticating already). " + log.info("We don't have a reported peers available. " +
"We will try again after a random pause."); "That should not happen. We will try again after a random pause.");
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), if (authenticateToRemainingReportedPeerTimer == null)
10, 20, TimeUnit.SECONDS); authenticateToRemainingReportedPeerTimer = UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
10, 20, TimeUnit.SECONDS);
} }
} else if (!remainingSeedNodes.isEmpty()) { } else if (remainingSeedNodesAvailable()) {
authenticateToRemainingSeedNode(); authenticateToRemainingSeedNode();
} else if (remainingSeedNodes.isEmpty()) { } else if (!persistedPeers.isEmpty()) {
UserThread.runAfterRandomDelay(() -> {
resetRemainingSeedNodes();
authenticateToRemainingSeedNode();
},
10, 20, TimeUnit.SECONDS);
} else {
log.info("We don't have seed nodes or reported peers available. " + log.info("We don't have seed nodes or reported peers available. " +
"We will try again after a random pause."); "We will add 5 peers from our persistedReportedPeers to our reportedPeers list and " +
UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(), "try authenticateToRemainingReportedPeer again.");
30, 40, TimeUnit.SECONDS);
List<ReportedPeer> list = new ArrayList<>(persistedPeers);
authenticationHandshakes.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e)));
authenticatedPeers.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e)));
if (!list.isEmpty()) {
int toRemove = Math.min(list.size(), 5);
for (int i = 0; i < toRemove; i++) {
ReportedPeer reportedPeer = list.get(i);
persistedPeers.remove(reportedPeer);
reportedPeers.add(reportedPeer);
}
authenticateToRemainingReportedPeer();
}
} else {
log.info("We don't have seed nodes, reported peers nor persistedReportedPeers available. " +
"We will reset the seed nodes and try authenticateToRemainingSeedNode again after a random pause.");
resetRemainingSeedNodes();
if (authenticateToRemainingSeedNodeTimer == null)
authenticateToRemainingSeedNodeTimer = UserThread.runAfterRandomDelay(() ->
authenticateToRemainingSeedNode(),
10, 20, TimeUnit.SECONDS);
} }
} else { } else {
log.info("We have already enough connections."); log.info("We have already enough connections.");
@ -525,12 +599,13 @@ public class PeerManager implements MessageListener, ConnectionListener {
} }
} else { } else {
log.info("We try to authenticate to peer {} for sending a private message.", peerAddress); log.info("We try to authenticate to peer {} for sending a private message.", peerAddress);
authenticate(peerAddress, new FutureCallback<Connection>() { authenticate(peerAddress, new FutureCallback<Connection>() {
@Override @Override
public void onSuccess(Connection connection) { public void onSuccess(Connection connection) {
log.info("We got a new peer for sending a private message authenticated."); log.info("We got a new peer for sending a private message authenticated.");
addAuthenticatedPeer(connection, peerAddress); handleAuthenticationSuccess(connection, peerAddress);
if (completeHandler != null) if (completeHandler != null)
completeHandler.run(); completeHandler.run();
} }
@ -555,30 +630,23 @@ public class PeerManager implements MessageListener, ConnectionListener {
private void authenticate(Address peerAddress, FutureCallback<Connection> futureCallback) { private void authenticate(Address peerAddress, FutureCallback<Connection> futureCallback) {
Log.traceCall(peerAddress.getFullAddress()); Log.traceCall(peerAddress.getFullAddress());
if (!authenticationHandshakes.containsKey(peerAddress)) { checkArgument(!authenticationHandshakes.containsKey(peerAddress),
log.info("We create an AuthenticationHandshake to authenticate to peer {}.", peerAddress); "An authentication handshake is already created for that peerAddress (" + peerAddress + ")");
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, log.info("We create an AuthenticationHandshake to authenticate to peer {}.", peerAddress);
getMyAddress(), AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode,
peerAddress, getMyAddress(),
() -> getAuthenticatedAndReportedPeers(), peerAddress,
(newReportedPeers, connection) -> addToReportedPeers(newReportedPeers, connection) () -> getAuthenticatedAndReportedPeers(),
); (newReportedPeers, connection) -> addToReportedPeers(newReportedPeers, connection)
authenticationHandshakes.put(peerAddress, authenticationHandshake); );
SettableFuture<Connection> authenticationFuture = authenticationHandshake.requestAuthentication(); authenticationHandshakes.put(peerAddress, authenticationHandshake);
Futures.addCallback(authenticationFuture, futureCallback); SettableFuture<Connection> authenticationFuture = authenticationHandshake.requestAuthentication();
} else { Futures.addCallback(authenticationFuture, futureCallback);
log.warn("An authentication handshake is already created for that peerAddress ({}). That should not happen", peerAddress);
}
} }
private void addAuthenticatedPeer(Connection connection, Address peerAddress) { private void handleAuthenticationSuccess(Connection connection, Address peerAddress) {
Log.traceCall(peerAddress.getFullAddress()); Log.traceCall(peerAddress.getFullAddress());
connection.setPeerAddress(peerAddress);
connection.setAuthenticated();
removeFromAuthenticationHandshakes(peerAddress);
log.info("\n\n############################################################\n" + log.info("\n\n############################################################\n" +
"We are authenticated to:" + "We are authenticated to:" +
"\nconnection=" + connection.getUid() "\nconnection=" + connection.getUid()
@ -586,14 +654,17 @@ public class PeerManager implements MessageListener, ConnectionListener {
+ "\npeerAddress= " + peerAddress + "\npeerAddress= " + peerAddress
+ "\n############################################################\n"); + "\n############################################################\n");
removeFromAuthenticationHandshakes(peerAddress);
connection.setPeerAddress(peerAddress);
connection.setAuthenticated();
authenticatedPeers.put(peerAddress, new Peer(connection, peerAddress)); authenticatedPeers.put(peerAddress, new Peer(connection, peerAddress));
removeFromReportedPeers(peerAddress); removeFromReportedPeers(peerAddress);
if (!checkIfConnectedPeersExceeds(MAX_CONNECTIONS_LOW_PRIORITY))
printAuthenticatedPeers();
authenticationListeners.stream().forEach(e -> e.onPeerAuthenticated(peerAddress, connection)); authenticationListeners.stream().forEach(e -> e.onPeerAuthenticated(peerAddress, connection));
printAuthenticatedPeers();
// We give a bit headroom to avoid dangling disconnect/connect
checkIfConnectedPeersExceeds(MAX_CONNECTIONS_LOW_PRIORITY + 2);
} }
void handleAuthenticationFailure(@Nullable Address peerAddress, Throwable throwable) { void handleAuthenticationFailure(@Nullable Address peerAddress, Throwable throwable) {
@ -609,6 +680,7 @@ public class PeerManager implements MessageListener, ConnectionListener {
removeFromAuthenticationHandshakes(peerAddress); removeFromAuthenticationHandshakes(peerAddress);
removeFromReportedPeers(peerAddress); removeFromReportedPeers(peerAddress);
removeFromAuthenticatedPeers(peerAddress); removeFromAuthenticatedPeers(peerAddress);
removeFromPersistedPeers(peerAddress);
} }
} }
@ -616,27 +688,44 @@ public class PeerManager implements MessageListener, ConnectionListener {
reportedPeers.remove(new ReportedPeer(peerAddress)); reportedPeers.remove(new ReportedPeer(peerAddress));
} }
private void removeFromAuthenticationHandshakes(@Nullable Address peerAddress) { private void removeFromAuthenticationHandshakes(Address peerAddress) {
if (authenticationHandshakes.containsKey(peerAddress)) if (authenticationHandshakes.containsKey(peerAddress))
authenticationHandshakes.remove(peerAddress); authenticationHandshakes.remove(peerAddress);
} }
private void removeFromAuthenticatedPeers(@Nullable Address peerAddress) { private void removeFromAuthenticatedPeers(Address peerAddress) {
if (authenticatedPeers.containsKey(peerAddress)) if (authenticatedPeers.containsKey(peerAddress))
authenticatedPeers.remove(peerAddress); authenticatedPeers.remove(peerAddress);
printAuthenticatedPeers(); printAuthenticatedPeers();
} }
private boolean enoughConnectionsForAuthReached() { private void removeFromPersistedPeers(Address peerAddress) {
// We reduce the limit to avoid dangling connect/disconnect ReportedPeer reportedPeer = new ReportedPeer(peerAddress);
return authenticatedPeers.size() >= MAX_CONNECTIONS_LOW_PRIORITY - 2; if (persistedPeers.contains(reportedPeer)) {
persistedPeers.remove(reportedPeer);
dbStorage.queueUpForSave(persistedPeers, 5000);
}
} }
private boolean reportedPeersAvailable() { private boolean enoughConnections() {
return !reportedPeers.isEmpty(); return authenticatedPeers.size() >= MAX_CONNECTIONS_LOW_PRIORITY;
} }
private boolean checkIfConnectedPeersExceeds(int limit) { protected boolean reportedPeersAvailable() {
List<ReportedPeer> list = new ArrayList<>(reportedPeers);
authenticationHandshakes.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e)));
authenticatedPeers.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e)));
return !list.isEmpty();
}
private boolean remainingSeedNodesAvailable() {
List<Address> list = new ArrayList<>(remainingSeedNodes);
authenticationHandshakes.keySet().stream().forEach(e -> list.remove(e));
authenticatedPeers.keySet().stream().forEach(e -> list.remove(e));
return !list.isEmpty();
}
protected boolean checkIfConnectedPeersExceeds(int limit) {
Log.traceCall(); Log.traceCall();
int size = authenticatedPeers.size(); int size = authenticatedPeers.size();
if (size > limit) { if (size > limit) {
@ -651,11 +740,11 @@ public class PeerManager implements MessageListener, ConnectionListener {
log.debug("networkNode.getAllConnections()={}", networkNode.getAllConnections()); log.debug("networkNode.getAllConnections()={}", networkNode.getAllConnections());
}*/ }*/
// If we are a seed node we don't remove other seed nodes to keep the core network well connected // We don't remove seed nodes to keep the core network well connected
List<Connection> authenticatedConnections = allConnections.stream() List<Connection> authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated()) .filter(e -> e.isAuthenticated())
.filter(e -> e.getConnectionPriority() == ConnectionPriority.PASSIVE) .filter(e -> e.getConnectionPriority() == ConnectionPriority.PASSIVE)
.filter(e -> !(this instanceof SeedNodePeerManager) || !isAuthConnectionSeedNode(e)) .filter(e -> !isSeedNode(e))
.collect(Collectors.toList()); .collect(Collectors.toList());
if (authenticatedConnections.size() == 0) { if (authenticatedConnections.size() == 0) {
@ -665,7 +754,7 @@ public class PeerManager implements MessageListener, ConnectionListener {
authenticatedConnections = allConnections.stream() authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated()) .filter(e -> e.isAuthenticated())
.filter(e -> e.getConnectionPriority() == ConnectionPriority.PASSIVE || e.getConnectionPriority() == ConnectionPriority.ACTIVE) .filter(e -> e.getConnectionPriority() == ConnectionPriority.PASSIVE || e.getConnectionPriority() == ConnectionPriority.ACTIVE)
.filter(e -> !(this instanceof SeedNodePeerManager) || !isAuthConnectionSeedNode(e)) .filter(e -> !isSeedNode(e))
.collect(Collectors.toList()); .collect(Collectors.toList());
if (authenticatedConnections.size() == 0) { if (authenticatedConnections.size() == 0) {
@ -675,7 +764,7 @@ public class PeerManager implements MessageListener, ConnectionListener {
authenticatedConnections = allConnections.stream() authenticatedConnections = allConnections.stream()
.filter(e -> e.isAuthenticated()) .filter(e -> e.isAuthenticated())
.filter(e -> e.getConnectionPriority() != ConnectionPriority.AUTH_REQUEST) .filter(e -> e.getConnectionPriority() != ConnectionPriority.AUTH_REQUEST)
.filter(e -> !(this instanceof SeedNodePeerManager) || !isAuthConnectionSeedNode(e)) .filter(e -> !isSeedNode(e))
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
} }
@ -688,7 +777,7 @@ public class PeerManager implements MessageListener, ConnectionListener {
Connection connection = authenticatedConnections.remove(0); Connection connection = authenticatedConnections.remove(0);
log.info("We are going to shut down the oldest connection with last activity date=" log.info("We are going to shut down the oldest connection with last activity date="
+ connection.getLastActivityDate() + " / connection=" + connection); + connection.getLastActivityDate() + " / connection=" + connection);
connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> checkIfConnectedPeersExceeds(limit), 10, 50, TimeUnit.MILLISECONDS)); connection.shutDown(() -> checkIfConnectedPeersExceeds(limit));
return true; return true;
} else { } else {
log.debug("authenticatedConnections.size() == 0. That might happen in rare cases. (checkIfConnectedPeersExceeds)"); log.debug("authenticatedConnections.size() == 0. That might happen in rare cases. (checkIfConnectedPeersExceeds)");
@ -700,8 +789,10 @@ public class PeerManager implements MessageListener, ConnectionListener {
} }
} }
private boolean isAuthConnectionSeedNode(Connection e) { private boolean isSeedNode(Connection connection) {
return e.getPeerAddressOptional().isPresent() && seedNodeAddressesOptional.isPresent() && seedNodeAddressesOptional.get().contains(e.getPeerAddressOptional().get()); return connection.getPeerAddressOptional().isPresent()
&& seedNodeAddressesOptional.isPresent()
&& seedNodeAddressesOptional.get().contains(connection.getPeerAddressOptional().get());
} }
@ -724,6 +815,10 @@ public class PeerManager implements MessageListener, ConnectionListener {
return authenticatedPeers; return authenticatedPeers;
} }
public HashSet<ReportedPeer> getPersistedPeers() {
return persistedPeers;
}
public boolean isInAuthenticationProcess(Address address) { public boolean isInAuthenticationProcess(Address address) {
return authenticationHandshakes.containsKey(address); return authenticationHandshakes.containsKey(address);
} }
@ -763,6 +858,25 @@ public class PeerManager implements MessageListener, ConnectionListener {
reportedPeers.addAll(adjustedReportedPeers); reportedPeers.addAll(adjustedReportedPeers);
purgeReportedPeersIfExceeds(); purgeReportedPeersIfExceeds();
// We add all adjustedReportedPeers to persistedReportedPeers but only save the 500 peers with the most
// recent lastActivityDate.
// ReportedPeers is changing when peers authenticate (remove) so we don't use that but
// the persistedReportedPeers set.
persistedPeers.addAll(adjustedReportedPeers);
// We add also our authenticated and authenticating peers
authenticatedPeers.keySet().forEach(e -> persistedPeers.add(new ReportedPeer(e, new Date())));
authenticationHandshakes.keySet().forEach(e -> persistedPeers.add(new ReportedPeer(e, new Date())));
int toRemove = persistedPeers.size() - 500;
if (toRemove > 0) {
List<ReportedPeer> list = new ArrayList<>(persistedPeers);
list.sort((o1, o2) -> o1.lastActivityDate.compareTo(o2.lastActivityDate));
for (int i = 0; i < toRemove; i++) {
persistedPeers.remove(list.get(i));
}
}
dbStorage.queueUpForSave(persistedPeers);
} }
printReportedPeers(); printReportedPeers();
@ -800,30 +914,31 @@ public class PeerManager implements MessageListener, ConnectionListener {
} }
private Optional<ReportedPeer> getAndRemoveNotAuthenticatingReportedPeer() { private Optional<ReportedPeer> getAndRemoveNotAuthenticatingReportedPeer() {
Optional<ReportedPeer> reportedPeer = Optional.empty();
List<ReportedPeer> list = new ArrayList<>(reportedPeers); List<ReportedPeer> list = new ArrayList<>(reportedPeers);
authenticationHandshakes.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e))); authenticationHandshakes.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e)));
authenticatedPeers.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e))); authenticatedPeers.keySet().stream().forEach(e -> list.remove(new ReportedPeer(e)));
if (!list.isEmpty()) if (!list.isEmpty()) {
reportedPeer = Optional.of(getAndRemoveRandomReportedPeer(list)); ReportedPeer reportedPeer = getAndRemoveRandomReportedPeer(list);
removeFromReportedPeers(reportedPeer.address);
return reportedPeer; return Optional.of(reportedPeer);
} else {
return Optional.empty();
}
} }
private Address getAndRemoveRandomAddress(List<Address> list) { protected Address getAndRemoveRandomAddress(List<Address> list) {
checkArgument(!list.isEmpty(), "List must not be empty"); checkArgument(!list.isEmpty(), "List must not be empty");
return list.remove(new Random().nextInt(list.size())); return list.remove(new Random().nextInt(list.size()));
} }
private Optional<Address> getAndRemoveNotAuthenticatingSeedNode() { private Optional<Address> getAndRemoveNotAuthenticatingSeedNode() {
Optional<Address> seedNode = Optional.empty();
authenticationHandshakes.keySet().stream().forEach(e -> remainingSeedNodes.remove(e)); authenticationHandshakes.keySet().stream().forEach(e -> remainingSeedNodes.remove(e));
authenticatedPeers.keySet().stream().forEach(e -> remainingSeedNodes.remove(e)); authenticatedPeers.keySet().stream().forEach(e -> remainingSeedNodes.remove(e));
if (!remainingSeedNodes.isEmpty()) if (remainingSeedNodesAvailable())
seedNode = Optional.of(getAndRemoveRandomAddress(remainingSeedNodes)); return Optional.of(getAndRemoveRandomAddress(remainingSeedNodes));
else
return seedNode; return Optional.empty();
} }
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
@ -836,18 +951,22 @@ public class PeerManager implements MessageListener, ConnectionListener {
} }
private void printAuthenticatedPeers() { private void printAuthenticatedPeers() {
StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" + if (!authenticatedPeers.isEmpty()) {
"Authenticated peers for node " + getMyAddress() + ":"); StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" +
authenticatedPeers.values().stream().forEach(e -> result.append("\n").append(e.address)); "Authenticated peers for node " + getMyAddress() + ":");
result.append("\n------------------------------------------------------------\n"); authenticatedPeers.values().stream().forEach(e -> result.append("\n").append(e.address));
log.info(result.toString()); result.append("\n------------------------------------------------------------\n");
log.info(result.toString());
}
} }
private void printReportedPeers() { private void printReportedPeers() {
StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" + if (!reportedPeers.isEmpty()) {
"Reported peers for node " + getMyAddress() + ":"); StringBuilder result = new StringBuilder("\n\n------------------------------------------------------------\n" +
reportedPeers.stream().forEach(e -> result.append("\n").append(e)); "Reported peers for node " + getMyAddress() + ":");
result.append("\n------------------------------------------------------------\n"); reportedPeers.stream().forEach(e -> result.append("\n").append(e));
log.info(result.toString()); result.append("\n------------------------------------------------------------\n");
log.info(result.toString());
}
} }
} }

View File

@ -22,6 +22,7 @@ public class ReportedPeer implements Serializable {
this(address, null); this(address, null);
} }
// We don't use the lastActivityDate for identity
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
@ -33,6 +34,7 @@ public class ReportedPeer implements Serializable {
} }
// We don't use the lastActivityDate for identity
@Override @Override
public int hashCode() { public int hashCode() {
return address != null ? address.hashCode() : 0; return address != null ? address.hashCode() : 0;

View File

@ -22,8 +22,10 @@ import org.slf4j.LoggerFactory;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
public class RequestDataManager implements MessageListener, AuthenticationListener { public class RequestDataManager implements MessageListener, AuthenticationListener {
private static final Logger log = LoggerFactory.getLogger(RequestDataManager.class); private static final Logger log = LoggerFactory.getLogger(RequestDataManager.class);
@ -36,6 +38,8 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
public interface Listener { public interface Listener {
void onNoSeedNodeAvailable(); void onNoSeedNodeAvailable();
void onNoPeersAvailable();
void onDataReceived(Address seedNode); void onDataReceived(Address seedNode);
} }
@ -43,21 +47,24 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
private final NetworkNode networkNode; private final NetworkNode networkNode;
protected final P2PDataStorage dataStorage; protected final P2PDataStorage dataStorage;
private final PeerManager peerManager; private final PeerManager peerManager;
private final Listener listener; private final HashSet<ReportedPeer> persistedPeers = new HashSet<>();
private final HashSet<ReportedPeer> remainingPersistedPeers = new HashSet<>();
private Listener listener;
private Optional<Address> optionalConnectedSeedNodeAddress = Optional.empty(); private Optional<Address> optionalConnectedSeedNodeAddress = Optional.empty();
private Optional<Collection<Address>> optionalSeedNodeAddresses = Optional.empty(); private Collection<Address> seedNodeAddresses;
protected Timer requestDataFromAuthenticatedSeedNodeTimer;
private Timer requestDataTimer, requestDataWithPersistedPeersTimer;
private boolean doNotifyNoSeedNodeAvailableListener = true;
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Constructor // Constructor
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public RequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager, Listener listener) { public RequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager) {
this.networkNode = networkNode; this.networkNode = networkNode;
this.dataStorage = dataStorage; this.dataStorage = dataStorage;
this.peerManager = peerManager; this.peerManager = peerManager;
this.listener = listener;
networkNode.addMessageListener(this); networkNode.addMessageListener(this);
} }
@ -66,6 +73,10 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
Log.traceCall(); Log.traceCall();
networkNode.removeMessageListener(this); networkNode.removeMessageListener(this);
stopRequestDataTimer();
stopRequestDataWithPersistedPeersTimer();
stopRequestDataFromAuthenticatedSeedNodeTimer();
} }
@ -73,54 +84,143 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
// API // API
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void requestData(Collection<Address> seedNodeAddresses) { public void setRequestDataManagerListener(Listener listener) {
if (!optionalSeedNodeAddresses.isPresent()) this.listener = listener;
optionalSeedNodeAddresses = Optional.of(seedNodeAddresses); }
Log.traceCall(seedNodeAddresses.toString()); public void requestDataFromSeedNodes(Collection<Address> seedNodeAddresses) {
if (!seedNodeAddresses.isEmpty()) { checkNotNull(seedNodeAddresses, "requestDataFromSeedNodes: seedNodeAddresses must not be null.");
List<Address> remainingSeedNodeAddresses = new ArrayList<>(seedNodeAddresses); checkArgument(!seedNodeAddresses.isEmpty(), "requestDataFromSeedNodes: seedNodeAddresses must not be empty.");
Collections.shuffle(remainingSeedNodeAddresses);
Address candidate = remainingSeedNodeAddresses.get(0);
if (!peerManager.isInAuthenticationProcess(candidate)) {
// We only remove it if it is not in the process of authentication
remainingSeedNodeAddresses.remove(0);
log.info("We try to send a GetAllDataMessage request to a random seed node. " + candidate);
SettableFuture<Connection> future = networkNode.sendMessage(candidate, new DataRequest()); this.seedNodeAddresses = seedNodeAddresses;
Futures.addCallback(future, new FutureCallback<Connection>() { requestData(seedNodeAddresses);
@Override }
public void onSuccess(@Nullable Connection connection) {
log.info("Send GetAllDataMessage to " + candidate + " succeeded."); private void requestData(Collection<Address> addresses) {
checkArgument(!optionalConnectedSeedNodeAddress.isPresent(), "We have already a connectedSeedNode. That must not happen."); Log.traceCall(addresses.toString());
optionalConnectedSeedNodeAddress = Optional.of(candidate); checkArgument(!addresses.isEmpty(), "requestData: addresses must not be empty.");
stopRequestDataTimer();
List<Address> remainingAddresses = new ArrayList<>(addresses);
Address candidate = remainingAddresses.get(new Random().nextInt(remainingAddresses.size()));
if (!peerManager.isInAuthenticationProcess(candidate)) {
// We only remove it if it is not in the process of authentication
remainingAddresses.remove(candidate);
log.info("We try to send a GetAllDataMessage request to node. " + candidate);
SettableFuture<Connection> future = networkNode.sendMessage(candidate, new DataRequest());
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.info("Send GetAllDataMessage to " + candidate + " succeeded.");
checkArgument(!optionalConnectedSeedNodeAddress.isPresent(), "We have already a connectedSeedNode. That must not happen.");
optionalConnectedSeedNodeAddress = Optional.of(candidate);
stopRequestDataTimer();
stopRequestDataWithPersistedPeersTimer();
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("Send GetAllDataMessage to " + candidate + " failed. " +
"That is expected if the node is offline. " +
"Exception:" + throwable.getMessage());
if (!remainingAddresses.isEmpty()) {
log.info("There are more seed nodes available for requesting data. " +
"We will try requestData again.");
ReportedPeer reportedPeer = new ReportedPeer(candidate);
if (remainingPersistedPeers.contains(reportedPeer))
remainingPersistedPeers.remove(reportedPeer);
requestData(remainingAddresses);
} else {
log.info("There is no seed node available for requesting data. " +
"That is expected if no seed node is online.\n" +
"We will try again to request data from a seed node after a random pause.");
requestDataWithPersistedPeers(candidate);
requestDataWithSeedNodeAddresses();
} }
}
@Override });
public void onFailure(@NotNull Throwable throwable) { } else if (!remainingAddresses.isEmpty()) {
log.info("Send GetAllDataMessage to " + candidate + " failed. " + log.info("The node ({}) is in the process of authentication.\n" +
"That is expected if the seed node is offline. " + "We will try requestData again with the remaining addresses.", candidate);
"Exception:" + throwable.getMessage()); remainingAddresses.remove(candidate);
if (!remainingSeedNodeAddresses.isEmpty()) if (!remainingAddresses.isEmpty()) {
log.trace("We try to connect another random seed node from our remaining list. " + remainingSeedNodeAddresses); requestData(remainingAddresses);
requestData(remainingSeedNodeAddresses);
}
});
} else { } else {
log.info("The seed node ({}) is in the process of authentication.\n" + log.info("The node ({}) is in the process of authentication.\n" +
"We will try again after a pause of 3-5 sec.", candidate); "There are no more remaining addresses available.\n" +
listener.onNoSeedNodeAvailable(); "We try requestData with the persistedPeers and after a pause with " +
UserThread.runAfterRandomDelay(() -> requestData(remainingSeedNodeAddresses), "the seed nodes again.", candidate);
3, 5, TimeUnit.SECONDS); requestDataWithPersistedPeers(candidate);
requestDataWithSeedNodeAddresses();
} }
} else { } else {
log.info("There is no seed node available for requesting data. " + log.info("The node ({}) is in the process of authentication.\n" +
"That is expected if no seed node is online.\n" + "There are no more remaining addresses available.\n" +
"We will try again after a pause of 10-20 sec."); "We try requestData with the persistedPeers and after a pause with " +
"the seed nodes again.", candidate);
requestDataWithPersistedPeers(candidate);
requestDataWithSeedNodeAddresses();
}
}
private void requestDataWithSeedNodeAddresses() {
Log.traceCall();
// We only want to notify the first time
if (doNotifyNoSeedNodeAvailableListener) {
doNotifyNoSeedNodeAvailableListener = false;
listener.onNoSeedNodeAvailable(); listener.onNoSeedNodeAvailable();
UserThread.runAfterRandomDelay(() -> requestData(optionalSeedNodeAddresses.get()), }
if (requestDataTimer == null)
requestDataTimer = UserThread.runAfterRandomDelay(() -> requestData(seedNodeAddresses),
10, 20, TimeUnit.SECONDS); 10, 20, TimeUnit.SECONDS);
}
private void requestDataWithPersistedPeers(@Nullable Address failedPeer) {
Log.traceCall("failedPeer=" + failedPeer);
stopRequestDataWithPersistedPeersTimer();
if (persistedPeers.isEmpty()) {
persistedPeers.addAll(peerManager.getPersistedPeers());
log.info("persistedPeers = " + persistedPeers);
remainingPersistedPeers.addAll(persistedPeers);
}
if (failedPeer != null) {
ReportedPeer reportedPeer = new ReportedPeer(failedPeer);
if (remainingPersistedPeers.contains(reportedPeer))
remainingPersistedPeers.remove(reportedPeer);
}
boolean persistedPeersAvailable = false;
if (!remainingPersistedPeers.isEmpty()) {
Set<Address> persistedPeerAddresses = remainingPersistedPeers.stream().map(e -> e.address).collect(Collectors.toSet());
if (!persistedPeerAddresses.isEmpty()) {
log.info("We try to use persisted peers for requestData.");
persistedPeersAvailable = true;
requestData(persistedPeerAddresses);
}
}
if (!persistedPeersAvailable) {
log.warn("No seed nodes and no persisted peers are available for requesting data.\n" +
"We will try again after a random pause.");
doNotifyNoSeedNodeAvailableListener = false;
listener.onNoPeersAvailable();
// reset remainingPersistedPeers
remainingPersistedPeers.clear();
remainingPersistedPeers.addAll(persistedPeers);
if (!remainingPersistedPeers.isEmpty() && requestDataWithPersistedPeersTimer == null)
requestDataWithPersistedPeersTimer = UserThread.runAfterRandomDelay(() ->
requestDataWithPersistedPeers(null),
30, 40, TimeUnit.SECONDS);
} }
} }
@ -159,8 +259,9 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
// We delay a bit to be sure that the authentication state is applied to all listeners // We delay a bit to be sure that the authentication state is applied to all listeners
if (connectedSeedNodeAddress.equals(peerAddress) && connection.getConnectionPriority() == ConnectionPriority.ACTIVE) { if (connectedSeedNodeAddress.equals(peerAddress) && connection.getConnectionPriority() == ConnectionPriority.ACTIVE) {
// We are the node (can be a seed node as well) which requested the authentication // We are the node (can be a seed node as well) which requested the authentication
UserThread.runAfter(() if (requestDataFromAuthenticatedSeedNodeTimer == null)
-> requestDataFromAuthenticatedSeedNode(peerAddress, connection), 100, TimeUnit.MILLISECONDS); requestDataFromAuthenticatedSeedNodeTimer = UserThread.runAfter(()
-> requestDataFromAuthenticatedSeedNode(peerAddress, connection), 100, TimeUnit.MILLISECONDS);
} }
}); });
} }
@ -168,6 +269,9 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
// 5. Step after authentication to first seed node we request again the data // 5. Step after authentication to first seed node we request again the data
protected void requestDataFromAuthenticatedSeedNode(Address peerAddress, Connection connection) { protected void requestDataFromAuthenticatedSeedNode(Address peerAddress, Connection connection) {
Log.traceCall(peerAddress.toString()); Log.traceCall(peerAddress.toString());
stopRequestDataFromAuthenticatedSeedNodeTimer();
// We have to request the data again as we might have missed pushed data in the meantime // We have to request the data again as we might have missed pushed data in the meantime
SettableFuture<Connection> future = networkNode.sendMessage(connection, new DataRequest()); SettableFuture<Connection> future = networkNode.sendMessage(connection, new DataRequest());
Futures.addCallback(future, new FutureCallback<Connection>() { Futures.addCallback(future, new FutureCallback<Connection>() {
@ -183,9 +287,33 @@ public class RequestDataManager implements MessageListener, AuthenticationListen
+ "\nWe will try again to request data from any of our seed nodes."); + "\nWe will try again to request data from any of our seed nodes.");
// We will try again to request data from any of our seed nodes. // We will try again to request data from any of our seed nodes.
if (optionalSeedNodeAddresses.isPresent()) if (seedNodeAddresses != null && !seedNodeAddresses.isEmpty())
requestData(optionalSeedNodeAddresses.get()); requestData(seedNodeAddresses);
else
log.error("seedNodeAddresses is null or empty. That must not happen. seedNodeAddresses="
+ seedNodeAddresses);
} }
}); });
} }
private void stopRequestDataTimer() {
if (requestDataTimer != null) {
requestDataTimer.cancel();
requestDataTimer = null;
}
}
private void stopRequestDataWithPersistedPeersTimer() {
if (requestDataWithPersistedPeersTimer != null) {
requestDataWithPersistedPeersTimer.cancel();
requestDataWithPersistedPeersTimer = null;
}
}
private void stopRequestDataFromAuthenticatedSeedNodeTimer() {
if (requestDataFromAuthenticatedSeedNodeTimer != null) {
requestDataFromAuthenticatedSeedNodeTimer.cancel();
requestDataFromAuthenticatedSeedNodeTimer = null;
}
}
} }

View File

@ -1,13 +1,74 @@
package io.bitsquare.p2p.peers; package io.bitsquare.p2p.peers;
import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.NetworkNode; import io.bitsquare.p2p.network.NetworkNode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
public class SeedNodePeerManager extends PeerManager { public class SeedNodePeerManager extends PeerManager {
private static final Logger log = LoggerFactory.getLogger(SeedNodePeerManager.class); private static final Logger log = LoggerFactory.getLogger(SeedNodePeerManager.class);
public SeedNodePeerManager(NetworkNode networkNode) { public SeedNodePeerManager(NetworkNode networkNode, File storageDir) {
super(networkNode); super(networkNode, storageDir);
} }
public void authenticateToSeedNode() {
Log.traceCall();
checkArgument(seedNodeAddressesOptional.isPresent(),
"seedNodeAddresses must be set before calling authenticateToSeedNode");
checkArgument(!seedNodeAddressesOptional.get().isEmpty(),
"seedNodeAddresses must not be empty");
remainingSeedNodes.addAll(seedNodeAddressesOptional.get());
Address peerAddress = getAndRemoveRandomAddress(remainingSeedNodes);
authenticateToFirstSeedNode(peerAddress);
startCheckSeedNodeConnectionTask();
}
@Override
protected void onFirstSeedNodeAuthenticated() {
// If we are seed node we want to first connect to all other seed nodes before connecting to the reported peers.
authenticateToRemainingSeedNode();
}
@Override
protected void onRemainingSeedNodeAuthenticated() {
// If we are seed node we want to first connect to all other seed nodes before connecting to the reported peers.
authenticateToRemainingSeedNode();
}
@Override
protected void handleNoSeedNodesAvailableCase() {
Log.traceCall();
log.info("We don't have more seed nodes available. " +
"We authenticate to reported peers and try again after a random pause with the seed nodes which failed or if " +
"none available with the reported peers.");
boolean reportedPeersAvailableCalled = false;
if (reportedPeersAvailable()) {
authenticateToRemainingReportedPeer();
reportedPeersAvailableCalled = true;
}
resetRemainingSeedNodes();
if (!remainingSeedNodes.isEmpty()) {
if (authenticateToRemainingSeedNodeTimer == null)
authenticateToRemainingSeedNodeTimer = UserThread.runAfterRandomDelay(() -> authenticateToRemainingSeedNode(),
10, 20, TimeUnit.SECONDS);
} else if (!reportedPeersAvailableCalled) {
if (authenticateToRemainingReportedPeerTimer == null)
authenticateToRemainingReportedPeerTimer = UserThread.runAfterRandomDelay(() -> authenticateToRemainingReportedPeer(),
10, 20, TimeUnit.SECONDS);
}
}
} }

View File

@ -13,15 +13,16 @@ import java.util.concurrent.TimeUnit;
public class SeedNodeRequestDataManager extends RequestDataManager { public class SeedNodeRequestDataManager extends RequestDataManager {
private static final Logger log = LoggerFactory.getLogger(SeedNodeRequestDataManager.class); private static final Logger log = LoggerFactory.getLogger(SeedNodeRequestDataManager.class);
public SeedNodeRequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager, Listener listener) { public SeedNodeRequestDataManager(NetworkNode networkNode, P2PDataStorage dataStorage, PeerManager peerManager) {
super(networkNode, dataStorage, peerManager, listener); super(networkNode, dataStorage, peerManager);
} }
@Override @Override
public void onPeerAuthenticated(Address peerAddress, Connection connection) { public void onPeerAuthenticated(Address peerAddress, Connection connection) {
//TODO not clear which use case is handles here... //TODO not clear which use case is handles here...
if (dataStorage.getMap().isEmpty()) { if (dataStorage.getMap().isEmpty()) {
UserThread.runAfterRandomDelay(() if (requestDataFromAuthenticatedSeedNodeTimer == null)
requestDataFromAuthenticatedSeedNodeTimer = UserThread.runAfterRandomDelay(()
-> requestDataFromAuthenticatedSeedNode(peerAddress, connection), 2, 5, TimeUnit.SECONDS); -> requestDataFromAuthenticatedSeedNode(peerAddress, connection), 2, 5, TimeUnit.SECONDS);
} }
super.onPeerAuthenticated(peerAddress, connection); super.onPeerAuthenticated(peerAddress, connection);

View File

@ -173,7 +173,7 @@ public class P2PDataStorage implements MessageListener {
rePublish = true; rePublish = true;
sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber); sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber);
storage.queueUpForSave(sequenceNumberMap); storage.queueUpForSave(sequenceNumberMap, 5000);
StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n"); StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n");
sb.append("Data set after addProtectedExpirableData:"); sb.append("Data set after addProtectedExpirableData:");
@ -210,7 +210,7 @@ public class P2PDataStorage implements MessageListener {
broadcast(new RemoveDataMessage(protectedData), sender); broadcast(new RemoveDataMessage(protectedData), sender);
sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber); sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber);
storage.queueUpForSave(sequenceNumberMap); storage.queueUpForSave(sequenceNumberMap, 5000);
} else { } else {
log.debug("remove failed"); log.debug("remove failed");
} }
@ -235,7 +235,7 @@ public class P2PDataStorage implements MessageListener {
broadcast(new RemoveMailboxDataMessage(protectedMailboxData), sender); broadcast(new RemoveMailboxDataMessage(protectedMailboxData), sender);
sequenceNumberMap.put(hashOfData, protectedMailboxData.sequenceNumber); sequenceNumberMap.put(hashOfData, protectedMailboxData.sequenceNumber);
storage.queueUpForSave(sequenceNumberMap); storage.queueUpForSave(sequenceNumberMap, 5000);
} else { } else {
log.debug("removeMailboxData failed"); log.debug("removeMailboxData failed");
} }

View File

@ -90,6 +90,10 @@ public class TestUtils {
public void onNoSeedNodeAvailable() { public void onNoSeedNodeAvailable() {
} }
@Override
public void onNoPeersAvailable() {
}
@Override @Override
public void onFirstPeerAuthenticated() { public void onFirstPeerAuthenticated() {
} }
@ -136,8 +140,11 @@ public class TestUtils {
} }
@Override @Override
public void onTorNodeReady() { public void onNoPeersAvailable() {
}
@Override
public void onTorNodeReady() {
} }
@Override @Override
@ -147,7 +154,6 @@ public class TestUtils {
@Override @Override
public void onHiddenServicePublished() { public void onHiddenServicePublished() {
} }
@Override @Override

View File

@ -95,6 +95,10 @@ public class PeerManagerTest {
public void onNoSeedNodeAvailable() { public void onNoSeedNodeAvailable() {
} }
@Override
public void onNoPeersAvailable() {
}
@Override @Override
public void onFirstPeerAuthenticated() { public void onFirstPeerAuthenticated() {
} }
@ -140,7 +144,10 @@ public class PeerManagerTest {
@Override @Override
public void onTorNodeReady() { public void onTorNodeReady() {
}
@Override
public void onNoPeersAvailable() {
} }
@Override @Override
@ -177,6 +184,10 @@ public class PeerManagerTest {
public void onTorNodeReady() { public void onTorNodeReady() {
} }
@Override
public void onNoPeersAvailable() {
}
@Override @Override
public void onFirstPeerAuthenticated() { public void onFirstPeerAuthenticated() {
latch.countDown(); latch.countDown();
@ -410,6 +421,10 @@ public class PeerManagerTest {
public void onTorNodeReady() { public void onTorNodeReady() {
} }
@Override
public void onNoPeersAvailable() {
}
@Override @Override
public void onFirstPeerAuthenticated() { public void onFirstPeerAuthenticated() {
} }

View File

@ -65,7 +65,7 @@ public class ProtectedDataStorageTest {
storageSignatureKeyPair1 = keyRing1.getSignatureKeyPair(); storageSignatureKeyPair1 = keyRing1.getSignatureKeyPair();
encryptionService1 = new EncryptionService(keyRing1); encryptionService1 = new EncryptionService(keyRing1);
networkNode1 = TestUtils.getAndStartSeedNode(8001, useClearNet, seedNodes).getSeedNodeP2PService().getNetworkNode(); networkNode1 = TestUtils.getAndStartSeedNode(8001, useClearNet, seedNodes).getSeedNodeP2PService().getNetworkNode();
peerManager1 = new PeerManager(networkNode1); peerManager1 = new PeerManager(networkNode1, new File("dummy"));
dataStorage1 = new P2PDataStorage(peerManager1, networkNode1, new File("dummy")); dataStorage1 = new P2PDataStorage(peerManager1, networkNode1, new File("dummy"));
// for mailbox // for mailbox