diff --git a/p2p/src/main/java/bisq/network/p2p/network/statistics/ConnectionStatistics.java b/p2p/src/main/java/bisq/network/p2p/network/statistics/ConnectionStatistics.java new file mode 100644 index 0000000000..7918696d05 --- /dev/null +++ b/p2p/src/main/java/bisq/network/p2p/network/statistics/ConnectionStatistics.java @@ -0,0 +1,111 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.network.statistics; + +import bisq.common.proto.network.NetworkEnvelope; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import lombok.Getter; +import lombok.Setter; + +@Getter +public class ConnectionStatistics { + + public interface Listener { + void onNewSentBytes(long numberOfNewBytes); + + void onNewReceivedBytes(long numberOfNewBytes); + + void onAddSentMessage(NetworkEnvelope networkEnvelope); + + void onAddReceivedMessage(NetworkEnvelope networkEnvelope); + } + + private final Date creationDate = new Date(); + private final List allListeners = new ArrayList<>(); + private final Map receivedMessages = new HashMap<>(); + private final Map sentMessages = new HashMap<>(); + + private long lastActivityTimestamp = System.currentTimeMillis(); + private long sentBytes; + private long receivedBytes; + private int totalSentMessages; + private int totalReceivedMessages; + @Setter + private int roundTripTime; + + public void addListener(Listener listener) { + allListeners.add(listener); + } + + public void removeListener(Listener listener) { + allListeners.remove(listener); + } + + void updateLastActivityTimestamp() { + lastActivityTimestamp = System.currentTimeMillis(); + } + + void addSentBytes(int value) { + sentBytes += value; + allListeners.forEach(listener -> listener.onNewSentBytes(value)); + } + + void addReceivedBytes(int value) { + receivedBytes += value; + allListeners.forEach(listener -> listener.onNewReceivedBytes(value)); + } + + void addSentMessage(NetworkEnvelope networkEnvelope) { + String messageClassName = networkEnvelope.getClass().getSimpleName(); + sentMessages.merge(messageClassName, 1, Integer::sum); + + totalSentMessages++; + allListeners.forEach(listener -> listener.onAddSentMessage(networkEnvelope)); + } + + void addReceivedMessage(NetworkEnvelope networkEnvelope) { + String messageClassName = networkEnvelope.getClass().getSimpleName(); + receivedMessages.merge(messageClassName, 1, Integer::sum); + + totalReceivedMessages++; + allListeners.forEach(listener -> listener.onAddReceivedMessage(networkEnvelope)); + } + + public long getLastActivityAge() { + return System.currentTimeMillis() - lastActivityTimestamp; + } + + @Override + public String toString() { + return "ConnectionStatistics{" + + "\n creationDate=" + creationDate + + ",\n lastActivityTimestamp=" + lastActivityTimestamp + + ",\n sentBytes=" + sentBytes + + ",\n receivedBytes=" + receivedBytes + + ",\n receivedMessages=" + receivedMessages + + ",\n sentMessages=" + sentMessages + + ",\n roundTripTime=" + roundTripTime + + "\n}"; + } +} diff --git a/p2p/src/main/java/bisq/network/p2p/network/statistics/ConnectionStatsAccumulator.java b/p2p/src/main/java/bisq/network/p2p/network/statistics/ConnectionStatsAccumulator.java new file mode 100644 index 0000000000..a5fd05ae7c --- /dev/null +++ b/p2p/src/main/java/bisq/network/p2p/network/statistics/ConnectionStatsAccumulator.java @@ -0,0 +1,91 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.network.statistics; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import lombok.Getter; + +@Getter +public class ConnectionStatsAccumulator implements Runnable { + private final long startTime = System.currentTimeMillis(); + + private final List allConnectionStatistics; + private final List allListeners = new CopyOnWriteArrayList<>(); + + private long totalSentBytes; + private long totalReceivedBytes; + + private int totalSentMessages; + private int totalReceivedMessages; + + private double totalSentMessagesPerSec; + private double totalReceivedMessagesPerSec; + + private double totalSentBytesPerSec; + private double totalReceivedBytesPerSec; + + public ConnectionStatsAccumulator(List allConnectionStatistics) { + this.allConnectionStatistics = allConnectionStatistics; + } + + @Override + public void run() { + long totalSentBytes = 0; + long totalReceivedBytes = 0; + + int totalSentMessages = 0; + int totalReceivedMessages = 0; + + for (ConnectionStatistics statistic : allConnectionStatistics) { + totalSentBytes += statistic.getSentBytes(); + totalReceivedBytes += statistic.getReceivedBytes(); + + totalSentMessages += statistic.getTotalSentMessages(); + totalReceivedMessages += statistic.getTotalReceivedMessages(); + } + + this.totalSentBytes = totalSentBytes; + this.totalReceivedBytes = totalReceivedBytes; + + this.totalSentMessages = totalSentMessages; + this.totalReceivedMessages = totalReceivedMessages; + + long passed = (System.currentTimeMillis() - startTime) / 1000; + totalSentMessagesPerSec = ((double) totalSentMessages / passed); + totalReceivedMessagesPerSec = ((double) totalReceivedMessages) / passed; + + totalSentBytesPerSec = ((double) totalSentBytes) / passed; + totalReceivedBytesPerSec = ((double) totalReceivedBytes) / passed; + + callListeners(); + } + + private void callListeners() { + allListeners.forEach(listener -> listener.onTotalSentStatsChanged(totalSentBytes, totalSentMessages, totalSentMessagesPerSec)); + } + + public void addListener(NetworkStatisticsService.Listener listener) { + allListeners.add(listener); + } + + public void removeListener(NetworkStatisticsService.Listener listener) { + allListeners.remove(listener); + } +} diff --git a/p2p/src/main/java/bisq/network/p2p/network/statistics/NetworkStatisticsService.java b/p2p/src/main/java/bisq/network/p2p/network/statistics/NetworkStatisticsService.java new file mode 100644 index 0000000000..78cc25905c --- /dev/null +++ b/p2p/src/main/java/bisq/network/p2p/network/statistics/NetworkStatisticsService.java @@ -0,0 +1,98 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.network.statistics; + +import bisq.common.util.Utilities; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Getter +public class NetworkStatisticsService { + + public interface Listener { + void onTotalSentStatsChanged(long totalSentBytes, long totalSentMessages, double totalSentMessagesPerSec); + } + + private final long startTime = System.currentTimeMillis(); + + private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + private final List allConnectionStatistics = new CopyOnWriteArrayList<>(); + private final ConnectionStatsAccumulator connectionStatsAccumulator = + new ConnectionStatsAccumulator(allConnectionStatistics); + + private final Map totalSentMessages = new HashMap<>(); + private final Map totalReceivedMessages = new HashMap<>(); + + public ConnectionStatistics newConnectionStatistics() { + var connectionStatistics = new ConnectionStatistics(); + allConnectionStatistics.add(connectionStatistics); + return connectionStatistics; + } + + public void start() { + scheduledExecutorService.scheduleAtFixedRate( + connectionStatsAccumulator, 1, 1, TimeUnit.SECONDS + ); + scheduledExecutorService.scheduleAtFixedRate( + createStatisticsLogger(), 1, 1, TimeUnit.HOURS + ); + } + + public void shutdown() { + scheduledExecutorService.shutdownNow(); + } + + public void addListener(Listener listener) { + connectionStatsAccumulator.addListener(listener); + } + + public void removeListener(Listener listener) { + connectionStatsAccumulator.removeListener(listener); + } + + private Runnable createStatisticsLogger() { + return () -> { + ConnectionStatsAccumulator allStats = connectionStatsAccumulator; + String ls = System.lineSeparator(); + + log.info("Accumulated network statistics:" + ls + + "Bytes sent: {};" + ls + + "Number of sent messages/Sent messages: {} / {};" + ls + + "Number of sent messages per sec: {};" + ls + + "Bytes received: {}" + ls + + "Number of received messages/Received messages: {} / {};" + ls + + "Number of received messages per sec: {}" + ls, + Utilities.readableFileSize(allStats.getTotalSentBytes()), + allStats.getTotalSentMessages(), totalSentMessages, + allStats.getTotalSentMessagesPerSec(), + Utilities.readableFileSize(allStats.getTotalReceivedBytes()), + allStats.getTotalReceivedMessages(), totalReceivedMessages, + allStats.getTotalReceivedMessagesPerSec()); + }; + } +} diff --git a/p2p/src/test/java/bisq/network/p2p/network/statistics/ConnectionStatisticsTest.java b/p2p/src/test/java/bisq/network/p2p/network/statistics/ConnectionStatisticsTest.java new file mode 100644 index 0000000000..e39a467b46 --- /dev/null +++ b/p2p/src/test/java/bisq/network/p2p/network/statistics/ConnectionStatisticsTest.java @@ -0,0 +1,306 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.network.statistics; + +import bisq.network.p2p.AckMessage; +import bisq.network.p2p.storage.messages.AddDataMessage; + +import bisq.common.proto.network.NetworkEnvelope; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +public class ConnectionStatisticsTest { + private final ConnectionStatistics statistics = new ConnectionStatistics(); + + @Test + public void updateLastActivityTimestampTest() throws InterruptedException { + var statistics = new ConnectionStatistics(); + long firstTimestamp = statistics.getLastActivityTimestamp(); + + Thread.sleep(200); + statistics.updateLastActivityTimestamp(); + + assertTrue(statistics.getLastActivityTimestamp() > firstTimestamp); + } + + @Test + public void addSentBytesTest() { + for (int i = 0; i < 5; i++) { + statistics.addSentBytes(100); + } + + assertEquals(500, statistics.getSentBytes()); + } + + @Test + public void addReceivedBytesTest() { + for (int i = 0; i < 5; i++) { + statistics.addReceivedBytes(100); + } + + assertEquals(500, statistics.getReceivedBytes()); + } + + @Test + public void addSentMessageTest() { + AckMessage ackMessage = mock(AckMessage.class); + for (int i = 0; i < 3; i++) { + statistics.addSentMessage(ackMessage); + } + + AddDataMessage addDataMessage = mock(AddDataMessage.class); + for (int i = 0; i < 5; i++) { + statistics.addSentMessage(addDataMessage); + } + + Map countByMessageClassName = statistics.getSentMessages(); + + String ackMessageClassName = ackMessage.getClass().getSimpleName(); + int counter = countByMessageClassName.get(ackMessageClassName); + assertEquals(3, counter); + + String addDataMessageClassName = addDataMessage.getClass().getSimpleName(); + counter = countByMessageClassName.get(addDataMessageClassName); + assertEquals(5, counter); + } + + @Test + public void addReceivedMessageTest() { + AckMessage ackMessage = mock(AckMessage.class); + for (int i = 0; i < 3; i++) { + statistics.addReceivedMessage(ackMessage); + } + + AddDataMessage addDataMessage = mock(AddDataMessage.class); + for (int i = 0; i < 5; i++) { + statistics.addReceivedMessage(addDataMessage); + } + + Map countByMessageClassName = statistics.getReceivedMessages(); + + String ackMessageClassName = ackMessage.getClass().getSimpleName(); + int counter = countByMessageClassName.get(ackMessageClassName); + assertEquals(3, counter); + + String addDataMessageClassName = addDataMessage.getClass().getSimpleName(); + counter = countByMessageClassName.get(addDataMessageClassName); + assertEquals(5, counter); + } + + @Test + public void numberOfTotalSentMessages() { + var statistics = new ConnectionStatistics(); + + AckMessage ackMessage = mock(AckMessage.class); + for (int i = 0; i < 3; i++) { + statistics.addSentMessage(ackMessage); + } + + AddDataMessage addDataMessage = mock(AddDataMessage.class); + for (int i = 0; i < 5; i++) { + statistics.addSentMessage(addDataMessage); + } + + assertEquals(8, statistics.getTotalSentMessages()); + } + + @Test + public void numberOfTotalReceivedMessages() { + var statistics = new ConnectionStatistics(); + + AckMessage ackMessage = mock(AckMessage.class); + for (int i = 0; i < 3; i++) { + statistics.addReceivedMessage(ackMessage); + } + + AddDataMessage addDataMessage = mock(AddDataMessage.class); + for (int i = 0; i < 5; i++) { + statistics.addReceivedMessage(addDataMessage); + } + + assertEquals(8, statistics.getTotalReceivedMessages()); + } + + @Test + public void getLastActivityAge() throws InterruptedException { + var statistics = new ConnectionStatistics(); + Thread.sleep(200); + assertTrue(statistics.getLastActivityAge() > 100); + } + + @Test + public void onSentBytesUpdatedListenerTest() { + var listener = new ConnectionStatistics.Listener() { + long onSentBytes; + + @Override + public void onNewSentBytes(long numberOfNewBytes) { + onSentBytes += numberOfNewBytes; + } + + @Override + public void onNewReceivedBytes(long numberOfNewBytes) { + } + + @Override + public void onAddSentMessage(NetworkEnvelope networkEnvelope) { + } + + @Override + public void onAddReceivedMessage(NetworkEnvelope networkEnvelope) { + } + }; + statistics.addListener(listener); + + for (int i = 0; i < 5; i++) { + statistics.addSentBytes(100); + } + + assertEquals(500, listener.onSentBytes); + } + + @Test + public void onReceivedBytesUpdatedListenerTest() { + var listener = new ConnectionStatistics.Listener() { + long onReceivedBytes; + + @Override + public void onNewSentBytes(long numberOfNewBytes) { + } + + @Override + public void onNewReceivedBytes(long numberOfNewBytes) { + onReceivedBytes += numberOfNewBytes; + } + + @Override + public void onAddSentMessage(NetworkEnvelope networkEnvelope) { + } + + @Override + public void onAddReceivedMessage(NetworkEnvelope networkEnvelope) { + } + }; + statistics.addListener(listener); + + for (int i = 0; i < 3; i++) { + statistics.addReceivedBytes(100); + } + + assertEquals(300, listener.onReceivedBytes); + } + + @Test + public void onAddSentMessageListenerTest() { + var listener = new ConnectionStatistics.Listener() { + final Map counterByClassName = new HashMap<>(); + + @Override + public void onNewSentBytes(long numberOfNewBytes) { + } + + @Override + public void onNewReceivedBytes(long numberOfNewBytes) { + } + + @Override + public void onAddSentMessage(NetworkEnvelope networkEnvelope) { + String messageClassName = networkEnvelope.getClass().getSimpleName(); + counterByClassName.merge(messageClassName, 1, Integer::sum); + } + + @Override + public void onAddReceivedMessage(NetworkEnvelope networkEnvelope) { + } + }; + statistics.addListener(listener); + + AckMessage ackMessage = mock(AckMessage.class); + for (int i = 0; i < 3; i++) { + statistics.addSentMessage(ackMessage); + } + + AddDataMessage addDataMessage = mock(AddDataMessage.class); + for (int i = 0; i < 5; i++) { + statistics.addSentMessage(addDataMessage); + } + + Map countByMessageClassName = listener.counterByClassName; + + String ackMessageClassName = ackMessage.getClass().getSimpleName(); + int counter = countByMessageClassName.get(ackMessageClassName); + assertEquals(3, counter); + + String addDataMessageClassName = addDataMessage.getClass().getSimpleName(); + counter = countByMessageClassName.get(addDataMessageClassName); + assertEquals(5, counter); + } + + @Test + public void onAddReceivedMessageListenerTest() { + var listener = new ConnectionStatistics.Listener() { + final Map counterByClassName = new HashMap<>(); + + @Override + public void onNewSentBytes(long numberOfNewBytes) { + } + + @Override + public void onNewReceivedBytes(long numberOfNewBytes) { + } + + @Override + public void onAddSentMessage(NetworkEnvelope networkEnvelope) { + } + + @Override + public void onAddReceivedMessage(NetworkEnvelope networkEnvelope) { + String messageClassName = networkEnvelope.getClass().getSimpleName(); + counterByClassName.merge(messageClassName, 1, Integer::sum); + } + }; + statistics.addListener(listener); + + AckMessage ackMessage = mock(AckMessage.class); + for (int i = 0; i < 3; i++) { + statistics.addReceivedMessage(ackMessage); + } + + AddDataMessage addDataMessage = mock(AddDataMessage.class); + for (int i = 0; i < 5; i++) { + statistics.addReceivedMessage(addDataMessage); + } + + Map countByMessageClassName = listener.counterByClassName; + + String ackMessageClassName = ackMessage.getClass().getSimpleName(); + int counter = countByMessageClassName.get(ackMessageClassName); + assertEquals(3, counter); + + String addDataMessageClassName = addDataMessage.getClass().getSimpleName(); + counter = countByMessageClassName.get(addDataMessageClassName); + assertEquals(5, counter); + } +} diff --git a/p2p/src/test/java/bisq/network/p2p/network/statistics/ConnectionStatsAccumulatorTest.java b/p2p/src/test/java/bisq/network/p2p/network/statistics/ConnectionStatsAccumulatorTest.java new file mode 100644 index 0000000000..16a9c3596d --- /dev/null +++ b/p2p/src/test/java/bisq/network/p2p/network/statistics/ConnectionStatsAccumulatorTest.java @@ -0,0 +1,211 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.network.statistics; + +import bisq.network.p2p.AckMessage; +import bisq.network.p2p.storage.messages.AddDataMessage; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +public class ConnectionStatsAccumulatorTest { + + private final NetworkStatisticsService networkStatisticsService = new NetworkStatisticsService(); + + @Test + public void totalSentBytes() { + List allConnectionStatistics = new ArrayList<>(); + var connectionStatsAccumulator = new ConnectionStatsAccumulator(allConnectionStatistics); + + ConnectionStatistics firstConnectionStatistics = networkStatisticsService.newConnectionStatistics(); + firstConnectionStatistics.addSentBytes(100); + + ConnectionStatistics secondConnectionStatistics = networkStatisticsService.newConnectionStatistics(); + secondConnectionStatistics.addSentBytes(100); + + ConnectionStatistics thirdConnectionStatistics = networkStatisticsService.newConnectionStatistics(); + thirdConnectionStatistics.addSentBytes(100); + + allConnectionStatistics.add(firstConnectionStatistics); + allConnectionStatistics.add(secondConnectionStatistics); + allConnectionStatistics.add(thirdConnectionStatistics); + + connectionStatsAccumulator.run(); + + assertEquals(300, connectionStatsAccumulator.getTotalSentBytes()); + } + + @Test + public void totalReceivedBytes() { + List allConnectionStatistics = new ArrayList<>(); + var connectionStatsAccumulator = new ConnectionStatsAccumulator(allConnectionStatistics); + + ConnectionStatistics firstConnectionStatistics = networkStatisticsService.newConnectionStatistics(); + firstConnectionStatistics.addReceivedBytes(100); + + ConnectionStatistics secondConnectionStatistics = networkStatisticsService.newConnectionStatistics(); + secondConnectionStatistics.addReceivedBytes(100); + + ConnectionStatistics thirdConnectionStatistics = networkStatisticsService.newConnectionStatistics(); + thirdConnectionStatistics.addReceivedBytes(100); + + allConnectionStatistics.add(firstConnectionStatistics); + allConnectionStatistics.add(secondConnectionStatistics); + allConnectionStatistics.add(thirdConnectionStatistics); + + connectionStatsAccumulator.run(); + + assertEquals(300, connectionStatsAccumulator.getTotalReceivedBytes()); + } + + @Test + public void totalSentMessages() { + ConnectionStatistics firstConnectionStatistics = networkStatisticsService.newConnectionStatistics(); + ConnectionStatistics secondConnectionStatistics = networkStatisticsService.newConnectionStatistics(); + ConnectionStatistics thirdConnectionStatistics = networkStatisticsService.newConnectionStatistics(); + + List allConnectionStatistics = List.of( + firstConnectionStatistics, secondConnectionStatistics, thirdConnectionStatistics + ); + var connectionStatsAccumulator = new ConnectionStatsAccumulator(allConnectionStatistics); + + AckMessage ackMessage = mock(AckMessage.class); + for (int i = 0; i < 3; i++) { + firstConnectionStatistics.addSentMessage(ackMessage); + secondConnectionStatistics.addSentMessage(ackMessage); + thirdConnectionStatistics.addSentMessage(ackMessage); + } + + AddDataMessage addDataMessage = mock(AddDataMessage.class); + for (int i = 0; i < 5; i++) { + firstConnectionStatistics.addSentMessage(addDataMessage); + secondConnectionStatistics.addSentMessage(addDataMessage); + thirdConnectionStatistics.addSentMessage(addDataMessage); + } + + connectionStatsAccumulator.run(); + + assertEquals(8 * 3, connectionStatsAccumulator.getTotalSentMessages()); + } + + @Test + public void totalReceivedSentMessages() { + ConnectionStatistics firstConnectionStatistics = networkStatisticsService.newConnectionStatistics(); + ConnectionStatistics secondConnectionStatistics = networkStatisticsService.newConnectionStatistics(); + ConnectionStatistics thirdConnectionStatistics = networkStatisticsService.newConnectionStatistics(); + + List allConnectionStatistics = List.of( + firstConnectionStatistics, secondConnectionStatistics, thirdConnectionStatistics + ); + var connectionStatsAccumulator = new ConnectionStatsAccumulator(allConnectionStatistics); + + AckMessage ackMessage = mock(AckMessage.class); + for (int i = 0; i < 3; i++) { + firstConnectionStatistics.addReceivedMessage(ackMessage); + secondConnectionStatistics.addReceivedMessage(ackMessage); + thirdConnectionStatistics.addReceivedMessage(ackMessage); + } + + AddDataMessage addDataMessage = mock(AddDataMessage.class); + for (int i = 0; i < 5; i++) { + firstConnectionStatistics.addReceivedMessage(addDataMessage); + secondConnectionStatistics.addReceivedMessage(addDataMessage); + thirdConnectionStatistics.addReceivedMessage(addDataMessage); + } + + connectionStatsAccumulator.run(); + + assertEquals(8 * 3, connectionStatsAccumulator.getTotalReceivedMessages()); + } + + @Test + public void addListener() { + var listener = new NetworkStatisticsService.Listener() { + long totalSentBytes, numTotalSentMessages; + + @Override + public void onTotalSentStatsChanged(long totalSentBytes, + long totalSentMessages, + double totalSentMessagesPerSec) { + this.totalSentBytes = totalSentBytes; + this.numTotalSentMessages = totalSentMessages; + } + }; + + AckMessage ackMessage = mock(AckMessage.class); + + ConnectionStatistics firstConnectionStatistics = networkStatisticsService.newConnectionStatistics(); + firstConnectionStatistics.addSentBytes(100); + firstConnectionStatistics.addSentMessage(ackMessage); + + ConnectionStatistics secondConnectionStatistics = networkStatisticsService.newConnectionStatistics(); + secondConnectionStatistics.addSentBytes(100); + secondConnectionStatistics.addSentMessage(ackMessage); + + ConnectionStatistics thirdConnectionStatistics = networkStatisticsService.newConnectionStatistics(); + thirdConnectionStatistics.addSentBytes(100); + thirdConnectionStatistics.addSentMessage(ackMessage); + + List allConnectionStatistics = List.of( + firstConnectionStatistics, secondConnectionStatistics, thirdConnectionStatistics + ); + + var connectionStatsAccumulator = new ConnectionStatsAccumulator(allConnectionStatistics); + connectionStatsAccumulator.addListener(listener); + connectionStatsAccumulator.run(); + + assertEquals(300, listener.totalSentBytes); + assertEquals(3, listener.numTotalSentMessages); + } + + @Test + public void removeListener() { + var listener = new NetworkStatisticsService.Listener() { + long totalSentBytes, numTotalSentMessages; + + @Override + public void onTotalSentStatsChanged(long totalSentBytes, + long totalSentMessages, + double totalSentMessagesPerSec) { + this.totalSentBytes = totalSentBytes; + this.numTotalSentMessages = totalSentMessages; + } + }; + + ConnectionStatistics firstConnectionStatistics = networkStatisticsService.newConnectionStatistics(); + firstConnectionStatistics.addSentBytes(100); + + AckMessage ackMessage = mock(AckMessage.class); + firstConnectionStatistics.addSentMessage(ackMessage); + + List allConnectionStatistics = List.of(firstConnectionStatistics); + var connectionStatsAccumulator = new ConnectionStatsAccumulator(allConnectionStatistics); + + connectionStatsAccumulator.addListener(listener); + connectionStatsAccumulator.removeListener(listener); + connectionStatsAccumulator.run(); + + assertEquals(0, listener.totalSentBytes); + assertEquals(0, listener.numTotalSentMessages); + } +}