improve logging

This commit is contained in:
Manfred Karrer 2015-11-07 23:13:19 +01:00
parent 6e1e3d4be3
commit 13399057f7
29 changed files with 679 additions and 362 deletions

View File

@ -24,7 +24,9 @@ import ch.qos.logback.core.rolling.RollingFileAppender;
import ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy;
import org.slf4j.LoggerFactory;
public class Logging {
public class Log {
public static boolean PRINT_TRACE_METHOD = true;
public static void setup(String fileName) {
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
@ -46,7 +48,7 @@ public class Logging {
PatternLayoutEncoder encoder = new PatternLayoutEncoder();
encoder.setContext(loggerContext);
encoder.setPattern("%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg %xEx%n");
encoder.setPattern("%highlight(%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg %xEx%n)");
encoder.start();
appender.setEncoder(encoder);
@ -57,4 +59,22 @@ public class Logging {
ch.qos.logback.classic.Logger logbackLogger = loggerContext.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME);
logbackLogger.addAppender(appender);
}
public static void traceCall() {
StackTraceElement stackTraceElement = new Throwable().getStackTrace()[1];
String methodName = stackTraceElement.getMethodName();
if (methodName.equals("<init>"))
methodName = "Constructor ";
String className = stackTraceElement.getClassName();
LoggerFactory.getLogger(className).trace("Called: {}", methodName);
}
public static void traceCall(String message) {
StackTraceElement stackTraceElement = new Throwable().getStackTrace()[1];
String methodName = stackTraceElement.getMethodName();
if (methodName.equals("<init>"))
methodName = "Constructor ";
String className = stackTraceElement.getClassName();
LoggerFactory.getLogger(className).trace("Called: {} [{}]", methodName, message);
}
}

View File

@ -35,6 +35,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLConnection;
import java.net.URLEncoder;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@ -372,6 +373,10 @@ public class Utilities {
}
}
public static void setThreadName(String name) {
Thread.currentThread().setName(name + "-" + new Random().nextInt(10000));
}
private static class AnnotationExclusionStrategy implements ExclusionStrategy {
@Override
public boolean shouldSkipField(FieldAttributes f) {

View File

@ -127,7 +127,7 @@ public class ArbitratorManager {
}
@Override
public void onAllDataReceived() {
public void onRequestingDataCompleted() {
}
@Override

View File

@ -81,6 +81,7 @@ public class ArbitratorService {
}
public Map<Address, Arbitrator> getArbitrators() {
// TODO java.lang.IllegalStateException: Duplicate key
final Map<Address, Arbitrator> arbitratorsMap = p2PService.getDataMap().values().stream()
.filter(e -> e.expirablePayload instanceof Arbitrator)
.map(e -> (Arbitrator) e.expirablePayload)

View File

@ -129,7 +129,7 @@ public class DisputeManager {
}
@Override
public void onAllDataReceived() {
public void onRequestingDataCompleted() {
}
@Override

View File

@ -28,6 +28,7 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.common.handlers.ErrorMessageHandler;
import io.bitsquare.common.handlers.ExceptionHandler;
import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.user.Preferences;
import javafx.beans.property.*;
import org.bitcoinj.core.*;
@ -122,7 +123,7 @@ public class WalletService {
Timer timeoutTimer = FxTimer.runLater(
Duration.ofMillis(STARTUP_TIMEOUT),
() -> {
Thread.currentThread().setName("WalletService:StartupTimeout-" + new Random().nextInt(1000));
Utilities.setThreadName("WalletService:StartupTimeout");
exceptionHandler.handleException(new TimeoutException("Wallet did not initialize in " + STARTUP_TIMEOUT / 1000 + " seconds."));
}
);

View File

@ -162,7 +162,7 @@ public class TradeManager {
}
@Override
public void onAllDataReceived() {
public void onRequestingDataCompleted() {
}
@Override

View File

@ -18,6 +18,7 @@
package io.bitsquare.trade.offer;
import io.bitsquare.app.Version;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.storage.Storage;
import io.bitsquare.trade.Tradable;
import io.bitsquare.trade.TradableList;
@ -29,7 +30,6 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.time.Duration;
import java.util.Date;
import java.util.Random;
public class OpenOffer implements Tradable, Serializable {
// That object is saved to disc. We need to take care of changes to not break deserialization.
@ -103,8 +103,8 @@ public class OpenOffer implements Tradable, Serializable {
timeoutTimer = FxTimer.runLater(
Duration.ofMillis(TIMEOUT),
() -> {
Thread.currentThread().setName("OpenOffer:Timeout-" + new Random().nextInt(1000));
log.debug("Timeout reached");
Utilities.setThreadName("OpenOffer:Timeout");
log.info("Timeout reached");
if (state == State.RESERVED)
setState(State.AVAILABLE);
});

View File

@ -24,6 +24,7 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.KeyRing;
import io.bitsquare.common.handlers.ErrorMessageHandler;
import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.P2PService;
@ -47,7 +48,6 @@ import javax.inject.Named;
import java.io.File;
import java.time.Duration;
import java.util.Optional;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
@ -142,7 +142,7 @@ public class OpenOfferManager {
}
@Override
public void onAllDataReceived() {
public void onRequestingDataCompleted() {
}
@Override
@ -165,7 +165,7 @@ public class OpenOfferManager {
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
Thread.currentThread().setName("RepublishOffers-" + new Random().nextInt(1000));
Utilities.setThreadName("RepublishOffers");
UserThread.execute(() -> rePublishOffers());
try {
} catch (Throwable t) {

View File

@ -20,6 +20,7 @@ package io.bitsquare.trade.protocol.availability;
import io.bitsquare.common.handlers.ErrorMessageHandler;
import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.common.taskrunner.TaskRunner;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.messaging.DecryptedMailListener;
import io.bitsquare.trade.offer.Offer;
@ -34,7 +35,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Random;
import static io.bitsquare.util.Validator.nonEmptyStringOf;
@ -145,7 +145,7 @@ public class OfferAvailabilityProtocol {
stopTimeout();
timeoutTimer = FxTimer.runLater(Duration.ofMillis(TIMEOUT), () -> {
Thread.currentThread().setName("OfferAvailabilityProtocol:Timeout-" + new Random().nextInt(1000));
Utilities.setThreadName("OfferAvailabilityProtocol:Timeout");
log.warn("Timeout reached");
errorMessageHandler.handleErrorMessage("Timeout reached: Peer has not responded.");
});

View File

@ -19,6 +19,7 @@ package io.bitsquare.trade.protocol.trade;
import io.bitsquare.arbitration.Arbitrator;
import io.bitsquare.common.crypto.PubKeyRing;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.messaging.DecryptedMailListener;
@ -36,7 +37,6 @@ import org.slf4j.LoggerFactory;
import java.security.PublicKey;
import java.time.Duration;
import java.util.Optional;
import java.util.Random;
import static io.bitsquare.util.Validator.nonEmptyStringOf;
@ -127,7 +127,7 @@ public abstract class TradeProtocol {
stopTimeout();
timeoutTimer = FxTimer.runLater(Duration.ofMillis(TIMEOUT), () -> {
Thread.currentThread().setName("TradeProtocol:Timeout-" + new Random().nextInt(1000));
Utilities.setThreadName("TradeProtocol:Timeout");
log.error("Timeout reached");
trade.setErrorMessage("A timeout occurred.");
cleanupTradable();

View File

@ -100,8 +100,9 @@ public class BitsquareApp extends Application {
public void start(Stage primaryStage) throws IOException {
BitsquareApp.primaryStage = primaryStage;
Logging.setup(Paths.get(env.getProperty(BitsquareEnvironment.APP_DATA_DIR_KEY), "bitsquare").toString());
Log.setup(Paths.get(env.getProperty(BitsquareEnvironment.APP_DATA_DIR_KEY), "bitsquare").toString());
Log.PRINT_TRACE_METHOD = DEV_MODE;
UserThread.setExecutor(Platform::runLater);
shutDownHandler = this::stop;

View File

@ -214,7 +214,7 @@ class MainViewModel implements ViewModel {
}
@Override
public void onAllDataReceived() {
public void onRequestingDataCompleted() {
p2pNetworkInfoFooter.set("Data received from peer.");
p2pNetworkReady.set(true);
}

View File

@ -2,11 +2,11 @@
<configuration>
<appender name="CONSOLE_APPENDER" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg %xEx%n</pattern>
<pattern>%highlight(%d{MMM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{15} - %msg %xEx%n)</pattern>
</encoder>
</appender>
<root level="INFO">
<root level="TRACE">
<appender-ref ref="CONSOLE_APPENDER"/>
</root>
@ -20,7 +20,8 @@
<logger name="com.msopentech.thali.toronionproxy.OnionProxyManagerEventHandler" level="WARN"/>
<logger name="io.bitsquare.btc.AddressBasedCoinSelector" level="OFF"/>
<logger name="io.bitsquare.btc.AddressBasedCoinSelector" level="WARN"/>
<logger name="io.bitsquare.storage.Storage" level="WARN"/>
<logger name="io.bitsquare.gui.util.Profiler" level="ERROR"/>
<logger name="io.bitsquare.locale.BSResources" level="ERROR"/>

View File

@ -5,8 +5,8 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.bitsquare.app.Log;
import io.bitsquare.app.ProgramArguments;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.CryptoException;
import io.bitsquare.common.crypto.KeyRing;
import io.bitsquare.common.crypto.PubKeyRing;
@ -75,7 +75,7 @@ public class P2PService implements SetupListener {
private boolean shutDownComplete;
private final CopyOnWriteArraySet<Runnable> shutDownResultHandlers = new CopyOnWriteArraySet<>();
private final BooleanProperty hiddenServicePublished = new SimpleBooleanProperty();
private final BooleanProperty allDataLoaded = new SimpleBooleanProperty();
private final BooleanProperty requestingDataCompleted = new SimpleBooleanProperty();
private final BooleanProperty authenticated = new SimpleBooleanProperty();
private MonadicBinding<Boolean> readyForAuthentication;
@ -92,6 +92,7 @@ public class P2PService implements SetupListener {
@Nullable EncryptionService encryptionService,
KeyRing keyRing,
@Named("storage.dir") File storageDir) {
Log.traceCall();
this.seedNodesRepository = seedNodesRepository;
this.port = port;
this.torDir = torDir;
@ -106,6 +107,7 @@ public class P2PService implements SetupListener {
}
private void init() {
Log.traceCall();
// network
Set<Address> seedNodeAddresses;
if (useLocalhost) {
@ -127,32 +129,37 @@ public class P2PService implements SetupListener {
networkNode.addConnectionListener(new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
Log.traceCall();
}
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
Log.traceCall();
checkArgument(peerAddress.equals(connection.getPeerAddress()),
"peerAddress must match connection.getPeerAddress()");
authenticatedPeerAddresses.add(peerAddress);
authenticated.set(true);
dataStorage.setAuthenticated();
UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onAuthenticated()));
p2pServiceListeners.stream().forEach(e -> e.onAuthenticated());
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall();
if (connection.isAuthenticated())
authenticatedPeerAddresses.remove(connection.getPeerAddress());
}
@Override
public void onError(Throwable throwable) {
Log.traceCall();
log.error("onError self/ConnectionException " + networkNode.getAddress() + "/" + throwable);
}
});
networkNode.addMessageListener((message, connection) -> {
Log.traceCall();
if (message instanceof GetDataRequest) {
log.trace("Received GetDataSetMessage: " + message);
networkNode.sendMessage(connection, new GetDataResponse(getDataSet()));
@ -166,15 +173,15 @@ public class P2PService implements SetupListener {
} else {
log.trace("Received DataSetMessage: Empty data set");
}
allDataLoaded();
setRequestingDataCompleted();
} else if (message instanceof SealedAndSignedMessage) {
if (encryptionService != null) {
try {
SealedAndSignedMessage sealedAndSignedMessage = (SealedAndSignedMessage) message;
DecryptedMsgWithPubKey decryptedMsgWithPubKey = encryptionService.decryptAndVerify(
sealedAndSignedMessage.sealedAndSigned);
UserThread.execute(() -> decryptedMailListeners.stream().forEach(
e -> e.onMailMessage(decryptedMsgWithPubKey, connection.getPeerAddress())));
decryptedMailListeners.stream().forEach(
e -> e.onMailMessage(decryptedMsgWithPubKey, connection.getPeerAddress()));
} catch (CryptoException e) {
log.info("Decryption of SealedAndSignedMessage failed. " +
"That is expected if the message is not intended for us.");
@ -186,6 +193,7 @@ public class P2PService implements SetupListener {
peerGroup.addPeerListener(new PeerListener() {
@Override
public void onFirstAuthenticatePeer(Peer peer) {
Log.traceCall();
log.trace("onFirstAuthenticatePeer " + peer);
sendGetAllDataMessageAfterAuthentication(peer);
@ -193,41 +201,46 @@ public class P2PService implements SetupListener {
@Override
public void onPeerAdded(Peer peer) {
Log.traceCall();
}
@Override
public void onPeerRemoved(Address address) {
Log.traceCall();
}
@Override
public void onConnectionAuthenticated(Connection connection) {
Log.traceCall();
}
});
dataStorage.addHashMapChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedData entry) {
Log.traceCall();
if (entry instanceof ProtectedMailboxData)
tryDecryptMailboxData((ProtectedMailboxData) entry);
}
@Override
public void onRemoved(ProtectedData entry) {
Log.traceCall();
}
});
readyForAuthentication = EasyBind.combine(hiddenServicePublished, allDataLoaded, authenticated,
readyForAuthentication = EasyBind.combine(hiddenServicePublished, requestingDataCompleted, authenticated,
(a, b, c) -> a && b && !c);
readyForAuthentication.subscribe((observable, oldValue, newValue) -> {
// we need to have both the initial data delivered and the hidden service published before we
// bootstrap and authenticate to other nodes.
if (newValue)
authenticateSeedNode();
tryAuthenticateSeedNode();
});
allDataLoaded.addListener((observable, oldValue, newValue) -> {
requestingDataCompleted.addListener((observable, oldValue, newValue) -> {
if (newValue)
UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onAllDataReceived()));
p2pServiceListeners.stream().forEach(e -> e.onRequestingDataCompleted());
});
}
@ -238,18 +251,20 @@ public class P2PService implements SetupListener {
@Override
public void onTorNodeReady() {
UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady()));
Log.traceCall();
p2pServiceListeners.stream().forEach(e -> e.onTorNodeReady());
// 1. Step: As soon we have the tor node ready (hidden service still not available) we request the
// data set from a random seed node.
sendGetAllDataMessage(peerGroup.getSeedNodeAddresses());
sendGetDataRequest(peerGroup.getSeedNodeAddresses());
}
@Override
public void onHiddenServicePublished() {
Log.traceCall();
checkArgument(networkNode.getAddress() != null, "Address must be set when we have the hidden service ready");
UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onHiddenServicePublished()));
p2pServiceListeners.stream().forEach(e -> e.onHiddenServicePublished());
// 3. (or 2.). Step: Hidden service is published
hiddenServicePublished.set(true);
@ -257,12 +272,13 @@ public class P2PService implements SetupListener {
@Override
public void onSetupFailed(Throwable throwable) {
UserThread.execute(() -> p2pServiceListeners.stream().forEach(e -> e.onSetupFailed(throwable)));
Log.traceCall();
p2pServiceListeners.stream().forEach(e -> e.onSetupFailed(throwable));
}
private void sendGetAllDataMessage(Collection<Address> seedNodeAddresses) {
private void sendGetDataRequest(Collection<Address> seedNodeAddresses) {
Log.traceCall();
if (!seedNodeAddresses.isEmpty()) {
log.trace("sendGetAllDataMessage");
List<Address> remainingSeedNodeAddresses = new ArrayList<>(seedNodeAddresses);
Collections.shuffle(remainingSeedNodeAddresses);
Address candidate = remainingSeedNodeAddresses.remove(0);
@ -281,34 +297,39 @@ public class P2PService implements SetupListener {
log.info("Send GetAllDataMessage to " + candidate + " failed. " +
"That is expected if other seed nodes are offline." +
"\nException:" + throwable.getMessage());
log.trace("We try to connect another random seed node. " + remainingSeedNodeAddresses);
sendGetAllDataMessage(remainingSeedNodeAddresses);
if (!remainingSeedNodeAddresses.isEmpty())
log.trace("We try to connect another random seed node. " + remainingSeedNodeAddresses);
sendGetDataRequest(remainingSeedNodeAddresses);
}
});
} else {
log.info("There is no seed node available for requesting data. That is expected for the first seed node.");
allDataLoaded();
setRequestingDataCompleted();
}
}
private void allDataLoaded() {
private void setRequestingDataCompleted() {
Log.traceCall();
// 2. (or 3.) Step: We got all data loaded
if (!allDataLoaded.get()) {
log.trace("allDataLoaded");
allDataLoaded.set(true);
}
if (!requestingDataCompleted.get())
requestingDataCompleted.set(true);
}
// 4. Step: hiddenServicePublished and allDataLoaded. We start authenticate to the connected seed node.
private void authenticateSeedNode() {
private void tryAuthenticateSeedNode() {
Log.traceCall();
if (connectedSeedNode != null) {
log.trace("authenticateSeedNode");
peerGroup.authenticateSeedNode(connectedSeedNode);
} else {
log.debug("No connected seedNode available.");
}
}
// 5. Step:
private void sendGetAllDataMessageAfterAuthentication(final Peer peer) {
Log.traceCall();
log.trace("sendGetDataSetMessageAfterAuthentication");
// After authentication we request again data as we might have missed pushed data in the meantime
SettableFuture<Connection> future = networkNode.sendMessage(peer.connection, new GetDataRequest());
@ -333,14 +354,17 @@ public class P2PService implements SetupListener {
// used by seed nodes to exclude themselves form list
public void removeMySeedNodeAddressFromList(Address mySeedNodeAddress) {
Log.traceCall();
peerGroup.removeMySeedNodeAddressFromList(mySeedNodeAddress);
}
public void start() {
Log.traceCall();
start(null);
}
public void start(@Nullable P2PServiceListener listener) {
Log.traceCall();
if (listener != null)
addP2PServiceListener(listener);
@ -348,6 +372,7 @@ public class P2PService implements SetupListener {
}
public void shutDown(Runnable shutDownCompleteHandler) {
Log.traceCall();
if (!shutDownInProgress) {
shutDownInProgress = true;
@ -361,12 +386,12 @@ public class P2PService implements SetupListener {
if (networkNode != null)
networkNode.shutDown(() -> {
UserThread.execute(() -> shutDownResultHandlers.stream().forEach(e -> new Thread(e).start()));
shutDownResultHandlers.stream().forEach(e -> e.run());
shutDownComplete = true;
});
} else {
if (shutDownComplete)
new Thread(shutDownCompleteHandler).start();
shutDownCompleteHandler.run();
else
shutDownResultHandlers.add(shutDownCompleteHandler);
log.warn("shutDown already in progress");
@ -380,19 +405,21 @@ public class P2PService implements SetupListener {
public void sendEncryptedMailMessage(Address peerAddress, PubKeyRing pubKeyRing, MailMessage message,
SendMailMessageListener sendMailMessageListener) {
Log.traceCall();
checkNotNull(peerAddress, "PeerAddress must not be null (sendEncryptedMailMessage)");
checkAuthentication();
if (!authenticatedPeerAddresses.contains(peerAddress))
peerGroup.authenticateToPeer(peerAddress,
() -> doSendEncryptedMailMessage(peerAddress, pubKeyRing, message, sendMailMessageListener),
() -> UserThread.execute(() -> sendMailMessageListener.onFault()));
() -> sendMailMessageListener.onFault());
else
doSendEncryptedMailMessage(peerAddress, pubKeyRing, message, sendMailMessageListener);
}
private void doSendEncryptedMailMessage(Address peerAddress, PubKeyRing pubKeyRing, MailMessage message,
SendMailMessageListener sendMailMessageListener) {
Log.traceCall();
if (encryptionService != null) {
try {
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(
@ -401,24 +428,25 @@ public class P2PService implements SetupListener {
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
UserThread.execute(() -> sendMailMessageListener.onArrived());
sendMailMessageListener.onArrived();
}
@Override
public void onFailure(@NotNull Throwable throwable) {
throwable.printStackTrace();
UserThread.execute(() -> sendMailMessageListener.onFault());
sendMailMessageListener.onFault();
}
});
} catch (CryptoException e) {
e.printStackTrace();
UserThread.execute(() -> sendMailMessageListener.onFault());
sendMailMessageListener.onFault();
}
}
}
public void sendEncryptedMailboxMessage(Address peerAddress, PubKeyRing peersPubKeyRing,
MailboxMessage message, SendMailboxMessageListener sendMailboxMessageListener) {
Log.traceCall();
checkNotNull(peerAddress, "PeerAddress must not be null (sendEncryptedMailboxMessage)");
checkArgument(!keyRing.getPubKeyRing().equals(peersPubKeyRing), "We got own keyring instead of that from peer");
checkAuthentication();
@ -437,6 +465,7 @@ public class P2PService implements SetupListener {
private void trySendEncryptedMailboxMessage(Address peerAddress, PubKeyRing peersPubKeyRing,
MailboxMessage message, SendMailboxMessageListener sendMailboxMessageListener) {
Log.traceCall();
if (encryptionService != null) {
try {
SealedAndSignedMessage sealedAndSignedMessage = new SealedAndSignedMessage(
@ -446,7 +475,7 @@ public class P2PService implements SetupListener {
@Override
public void onSuccess(@Nullable Connection connection) {
log.trace("SendEncryptedMailboxMessage onSuccess");
UserThread.execute(() -> sendMailboxMessageListener.onArrived());
sendMailboxMessageListener.onArrived();
}
@Override
@ -460,13 +489,13 @@ public class P2PService implements SetupListener {
keyRing.getSignatureKeyPair().getPublic(),
receiverStoragePublicKey),
receiverStoragePublicKey);
UserThread.execute(() -> sendMailboxMessageListener.onStoredInMailbox());
sendMailboxMessageListener.onStoredInMailbox();
}
});
} catch (CryptoException e) {
e.printStackTrace();
log.error("sendEncryptedMessage failed");
UserThread.execute(() -> sendMailboxMessageListener.onFault());
sendMailboxMessageListener.onFault();
}
}
}
@ -477,6 +506,7 @@ public class P2PService implements SetupListener {
///////////////////////////////////////////////////////////////////////////////////////////
public boolean addData(ExpirablePayload expirablePayload) {
Log.traceCall();
checkAuthentication();
try {
@ -489,6 +519,7 @@ public class P2PService implements SetupListener {
}
private void addMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) {
Log.traceCall();
checkAuthentication();
try {
@ -500,6 +531,7 @@ public class P2PService implements SetupListener {
}
public boolean removeData(ExpirablePayload expirablePayload) {
Log.traceCall();
checkAuthentication();
try {
@ -512,6 +544,7 @@ public class P2PService implements SetupListener {
}
public void removeEntryFromMailbox(DecryptedMsgWithPubKey decryptedMsgWithPubKey) {
Log.traceCall();
checkAuthentication();
ProtectedMailboxData mailboxData = mailboxMap.get(decryptedMsgWithPubKey);
@ -525,6 +558,7 @@ public class P2PService implements SetupListener {
}
private void removeMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) {
Log.traceCall();
checkAuthentication();
try {
@ -536,6 +570,7 @@ public class P2PService implements SetupListener {
}
public Map<BigInteger, ProtectedData> getDataMap() {
Log.traceCall();
return dataStorage.getMap();
}
@ -545,38 +580,47 @@ public class P2PService implements SetupListener {
///////////////////////////////////////////////////////////////////////////////////////////
public void addMessageListener(MessageListener messageListener) {
Log.traceCall();
networkNode.addMessageListener(messageListener);
}
public void removeMessageListener(MessageListener messageListener) {
Log.traceCall();
networkNode.removeMessageListener(messageListener);
}
public void addDecryptedMailListener(DecryptedMailListener listener) {
Log.traceCall();
decryptedMailListeners.add(listener);
}
public void removeDecryptedMailListener(DecryptedMailListener listener) {
Log.traceCall();
decryptedMailListeners.remove(listener);
}
public void addDecryptedMailboxListener(DecryptedMailboxListener listener) {
Log.traceCall();
decryptedMailboxListeners.add(listener);
}
public void removeDecryptedMailboxListener(DecryptedMailboxListener listener) {
Log.traceCall();
decryptedMailboxListeners.remove(listener);
}
public void addP2PServiceListener(P2PServiceListener listener) {
Log.traceCall();
p2pServiceListeners.add(listener);
}
public void removeP2PServiceListener(P2PServiceListener listener) {
Log.traceCall();
p2pServiceListeners.remove(listener);
}
public void addHashSetChangedListener(HashMapChangedListener hashMapChangedListener) {
Log.traceCall();
dataStorage.addHashMapChangedListener(hashMapChangedListener);
}
@ -586,22 +630,27 @@ public class P2PService implements SetupListener {
///////////////////////////////////////////////////////////////////////////////////////////
public boolean isAuthenticated() {
Log.traceCall();
return authenticated.get();
}
public NetworkNode getNetworkNode() {
Log.traceCall();
return networkNode;
}
public PeerGroup getPeerGroup() {
Log.traceCall();
return peerGroup;
}
public Address getAddress() {
Log.traceCall();
return networkNode.getAddress();
}
public NetworkStatistics getNetworkStatistics() {
Log.traceCall();
return networkStatistics;
}
@ -611,10 +660,12 @@ public class P2PService implements SetupListener {
///////////////////////////////////////////////////////////////////////////////////////////
private HashSet<ProtectedData> getDataSet() {
Log.traceCall();
return new HashSet<>(dataStorage.getMap().values());
}
private void tryDecryptMailboxData(ProtectedMailboxData mailboxData) {
Log.traceCall();
if (encryptionService != null) {
ExpirablePayload data = mailboxData.expirablePayload;
if (data instanceof ExpirableMailboxPayload) {
@ -627,12 +678,11 @@ public class P2PService implements SetupListener {
Address senderAddress = mailboxMessage.getSenderAddress();
checkNotNull(senderAddress, "senderAddress must not be null for mailbox messages");
mailboxMap.put(decryptedMsgWithPubKey, mailboxData);
log.trace("Decryption of SealedAndSignedMessage succeeded. senderAddress="
+ senderAddress + " / my address=" + getAddress());
UserThread.execute(() -> decryptedMailboxListeners.stream().forEach(
e -> e.onMailboxMessageAdded(decryptedMsgWithPubKey, senderAddress)));
decryptedMailboxListeners.stream().forEach(
e -> e.onMailboxMessageAdded(decryptedMsgWithPubKey, senderAddress));
}
} catch (CryptoException e) {
log.trace("Decryption of SealedAndSignedMessage failed. That is expected if the message is not intended for us. " + e.getMessage());
@ -642,6 +692,7 @@ public class P2PService implements SetupListener {
}
private void checkAuthentication() {
Log.traceCall();
if (authenticatedPeerAddresses.isEmpty())
throw new AuthenticationException("You must be authenticated before adding data to the P2P network.");
}

View File

@ -5,7 +5,7 @@ import io.bitsquare.p2p.network.SetupListener;
public interface P2PServiceListener extends SetupListener {
void onAllDataReceived();
void onRequestingDataCompleted();
void onAuthenticated();
}

View File

@ -2,6 +2,7 @@ package io.bitsquare.p2p.network;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import io.bitsquare.app.Log;
import io.bitsquare.common.ByteArrayUtils;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Address;
@ -24,7 +25,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Connection is created by the server thread or by send message from NetworkNode.
* Connection is created by the server thread or by sendMessage from NetworkNode.
* All handlers are called on User thread.
* Shared data between InputHandler thread and that
*/
@ -34,7 +35,7 @@ public class Connection {
private static final int MAX_ILLEGAL_REQUESTS = 5;
private static final int SOCKET_TIMEOUT = 10 * 60 * 1000; // 10 min. //TODO set shorter
private InputHandler inputHandler;
private boolean isAuthenticated;
private volatile boolean isAuthenticated;
public static int getMaxMsgSize() {
return MAX_MSG_SIZE;
@ -43,6 +44,7 @@ public class Connection {
private final String portInfo;
private final String uid;
private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
public final String objectId = super.toString().split("@")[1];
// set in init
private ObjectOutputStream objectOutputStream;
@ -66,6 +68,7 @@ public class Connection {
///////////////////////////////////////////////////////////////////////////////////////////
public Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener) {
Log.traceCall();
portInfo = "localPort=" + socket.getLocalPort() + "/port=" + socket.getPort();
uid = UUID.randomUUID().toString();
@ -73,6 +76,7 @@ public class Connection {
}
private void init(Socket socket, MessageListener messageListener, ConnectionListener connectionListener) {
Log.traceCall();
sharedSpace = new SharedSpace(this, socket, messageListener, connectionListener, useCompression);
try {
socket.setSoTimeout(SOCKET_TIMEOUT);
@ -102,31 +106,38 @@ public class Connection {
// API
///////////////////////////////////////////////////////////////////////////////////////////
// Called form UserThread
public void setAuthenticated(Address peerAddress, Connection connection) {
this.peerAddress = peerAddress;
Log.traceCall();
synchronized (peerAddress) {
this.peerAddress = peerAddress;
}
isAuthenticated = true;
UserThread.execute(() -> sharedSpace.getConnectionListener().onPeerAddressAuthenticated(peerAddress, connection));
sharedSpace.getConnectionListener().onPeerAddressAuthenticated(peerAddress, connection);
}
// Called form various threads
public void sendMessage(Message message) {
Log.traceCall();
if (!stopped) {
try {
log.trace("writeObject " + message + " on connection with port " + portInfo);
log.info("writeObject " + message + " on connection with port " + portInfo);
Object objectToWrite;
if (useCompression) {
byte[] messageAsBytes = ByteArrayUtils.objectToByteArray(message);
// log.trace("Write object uncompressed data size: " + messageAsBytes.length);
byte[] compressed = Utils.compress(message);
//log.trace("Write object compressed data size: " + compressed.length);
objectToWrite = compressed;
} else {
// log.trace("Write object data size: " + ByteArrayUtils.objectToByteArray(message).length);
objectToWrite = message;
}
if (!stopped) {
Object objectToWrite;
if (useCompression) {
byte[] messageAsBytes = ByteArrayUtils.objectToByteArray(message);
// log.trace("Write object uncompressed data size: " + messageAsBytes.length);
byte[] compressed = Utils.compress(message);
//log.trace("Write object compressed data size: " + compressed.length);
objectToWrite = compressed;
} else {
// log.trace("Write object data size: " + ByteArrayUtils.objectToByteArray(message).length);
objectToWrite = message;
synchronized (objectOutputStream) {
objectOutputStream.writeObject(objectToWrite);
objectOutputStream.flush();
}
objectOutputStream.writeObject(objectToWrite);
objectOutputStream.flush();
sharedSpace.updateLastActivityDate();
}
} catch (IOException e) {
@ -134,37 +145,47 @@ public class Connection {
sharedSpace.handleConnectionException(e);
}
} else {
log.debug("sendMessage after stopped");
log.debug("called sendMessage but was already stopped");
}
}
public void reportIllegalRequest(IllegalRequest illegalRequest) {
Log.traceCall();
sharedSpace.reportIllegalRequest(illegalRequest);
}
public synchronized void setPeerAddress(@Nullable Address peerAddress) {
Log.traceCall();
this.peerAddress = peerAddress;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Getters
///////////////////////////////////////////////////////////////////////////////////////////
@Nullable
public Address getPeerAddress() {
public synchronized Address getPeerAddress() {
//Log.traceCall();
return peerAddress;
}
public Date getLastActivityDate() {
//Log.traceCall();
return sharedSpace.getLastActivityDate();
}
public boolean isAuthenticated() {
//Log.traceCall();
return isAuthenticated;
}
public String getUid() {
Log.traceCall();
return uid;
}
public boolean isStopped() {
Log.traceCall();
return stopped;
}
@ -174,26 +195,30 @@ public class Connection {
///////////////////////////////////////////////////////////////////////////////////////////
public void shutDown(Runnable completeHandler) {
// Log.traceCall();
shutDown(true, completeHandler);
}
public void shutDown() {
//Log.traceCall();
shutDown(true, null);
}
private void shutDown(boolean sendCloseConnectionMessage) {
//Log.traceCall();
shutDown(sendCloseConnectionMessage, null);
}
private void shutDown(boolean sendCloseConnectionMessage, @Nullable Runnable shutDownCompleteHandler) {
Log.traceCall(this.toString());
if (!stopped) {
log.info("\n\n############################################################\n" +
"ShutDown connection:"
+ "\npeerAddress=" + peerAddress
+ "\nlocalPort/port=" + sharedSpace.getSocket().getLocalPort()
+ "/" + sharedSpace.getSocket().getPort()
+ "\nobjectId=" + getObjectId() + " / uid=" + getUid()
+ "\nisAuthenticated=" + isAuthenticated()
+ "\nobjectId=" + objectId + " / uid=" + uid
+ "\nisAuthenticated=" + isAuthenticated
+ "\n############################################################\n");
log.trace("ShutDown connection requested. Connection=" + this.toString());
@ -205,10 +230,9 @@ public class Connection {
if (sendCloseConnectionMessage) {
new Thread(() -> {
Thread.currentThread().setName("Connection:SendCloseConnectionMessage-" + this.getObjectId());
Thread.currentThread().setName("Connection:SendCloseConnectionMessage-" + this.objectId);
try {
sendMessage(new CloseConnectionMessage());
// give a bit of time for closing gracefully
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
t.printStackTrace();
@ -224,10 +248,12 @@ public class Connection {
}
private void continueShutDown(@Nullable Runnable shutDownCompleteHandler) {
Log.traceCall();
ConnectionListener.Reason shutDownReason = sharedSpace.getShutDownReason();
if (shutDownReason == null)
shutDownReason = ConnectionListener.Reason.SHUT_DOWN;
final ConnectionListener.Reason finalShutDownReason = shutDownReason;
// keep UserThread.execute as its not clear if that is called from a non-UserThread
UserThread.execute(() -> sharedSpace.getConnectionListener().onDisconnect(finalShutDownReason, this));
try {
@ -236,12 +262,12 @@ public class Connection {
log.trace("SocketException at shutdown might be expected " + e.getMessage());
} catch (IOException e) {
e.printStackTrace();
log.error("Exception at shutdown. " + e.getMessage());
} finally {
MoreExecutors.shutdownAndAwaitTermination(singleThreadExecutor, 500, TimeUnit.MILLISECONDS);
log.debug("Connection shutdown complete " + this.toString());
// dont use executorService as its shut down but call handler on own thread
// to not get interrupted by caller
// keep UserThread.execute as its not clear if that is called from a non-UserThread
if (shutDownCompleteHandler != null)
UserThread.execute(shutDownCompleteHandler);
}
@ -273,7 +299,7 @@ public class Connection {
return "Connection{" +
"portInfo=" + portInfo +
", uid='" + uid + '\'' +
", objectId='" + getObjectId() + '\'' +
", objectId='" + objectId + '\'' +
", sharedSpace=" + sharedSpace.toString() +
", peerAddress=" + peerAddress +
", isAuthenticated=" + isAuthenticated +
@ -283,21 +309,13 @@ public class Connection {
'}';
}
public String getObjectId() {
return super.toString().split("@")[1];
}
public void setPeerAddress(@Nullable Address peerAddress) {
this.peerAddress = peerAddress;
}
///////////////////////////////////////////////////////////////////////////////////////////
// SharedSpace
///////////////////////////////////////////////////////////////////////////////////////////
/**
* Holds all shared data between Connection and InputHandler
* Runs in same thread as Connection
*/
private static class SharedSpace {
private static final Logger log = LoggerFactory.getLogger(SharedSpace.class);
@ -316,6 +334,7 @@ public class Connection {
public SharedSpace(Connection connection, Socket socket, MessageListener messageListener,
ConnectionListener connectionListener, boolean useCompression) {
Log.traceCall();
this.connection = connection;
this.socket = socket;
this.messageListener = messageListener;
@ -323,15 +342,18 @@ public class Connection {
this.useCompression = useCompression;
}
public void updateLastActivityDate() {
public synchronized void updateLastActivityDate() {
Log.traceCall();
lastActivityDate = new Date();
}
public Date getLastActivityDate() {
public synchronized Date getLastActivityDate() {
// Log.traceCall();
return lastActivityDate;
}
public void reportIllegalRequest(IllegalRequest illegalRequest) {
Log.traceCall();
log.warn("We got reported an illegal request " + illegalRequest);
int prevCounter = illegalRequests.get(illegalRequest);
if (prevCounter > illegalRequest.maxTolerance) {
@ -343,6 +365,7 @@ public class Connection {
}
public void handleConnectionException(Exception e) {
Log.traceCall();
if (e instanceof SocketException) {
if (socket.isClosed())
shutDownReason = ConnectionListener.Reason.SOCKET_CLOSED;
@ -358,32 +381,50 @@ public class Connection {
e.printStackTrace();
}
if (!stopped)
if (!stopped) {
stopped = true;
connection.shutDown(false);
}
}
public void onMessage(Message message) {
Log.traceCall();
UserThread.execute(() -> messageListener.onMessage(message, connection));
}
public boolean useCompression() {
Log.traceCall();
return useCompression;
}
public void shutDown(boolean sendCloseConnectionMessage) {
Log.traceCall();
connection.shutDown(sendCloseConnectionMessage);
}
public ConnectionListener getConnectionListener() {
public synchronized ConnectionListener getConnectionListener() {
Log.traceCall();
return connectionListener;
}
public Socket getSocket() {
public synchronized Socket getSocket() {
//Log.traceCall();
return socket;
}
public String getConnectionId() {
return connection.getObjectId();
Log.traceCall();
return connection.objectId;
}
public void stop() {
Log.traceCall();
this.stopped = true;
}
public synchronized ConnectionListener.Reason getShutDownReason() {
Log.traceCall();
return shutDownReason;
}
@Override
@ -396,13 +437,6 @@ public class Connection {
'}';
}
public void stop() {
this.stopped = true;
}
public ConnectionListener.Reason getShutDownReason() {
return shutDownReason;
}
}
@ -410,6 +444,7 @@ public class Connection {
// InputHandler
///////////////////////////////////////////////////////////////////////////////////////////
// Runs in same thread as Connection
private static class InputHandler implements Runnable {
private static final Logger log = LoggerFactory.getLogger(InputHandler.class);
@ -419,24 +454,27 @@ public class Connection {
private volatile boolean stopped;
public InputHandler(SharedSpace sharedSpace, ObjectInputStream objectInputStream, String portInfo) {
Log.traceCall();
this.sharedSpace = sharedSpace;
this.objectInputStream = objectInputStream;
this.portInfo = portInfo;
}
public void stop() {
Log.traceCall();
stopped = true;
}
@Override
public void run() {
Log.traceCall();
try {
Thread.currentThread().setName("InputHandler-" + portInfo);
while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
log.trace("InputHandler waiting for incoming messages connection=" + sharedSpace.getConnectionId());
Object rawInputObject = objectInputStream.readObject();
log.trace("New data arrived at inputHandler of connection=" + sharedSpace.getConnectionId()
log.info("New data arrived at inputHandler of connection=" + sharedSpace.getConnectionId()
+ " rawInputObject " + rawInputObject);
int size = ByteArrayUtils.objectToByteArray(rawInputObject).length;
@ -468,18 +506,16 @@ public class Connection {
if (message instanceof CloseConnectionMessage) {
stopped = true;
sharedSpace.shutDown(false);
} else {
} else if (!stopped) {
sharedSpace.onMessage(message);
}
} else {
sharedSpace.reportIllegalRequest(IllegalRequest.InvalidDataType);
}
} else {
log.error("Received decompressed data exceeds max. msg size.");
sharedSpace.reportIllegalRequest(IllegalRequest.MaxSizeExceeded);
}
} else {
log.error("Received compressed data exceeds max. msg size.");
sharedSpace.reportIllegalRequest(IllegalRequest.MaxSizeExceeded);
}
} catch (IOException | ClassNotFoundException e) {
@ -489,7 +525,8 @@ public class Connection {
}
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
stopped = true;
sharedSpace.handleConnectionException(new Exception(t));
}
}

View File

@ -6,7 +6,9 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext;
import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager;
import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Address;
import io.nucleo.net.HiddenServiceDescriptor;
import io.nucleo.net.TorNode;
@ -18,15 +20,15 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
// Run in UserThread
public class LocalhostNetworkNode extends NetworkNode {
private static final Logger log = LoggerFactory.getLogger(LocalhostNetworkNode.class);
private static int simulateTorDelayTorNode = 1 * 100;
private static int simulateTorDelayHiddenService = 1 * 100;
private static volatile int simulateTorDelayTorNode = 1 * 100;
private static volatile int simulateTorDelayHiddenService = 1 * 100;
private Address address;
public static void setSimulateTorDelayTorNode(int simulateTorDelayTorNode) {
@ -44,28 +46,32 @@ public class LocalhostNetworkNode extends NetworkNode {
public LocalhostNetworkNode(int port) {
super(port);
Log.traceCall();
}
@Override
public void start(@Nullable SetupListener setupListener) {
Log.traceCall();
if (setupListener != null) addSetupListener(setupListener);
createExecutor();
createExecutorService();
//Tor delay simulation
createTorNode(torNode -> {
Log.traceCall("torNode created");
setupListeners.stream().forEach(e -> e.onTorNodeReady());
// Create Hidden Service (takes about 40 sec.)
createHiddenService(hiddenServiceDescriptor -> {
Log.traceCall("hiddenService created");
try {
startServer(new ServerSocket(port));
startServer(new ServerSocket(servicePort));
} catch (IOException e) {
e.printStackTrace();
log.error("Exception at startServer: " + e.getMessage());
}
address = new Address("localhost", port);
address = new Address("localhost", servicePort);
setupListeners.stream().forEach(e -> e.onHiddenServicePublished());
});
@ -76,21 +82,26 @@ public class LocalhostNetworkNode extends NetworkNode {
@Override
@Nullable
public Address getAddress() {
Log.traceCall();
return address;
}
// Called from NetworkNode thread
@Override
protected Socket getSocket(Address peerAddress) throws IOException {
protected Socket createSocket(Address peerAddress) throws IOException {
Log.traceCall();
return new Socket(peerAddress.hostName, peerAddress.port);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Tor delay simulation
///////////////////////////////////////////////////////////////////////////////////////////
private void createTorNode(final Consumer<TorNode> resultHandler) {
Log.traceCall();
ListenableFuture<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:CreateTorNode-" + new Random().nextInt(1000));
Utilities.setThreadName("NetworkNode:CreateTorNode");
try {
long ts = System.currentTimeMillis();
if (simulateTorDelayTorNode > 0)
@ -100,6 +111,7 @@ public class LocalhostNetworkNode extends NetworkNode {
"TorNode created [simulation]:" +
"\nTook " + (System.currentTimeMillis() - ts) + " ms"
+ "\n############################################################\n");
// as we are simulating we return null
return null;
} catch (Throwable t) {
throw t;
@ -107,18 +119,25 @@ public class LocalhostNetworkNode extends NetworkNode {
});
Futures.addCallback(future, new FutureCallback<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>>() {
public void onSuccess(TorNode<JavaOnionProxyManager, JavaOnionProxyContext> torNode) {
UserThread.execute(() -> resultHandler.accept(torNode));
UserThread.execute(() -> {
// as we are simulating we return null
resultHandler.accept(null);
});
}
public void onFailure(@NotNull Throwable throwable) {
log.error("[simulation] TorNode creation failed");
UserThread.execute(() -> {
log.error("[simulation] TorNode creation failed. " + throwable.getMessage());
throwable.printStackTrace();
});
}
});
}
private void createHiddenService(final Consumer<HiddenServiceDescriptor> resultHandler) {
Log.traceCall();
ListenableFuture<HiddenServiceDescriptor> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:CreateHiddenService-" + new Random().nextInt(1000));
Utilities.setThreadName("NetworkNode:CreateHiddenService");
try {
long ts = System.currentTimeMillis();
if (simulateTorDelayHiddenService > 0)
@ -128,6 +147,7 @@ public class LocalhostNetworkNode extends NetworkNode {
"Hidden service created [simulation]:" +
"\nTook " + (System.currentTimeMillis() - ts) + " ms"
+ "\n############################################################\n");
// as we are simulating we return null
return null;
} catch (Throwable t) {
throw t;
@ -135,13 +155,18 @@ public class LocalhostNetworkNode extends NetworkNode {
});
Futures.addCallback(future, new FutureCallback<HiddenServiceDescriptor>() {
public void onSuccess(HiddenServiceDescriptor hiddenServiceDescriptor) {
UserThread.execute(() -> resultHandler.accept(hiddenServiceDescriptor));
UserThread.execute(() -> {
// as we are simulating we return null
resultHandler.accept(null);
});
}
public void onFailure(@NotNull Throwable throwable) {
log.error("[simulation] Hidden service creation failed");
UserThread.execute(() -> {
log.error("[simulation] Hidden service creation failed. " + throwable.getMessage());
throwable.printStackTrace();
});
}
});
}
}

View File

@ -1,6 +1,7 @@
package io.bitsquare.p2p.network;
import com.google.common.util.concurrent.*;
import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Address;
@ -21,11 +22,12 @@ import java.util.concurrent.CopyOnWriteArraySet;
import static com.google.common.base.Preconditions.checkNotNull;
// Run in UserThread
public abstract class NetworkNode implements MessageListener, ConnectionListener {
private static final Logger log = LoggerFactory.getLogger(NetworkNode.class);
protected final int port;
private final CopyOnWriteArraySet<Connection> outBoundConnections = new CopyOnWriteArraySet<>();
protected final int servicePort;
private final CopyOnWriteArraySet<Connection> inBoundConnections = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<MessageListener> messageListeners = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<ConnectionListener> connectionListeners = new CopyOnWriteArraySet<>();
@ -34,13 +36,17 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
private Server server;
private volatile boolean shutDownInProgress;
// accessed from different threads
private final CopyOnWriteArraySet<Connection> outBoundConnections = new CopyOnWriteArraySet<>();
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public NetworkNode(int port) {
this.port = port;
public NetworkNode(int servicePort) {
Log.traceCall();
this.servicePort = servicePort;
}
///////////////////////////////////////////////////////////////////////////////////////////
@ -48,16 +54,17 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
///////////////////////////////////////////////////////////////////////////////////////////
public void start() {
Log.traceCall();
start(null);
}
abstract public void start(@Nullable SetupListener setupListener);
public SettableFuture<Connection> sendMessage(@NotNull Address peerAddress, Message message) {
log.trace("sendMessage message=" + message);
Log.traceCall("message: " + message + " to peerAddress: " + peerAddress);
checkNotNull(peerAddress, "peerAddress must not be null");
Optional<Connection> outboundConnectionOptional = findOutboundConnection(peerAddress);
Optional<Connection> outboundConnectionOptional = lookupOutboundConnection(peerAddress);
Connection connection = outboundConnectionOptional.isPresent() ? outboundConnectionOptional.get() : null;
if (connection != null)
log.trace("We have found a connection in outBoundConnections. Connection.uid=" + connection.getUid());
@ -69,7 +76,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
}
if (connection == null) {
Optional<Connection> inboundConnectionOptional = findInboundConnection(peerAddress);
Optional<Connection> inboundConnectionOptional = lookupInboundConnection(peerAddress);
if (inboundConnectionOptional.isPresent()) connection = inboundConnectionOptional.get();
if (connection != null)
log.trace("We have found a connection in inBoundConnections. Connection.uid=" + connection.getUid());
@ -78,28 +85,28 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
if (connection != null) {
return sendMessage(connection, message);
} else {
log.trace("We have not found any connection for that peerAddress. " +
"We will create a new outbound connection.");
final SettableFuture<Connection> resultFuture = SettableFuture.create();
ListenableFuture<Connection> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:SendMessage-create-new-outbound-connection-to-" + peerAddress);
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peerAddress);
try {
Connection newConnection;
log.trace("We have not found any connection for that peerAddress. " +
"We will create a new outbound connection.");
Socket socket = getSocket(peerAddress); // can take a while when using tor
newConnection = new Connection(socket, NetworkNode.this, NetworkNode.this);
Socket socket = createSocket(peerAddress); // can take a while when using tor
Connection newConnection = new Connection(socket, NetworkNode.this, NetworkNode.this);
newConnection.setPeerAddress(peerAddress);
outBoundConnections.add(newConnection);
log.info("\n\n############################################################\n" +
"NetworkNode created new outbound connection:"
+ "\npeerAddress=" + peerAddress.getFullAddress()
+ "\npeerAddress=" + peerAddress
+ "\nconnection.uid=" + newConnection.getUid()
+ "\nmessage=" + message
+ "\n############################################################\n");
newConnection.sendMessage(message);
return newConnection;
return newConnection; // can take a while when using tor
} catch (Throwable throwable) {
if (!(throwable instanceof ConnectException || throwable instanceof IOException)) {
throwable.printStackTrace();
@ -110,11 +117,15 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
});
Futures.addCallback(future, new FutureCallback<Connection>() {
public void onSuccess(Connection connection) {
UserThread.execute(() -> resultFuture.set(connection));
UserThread.execute(() -> {
resultFuture.set(connection);
});
}
public void onFailure(@NotNull Throwable throwable) {
UserThread.execute(() -> resultFuture.setException(throwable));
UserThread.execute(() -> {
resultFuture.setException(throwable);
});
}
});
return resultFuture;
@ -122,9 +133,10 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
}
public SettableFuture<Connection> sendMessage(Connection connection, Message message) {
Log.traceCall();
// connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block
ListenableFuture<Connection> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:SendMessage-to-connection-" + connection.getObjectId());
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + connection.objectId);
try {
log.debug("## connection.sendMessage");
connection.sendMessage(message);
@ -136,25 +148,29 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
final SettableFuture<Connection> resultFuture = SettableFuture.create();
Futures.addCallback(future, new FutureCallback<Connection>() {
public void onSuccess(Connection connection) {
log.debug("## connection.sendMessage onSuccess");
UserThread.execute(() -> resultFuture.set(connection));
UserThread.execute(() -> {
resultFuture.set(connection);
});
}
public void onFailure(@NotNull Throwable throwable) {
log.debug("## connection.sendMessage onFailure");
UserThread.execute(() -> resultFuture.setException(throwable));
UserThread.execute(() -> {
resultFuture.setException(throwable);
});
}
});
return resultFuture;
}
public Set<Connection> getAllConnections() {
Log.traceCall();
Set<Connection> set = new HashSet<>(inBoundConnections);
set.addAll(outBoundConnections);
return set;
}
public void shutDown(Runnable shutDownCompleteHandler) {
Log.traceCall();
log.info("Shutdown NetworkNode");
if (!shutDownInProgress) {
shutDownInProgress = true;
@ -166,7 +182,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
getAllConnections().stream().forEach(e -> e.shutDown());
log.info("NetworkNode shutdown complete");
if (shutDownCompleteHandler != null) UserThread.execute(() -> shutDownCompleteHandler.run());
if (shutDownCompleteHandler != null) shutDownCompleteHandler.run();
}
}
@ -176,6 +192,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
///////////////////////////////////////////////////////////////////////////////////////////
public void addSetupListener(SetupListener setupListener) {
Log.traceCall();
setupListeners.add(setupListener);
}
@ -185,38 +202,43 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
///////////////////////////////////////////////////////////////////////////////////////////
public void addConnectionListener(ConnectionListener connectionListener) {
Log.traceCall();
connectionListeners.add(connectionListener);
}
public void removeConnectionListener(ConnectionListener connectionListener) {
Log.traceCall();
connectionListeners.remove(connectionListener);
}
@Override
public void onConnection(Connection connection) {
connectionListeners.stream().forEach(e -> UserThread.execute(() -> e.onConnection(connection)));
Log.traceCall();
connectionListeners.stream().forEach(e -> e.onConnection(connection));
}
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
log.trace("onAuthenticationComplete peerAddress=" + peerAddress);
log.trace("onAuthenticationComplete connection=" + connection);
Log.traceCall();
log.trace("onAuthenticationComplete peerAddress/connection: " + peerAddress + " / " + connection);
connectionListeners.stream().forEach(e -> UserThread.execute(() -> e.onPeerAddressAuthenticated(peerAddress, connection)));
connectionListeners.stream().forEach(e -> e.onPeerAddressAuthenticated(peerAddress, connection));
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall();
Address peerAddress = connection.getPeerAddress();
log.trace("onDisconnect connection " + connection + ", peerAddress= " + peerAddress);
outBoundConnections.remove(connection);
inBoundConnections.remove(connection);
connectionListeners.stream().forEach(e -> UserThread.execute(() -> e.onDisconnect(reason, connection)));
connectionListeners.stream().forEach(e -> e.onDisconnect(reason, connection));
}
@Override
public void onError(Throwable throwable) {
connectionListeners.stream().forEach(e -> UserThread.execute(() -> e.onError(throwable)));
Log.traceCall();
connectionListeners.stream().forEach(e -> e.onError(throwable));
}
@ -225,16 +247,19 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
///////////////////////////////////////////////////////////////////////////////////////////
public void addMessageListener(MessageListener messageListener) {
Log.traceCall();
messageListeners.add(messageListener);
}
public void removeMessageListener(MessageListener messageListener) {
Log.traceCall();
messageListeners.remove(messageListener);
}
@Override
public void onMessage(Message message, Connection connection) {
messageListeners.stream().forEach(e -> UserThread.execute(() -> e.onMessage(message, connection)));
Log.traceCall();
messageListeners.stream().forEach(e -> e.onMessage(message, connection));
}
@ -242,16 +267,19 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
// Protected
///////////////////////////////////////////////////////////////////////////////////////////
protected void createExecutor() {
executorService = Utilities.getListeningExecutorService("NetworkNode-" + port, 20, 50, 120L);
protected void createExecutorService() {
Log.traceCall();
executorService = Utilities.getListeningExecutorService("NetworkNode-" + servicePort, 20, 50, 120L);
}
protected void startServer(ServerSocket serverSocket) {
Log.traceCall();
server = new Server(serverSocket,
(message, connection) -> NetworkNode.this.onMessage(message, connection),
new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
Log.traceCall();
// we still have not authenticated so put it to the temp list
inBoundConnections.add(connection);
NetworkNode.this.onConnection(connection);
@ -259,36 +287,42 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
Log.traceCall();
NetworkNode.this.onPeerAddressAuthenticated(peerAddress, connection);
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall();
Address peerAddress = connection.getPeerAddress();
log.trace("onDisconnect at incoming connection to peerAddress " + peerAddress);
log.trace("onDisconnect at incoming connection to peerAddress (or connection) "
+ ((peerAddress == null) ? connection : peerAddress));
inBoundConnections.remove(connection);
NetworkNode.this.onDisconnect(reason, connection);
}
@Override
public void onError(Throwable throwable) {
Log.traceCall();
NetworkNode.this.onError(throwable);
}
});
executorService.submit(server);
}
private Optional<Connection> findOutboundConnection(Address peerAddress) {
private Optional<Connection> lookupOutboundConnection(Address peerAddress) {
Log.traceCall();
return outBoundConnections.stream()
.filter(e -> peerAddress.equals(e.getPeerAddress())).findAny();
}
private Optional<Connection> findInboundConnection(Address peerAddress) {
private Optional<Connection> lookupInboundConnection(Address peerAddress) {
Log.traceCall();
return inBoundConnections.stream()
.filter(e -> peerAddress.equals(e.getPeerAddress())).findAny();
}
abstract protected Socket getSocket(Address peerAddress) throws IOException;
abstract protected Socket createSocket(Address peerAddress) throws IOException;
@Nullable
abstract public Address getAddress();

View File

@ -1,5 +1,6 @@
package io.bitsquare.p2p.network;
import io.bitsquare.app.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -7,20 +8,24 @@ import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
// Runs in UserThread
class Server implements Runnable {
private static final Logger log = LoggerFactory.getLogger(Server.class);
private final ServerSocket serverSocket;
private final MessageListener messageListener;
private final ConnectionListener connectionListener;
private final Set<Connection> connections = new HashSet<>();
// accessed from different threads
private final ServerSocket serverSocket;
private final Set<Connection> connections = new CopyOnWriteArraySet<>();
private volatile boolean stopped;
public Server(ServerSocket serverSocket, MessageListener messageListener, ConnectionListener connectionListener) {
Log.traceCall();
this.serverSocket = serverSocket;
this.messageListener = messageListener;
this.connectionListener = connectionListener;
@ -28,14 +33,15 @@ class Server implements Runnable {
@Override
public void run() {
Log.traceCall();
try {
// Thread created by NetworkNode
Thread.currentThread().setName("NetworkNode:Server-" + serverSocket.getLocalPort());
Thread.currentThread().setName("Server-" + serverSocket.getLocalPort());
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
log.info("Ready to accept new clients on port " + serverSocket.getLocalPort());
final Socket socket = serverSocket.accept();
if (!stopped) {
if (!stopped && !Thread.currentThread().isInterrupted()) {
log.info("Accepted new client on localPort/port " + socket.getLocalPort() + "/" + socket.getPort());
Connection connection = new Connection(socket, messageListener, connectionListener);
@ -61,6 +67,7 @@ class Server implements Runnable {
}
public void shutDown() {
Log.traceCall();
if (!stopped) {
stopped = true;

View File

@ -6,7 +6,9 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.msopentech.thali.java.toronionproxy.JavaOnionProxyContext;
import com.msopentech.thali.java.toronionproxy.JavaOnionProxyManager;
import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.Utils;
import io.nucleo.net.HiddenServiceDescriptor;
@ -19,29 +21,24 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.Socket;
import java.util.Random;
import java.util.Timer;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static com.google.common.base.Preconditions.checkArgument;
// Run in UserThread
public class TorNetworkNode extends NetworkNode {
private static final Logger log = LoggerFactory.getLogger(TorNetworkNode.class);
private static final Random random = new Random();
private static final long TIMEOUT = 5000;
private static final int MAX_ERRORS_BEFORE_RESTART = 3;
private static final int MAX_RESTART_ATTEMPTS = 3;
private static final int WAIT_BEFORE_RESTART = 2000;
private static final long SHUT_DOWN_TIMEOUT = 5000;
private final File torDir;
private TorNode torNode;
private TorNode torNetworkNode;
private HiddenServiceDescriptor hiddenServiceDescriptor;
private Timer shutDownTimeoutTimer;
private long nonce;
private int errorCounter;
private int restartCounter;
private Runnable shutDownCompleteHandler;
private boolean torShutDownComplete, networkNodeShutDownDoneComplete;
@ -53,7 +50,7 @@ public class TorNetworkNode extends NetworkNode {
public TorNetworkNode(int servicePort, File torDir) {
super(servicePort);
Log.traceCall();
this.torDir = torDir;
}
@ -64,39 +61,56 @@ public class TorNetworkNode extends NetworkNode {
@Override
public void start(@Nullable SetupListener setupListener) {
Log.traceCall();
if (setupListener != null)
addSetupListener(setupListener);
createExecutor();
createExecutorService();
// Create the tor node (takes about 6 sec.)
createTorNode(torDir, torNode -> {
TorNetworkNode.this.torNode = torNode;
Log.traceCall("torNode created");
TorNetworkNode.this.torNetworkNode = torNode;
setupListeners.stream().forEach(e -> UserThread.execute(() -> e.onTorNodeReady()));
setupListeners.stream().forEach(e -> e.onTorNodeReady());
// Create Hidden Service (takes about 40 sec.)
createHiddenService(torNode, Utils.findFreeSystemPort(), port, hiddenServiceDescriptor -> {
TorNetworkNode.this.hiddenServiceDescriptor = hiddenServiceDescriptor;
createHiddenService(torNode,
Utils.findFreeSystemPort(),
servicePort,
hiddenServiceDescriptor -> {
Log.traceCall("hiddenService created");
TorNetworkNode.this.hiddenServiceDescriptor = hiddenServiceDescriptor;
startServer(hiddenServiceDescriptor.getServerSocket());
UserThread.runAfter(() -> setupListeners.stream().forEach(e -> e.onHiddenServicePublished()),
500, TimeUnit.MILLISECONDS);
});
startServer(hiddenServiceDescriptor.getServerSocket());
setupListeners.stream().forEach(e -> e.onHiddenServicePublished());
/* UserThread.runAfter(() -> setupListeners.stream().forEach(e -> e.onHiddenServicePublished()),
500, TimeUnit.MILLISECONDS);*/
});
});
}
@Override
@Nullable
public Address getAddress() {
Log.traceCall();
if (hiddenServiceDescriptor != null)
return new Address(hiddenServiceDescriptor.getFullAddress());
else
return null;
}
@Override
protected Socket createSocket(Address peerAddress) throws IOException {
Log.traceCall();
checkArgument(peerAddress.hostName.endsWith(".onion"), "PeerAddress is not an onion address");
return torNetworkNode.connectToHiddenService(peerAddress.hostName, peerAddress.port);
}
//TODO simplify
public void shutDown(Runnable shutDownCompleteHandler) {
log.info("Shutdown TorNetworkNode");
Log.traceCall();
this.shutDownCompleteHandler = shutDownCompleteHandler;
shutDownTimeoutTimer = UserThread.runAfter(() -> {
@ -105,41 +119,41 @@ public class TorNetworkNode extends NetworkNode {
}, SHUT_DOWN_TIMEOUT, TimeUnit.MILLISECONDS);
if (executorService != null) {
executorService.submit(() -> super.shutDown(() -> {
executorService.submit(() -> {
Utilities.setThreadName("TorNetworkNodeShutDownSuperClass");
UserThread.execute(() -> {
// We want to stay in UserThread
super.shutDown(() -> {
networkNodeShutDownDoneComplete = true;
if (torShutDownComplete)
shutDownExecutorService();
}
));
});
});
});
} else {
log.error("executorService must not be null at shutDown");
}
ListenableFuture<?> future2 = executorService.submit(() -> {
executorService.submit(() -> {
Utilities.setThreadName("NetworkNode:torNodeShutdown");
try {
long ts = System.currentTimeMillis();
log.info("Shutdown torNode");
if (torNode != null)
torNode.shutdown();
// Might take a bit so we use a thread
if (torNetworkNode != null)
torNetworkNode.shutdown();
log.info("Shutdown torNode done after " + (System.currentTimeMillis() - ts) + " ms.");
UserThread.execute(() -> {
torShutDownComplete = true;
if (networkNodeShutDownDoneComplete)
shutDownExecutorService();
});
} catch (Throwable e) {
e.printStackTrace();
log.error("Shutdown torNode failed with exception: " + e.getMessage());
shutDownExecutorService();
}
});
Futures.addCallback(future2, new FutureCallback<Object>() {
@Override
public void onSuccess(Object o) {
torShutDownComplete = true;
if (networkNodeShutDownDoneComplete)
UserThread.execute(() -> {
e.printStackTrace();
log.error("Shutdown torNode failed with exception: " + e.getMessage());
// We want to switch to UserThread
shutDownExecutorService();
}
@Override
public void onFailure(@NotNull Throwable throwable) {
throwable.printStackTrace();
log.error("Shutdown torNode failed with exception: " + throwable.getMessage());
shutDownExecutorService();
});
}
});
}
@ -149,26 +163,27 @@ public class TorNetworkNode extends NetworkNode {
///////////////////////////////////////////////////////////////////////////////////////////
private void shutDownExecutorService() {
Log.traceCall();
shutDownTimeoutTimer.cancel();
new Thread(() -> {
Thread.currentThread().setName("NetworkNode:shutDownExecutorService-" + new Random().nextInt(1000));
Utilities.setThreadName("NetworkNode:shutDownExecutorService");
try {
long ts = System.currentTimeMillis();
log.info("Shutdown executorService");
log.debug("Shutdown executorService");
MoreExecutors.shutdownAndAwaitTermination(executorService, 500, TimeUnit.MILLISECONDS);
log.info("Shutdown executorService done after " + (System.currentTimeMillis() - ts) + " ms.");
log.debug("Shutdown executorService done after " + (System.currentTimeMillis() - ts) + " ms.");
log.info("Shutdown completed");
UserThread.execute(() -> shutDownCompleteHandler.run());
shutDownCompleteHandler.run();
} catch (Throwable t) {
t.printStackTrace();
log.error("Shutdown executorService failed with exception: " + t.getMessage());
UserThread.execute(() -> shutDownCompleteHandler.run());
shutDownCompleteHandler.run();
}
}).start();
}
private void restartTor() {
Log.traceCall();
restartCounter++;
if (restartCounter <= MAX_RESTART_ATTEMPTS) {
shutDown(() -> UserThread.runAfter(() -> {
@ -189,8 +204,9 @@ public class TorNetworkNode extends NetworkNode {
///////////////////////////////////////////////////////////////////////////////////////////
private void createTorNode(final File torDir, final Consumer<TorNode> resultHandler) {
Log.traceCall();
ListenableFuture<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:CreateTorNode-" + new Random().nextInt(1000));
Utilities.setThreadName("TorNetworkNode:CreateTorNode");
try {
long ts = System.currentTimeMillis();
if (torDir.mkdirs())
@ -198,34 +214,41 @@ public class TorNetworkNode extends NetworkNode {
log.info("TorDir = " + torDir.getAbsolutePath());
log.trace("Create TorNode");
TorNode<JavaOnionProxyManager, JavaOnionProxyContext> torNode1 = new TorNode<JavaOnionProxyManager, JavaOnionProxyContext>(
torDir) {
};
TorNode<JavaOnionProxyManager, JavaOnionProxyContext> torNode =
new TorNode<JavaOnionProxyManager, JavaOnionProxyContext>(torDir) {
};
log.info("\n\n############################################################\n" +
"TorNode created:" +
"\nTook " + (System.currentTimeMillis() - ts) + " ms"
+ "\n############################################################\n");
return torNode1;
return torNode;
} catch (Throwable t) {
throw t;
}
});
Futures.addCallback(future, new FutureCallback<TorNode<JavaOnionProxyManager, JavaOnionProxyContext>>() {
public void onSuccess(TorNode<JavaOnionProxyManager, JavaOnionProxyContext> torNode) {
resultHandler.accept(torNode);
Log.traceCall();
UserThread.execute(() -> {
resultHandler.accept(torNode);
});
}
public void onFailure(@NotNull Throwable throwable) {
log.error("TorNode creation failed with exception: " + throwable.getMessage());
restartTor();
Log.traceCall();
UserThread.execute(() -> {
log.error("TorNode creation failed with exception: " + throwable.getMessage());
restartTor();
});
}
});
}
private void createHiddenService(TorNode torNode, int localPort, int servicePort,
Consumer<HiddenServiceDescriptor> resultHandler) {
Log.traceCall();
ListenableFuture<HiddenServiceDescriptor> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:CreateHiddenService-" + new Random().nextInt(1000));
Utilities.setThreadName("TorNetworkNode:CreateHiddenService");
try {
long ts = System.currentTimeMillis();
log.debug("Create hidden service");
@ -243,23 +266,17 @@ public class TorNetworkNode extends NetworkNode {
});
Futures.addCallback(future, new FutureCallback<HiddenServiceDescriptor>() {
public void onSuccess(HiddenServiceDescriptor hiddenServiceDescriptor) {
resultHandler.accept(hiddenServiceDescriptor);
UserThread.execute(() -> {
resultHandler.accept(hiddenServiceDescriptor);
});
}
public void onFailure(@NotNull Throwable throwable) {
log.error("Hidden service creation failed");
restartTor();
UserThread.execute(() -> {
log.error("Hidden service creation failed");
restartTor();
});
}
});
}
@Override
protected Socket getSocket(Address peerAddress) throws IOException {
checkArgument(peerAddress.hostName.endsWith(".onion"), "PeerAddress is not an onion address");
return torNode.connectToHiddenService(peerAddress.hostName, peerAddress.port);
}
}

View File

@ -4,7 +4,7 @@ import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.common.UserThread;
import io.bitsquare.app.Log;
import io.bitsquare.common.util.Tuple2;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.Connection;
@ -17,7 +17,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.TimeUnit;
// authentication example:
@ -43,6 +42,7 @@ public class AuthenticationHandshake {
private MessageListener messageListener;
public AuthenticationHandshake(NetworkNode networkNode, PeerGroup peerGroup, Address myAddress) {
Log.traceCall();
this.networkNode = networkNode;
this.peerGroup = peerGroup;
this.myAddress = myAddress;
@ -51,21 +51,25 @@ public class AuthenticationHandshake {
}
private void onFault(@NotNull Throwable throwable) {
Log.traceCall();
cleanup();
UserThread.execute(() -> resultFuture.setException(throwable));
resultFuture.setException(throwable);
}
private void onSuccess(Connection connection) {
Log.traceCall();
cleanup();
UserThread.execute(() -> resultFuture.set(connection));
resultFuture.set(connection);
}
private void cleanup() {
Log.traceCall();
stopped = true;
networkNode.removeMessageListener(messageListener);
}
public SettableFuture<Connection> requestAuthenticationToPeer(Address peerAddress) {
Log.traceCall();
// Requesting peer
resultFuture = SettableFuture.create();
startAuthTs = System.currentTimeMillis();
@ -88,6 +92,7 @@ public class AuthenticationHandshake {
}
public SettableFuture<Connection> requestAuthentication(Set<Address> remainingAddresses, Address peerAddress) {
Log.traceCall();
// Requesting peer
resultFuture = SettableFuture.create();
startAuthTs = System.currentTimeMillis();
@ -113,6 +118,7 @@ public class AuthenticationHandshake {
}
public SettableFuture<Connection> processAuthenticationRequest(AuthenticationRequest authenticationRequest, Connection connection) {
Log.traceCall();
// Responding peer
resultFuture = SettableFuture.create();
startAuthTs = System.currentTimeMillis();
@ -121,7 +127,31 @@ public class AuthenticationHandshake {
log.trace("RequestAuthenticationMessage from " + peerAddress + " at " + myAddress);
log.info("We shut down inbound connection from peer {} to establish a new " +
"connection with his reported address.", peerAddress);
connection.shutDown(() -> UserThread.runAfter(() -> {
//TODO check if causes problems without delay
connection.shutDown(() -> {
Log.traceCall();
if (!stopped) {
// we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to
// inconsistent state (removal of connection from NetworkNode.authenticatedConnections)
log.trace("processAuthenticationMessage: connection.shutDown complete. RequestAuthenticationMessage from " + peerAddress + " at " + myAddress);
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new AuthenticationResponse(myAddress, authenticationRequest.nonce, getAndSetNonce()));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("onSuccess sending ChallengeMessage");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.warn("onFailure sending ChallengeMessage.");
onFault(throwable);
}
});
}
});
/* connection.shutDown(() -> UserThread.runAfter(() -> { Log.traceCall();
if (!stopped) {
// we delay a bit as listeners for connection.onDisconnect are on other threads and might lead to
// inconsistent state (removal of connection from NetworkNode.authenticatedConnections)
@ -130,12 +160,12 @@ public class AuthenticationHandshake {
SettableFuture<Connection> future = networkNode.sendMessage(peerAddress, new AuthenticationResponse(myAddress, authenticationRequest.nonce, getAndSetNonce()));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
public void onSuccess(Connection connection) { Log.traceCall();
log.trace("onSuccess sending ChallengeMessage");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
public void onFailure(@NotNull Throwable throwable) { Log.traceCall();
log.warn("onFailure sending ChallengeMessage.");
onFault(throwable);
}
@ -143,13 +173,15 @@ public class AuthenticationHandshake {
}
},
100 + PeerGroup.simulateAuthTorNode,
TimeUnit.MILLISECONDS));
TimeUnit.MILLISECONDS));*/
return resultFuture;
}
private void setupMessageListener() {
Log.traceCall();
messageListener = (message, connection) -> {
Log.traceCall();
if (message instanceof AuthenticationMessage) {
if (message instanceof AuthenticationResponse) {
// Requesting peer
@ -207,7 +239,7 @@ public class AuthenticationHandshake {
});
log.info("\n\nAuthenticationComplete: Peer with address " + peerAddress
+ " authenticated (" + connection.getObjectId() + "). Took "
+ " authenticated (" + connection.objectId + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms. \n\n");
onSuccess(connection);
@ -227,7 +259,7 @@ public class AuthenticationHandshake {
// we wait until the handshake is completed before setting the authenticate flag
// authentication at both sides of the connection
log.info("\n\nAuthenticationComplete\nPeer with address " + peerAddress
+ " authenticated (" + connection.getObjectId() + "). Took "
+ " authenticated (" + connection.objectId + "). Took "
+ (System.currentTimeMillis() - startAuthTs) + " ms. \n\n");
onSuccess(connection);
@ -239,6 +271,7 @@ public class AuthenticationHandshake {
}
private void authenticateToNextRandomPeer(Set<Address> remainingAddresses) {
Log.traceCall();
Optional<Tuple2<Address, Set<Address>>> tupleOptional = getRandomAddressAndRemainingSet(remainingAddresses);
if (tupleOptional.isPresent()) {
Tuple2<Address, Set<Address>> tuple = tupleOptional.get();
@ -250,6 +283,7 @@ public class AuthenticationHandshake {
}
private Optional<Tuple2<Address, Set<Address>>> getRandomAddressAndRemainingSet(Set<Address> addresses) {
Log.traceCall();
if (!addresses.isEmpty()) {
List<Address> list = new ArrayList<>(addresses);
Collections.shuffle(list);
@ -261,6 +295,7 @@ public class AuthenticationHandshake {
}
private long getAndSetNonce() {
Log.traceCall();
nonce = new Random().nextLong();
while (nonce == 0)
nonce = getAndSetNonce();

View File

@ -3,8 +3,10 @@ package io.bitsquare.p2p.peers;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.util.Tuple2;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.Connection;
import io.bitsquare.p2p.network.ConnectionListener;
@ -19,14 +21,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
// Run in UserThread
public class PeerGroup {
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
@ -42,21 +42,21 @@ public class PeerGroup {
MAX_CONNECTIONS = maxConnections;
}
private static final int MAINTENANCE_INTERVAL = new Random().nextInt(2 * 60 * 1000) + 2 * 60 * 1000; // 2-4 min.
private static final int GET_PEERS_INTERVAL = new Random().nextInt(1 * 60 * 1000) + 1 * 60 * 1000; // 1-2 min.
private static final int SEND_PING_INTERVAL = 100000000;// new Random().nextInt(2 * 60 * 1000) + 2 * 60 * 1000; // 2-4 min.
private static final int GET_PEERS_INTERVAL = 100000000;//new Random().nextInt(1 * 60 * 1000) + 1 * 60 * 1000; // 1-2 min.
private static final int PING_AFTER_CONNECTION_INACTIVITY = 30 * 1000;
private static final int MAX_REPORTED_PEERS = 1000;
private final NetworkNode networkNode;
private final Set<Address> seedNodeAddresses;
private final CopyOnWriteArraySet<PeerListener> peerListeners = new CopyOnWriteArraySet<>();
private final ConcurrentHashMap<Address, Peer> authenticatedPeers = new ConcurrentHashMap<>();
private final CopyOnWriteArraySet<Address> reportedPeerAddresses = new CopyOnWriteArraySet<>();
private final Timer maintenanceTimer = new Timer();
private final Set<PeerListener> peerListeners = new HashSet<>();
private final Map<Address, Peer> authenticatedPeers = new HashMap<>();
private final Set<Address> reportedPeerAddresses = new HashSet<>();
private final Timer sendPingTimer = new Timer();
private final Timer getPeersTimer = new Timer();
private volatile boolean shutDownInProgress;
private boolean shutDownInProgress;
private boolean firstPeerAdded = false;
@ -65,6 +65,8 @@ public class PeerGroup {
///////////////////////////////////////////////////////////////////////////////////////////
public PeerGroup(NetworkNode networkNode, Set<Address> seeds) {
Log.traceCall();
this.networkNode = networkNode;
this.seedNodeAddresses = seeds;
@ -72,6 +74,7 @@ public class PeerGroup {
}
private void init(NetworkNode networkNode) {
Log.traceCall();
networkNode.addMessageListener((message, connection) -> {
if (message instanceof MaintenanceMessage)
processMaintenanceMessage((MaintenanceMessage) message, connection);
@ -83,16 +86,18 @@ public class PeerGroup {
networkNode.addConnectionListener(new ConnectionListener() {
@Override
public void onConnection(Connection connection) {
Log.traceCall();
}
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
Log.traceCall();
}
@Override
public void onDisconnect(Reason reason, Connection connection) {
Log.traceCall();
log.debug("onDisconnect connection=" + connection + " / reason=" + reason);
log.debug("##### onDisconnect connection.isAuthenticated()=" + connection.isAuthenticated());
// only removes authenticated nodes
if (connection.isAuthenticated())
removePeer(connection.getPeerAddress());
@ -100,6 +105,7 @@ public class PeerGroup {
@Override
public void onError(Throwable throwable) {
Log.traceCall();
}
});
@ -112,21 +118,23 @@ public class PeerGroup {
///////////////////////////////////////////////////////////////////////////////////////////
public void removeMySeedNodeAddressFromList(Address mySeedNodeAddress) {
Log.traceCall();
seedNodeAddresses.remove(mySeedNodeAddress);
}
public void shutDown() {
Log.traceCall();
if (!shutDownInProgress) {
shutDownInProgress = true;
if (maintenanceTimer != null)
maintenanceTimer.cancel();
if (sendPingTimer != null)
sendPingTimer.cancel();
}
}
public void broadcast(BroadcastMessage message, @Nullable Address sender) {
Log.traceCall();
log.trace("Broadcast message to " + authenticatedPeers.values().size() + " peers.");
log.trace("message = " + message);
printAuthenticatedPeers();
// TODO add randomized timing?
authenticatedPeers.values().stream()
@ -155,16 +163,15 @@ public class PeerGroup {
///////////////////////////////////////////////////////////////////////////////////////////
private void processAuthenticationRequest(NetworkNode networkNode, AuthenticationRequest message, final Connection connection) {
Log.traceCall();
AuthenticationHandshake authenticationHandshake = new AuthenticationHandshake(networkNode, PeerGroup.this, getMyAddress());
SettableFuture<Connection> future = authenticationHandshake.processAuthenticationRequest(message, connection);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
if (connection != null) {
UserThread.execute(() -> {
setAuthenticated(connection, connection.getPeerAddress());
purgeReportedPeers();
});
setAuthenticated(connection, connection.getPeerAddress());
purgeReportedPeersIfExceeds();
}
}
@ -172,12 +179,13 @@ public class PeerGroup {
public void onFailure(@NotNull Throwable throwable) {
throwable.printStackTrace();
log.error("AuthenticationHandshake failed. " + throwable.getMessage());
UserThread.execute(() -> removePeer(connection.getPeerAddress()));
removePeer(connection.getPeerAddress());
}
});
}
public void authenticateSeedNode(Address peerAddress) {
Log.traceCall();
authenticateToSeedNode(new HashSet<>(seedNodeAddresses), peerAddress, true);
}
@ -185,6 +193,7 @@ public class PeerGroup {
// After connection is authenticated, we try to connect to any reported peer as long we have not
// reached our max connection size.
private void authenticateToSeedNode(Set<Address> remainingAddresses, Address peerAddress, boolean continueOnSuccess) {
Log.traceCall();
checkArgument(!authenticatedPeers.containsKey(peerAddress),
"We have that peer already authenticated. That must never happen.");
@ -232,6 +241,7 @@ public class PeerGroup {
}
private void authenticateToRemainingReportedPeers() {
Log.traceCall();
Optional<Tuple2<Address, Set<Address>>> tupleOptional = getRandomItemAndRemainingSet(reportedPeerAddresses);
if (tupleOptional.isPresent()) {
log.info("We try to authenticate to a random peer. " + tupleOptional.get().first);
@ -245,6 +255,7 @@ public class PeerGroup {
// We try to connect to a reported peer. If we fail we repeat after the failed peer has been removed.
// If we succeed we repeat until we are ut of addresses.
private void authenticateToReportedPeer(Set<Address> remainingAddresses, Address peerAddress) {
Log.traceCall();
checkArgument(!authenticatedPeers.containsKey(peerAddress),
"We have that peer already authenticated. That must never happen.");
@ -283,6 +294,7 @@ public class PeerGroup {
}
private void authenticateToRemainingSeedNodes() {
Log.traceCall();
Optional<Tuple2<Address, Set<Address>>> tupleOptional = getRandomItemAndRemainingSet(seedNodeAddresses);
if (tupleOptional.isPresent()) {
log.info("We try to authenticate to a random seed node. " + tupleOptional.get().first);
@ -300,6 +312,7 @@ public class PeerGroup {
///////////////////////////////////////////////////////////////////////////////////////////
public void authenticateToPeer(Address peerAddress, @Nullable Runnable authenticationCompleteHandler, @Nullable Runnable faultHandler) {
Log.traceCall();
checkArgument(!authenticatedPeers.containsKey(peerAddress),
"We have that seed node already authenticated. That must never happen.");
@ -327,9 +340,10 @@ public class PeerGroup {
}
private void setAuthenticated(Connection connection, Address peerAddress) {
Log.traceCall();
log.info("\n\n############################################################\n" +
"We are authenticated to:" +
"\nconnection=" + connection
"\nconnection=" + connection.getUid()
+ "\nmyAddress=" + getMyAddress()
+ "\npeerAddress= " + peerAddress
+ "\n############################################################\n");
@ -342,19 +356,18 @@ public class PeerGroup {
}
private void addAuthenticatedPeer(Peer peer) {
Log.traceCall();
authenticatedPeers.put(peer.address, peer);
reportedPeerAddresses.remove(peer.address);
firstPeerAdded = !firstPeerAdded && authenticatedPeers.size() == 1;
UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerAdded(peer)));
peerListeners.stream().forEach(e -> e.onPeerAdded(peer));
if (firstPeerAdded)
UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onFirstAuthenticatePeer(peer)));
peerListeners.stream().forEach(e -> e.onFirstAuthenticatePeer(peer));
if (authenticatedPeers.size() > MAX_CONNECTIONS)
disconnectOldConnections();
printAuthenticatedPeers();
if (!checkIfConnectedPeersExceeds())
printAuthenticatedPeers();
}
///////////////////////////////////////////////////////////////////////////////////////////
@ -362,13 +375,14 @@ public class PeerGroup {
///////////////////////////////////////////////////////////////////////////////////////////
private void setupMaintenanceTimer() {
maintenanceTimer.scheduleAtFixedRate(new TimerTask() {
Log.traceCall();
sendPingTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
Thread.currentThread().setName("MaintenanceTimer-" + new Random().nextInt(1000));
Utilities.setThreadName("MaintenanceTimer");
try {
UserThread.execute(() -> {
disconnectOldConnections();
checkIfConnectedPeersExceeds();
pingPeers();
});
} catch (Throwable t) {
@ -376,14 +390,14 @@ public class PeerGroup {
log.error("Executing task failed. " + t.getMessage());
}
}
}, MAINTENANCE_INTERVAL, MAINTENANCE_INTERVAL);
}, SEND_PING_INTERVAL, SEND_PING_INTERVAL);
getPeersTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
Thread.currentThread().setName("GetPeersTimer-" + new Random().nextInt(1000));
Utilities.setThreadName("GetPeersTimer");
try {
UserThread.execute(() -> sendGetPeersRequest());
UserThread.execute(() -> trySendGetPeersRequest());
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
@ -393,21 +407,28 @@ public class PeerGroup {
}
private void disconnectOldConnections() {
List<Connection> authenticatedConnections = networkNode.getAllConnections().stream()
.filter(e -> e.isAuthenticated())
.collect(Collectors.toList());
if (authenticatedConnections.size() > MAX_CONNECTIONS) {
private boolean checkIfConnectedPeersExceeds() {
Log.traceCall();
if (authenticatedPeers.size() > MAX_CONNECTIONS) {
log.trace("We have too many connections open. Lets remove the one which was not active recently.");
List<Connection> authenticatedConnections = networkNode.getAllConnections().stream()
.filter(e -> e.isAuthenticated())
.collect(Collectors.toList());
authenticatedConnections.sort((o1, o2) -> o1.getLastActivityDate().compareTo(o2.getLastActivityDate()));
log.info("Number of connections exceeds MAX_CONNECTIONS. Current size=" + authenticatedConnections.size());
Connection connection = authenticatedConnections.remove(0);
log.info("Shutdown oldest connection with last activity date=" + connection.getLastActivityDate() + " / connection=" + connection);
connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> disconnectOldConnections(), 100, 500, TimeUnit.MILLISECONDS));
log.info("We had shut down the oldest connection with last activity date="
+ connection.getLastActivityDate() + " / connection=" + connection);
connection.shutDown(() -> UserThread.runAfterRandomDelay(() -> checkIfConnectedPeersExceeds(), 100, 500, TimeUnit.MILLISECONDS));
return true;
} else {
log.trace("We don't have too many connections open.");
return false;
}
}
private void pingPeers() {
log.trace("pingPeers");
Log.traceCall();
Set<Peer> connectedPeersList = new HashSet<>(authenticatedPeers.values());
connectedPeersList.stream()
.filter(e -> (new Date().getTime() - e.connection.getLastActivityDate().getTime()) > PING_AFTER_CONNECTION_INACTIVITY)
@ -428,29 +449,35 @@ public class PeerGroup {
}, 5, 10));
}
private void sendGetPeersRequest() {
log.trace("sendGetPeersRequest");
Set<Peer> connectedPeersList = new HashSet<>(authenticatedPeers.values());
connectedPeersList.stream()
.forEach(e -> UserThread.runAfterRandomDelay(() -> {
SettableFuture<Connection> future = networkNode.sendMessage(e.connection,
new GetPeersRequest(getMyAddress(), new HashSet<>(getAllPeerAddresses())));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("sendGetPeersRequest sent successfully");
}
private void trySendGetPeersRequest() {
Log.traceCall();
Collection<Peer> peers = authenticatedPeers.values();
if (!peers.isEmpty()) {
Set<Peer> connectedPeersList = new HashSet<>(peers);
connectedPeersList.stream()
.forEach(e -> UserThread.runAfterRandomDelay(() -> {
SettableFuture<Connection> future = networkNode.sendMessage(e.connection,
new GetPeersRequest(getMyAddress(), new HashSet<>(getAllPeerAddresses())));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(Connection connection) {
log.trace("sendGetPeersRequest sent successfully");
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("sendGetPeersRequest sending failed " + throwable.getMessage());
removePeer(e.address);
}
});
}, 5, 10));
@Override
public void onFailure(@NotNull Throwable throwable) {
log.info("sendGetPeersRequest sending failed " + throwable.getMessage());
removePeer(e.address);
}
});
}, 5, 10));
} else {
log.debug("No peers available for requesting.");
}
}
private void processMaintenanceMessage(MaintenanceMessage message, Connection connection) {
Log.traceCall();
log.debug("Received message " + message + " at " + getMyAddress() + " from " + connection.getPeerAddress());
if (message instanceof PingMessage) {
SettableFuture<Connection> future = networkNode.sendMessage(connection, new PongMessage(((PingMessage) message).nonce));
@ -471,8 +498,8 @@ public class PeerGroup {
Peer peer = authenticatedPeers.get(connection.getPeerAddress());
if (peer != null) {
if (((PongMessage) message).nonce != peer.getPingNonce()) {
removePeer(peer.address);
log.warn("PongMessage invalid: self/peer " + getMyAddress() + "/" + connection.getPeerAddress());
removePeer(peer.address);
}
}
}
@ -510,18 +537,22 @@ public class PeerGroup {
///////////////////////////////////////////////////////////////////////////////////////////
public void addMessageListener(MessageListener messageListener) {
Log.traceCall();
networkNode.addMessageListener(messageListener);
}
public void removeMessageListener(MessageListener messageListener) {
Log.traceCall();
networkNode.removeMessageListener(messageListener);
}
public void addPeerListener(PeerListener peerListener) {
Log.traceCall();
peerListeners.add(peerListener);
}
public void removePeerListener(PeerListener peerListener) {
Log.traceCall();
peerListeners.remove(peerListener);
}
@ -531,17 +562,20 @@ public class PeerGroup {
///////////////////////////////////////////////////////////////////////////////////////////
private Map<Address, Peer> getAuthenticatedPeers() {
Log.traceCall();
return authenticatedPeers;
}
public Set<Address> getAllPeerAddresses() {
CopyOnWriteArraySet<Address> allPeerAddresses = new CopyOnWriteArraySet<>(reportedPeerAddresses);
Log.traceCall();
Set<Address> allPeerAddresses = new HashSet<>(reportedPeerAddresses);
allPeerAddresses.addAll(authenticatedPeers.values().stream()
.map(e -> e.address).collect(Collectors.toList()));
.map(e -> e.address).collect(Collectors.toSet()));
return allPeerAddresses;
}
public Set<Address> getSeedNodeAddresses() {
Log.traceCall();
return seedNodeAddresses;
}
@ -551,7 +585,7 @@ public class PeerGroup {
///////////////////////////////////////////////////////////////////////////////////////////
void addToReportedPeers(HashSet<Address> peerAddresses, Connection connection) {
log.trace("addToReportedPeers");
Log.traceCall();
// we disconnect misbehaving nodes trying to send too many peers
// reported peers include the peers connected peers which is normally max. 8 but we give some headroom
// for safety
@ -560,24 +594,29 @@ public class PeerGroup {
} else {
peerAddresses.remove(getMyAddress());
reportedPeerAddresses.addAll(peerAddresses);
purgeReportedPeers();
purgeReportedPeersIfExceeds();
}
}
private void purgeReportedPeers() {
log.trace("purgeReportedPeers");
int all = getAllPeerAddresses().size();
if (all > 1000) {
int diff = all - 100;
private void purgeReportedPeersIfExceeds() {
Log.traceCall();
int size = reportedPeerAddresses.size();
if (size > MAX_REPORTED_PEERS) {
log.trace("We have more then {} reported peers. size={}. " +
"We remove random peers from the reported peers list.", MAX_REPORTED_PEERS, size);
int diff = size - MAX_REPORTED_PEERS;
List<Address> list = new LinkedList<>(getReportedNotConnectedPeerAddresses());
for (int i = 0; i < diff; i++) {
Address toRemove = getAndRemoveRandomItem(list);
reportedPeerAddresses.remove(toRemove);
}
} else {
log.trace("We don't have more then {} reported peers yet.", MAX_REPORTED_PEERS);
}
}
private Set<Address> getReportedNotConnectedPeerAddresses() {
Log.traceCall();
Set<Address> set = new HashSet<>(reportedPeerAddresses);
authenticatedPeers.values().stream().forEach(e -> set.remove(e.address));
return set;
@ -589,18 +628,20 @@ public class PeerGroup {
///////////////////////////////////////////////////////////////////////////////////////////
private void removePeer(@Nullable Address peerAddress) {
reportedPeerAddresses.remove(peerAddress);
Log.traceCall("peerAddress=" + peerAddress);
if (peerAddress != null) {
boolean contained = reportedPeerAddresses.remove(peerAddress);
Peer disconnectedPeer = authenticatedPeers.remove(peerAddress);
if (disconnectedPeer != null)
UserThread.execute(() -> peerListeners.stream().forEach(e -> e.onPeerRemoved(peerAddress)));
peerListeners.stream().forEach(e -> e.onPeerRemoved(peerAddress));
if (contained || disconnectedPeer != null)
printAllPeers();
}
printAuthenticatedPeers();
printReportedPeers();
}
private Address getMyAddress() {
Log.traceCall();
return networkNode.getAddress();
}
@ -610,10 +651,12 @@ public class PeerGroup {
///////////////////////////////////////////////////////////////////////////////////////////
private Address getAndRemoveRandomItem(List<Address> list) {
Log.traceCall();
return list.remove(new Random().nextInt(list.size()));
}
private Optional<Tuple2<Address, Set<Address>>> getRandomItemAndRemainingSet(Set<Address> remainingAddresses) {
Log.traceCall();
List<Address> list = new ArrayList<>(remainingAddresses);
authenticatedPeers.values().stream().forEach(e -> list.remove(e.address));
if (!list.isEmpty()) {
@ -625,11 +668,13 @@ public class PeerGroup {
}
public void printAllPeers() {
Log.traceCall();
printAuthenticatedPeers();
printReportedPeers();
}
public void printAuthenticatedPeers() {
Log.traceCall();
StringBuilder result = new StringBuilder("\n\n############################################################\n" +
"Authenticated peers for node " + getMyAddress() + ":");
authenticatedPeers.values().stream().forEach(e -> result.append("\n").append(e.address));
@ -638,6 +683,7 @@ public class PeerGroup {
}
public void printReportedPeers() {
Log.traceCall();
StringBuilder result = new StringBuilder("\n\n############################################################\n" +
"Reported peers for node " + getMyAddress() + ":");
reportedPeerAddresses.stream().forEach(e -> result.append("\n").append(e));

View File

@ -1,5 +1,6 @@
package io.bitsquare.p2p.seed;
import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.KeyRing;
import io.bitsquare.crypto.EncryptionService;
@ -29,6 +30,7 @@ public class SeedNode {
private boolean stopped;
public SeedNode() {
Log.traceCall();
}
@ -41,6 +43,7 @@ public class SeedNode {
// eg. lmvdenjkyvx2ovga.onion:8001 20 false eo5ay2lyzrfvx2nr.onion:8002|si3uu56adkyqkldl.onion:8003
// or when using localhost: localhost:8001 20 true localhost:8002|localhost:8003
public void processArgs(String[] args) {
Log.traceCall();
if (args.length > 0) {
String arg0 = args[0];
checkArgument(arg0.contains(":") && arg0.split(":").length == 2 && arg0.split(":")[1].length() == 4, "Wrong program argument");
@ -78,6 +81,7 @@ public class SeedNode {
}
public void createAndStartP2PService(EncryptionService encryptionService, KeyRing keyRing, Address mySeedNodeAddress, boolean useLocalhost, @Nullable Set<Address> seedNodes, @Nullable P2PServiceListener listener) {
Log.traceCall();
SeedNodesRepository seedNodesRepository = new SeedNodesRepository();
if (seedNodes != null && !seedNodes.isEmpty()) {
if (useLocalhost)
@ -92,14 +96,17 @@ public class SeedNode {
}
public P2PService getP2PService() {
Log.traceCall();
return p2PService;
}
public void shutDown() {
Log.traceCall();
shutDown(null);
}
public void shutDown(@Nullable Runnable shutDownCompleteHandler) {
Log.traceCall();
log.debug("Request shutdown seed node");
if (!stopped) {
stopped = true;

View File

@ -1,10 +1,12 @@
package io.bitsquare.p2p.storage;
import com.google.common.annotations.VisibleForTesting;
import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.CryptoException;
import io.bitsquare.common.crypto.Hash;
import io.bitsquare.common.crypto.Sig;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.Address;
import io.bitsquare.p2p.network.IllegalRequest;
import io.bitsquare.p2p.network.MessageListener;
@ -21,12 +23,12 @@ import java.math.BigInteger;
import java.security.KeyPair;
import java.security.PublicKey;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
// Run in UserThread
public class ProtectedExpirableDataStorage {
private static final Logger log = LoggerFactory.getLogger(ProtectedExpirableDataStorage.class);
@ -48,6 +50,7 @@ public class ProtectedExpirableDataStorage {
///////////////////////////////////////////////////////////////////////////////////////////
public ProtectedExpirableDataStorage(PeerGroup peerGroup, File storageDir) {
Log.traceCall();
this.peerGroup = peerGroup;
storage = new Storage<>(storageDir);
@ -56,12 +59,14 @@ public class ProtectedExpirableDataStorage {
}
private void init() {
Log.traceCall();
ConcurrentHashMap<BigInteger, Integer> persisted = storage.initAndGetPersisted(sequenceNumberMap, "sequenceNumberMap");
if (persisted != null) {
sequenceNumberMap = persisted;
}
addMessageListener((message, connection) -> {
Log.traceCall("onMessage: Message=" + message);
if (message instanceof DataMessage) {
if (connection.isAuthenticated()) {
log.trace("ProtectedExpirableDataMessage received " + message + " on connection " + connection);
@ -80,21 +85,26 @@ public class ProtectedExpirableDataStorage {
}
});
TimerTask task = new TimerTask() {
@Override
public void run() {
Thread.currentThread().setName("RemoveExpiredEntriesTimer-" + new Random().nextInt(1000));
try {
log.info("removeExpiredEntries called ");
map.entrySet().stream().filter(entry -> entry.getValue().isExpired())
.forEach(entry -> map.remove(entry.getKey()));
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
}
}
};
timer.scheduleAtFixedRate(task, CHECK_TTL_INTERVAL, CHECK_TTL_INTERVAL);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
Utilities.setThreadName("RemoveExpiredEntriesTimer");
UserThread.execute(() -> removeExpiredEntries());
} catch (Throwable t) {
t.printStackTrace();
log.error("Executing task failed. " + t.getMessage());
}
}
},
CHECK_TTL_INTERVAL, CHECK_TTL_INTERVAL);
}
private void removeExpiredEntries() {
Log.traceCall();
map.entrySet().stream()
.filter(entry -> entry.getValue().isExpired())
.forEach(entry -> map.remove(entry.getKey()));
}
@ -103,6 +113,7 @@ public class ProtectedExpirableDataStorage {
///////////////////////////////////////////////////////////////////////////////////////////
public void shutDown() {
Log.traceCall();
if (!shutDownInProgress) {
shutDownInProgress = true;
timer.cancel();
@ -111,10 +122,12 @@ public class ProtectedExpirableDataStorage {
}
public void setAuthenticated() {
Log.traceCall();
this.authenticated = true;
}
public boolean add(ProtectedData protectedData, @Nullable Address sender) {
Log.traceCall();
BigInteger hashOfPayload = getHashAsBigInteger(protectedData.expirablePayload);
boolean containsKey = map.containsKey(hashOfPayload);
boolean result = checkPublicKeys(protectedData, true)
@ -128,13 +141,13 @@ public class ProtectedExpirableDataStorage {
if (result) {
map.put(hashOfPayload, protectedData);
log.trace("Data added to our map and it will be broadcasted to our peers.");
UserThread.execute(() -> hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedData)));
hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedData));
StringBuilder sb = new StringBuilder("\n\n############################################################\n" +
"Data set after addProtectedExpirableData:");
map.values().stream().forEach(e -> sb.append("\n").append(e.toString()));
StringBuilder sb = new StringBuilder("\n\n############################################################\n");
sb.append("Data set after addProtectedExpirableData:");
map.values().stream().forEach(e -> sb.append("\n").append(e.toString()).append("\n"));
sb.append("\n############################################################\n");
log.trace(sb.toString());
log.info(sb.toString());
if (!containsKey)
broadcast(new AddDataMessage(protectedData), sender);
@ -148,6 +161,7 @@ public class ProtectedExpirableDataStorage {
}
public boolean remove(ProtectedData protectedData, @Nullable Address sender) {
Log.traceCall();
BigInteger hashOfPayload = getHashAsBigInteger(protectedData.expirablePayload);
boolean containsKey = map.containsKey(hashOfPayload);
if (!containsKey) log.debug("Remove data ignored as we don't have an entry for that data.");
@ -172,6 +186,7 @@ public class ProtectedExpirableDataStorage {
}
public boolean removeMailboxData(ProtectedMailboxData protectedMailboxData, @Nullable Address sender) {
Log.traceCall();
BigInteger hashOfData = getHashAsBigInteger(protectedMailboxData.expirablePayload);
boolean containsKey = map.containsKey(hashOfData);
if (!containsKey) log.debug("Remove data ignored as we don't have an entry for that data.");
@ -196,11 +211,13 @@ public class ProtectedExpirableDataStorage {
}
public Map<BigInteger, ProtectedData> getMap() {
Log.traceCall();
return map;
}
public ProtectedData getDataWithSignedSeqNr(ExpirablePayload payload, KeyPair ownerStoragePubKey)
throws CryptoException {
Log.traceCall();
BigInteger hashOfData = getHashAsBigInteger(payload);
int sequenceNumber;
if (sequenceNumberMap.containsKey(hashOfData))
@ -216,6 +233,7 @@ public class ProtectedExpirableDataStorage {
public ProtectedMailboxData getMailboxDataWithSignedSeqNr(ExpirableMailboxPayload expirableMailboxPayload,
KeyPair storageSignaturePubKey, PublicKey receiversPublicKey)
throws CryptoException {
Log.traceCall();
BigInteger hashOfData = getHashAsBigInteger(expirableMailboxPayload);
int sequenceNumber;
if (sequenceNumberMap.containsKey(hashOfData))
@ -230,10 +248,12 @@ public class ProtectedExpirableDataStorage {
}
public void addHashMapChangedListener(HashMapChangedListener hashMapChangedListener) {
Log.traceCall();
hashMapChangedListeners.add(hashMapChangedListener);
}
private void addMessageListener(MessageListener messageListener) {
Log.traceCall();
peerGroup.addMessageListener(messageListener);
}
@ -243,9 +263,10 @@ public class ProtectedExpirableDataStorage {
///////////////////////////////////////////////////////////////////////////////////////////
private void doRemoveProtectedExpirableData(ProtectedData protectedData, BigInteger hashOfPayload) {
Log.traceCall();
map.remove(hashOfPayload);
log.trace("Data removed from our map. We broadcast the message to our peers.");
UserThread.execute(() -> hashMapChangedListeners.stream().forEach(e -> e.onRemoved(protectedData)));
hashMapChangedListeners.stream().forEach(e -> e.onRemoved(protectedData));
StringBuilder sb = new StringBuilder("\n\n############################################################\n" +
"Data set after removeProtectedExpirableData:");
@ -255,6 +276,7 @@ public class ProtectedExpirableDataStorage {
}
private boolean isSequenceNrValid(ProtectedData data, BigInteger hashOfData) {
Log.traceCall();
int newSequenceNumber = data.sequenceNumber;
Integer storedSequenceNumber = sequenceNumberMap.get(hashOfData);
if (sequenceNumberMap.containsKey(hashOfData) && newSequenceNumber <= storedSequenceNumber) {
@ -267,6 +289,7 @@ public class ProtectedExpirableDataStorage {
}
private boolean checkSignature(ProtectedData data) {
Log.traceCall();
byte[] hashOfDataAndSeqNr = Hash.getHash(new DataAndSeqNr(data.expirablePayload, data.sequenceNumber));
try {
boolean result = Sig.verify(data.ownerStoragePubKey, hashOfDataAndSeqNr, data.signature);
@ -282,6 +305,7 @@ public class ProtectedExpirableDataStorage {
}
private boolean checkPublicKeys(ProtectedData data, boolean isAddOperation) {
Log.traceCall();
boolean result = false;
if (data.expirablePayload instanceof ExpirableMailboxPayload) {
ExpirableMailboxPayload expirableMailboxPayload = (ExpirableMailboxPayload) data.expirablePayload;
@ -299,6 +323,7 @@ public class ProtectedExpirableDataStorage {
}
private boolean checkIfStoredDataMatchesNewData(ProtectedData data, BigInteger hashOfData) {
Log.traceCall();
ProtectedData storedData = map.get(hashOfData);
boolean result = getHashAsBigInteger(storedData.expirablePayload).equals(hashOfData)
&& storedData.ownerStoragePubKey.equals(data.ownerStoragePubKey);
@ -309,6 +334,7 @@ public class ProtectedExpirableDataStorage {
}
private boolean checkIfStoredMailboxDataMatchesNewMailboxData(ProtectedMailboxData data, BigInteger hashOfData) {
Log.traceCall();
ProtectedData storedData = map.get(hashOfData);
if (storedData instanceof ProtectedMailboxData) {
ProtectedMailboxData storedMailboxData = (ProtectedMailboxData) storedData;
@ -325,8 +351,8 @@ public class ProtectedExpirableDataStorage {
}
}
private void broadcast(BroadcastMessage message, @Nullable Address sender) {
Log.traceCall();
if (authenticated) {
peerGroup.broadcast(message, sender);
log.trace("Broadcast message " + message);
@ -336,6 +362,7 @@ public class ProtectedExpirableDataStorage {
}
private BigInteger getHashAsBigInteger(ExpirablePayload payload) {
Log.traceCall();
return new BigInteger(Hash.getHash(payload));
}
}

View File

@ -82,7 +82,7 @@ public class TestUtils {
CountDownLatch latch = new CountDownLatch(1);
seedNode.createAndStartP2PService(encryptionService, keyRing, new Address("localhost", port), useLocalhost, seedNodes, new P2PServiceListener() {
@Override
public void onAllDataReceived() {
public void onRequestingDataCompleted() {
}
@Override
@ -120,7 +120,7 @@ public class TestUtils {
P2PService p2PService = new P2PService(seedNodesRepository, port, new File("seed_node_" + port), useLocalhost, encryptionService, keyRing, new File("dummy"));
p2PService.start(new P2PServiceListener() {
@Override
public void onAllDataReceived() {
public void onRequestingDataCompleted() {
}
@Override

View File

@ -85,7 +85,7 @@ public class PeerGroupTest {
latch = new CountDownLatch(2);
seedNode1.createAndStartP2PService(null, null, address, useLocalhost, seedNodes, new P2PServiceListener() {
@Override
public void onAllDataReceived() {
public void onRequestingDataCompleted() {
latch.countDown();
}
@ -129,7 +129,7 @@ public class PeerGroupTest {
seedNode1 = new SeedNode();
seedNode1.createAndStartP2PService(null, null, address1, useLocalhost, seedNodes, new P2PServiceListener() {
@Override
public void onAllDataReceived() {
public void onRequestingDataCompleted() {
latch.countDown();
}
@ -160,7 +160,7 @@ public class PeerGroupTest {
seedNode2 = new SeedNode();
seedNode2.createAndStartP2PService(null, null, address2, useLocalhost, seedNodes, new P2PServiceListener() {
@Override
public void onAllDataReceived() {
public void onRequestingDataCompleted() {
latch.countDown();
}
@ -387,7 +387,7 @@ public class PeerGroupTest {
latch = new CountDownLatch(1);
seedNode.createAndStartP2PService(null, null, new Address("localhost", port), useLocalhost, seedNodes, new P2PServiceListener() {
@Override
public void onAllDataReceived() {
public void onRequestingDataCompleted() {
latch.countDown();
}

View File

@ -1,7 +1,7 @@
package io.bitsquare.p2p.seed;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.bitsquare.app.Logging;
import io.bitsquare.app.Log;
import io.bitsquare.common.UserThread;
import org.bitcoinj.crypto.DRMWorkaround;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
@ -28,11 +28,12 @@ public class SeedNodeMain {
// To stop enter: q
public static void main(String[] args) throws NoSuchAlgorithmException {
Path path = Paths.get("seed_node_log");
Logging.setup(path.toString());
Log.setup(path.toString());
Log.PRINT_TRACE_METHOD = true;
log.info("Log files under: " + path.toAbsolutePath().toString());
DRMWorkaround.maybeDisableExportControls();
new SeedNodeMain(args);
}
@ -60,7 +61,7 @@ public class SeedNodeMain {
public void listenForExitCommand() {
Scanner scan = new Scanner(System.in);
String line;
while (!stopped && ((line = scan.nextLine()) != null)) {
while (!stopped && !Thread.currentThread().isInterrupted() && ((line = scan.nextLine()) != null)) {
if (line.equals("q")) {
if (!stopped) {
stopped = true;
@ -70,11 +71,11 @@ public class SeedNodeMain {
}, 5);
if (seedNode != null) {
seedNode.shutDown(() -> {
UserThread.execute(() -> seedNode.shutDown(() -> {
timeout.cancel();
log.debug("Shutdown seed node complete.");
System.exit(0);
});
}));
}
}
}

View File

@ -19,11 +19,11 @@
<configuration>
<appender name="CONSOLE_APPENDER" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{MMM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg %xEx%n</pattern>
<pattern>%highlight(%d{MMM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{15} - %msg %xEx%n)</pattern>
</encoder>
</appender>
<root level="INFO">
<root level="TRACE">
<appender-ref ref="CONSOLE_APPENDER"/>
</root>
<!-- <logger name="io.bitsquare.p2p.peers.PeerGroup" level="INFO"/>
@ -34,6 +34,7 @@
<logger name="io.bitsquare.p2p.network.NetworkNode" level="TRACE"/>-->
<logger name="io.bitsquare.storage.Storage" level="WARN"/>
<logger name="com.msopentech.thali.toronionproxy.OnionProxyManagerEventHandler" level="WARN"/>
</configuration>