This commit is contained in:
Manfred Karrer 2016-02-15 23:33:20 +01:00
parent 540f49f1e1
commit fc1ab8a346
11 changed files with 140 additions and 53 deletions

View file

@ -28,7 +28,6 @@ import io.bitsquare.gui.util.BSFormatter;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.P2PServiceListener;
import io.bitsquare.p2p.network.LocalhostNetworkNode;
import io.bitsquare.user.Preferences;
import javafx.beans.value.ChangeListener;
import javafx.collections.FXCollections;
@ -84,9 +83,6 @@ public class NetworkSettingsView extends ActivatableViewAndModel<GridPane, Activ
this.p2PService = p2PService;
this.preferences = preferences;
this.formatter = formatter;
BitcoinNetwork bitcoinNetwork = preferences.getBitcoinNetwork();
boolean useLocalhost = p2PService.getNetworkNode() instanceof LocalhostNetworkNode;
}
public void initialize() {

View file

@ -34,32 +34,32 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
public class NetworkStatisticListItem {
class NetworkStatisticListItem {
private static final Logger log = LoggerFactory.getLogger(NetworkStatisticListItem.class);
final Statistic statistic;
private final Statistic statistic;
private final Connection connection;
private final Subscription sentBytesSubscription, receivedBytesSubscription;
private final Timer timer;
private BSFormatter formatter;
private final BSFormatter formatter;
private StringProperty lastActivity = new SimpleStringProperty();
private StringProperty sentBytes = new SimpleStringProperty();
private StringProperty receivedBytes = new SimpleStringProperty();
private final StringProperty lastActivity = new SimpleStringProperty();
private final StringProperty sentBytes = new SimpleStringProperty();
private final StringProperty receivedBytes = new SimpleStringProperty();
public NetworkStatisticListItem(Connection connection, BSFormatter formatter) {
this.connection = connection;
this.formatter = formatter;
this.statistic = connection.getStatistic();
sentBytesSubscription = EasyBind.subscribe(statistic.sentBytesProperty,
sentBytesSubscription = EasyBind.subscribe(statistic.sentBytesProperty(),
e -> sentBytes.set(formatter.formatBytes((int) e)));
receivedBytesSubscription = EasyBind.subscribe(statistic.receivedBytesProperty,
receivedBytesSubscription = EasyBind.subscribe(statistic.receivedBytesProperty(),
e -> receivedBytes.set(formatter.formatBytes((int) e)));
timer = FxTimer.runPeriodically(Duration.ofMillis(1000),
() -> UserThread.execute(() -> onLastActivityChanged(statistic.lastActivityTimestampProperty.get())));
onLastActivityChanged(statistic.lastActivityTimestampProperty.get());
() -> UserThread.execute(() -> onLastActivityChanged(statistic.getLastActivityTimestamp())));
onLastActivityChanged(statistic.getLastActivityTimestamp());
}
private void onLastActivityChanged(long timeStamp) {

View file

@ -12,6 +12,7 @@ import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.Utils;
import io.bitsquare.p2p.network.messages.CloseConnectionMessage;
import io.bitsquare.p2p.network.messages.SendersNodeAddressMessage;
import io.bitsquare.p2p.peers.keepalive.messages.KeepAliveMessage;
import javafx.beans.property.ObjectProperty;
import javafx.beans.property.ReadOnlyObjectProperty;
import javafx.beans.property.SimpleObjectProperty;
@ -47,7 +48,7 @@ public class Connection implements MessageListener {
public enum PeerType {
SEED_NODE,
PEER,
DIRECT_MSG_PEER;
DIRECT_MSG_PEER
}
@ -93,7 +94,7 @@ public class Connection implements MessageListener {
private final boolean useCompression = false;
private PeerType peerType;
private final ObjectProperty<NodeAddress> nodeAddressProperty = new SimpleObjectProperty<>();
private List<Long> messageTimeStamps = new ArrayList<>();
private final List<Long> messageTimeStamps = new ArrayList<>();
private final CopyOnWriteArraySet<MessageListener> messageListeners = new CopyOnWriteArraySet<>();
@ -192,7 +193,11 @@ public class Connection implements MessageListener {
objectOutputStream.flush();
statistic.addSentBytes(ByteArrayUtils.objectToByteArray(objectToWrite).length);
statistic.updateLastActivityTimestamp();
statistic.addSentMessage(message);
// We don't want to get the activity ts updated by ping/pong msg
if (!(message instanceof KeepAliveMessage))
statistic.updateLastActivityTimestamp();
}
} catch (IOException e) {
// an exception lead to a shutdown
@ -221,7 +226,7 @@ public class Connection implements MessageListener {
sharedModel.reportInvalidRequest(ruleViolation);
}
public boolean violatesThrottleLimit() {
private boolean violatesThrottleLimit() {
long now = System.currentTimeMillis();
boolean violated = false;
if (messageTimeStamps.size() >= MSG_THROTTLE_PER_SEC) {
@ -602,7 +607,7 @@ public class Connection implements MessageListener {
return;
}
Serializable serializable = null;
Serializable serializable;
if (useCompression) {
if (rawInputObject instanceof byte[]) {
byte[] compressedObjectAsBytes = (byte[]) rawInputObject;
@ -647,14 +652,18 @@ public class Connection implements MessageListener {
Connection connection = sharedModel.connection;
if (message instanceof CloseConnectionMessage) {
CloseConnectionReason[] values = CloseConnectionReason.values();
log.info("CloseConnectionMessage received. Reason={}\n\t" +
"connection={}", ((CloseConnectionMessage) message).reason, connection);
stop();
sharedModel.shutDown(CloseConnectionReason.CLOSE_REQUESTED_BY_PEER);
} else if (!stopped) {
connection.statistic.updateLastActivityTimestamp();
connection.statistic.addReceivedBytes(size);
connection.statistic.addReceivedMessage(message);
// We don't want to get the activity ts updated by ping/pong msg
if (!(message instanceof KeepAliveMessage))
connection.statistic.updateLastActivityTimestamp();
// First a seed node gets a message form a peer (PreliminaryDataRequest using
// AnonymousMessage interface) which does not has its hidden service

View file

@ -1,59 +1,142 @@
package io.bitsquare.p2p.network;
import io.bitsquare.common.UserThread;
import io.bitsquare.p2p.Message;
import javafx.beans.property.IntegerProperty;
import javafx.beans.property.LongProperty;
import javafx.beans.property.SimpleIntegerProperty;
import javafx.beans.property.SimpleLongProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class Statistic {
private static final Logger log = LoggerFactory.getLogger(Statistic.class);
private final Date creationDate;
private long lastActivityTimestamp;
private int sentBytes = 0;
private int receivedBytes = 0;
public LongProperty lastActivityTimestampProperty = new SimpleLongProperty(System.currentTimeMillis());
public IntegerProperty sentBytesProperty = new SimpleIntegerProperty(0);
public IntegerProperty receivedBytesProperty = new SimpleIntegerProperty(0);
///////////////////////////////////////////////////////////////////////////////////////////
// Static
///////////////////////////////////////////////////////////////////////////////////////////
private final static IntegerProperty totalSentBytes = new SimpleIntegerProperty(0);
private final static IntegerProperty totalReceivedBytes = new SimpleIntegerProperty(0);
public static int getTotalSentBytes() {
return totalSentBytes.get();
}
public static IntegerProperty totalSentBytesProperty() {
return totalSentBytes;
}
public static int getTotalReceivedBytes() {
return totalReceivedBytes.get();
}
public static IntegerProperty totalReceivedBytesProperty() {
return totalReceivedBytes;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Instance fields
///////////////////////////////////////////////////////////////////////////////////////////
private final Date creationDate;
private long lastActivityTimestamp = System.currentTimeMillis();
private final IntegerProperty sentBytes = new SimpleIntegerProperty(0);
private final IntegerProperty receivedBytes = new SimpleIntegerProperty(0);
private final Map<String, Integer> receivedMessages = new ConcurrentHashMap<>();
private final Map<String, Integer> sentMessages = new ConcurrentHashMap<>();
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
public Statistic() {
creationDate = new Date();
updateLastActivityTimestamp();
}
public Date getCreationDate() {
return creationDate;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Update, increment
///////////////////////////////////////////////////////////////////////////////////////////
public void updateLastActivityTimestamp() {
lastActivityTimestamp = System.currentTimeMillis();
lastActivityTimestampProperty.set(lastActivityTimestamp);
UserThread.execute(() -> lastActivityTimestamp = System.currentTimeMillis());
}
public void addSentBytes(int value) {
UserThread.execute(() -> {
sentBytes.set(sentBytes.get() + value);
totalSentBytes.set(totalSentBytes.get() + value);
});
}
public void addReceivedBytes(int value) {
UserThread.execute(() -> {
receivedBytes.set(receivedBytes.get() + value);
totalReceivedBytes.set(totalReceivedBytes.get() + value);
});
}
// TODO would need msg inspection to get useful information...
public void addReceivedMessage(Message message) {
String messageClassName = message.getClass().getSimpleName();
int counter = 1;
if (receivedMessages.containsKey(messageClassName))
counter = receivedMessages.get(messageClassName) + 1;
receivedMessages.put(messageClassName, counter);
}
public void addSentMessage(Message message) {
String messageClassName = message.getClass().getSimpleName();
int counter = 1;
if (sentMessages.containsKey(messageClassName))
counter = sentMessages.get(messageClassName) + 1;
sentMessages.put(messageClassName, counter);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Getters
///////////////////////////////////////////////////////////////////////////////////////////
public long getLastActivityTimestamp() {
return lastActivityTimestamp;
}
public int getSentBytes() {
return sentBytes.get();
}
public IntegerProperty sentBytesProperty() {
return sentBytes;
}
public void addSentBytes(int sentBytes) {
this.sentBytes += sentBytes;
sentBytesProperty.set(this.sentBytes);
public int getReceivedBytes() {
return receivedBytes.get();
}
public int getReceivedBytes() {
public IntegerProperty receivedBytesProperty() {
return receivedBytes;
}
public void addReceivedBytes(int receivedBytes) {
this.receivedBytes += receivedBytes;
receivedBytesProperty.set(this.receivedBytes);
public Date getCreationDate() {
return creationDate;
}
@Override
public String toString() {
return "Statistic{" +
"creationDate=" + creationDate +
", lastActivityTimestamp=" + lastActivityTimestamp +
", sentBytes=" + sentBytes +
", receivedBytes=" + receivedBytes +
'}';
}
}

View file

@ -16,10 +16,9 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.Random;
public class KeepAliveHandler implements MessageListener {
class KeepAliveHandler implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(KeepAliveHandler.class);
private Connection connection;
@ -31,7 +30,7 @@ public class KeepAliveHandler implements MessageListener {
public interface Listener {
void onComplete();
void onFault(String errorMessage, @Nullable Connection connection);
void onFault(String errorMessage, Connection connection);
}

View file

@ -6,7 +6,7 @@ public final class Ping extends KeepAliveMessage {
// That object is sent over the wire, so we need to take care of version compatibility.
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
public int nonce;
public final int nonce;
public Ping(int nonce) {
this.nonce = nonce;

View file

@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
public class GetPeersRequestHandler {
class GetPeersRequestHandler {
private static final Logger log = LoggerFactory.getLogger(GetPeersRequestHandler.class);
private static final long TIME_OUT_SEC = 20;

View file

@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
public class PeerExchangeHandler implements MessageListener {
class PeerExchangeHandler implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(PeerExchangeHandler.class);
private static final long TIME_OUT_SEC = 20;
@ -52,7 +52,7 @@ public class PeerExchangeHandler implements MessageListener {
private final Listener listener;
private final int nonce = new Random().nextInt();
private Timer timeoutTimer;
public Connection connection;
private Connection connection;
///////////////////////////////////////////////////////////////////////////////////////////

View file

@ -11,7 +11,7 @@ public class ReportedPeer implements Serializable {
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
public final NodeAddress nodeAddress;
public Date date;
public final Date date;
public ReportedPeer(NodeAddress nodeAddress) {
this.nodeAddress = nodeAddress;

View file

@ -12,7 +12,7 @@ public final class GetPeersRequest extends PeerExchangeMessage implements Sender
private static final long serialVersionUID = Version.P2P_NETWORK_VERSION;
private final NodeAddress senderNodeAddress;
public int nonce;
public final int nonce;
public final HashSet<ReportedPeer> reportedPeers;
public GetPeersRequest(NodeAddress senderNodeAddress, int nonce, HashSet<ReportedPeer> reportedPeers) {

View file

@ -3,7 +3,7 @@ package io.bitsquare.p2p.peers.peerexchange.messages;
import io.bitsquare.app.Version;
import io.bitsquare.p2p.Message;
public abstract class PeerExchangeMessage implements Message {
abstract class PeerExchangeMessage implements Message {
private final int messageVersion = Version.getP2PMessageVersion();
@Override