Add prog args for connection parameters

- Add program args: msgThrottlePerSec, msgThrottlePer10Sec,
sendMsgThrottleTrigger and sendMsgThrottleSleep
- Add ConnectionConfig class with static injected field in Connection
- Cleanups
This commit is contained in:
Manfred Karrer 2019-03-04 00:57:29 -05:00
parent 744bbe6ad7
commit c65decfeaf
No known key found for this signature in database
GPG key ID: 401250966A6B2C46
6 changed files with 168 additions and 35 deletions

View file

@ -25,6 +25,7 @@ import bisq.core.exceptions.BisqException;
import bisq.core.filter.FilterManager;
import bisq.network.NetworkOptionKeys;
import bisq.network.p2p.network.ConnectionConfig;
import bisq.common.CommonOptionKeys;
import bisq.common.app.Version;
@ -193,7 +194,8 @@ public class BisqEnvironment extends StandardEnvironment {
rpcPort, rpcBlockNotificationPort, dumpBlockchainData, fullDaoNode,
myAddress, banList, dumpStatistics, maxMemory, socks5ProxyBtcAddress,
torRcFile, torRcOptions, externalTorControlPort, externalTorPassword, externalTorCookieFile,
socks5ProxyHttpAddress, useAllProvidedNodes, numConnectionForBtc, genesisTxId, genesisBlockHeight, referralId, daoActivated;
socks5ProxyHttpAddress, useAllProvidedNodes, numConnectionForBtc, genesisTxId, genesisBlockHeight,
referralId, daoActivated, msgThrottlePerSec, msgThrottlePer10Sec, sendMsgThrottleTrigger, sendMsgThrottleSleep;
protected final boolean externalTorUseSafeCookieAuthentication, torStreamIsolation;
@ -283,12 +285,20 @@ public class BisqEnvironment extends StandardEnvironment {
externalTorCookieFile = commandLineProperties.containsProperty(NetworkOptionKeys.EXTERNAL_TOR_COOKIE_FILE) ?
(String) commandLineProperties.getProperty(NetworkOptionKeys.EXTERNAL_TOR_COOKIE_FILE) :
"";
externalTorUseSafeCookieAuthentication = commandLineProperties.containsProperty(NetworkOptionKeys.EXTERNAL_TOR_USE_SAFECOOKIE) ?
true :
false;
torStreamIsolation = commandLineProperties.containsProperty(NetworkOptionKeys.TOR_STREAM_ISOLATION) ?
true :
false;
externalTorUseSafeCookieAuthentication = commandLineProperties.containsProperty(NetworkOptionKeys.EXTERNAL_TOR_USE_SAFECOOKIE);
torStreamIsolation = commandLineProperties.containsProperty(NetworkOptionKeys.TOR_STREAM_ISOLATION);
msgThrottlePerSec = commandLineProperties.containsProperty(NetworkOptionKeys.MSG_THROTTLE_PER_SEC) ?
(String) commandLineProperties.getProperty(NetworkOptionKeys.MSG_THROTTLE_PER_SEC) :
String.valueOf(ConnectionConfig.MSG_THROTTLE_PER_SEC);
msgThrottlePer10Sec = commandLineProperties.containsProperty(NetworkOptionKeys.MSG_THROTTLE_PER_10_SEC) ?
(String) commandLineProperties.getProperty(NetworkOptionKeys.MSG_THROTTLE_PER_10_SEC) :
String.valueOf(ConnectionConfig.MSG_THROTTLE_PER_10_SEC);
sendMsgThrottleTrigger = commandLineProperties.containsProperty(NetworkOptionKeys.SEND_MSG_THROTTLE_TRIGGER) ?
(String) commandLineProperties.getProperty(NetworkOptionKeys.SEND_MSG_THROTTLE_TRIGGER) :
String.valueOf(ConnectionConfig.SEND_MSG_THROTTLE_TRIGGER);
sendMsgThrottleSleep = commandLineProperties.containsProperty(NetworkOptionKeys.SEND_MSG_THROTTLE_SLEEP) ?
(String) commandLineProperties.getProperty(NetworkOptionKeys.SEND_MSG_THROTTLE_SLEEP) :
String.valueOf(ConnectionConfig.SEND_MSG_THROTTLE_SLEEP);
//RpcOptionKeys
rpcUser = commandLineProperties.containsProperty(DaoOptionKeys.RPC_USER) ?
@ -467,6 +477,11 @@ public class BisqEnvironment extends StandardEnvironment {
if (torStreamIsolation)
setProperty(NetworkOptionKeys.TOR_STREAM_ISOLATION, "true");
setProperty(NetworkOptionKeys.MSG_THROTTLE_PER_SEC, msgThrottlePerSec);
setProperty(NetworkOptionKeys.MSG_THROTTLE_PER_10_SEC, msgThrottlePer10Sec);
setProperty(NetworkOptionKeys.SEND_MSG_THROTTLE_TRIGGER, sendMsgThrottleTrigger);
setProperty(NetworkOptionKeys.SEND_MSG_THROTTLE_SLEEP, sendMsgThrottleSleep);
setProperty(AppOptionKeys.APP_DATA_DIR_KEY, appDataDir);
setProperty(AppOptionKeys.DESKTOP_WITH_HTTP_API, desktopWithHttpApi);
setProperty(AppOptionKeys.DESKTOP_WITH_GRPC_API, desktopWithGrpcApi);

View file

@ -33,6 +33,7 @@ import bisq.core.trade.TradeManager;
import bisq.network.NetworkOptionKeys;
import bisq.network.p2p.P2PService;
import bisq.network.p2p.network.ConnectionConfig;
import bisq.common.CommonOptionKeys;
import bisq.common.UserThread;
@ -416,6 +417,27 @@ public abstract class BisqExecutable implements GracefulShutDownHandler, BisqSet
parser.accepts(NetworkOptionKeys.TOR_STREAM_ISOLATION,
"Use stream isolation for Tor [experimental!].");
parser.accepts(NetworkOptionKeys.MSG_THROTTLE_PER_SEC,
format("Message throttle per sec for connection class (default: %s)",
String.valueOf(ConnectionConfig.MSG_THROTTLE_PER_SEC)))
.withRequiredArg()
.ofType(int.class);
parser.accepts(NetworkOptionKeys.MSG_THROTTLE_PER_10_SEC,
format("Message throttle per 10 sec for connection class (default: %s)",
String.valueOf(ConnectionConfig.MSG_THROTTLE_PER_10_SEC)))
.withRequiredArg()
.ofType(int.class);
parser.accepts(NetworkOptionKeys.SEND_MSG_THROTTLE_TRIGGER,
format("Time in ms when we trigger a sleep if 2 messages are sent (default: %s)",
String.valueOf(ConnectionConfig.SEND_MSG_THROTTLE_TRIGGER)))
.withRequiredArg()
.ofType(int.class);
parser.accepts(NetworkOptionKeys.SEND_MSG_THROTTLE_SLEEP,
format("Pause in ms to sleep if we get too many messages to send (default: %s)",
String.valueOf(ConnectionConfig.SEND_MSG_THROTTLE_SLEEP)))
.withRequiredArg()
.ofType(int.class);
//AppOptionKeys
parser.accepts(AppOptionKeys.USER_DATA_DIR_KEY,
format("User data directory (default: %s)", BisqEnvironment.DEFAULT_USER_DATA_DIR))

View file

@ -36,4 +36,8 @@ public class NetworkOptionKeys {
public static final String EXTERNAL_TOR_COOKIE_FILE = "torControlCookieFile";
public static final String EXTERNAL_TOR_USE_SAFECOOKIE = "torControlUseSafeCookieAuth";
public static final String TOR_STREAM_ISOLATION = "torStreamIsolation";
public static final String MSG_THROTTLE_PER_SEC = "msgThrottlePerSec";
public static final String MSG_THROTTLE_PER_10_SEC = "msgThrottlePer10Sec";
public static final String SEND_MSG_THROTTLE_TRIGGER = "sendMsgThrottleTrigger";
public static final String SEND_MSG_THROTTLE_SLEEP = "sendMsgThrottleSleep";
}

View file

@ -19,6 +19,8 @@ package bisq.network.p2p;
import bisq.network.NetworkOptionKeys;
import bisq.network.Socks5ProxyProvider;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.ConnectionConfig;
import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.peers.BanList;
import bisq.network.p2p.peers.Broadcaster;
@ -64,10 +66,12 @@ public class P2PModule extends AppModule {
bind(KeepAliveManager.class).in(Singleton.class);
bind(Broadcaster.class).in(Singleton.class);
bind(BanList.class).in(Singleton.class);
bind(ConnectionConfig.class).in(Singleton.class);
bind(NetworkNode.class).toProvider(NetworkNodeProvider.class).in(Singleton.class);
bind(Socks5ProxyProvider.class).in(Singleton.class);
requestStaticInjection(Connection.class);
Boolean useLocalhostForP2P = environment.getProperty(NetworkOptionKeys.USE_LOCALHOST_FOR_P2P, boolean.class, false);
bind(boolean.class).annotatedWith(Names.named(NetworkOptionKeys.USE_LOCALHOST_FOR_P2P)).toInstance(useLocalhostForP2P);
@ -93,7 +97,11 @@ public class P2PModule extends AppModule {
bindConstant().annotatedWith(named(NetworkOptionKeys.EXTERNAL_TOR_CONTROL_PORT)).to(environment.getRequiredProperty(NetworkOptionKeys.EXTERNAL_TOR_CONTROL_PORT));
bindConstant().annotatedWith(named(NetworkOptionKeys.EXTERNAL_TOR_PASSWORD)).to(environment.getRequiredProperty(NetworkOptionKeys.EXTERNAL_TOR_PASSWORD));
bindConstant().annotatedWith(named(NetworkOptionKeys.EXTERNAL_TOR_COOKIE_FILE)).to(environment.getRequiredProperty(NetworkOptionKeys.EXTERNAL_TOR_COOKIE_FILE));
bindConstant().annotatedWith(named(NetworkOptionKeys.EXTERNAL_TOR_USE_SAFECOOKIE)).to(environment.containsProperty(NetworkOptionKeys.EXTERNAL_TOR_USE_SAFECOOKIE) ? true : false);
bindConstant().annotatedWith(named(NetworkOptionKeys.TOR_STREAM_ISOLATION)).to(environment.containsProperty(NetworkOptionKeys.TOR_STREAM_ISOLATION) ? true : false);
bindConstant().annotatedWith(named(NetworkOptionKeys.EXTERNAL_TOR_USE_SAFECOOKIE)).to(environment.containsProperty(NetworkOptionKeys.EXTERNAL_TOR_USE_SAFECOOKIE));
bindConstant().annotatedWith(named(NetworkOptionKeys.TOR_STREAM_ISOLATION)).to(environment.containsProperty(NetworkOptionKeys.TOR_STREAM_ISOLATION));
bindConstant().annotatedWith(named(NetworkOptionKeys.MSG_THROTTLE_PER_SEC)).to(environment.getRequiredProperty(NetworkOptionKeys.MSG_THROTTLE_PER_SEC));
bindConstant().annotatedWith(named(NetworkOptionKeys.MSG_THROTTLE_PER_10_SEC)).to(environment.getRequiredProperty(NetworkOptionKeys.MSG_THROTTLE_PER_10_SEC));
bindConstant().annotatedWith(named(NetworkOptionKeys.SEND_MSG_THROTTLE_TRIGGER)).to(environment.getRequiredProperty(NetworkOptionKeys.SEND_MSG_THROTTLE_TRIGGER));
bindConstant().annotatedWith(named(NetworkOptionKeys.SEND_MSG_THROTTLE_SLEEP)).to(environment.getRequiredProperty(NetworkOptionKeys.SEND_MSG_THROTTLE_SLEEP));
}
}

View file

@ -48,6 +48,8 @@ import bisq.common.util.Utilities;
import io.bisq.generated.protobuffer.PB;
import javax.inject.Inject;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
@ -113,18 +115,20 @@ public class Connection implements MessageListener {
// Static
///////////////////////////////////////////////////////////////////////////////////////////
@Inject
private static ConnectionConfig connectionConfig;
// Leaving some constants package-private for tests to know limits.
static final int PERMITTED_MESSAGE_SIZE = 200 * 1024; // 200 kb
static final int MAX_PERMITTED_MESSAGE_SIZE = 10 * 1024 * 1024; // 10 MB (425 offers resulted in about 660 kb, mailbox msg will add more to it) offer has usually 2 kb, mailbox 3kb.
//TODO decrease limits again after testing
static final int MSG_THROTTLE_PER_SEC = 200; // With MAX_MSG_SIZE of 200kb results in bandwidth of 40MB/sec or 5 mbit/sec
static final int MSG_THROTTLE_PER_10_SEC = 1000; // With MAX_MSG_SIZE of 200kb results in bandwidth of 20MB/sec or 2.5 mbit/sec
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(120);
public static int getPermittedMessageSize() {
return PERMITTED_MESSAGE_SIZE;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Class fields
///////////////////////////////////////////////////////////////////////////////////////////
@ -138,6 +142,10 @@ public class Connection implements MessageListener {
// holder of state shared between InputHandler and Connection
private final SharedModel sharedModel;
private final Statistic statistic;
private final int msgThrottlePer10Sec;
private final int msgThrottlePerSec;
private final int sendMsgThrottleTrigger;
private final int sendMsgThrottleSleep;
// set in init
private InputHandler inputHandler;
@ -158,13 +166,21 @@ public class Connection implements MessageListener {
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener,
@Nullable NodeAddress peersNodeAddress, NetworkProtoResolver networkProtoResolver) {
Connection(Socket socket,
MessageListener messageListener,
ConnectionListener connectionListener,
@Nullable NodeAddress peersNodeAddress,
NetworkProtoResolver networkProtoResolver) {
this.socket = socket;
this.connectionListener = connectionListener;
uid = UUID.randomUUID().toString();
statistic = new Statistic();
msgThrottlePerSec = connectionConfig.getMsgThrottlePerSec();
msgThrottlePer10Sec = connectionConfig.getMsgThrottlePer10Sec();
sendMsgThrottleTrigger = connectionConfig.getSendMsgThrottleTrigger();
sendMsgThrottleSleep = connectionConfig.getSendMsgThrottleSleep();
addMessageListener(messageListener);
sharedModel = new SharedModel(this, socket);
@ -200,7 +216,6 @@ public class Connection implements MessageListener {
log.trace("New connection created: " + this.toString());
UserThread.execute(() -> connectionListener.onConnection(this));
} catch (Throwable e) {
handleException(e);
}
@ -228,15 +243,16 @@ public class Connection implements MessageListener {
// Throttle outbound network_messages
long now = System.currentTimeMillis();
long elapsed = now - lastSendTimeStamp;
if (elapsed < 20) {
log.debug("We got 2 sendMessage requests in less than 20 ms. We set the thread to sleep " +
"for 50 ms to avoid flooding our peer. lastSendTimeStamp={}, now={}, elapsed={}",
lastSendTimeStamp, now, elapsed);
Thread.sleep(50);
if (elapsed < sendMsgThrottleTrigger) {
log.warn("We got 2 sendMessage requests in less than {} ms. We set the thread to sleep " +
"for {} ms to avoid flooding our peer. lastSendTimeStamp={}, now={}, elapsed={}, networkEnvelope={}",
sendMsgThrottleTrigger, sendMsgThrottleSleep, lastSendTimeStamp, now, elapsed,
networkEnvelope.getClass().getSimpleName());
Thread.sleep(sendMsgThrottleSleep);
}
lastSendTimeStamp = now;
String peersNodeAddress = peersNodeAddressOptional.isPresent() ? peersNodeAddressOptional.get().toString() : "null";
String peersNodeAddress = peersNodeAddressOptional.map(NodeAddress::toString).orElse("null");
PB.NetworkEnvelope proto = networkEnvelope.toProtoNetworkEnvelope();
log.debug("Sending message: {}", Utilities.toTruncatedString(proto.toString(), 10000));
@ -349,9 +365,9 @@ public class Connection implements MessageListener {
long now = System.currentTimeMillis();
boolean violated = false;
//TODO remove message storage after network is tested stable
if (messageTimeStamps.size() >= MSG_THROTTLE_PER_SEC) {
if (messageTimeStamps.size() >= msgThrottlePerSec) {
// check if we got more than 200 (MSG_THROTTLE_PER_SEC) msg per sec.
long compareValue = messageTimeStamps.get(messageTimeStamps.size() - MSG_THROTTLE_PER_SEC).first;
long compareValue = messageTimeStamps.get(messageTimeStamps.size() - msgThrottlePerSec).first;
// if duration < 1 sec we received too much network_messages
violated = now - compareValue < TimeUnit.SECONDS.toMillis(1);
if (violated) {
@ -363,10 +379,10 @@ public class Connection implements MessageListener {
}
}
if (messageTimeStamps.size() >= MSG_THROTTLE_PER_10_SEC) {
if (messageTimeStamps.size() >= msgThrottlePer10Sec) {
if (!violated) {
// check if we got more than 50 msg per 10 sec.
long compareValue = messageTimeStamps.get(messageTimeStamps.size() - MSG_THROTTLE_PER_10_SEC).first;
long compareValue = messageTimeStamps.get(messageTimeStamps.size() - msgThrottlePer10Sec).first;
// if duration < 10 sec we received too much network_messages
violated = now - compareValue < TimeUnit.SECONDS.toMillis(10);
@ -380,7 +396,7 @@ public class Connection implements MessageListener {
}
}
// we limit to max 1000 (MSG_THROTTLE_PER_10SEC) entries
while(messageTimeStamps.size() > MSG_THROTTLE_PER_10_SEC)
while (messageTimeStamps.size() > msgThrottlePer10Sec)
messageTimeStamps.remove(0);
messageTimeStamps.add(new Tuple2<>(now, networkEnvelope.getClass().getName()));
@ -395,7 +411,7 @@ public class Connection implements MessageListener {
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
checkArgument(connection.equals(this));
UserThread.execute(() -> messageListeners.stream().forEach(e -> e.onMessage(networkEnvelope, connection)));
UserThread.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection)));
}
@ -478,7 +494,7 @@ public class Connection implements MessageListener {
public void shutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) {
log.debug("shutDown: nodeAddressOpt={}, closeConnectionReason={}", this.peersNodeAddressOptional, closeConnectionReason);
if (!stopped) {
String peersNodeAddress = peersNodeAddressOptional.isPresent() ? peersNodeAddressOptional.get().toString() : "null";
String peersNodeAddress = peersNodeAddressOptional.map(NodeAddress::toString).orElse("null");
log.debug("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" +
"ShutDown connection:"
+ "\npeersNodeAddress=" + peersNodeAddress
@ -606,7 +622,7 @@ public class Connection implements MessageListener {
private List<Integer> supportedCapabilities;
public SharedModel(Connection connection, Socket socket) {
SharedModel(Connection connection, Socket socket) {
this.connection = connection;
this.socket = socket;
}
@ -704,7 +720,7 @@ public class Connection implements MessageListener {
stopped = true;
}
public RuleViolation getRuleViolation() {
RuleViolation getRuleViolation() {
return ruleViolation;
}
@ -743,11 +759,11 @@ public class Connection implements MessageListener {
private long lastReadTimeStamp;
private boolean threadNameSet;
public InputHandler(SharedModel sharedModel,
InputStream protoInputStream,
String portInfo,
MessageListener messageListener,
NetworkProtoResolver networkProtoResolver) {
InputHandler(SharedModel sharedModel,
InputStream protoInputStream,
String portInfo,
MessageListener messageListener,
NetworkProtoResolver networkProtoResolver) {
this.sharedModel = sharedModel;
this.protoInputStream = protoInputStream;
this.portInfo = portInfo;

View file

@ -0,0 +1,68 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.network.p2p.network;
import bisq.network.NetworkOptionKeys;
import com.google.inject.name.Named;
import javax.inject.Inject;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ConnectionConfig {
public static final int MSG_THROTTLE_PER_SEC = 200; // With MAX_MSG_SIZE of 200kb results in bandwidth of 40MB/sec or 5 mbit/sec
public static final int MSG_THROTTLE_PER_10_SEC = 1000; // With MAX_MSG_SIZE of 200kb results in bandwidth of 20MB/sec or 2.5 mbit/sec
public static final int SEND_MSG_THROTTLE_TRIGGER = 20; // Time in ms when we trigger a sleep if 2 messages are sent
public static final int SEND_MSG_THROTTLE_SLEEP = 50; // Pause in ms to sleep if we get too many messages to send
@Getter
private int msgThrottlePerSec;
@Getter
private int msgThrottlePer10Sec;
@Getter
private int sendMsgThrottleTrigger;
@Getter
private int sendMsgThrottleSleep;
@Inject
public ConnectionConfig(@Named(NetworkOptionKeys.MSG_THROTTLE_PER_SEC) int msgThrottlePerSec,
@Named(NetworkOptionKeys.MSG_THROTTLE_PER_10_SEC) int msgThrottlePer10Sec,
@Named(NetworkOptionKeys.SEND_MSG_THROTTLE_TRIGGER) int sendMsgThrottleTrigger,
@Named(NetworkOptionKeys.SEND_MSG_THROTTLE_SLEEP) int sendMsgThrottleSleep) {
this.msgThrottlePerSec = msgThrottlePerSec;
this.msgThrottlePer10Sec = msgThrottlePer10Sec;
this.sendMsgThrottleTrigger = sendMsgThrottleTrigger;
this.sendMsgThrottleSleep = sendMsgThrottleSleep;
log.info(this.toString());
}
@Override
public String toString() {
return "ConnectionConfig{" +
"\n msgThrottlePerSec=" + msgThrottlePerSec +
",\n msgThrottlePer10Sec=" + msgThrottlePer10Sec +
",\n sendMsgThrottleTrigger=" + sendMsgThrottleTrigger +
",\n sendMsgThrottleSleep=" + sendMsgThrottleSleep +
"\n}";
}
}