Implement NetworkStatisticsService

The NetworkStatisticsService is an attempt to decouple the network
statistic computation from the UI thread. Here, the
NetworkStatisticsService schedules repeating tasks on a
ScheduledExecutorService.
This commit is contained in:
Alva Swanson 2023-02-15 18:38:15 +01:00
parent bed76128b7
commit 4c6c9100b3
No known key found for this signature in database
GPG Key ID: 004760E77F753090
5 changed files with 817 additions and 0 deletions

View File

@ -0,0 +1,111 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.network.p2p.network.statistics;
import bisq.common.proto.network.NetworkEnvelope;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;
@Getter
public class ConnectionStatistics {
public interface Listener {
void onNewSentBytes(long numberOfNewBytes);
void onNewReceivedBytes(long numberOfNewBytes);
void onAddSentMessage(NetworkEnvelope networkEnvelope);
void onAddReceivedMessage(NetworkEnvelope networkEnvelope);
}
private final Date creationDate = new Date();
private final List<Listener> allListeners = new ArrayList<>();
private final Map<String, Integer> receivedMessages = new HashMap<>();
private final Map<String, Integer> sentMessages = new HashMap<>();
private long lastActivityTimestamp = System.currentTimeMillis();
private long sentBytes;
private long receivedBytes;
private int totalSentMessages;
private int totalReceivedMessages;
@Setter
private int roundTripTime;
public void addListener(Listener listener) {
allListeners.add(listener);
}
public void removeListener(Listener listener) {
allListeners.remove(listener);
}
void updateLastActivityTimestamp() {
lastActivityTimestamp = System.currentTimeMillis();
}
void addSentBytes(int value) {
sentBytes += value;
allListeners.forEach(listener -> listener.onNewSentBytes(value));
}
void addReceivedBytes(int value) {
receivedBytes += value;
allListeners.forEach(listener -> listener.onNewReceivedBytes(value));
}
void addSentMessage(NetworkEnvelope networkEnvelope) {
String messageClassName = networkEnvelope.getClass().getSimpleName();
sentMessages.merge(messageClassName, 1, Integer::sum);
totalSentMessages++;
allListeners.forEach(listener -> listener.onAddSentMessage(networkEnvelope));
}
void addReceivedMessage(NetworkEnvelope networkEnvelope) {
String messageClassName = networkEnvelope.getClass().getSimpleName();
receivedMessages.merge(messageClassName, 1, Integer::sum);
totalReceivedMessages++;
allListeners.forEach(listener -> listener.onAddReceivedMessage(networkEnvelope));
}
public long getLastActivityAge() {
return System.currentTimeMillis() - lastActivityTimestamp;
}
@Override
public String toString() {
return "ConnectionStatistics{" +
"\n creationDate=" + creationDate +
",\n lastActivityTimestamp=" + lastActivityTimestamp +
",\n sentBytes=" + sentBytes +
",\n receivedBytes=" + receivedBytes +
",\n receivedMessages=" + receivedMessages +
",\n sentMessages=" + sentMessages +
",\n roundTripTime=" + roundTripTime +
"\n}";
}
}

View File

@ -0,0 +1,91 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.network.p2p.network.statistics;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import lombok.Getter;
@Getter
public class ConnectionStatsAccumulator implements Runnable {
private final long startTime = System.currentTimeMillis();
private final List<ConnectionStatistics> allConnectionStatistics;
private final List<NetworkStatisticsService.Listener> allListeners = new CopyOnWriteArrayList<>();
private long totalSentBytes;
private long totalReceivedBytes;
private int totalSentMessages;
private int totalReceivedMessages;
private double totalSentMessagesPerSec;
private double totalReceivedMessagesPerSec;
private double totalSentBytesPerSec;
private double totalReceivedBytesPerSec;
public ConnectionStatsAccumulator(List<ConnectionStatistics> allConnectionStatistics) {
this.allConnectionStatistics = allConnectionStatistics;
}
@Override
public void run() {
long totalSentBytes = 0;
long totalReceivedBytes = 0;
int totalSentMessages = 0;
int totalReceivedMessages = 0;
for (ConnectionStatistics statistic : allConnectionStatistics) {
totalSentBytes += statistic.getSentBytes();
totalReceivedBytes += statistic.getReceivedBytes();
totalSentMessages += statistic.getTotalSentMessages();
totalReceivedMessages += statistic.getTotalReceivedMessages();
}
this.totalSentBytes = totalSentBytes;
this.totalReceivedBytes = totalReceivedBytes;
this.totalSentMessages = totalSentMessages;
this.totalReceivedMessages = totalReceivedMessages;
long passed = (System.currentTimeMillis() - startTime) / 1000;
totalSentMessagesPerSec = ((double) totalSentMessages / passed);
totalReceivedMessagesPerSec = ((double) totalReceivedMessages) / passed;
totalSentBytesPerSec = ((double) totalSentBytes) / passed;
totalReceivedBytesPerSec = ((double) totalReceivedBytes) / passed;
callListeners();
}
private void callListeners() {
allListeners.forEach(listener -> listener.onTotalSentStatsChanged(totalSentBytes, totalSentMessages, totalSentMessagesPerSec));
}
public void addListener(NetworkStatisticsService.Listener listener) {
allListeners.add(listener);
}
public void removeListener(NetworkStatisticsService.Listener listener) {
allListeners.remove(listener);
}
}

View File

@ -0,0 +1,98 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.network.p2p.network.statistics;
import bisq.common.util.Utilities;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Getter
public class NetworkStatisticsService {
public interface Listener {
void onTotalSentStatsChanged(long totalSentBytes, long totalSentMessages, double totalSentMessagesPerSec);
}
private final long startTime = System.currentTimeMillis();
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
private final List<ConnectionStatistics> allConnectionStatistics = new CopyOnWriteArrayList<>();
private final ConnectionStatsAccumulator connectionStatsAccumulator =
new ConnectionStatsAccumulator(allConnectionStatistics);
private final Map<String, Integer> totalSentMessages = new HashMap<>();
private final Map<String, Integer> totalReceivedMessages = new HashMap<>();
public ConnectionStatistics newConnectionStatistics() {
var connectionStatistics = new ConnectionStatistics();
allConnectionStatistics.add(connectionStatistics);
return connectionStatistics;
}
public void start() {
scheduledExecutorService.scheduleAtFixedRate(
connectionStatsAccumulator, 1, 1, TimeUnit.SECONDS
);
scheduledExecutorService.scheduleAtFixedRate(
createStatisticsLogger(), 1, 1, TimeUnit.HOURS
);
}
public void shutdown() {
scheduledExecutorService.shutdownNow();
}
public void addListener(Listener listener) {
connectionStatsAccumulator.addListener(listener);
}
public void removeListener(Listener listener) {
connectionStatsAccumulator.removeListener(listener);
}
private Runnable createStatisticsLogger() {
return () -> {
ConnectionStatsAccumulator allStats = connectionStatsAccumulator;
String ls = System.lineSeparator();
log.info("Accumulated network statistics:" + ls +
"Bytes sent: {};" + ls +
"Number of sent messages/Sent messages: {} / {};" + ls +
"Number of sent messages per sec: {};" + ls +
"Bytes received: {}" + ls +
"Number of received messages/Received messages: {} / {};" + ls +
"Number of received messages per sec: {}" + ls,
Utilities.readableFileSize(allStats.getTotalSentBytes()),
allStats.getTotalSentMessages(), totalSentMessages,
allStats.getTotalSentMessagesPerSec(),
Utilities.readableFileSize(allStats.getTotalReceivedBytes()),
allStats.getTotalReceivedMessages(), totalReceivedMessages,
allStats.getTotalReceivedMessagesPerSec());
};
}
}

View File

@ -0,0 +1,306 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.network.p2p.network.statistics;
import bisq.network.p2p.AckMessage;
import bisq.network.p2p.storage.messages.AddDataMessage;
import bisq.common.proto.network.NetworkEnvelope;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
public class ConnectionStatisticsTest {
private final ConnectionStatistics statistics = new ConnectionStatistics();
@Test
public void updateLastActivityTimestampTest() throws InterruptedException {
var statistics = new ConnectionStatistics();
long firstTimestamp = statistics.getLastActivityTimestamp();
Thread.sleep(200);
statistics.updateLastActivityTimestamp();
assertTrue(statistics.getLastActivityTimestamp() > firstTimestamp);
}
@Test
public void addSentBytesTest() {
for (int i = 0; i < 5; i++) {
statistics.addSentBytes(100);
}
assertEquals(500, statistics.getSentBytes());
}
@Test
public void addReceivedBytesTest() {
for (int i = 0; i < 5; i++) {
statistics.addReceivedBytes(100);
}
assertEquals(500, statistics.getReceivedBytes());
}
@Test
public void addSentMessageTest() {
AckMessage ackMessage = mock(AckMessage.class);
for (int i = 0; i < 3; i++) {
statistics.addSentMessage(ackMessage);
}
AddDataMessage addDataMessage = mock(AddDataMessage.class);
for (int i = 0; i < 5; i++) {
statistics.addSentMessage(addDataMessage);
}
Map<String, Integer> countByMessageClassName = statistics.getSentMessages();
String ackMessageClassName = ackMessage.getClass().getSimpleName();
int counter = countByMessageClassName.get(ackMessageClassName);
assertEquals(3, counter);
String addDataMessageClassName = addDataMessage.getClass().getSimpleName();
counter = countByMessageClassName.get(addDataMessageClassName);
assertEquals(5, counter);
}
@Test
public void addReceivedMessageTest() {
AckMessage ackMessage = mock(AckMessage.class);
for (int i = 0; i < 3; i++) {
statistics.addReceivedMessage(ackMessage);
}
AddDataMessage addDataMessage = mock(AddDataMessage.class);
for (int i = 0; i < 5; i++) {
statistics.addReceivedMessage(addDataMessage);
}
Map<String, Integer> countByMessageClassName = statistics.getReceivedMessages();
String ackMessageClassName = ackMessage.getClass().getSimpleName();
int counter = countByMessageClassName.get(ackMessageClassName);
assertEquals(3, counter);
String addDataMessageClassName = addDataMessage.getClass().getSimpleName();
counter = countByMessageClassName.get(addDataMessageClassName);
assertEquals(5, counter);
}
@Test
public void numberOfTotalSentMessages() {
var statistics = new ConnectionStatistics();
AckMessage ackMessage = mock(AckMessage.class);
for (int i = 0; i < 3; i++) {
statistics.addSentMessage(ackMessage);
}
AddDataMessage addDataMessage = mock(AddDataMessage.class);
for (int i = 0; i < 5; i++) {
statistics.addSentMessage(addDataMessage);
}
assertEquals(8, statistics.getTotalSentMessages());
}
@Test
public void numberOfTotalReceivedMessages() {
var statistics = new ConnectionStatistics();
AckMessage ackMessage = mock(AckMessage.class);
for (int i = 0; i < 3; i++) {
statistics.addReceivedMessage(ackMessage);
}
AddDataMessage addDataMessage = mock(AddDataMessage.class);
for (int i = 0; i < 5; i++) {
statistics.addReceivedMessage(addDataMessage);
}
assertEquals(8, statistics.getTotalReceivedMessages());
}
@Test
public void getLastActivityAge() throws InterruptedException {
var statistics = new ConnectionStatistics();
Thread.sleep(200);
assertTrue(statistics.getLastActivityAge() > 100);
}
@Test
public void onSentBytesUpdatedListenerTest() {
var listener = new ConnectionStatistics.Listener() {
long onSentBytes;
@Override
public void onNewSentBytes(long numberOfNewBytes) {
onSentBytes += numberOfNewBytes;
}
@Override
public void onNewReceivedBytes(long numberOfNewBytes) {
}
@Override
public void onAddSentMessage(NetworkEnvelope networkEnvelope) {
}
@Override
public void onAddReceivedMessage(NetworkEnvelope networkEnvelope) {
}
};
statistics.addListener(listener);
for (int i = 0; i < 5; i++) {
statistics.addSentBytes(100);
}
assertEquals(500, listener.onSentBytes);
}
@Test
public void onReceivedBytesUpdatedListenerTest() {
var listener = new ConnectionStatistics.Listener() {
long onReceivedBytes;
@Override
public void onNewSentBytes(long numberOfNewBytes) {
}
@Override
public void onNewReceivedBytes(long numberOfNewBytes) {
onReceivedBytes += numberOfNewBytes;
}
@Override
public void onAddSentMessage(NetworkEnvelope networkEnvelope) {
}
@Override
public void onAddReceivedMessage(NetworkEnvelope networkEnvelope) {
}
};
statistics.addListener(listener);
for (int i = 0; i < 3; i++) {
statistics.addReceivedBytes(100);
}
assertEquals(300, listener.onReceivedBytes);
}
@Test
public void onAddSentMessageListenerTest() {
var listener = new ConnectionStatistics.Listener() {
final Map<String, Integer> counterByClassName = new HashMap<>();
@Override
public void onNewSentBytes(long numberOfNewBytes) {
}
@Override
public void onNewReceivedBytes(long numberOfNewBytes) {
}
@Override
public void onAddSentMessage(NetworkEnvelope networkEnvelope) {
String messageClassName = networkEnvelope.getClass().getSimpleName();
counterByClassName.merge(messageClassName, 1, Integer::sum);
}
@Override
public void onAddReceivedMessage(NetworkEnvelope networkEnvelope) {
}
};
statistics.addListener(listener);
AckMessage ackMessage = mock(AckMessage.class);
for (int i = 0; i < 3; i++) {
statistics.addSentMessage(ackMessage);
}
AddDataMessage addDataMessage = mock(AddDataMessage.class);
for (int i = 0; i < 5; i++) {
statistics.addSentMessage(addDataMessage);
}
Map<String, Integer> countByMessageClassName = listener.counterByClassName;
String ackMessageClassName = ackMessage.getClass().getSimpleName();
int counter = countByMessageClassName.get(ackMessageClassName);
assertEquals(3, counter);
String addDataMessageClassName = addDataMessage.getClass().getSimpleName();
counter = countByMessageClassName.get(addDataMessageClassName);
assertEquals(5, counter);
}
@Test
public void onAddReceivedMessageListenerTest() {
var listener = new ConnectionStatistics.Listener() {
final Map<String, Integer> counterByClassName = new HashMap<>();
@Override
public void onNewSentBytes(long numberOfNewBytes) {
}
@Override
public void onNewReceivedBytes(long numberOfNewBytes) {
}
@Override
public void onAddSentMessage(NetworkEnvelope networkEnvelope) {
}
@Override
public void onAddReceivedMessage(NetworkEnvelope networkEnvelope) {
String messageClassName = networkEnvelope.getClass().getSimpleName();
counterByClassName.merge(messageClassName, 1, Integer::sum);
}
};
statistics.addListener(listener);
AckMessage ackMessage = mock(AckMessage.class);
for (int i = 0; i < 3; i++) {
statistics.addReceivedMessage(ackMessage);
}
AddDataMessage addDataMessage = mock(AddDataMessage.class);
for (int i = 0; i < 5; i++) {
statistics.addReceivedMessage(addDataMessage);
}
Map<String, Integer> countByMessageClassName = listener.counterByClassName;
String ackMessageClassName = ackMessage.getClass().getSimpleName();
int counter = countByMessageClassName.get(ackMessageClassName);
assertEquals(3, counter);
String addDataMessageClassName = addDataMessage.getClass().getSimpleName();
counter = countByMessageClassName.get(addDataMessageClassName);
assertEquals(5, counter);
}
}

View File

@ -0,0 +1,211 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.network.p2p.network.statistics;
import bisq.network.p2p.AckMessage;
import bisq.network.p2p.storage.messages.AddDataMessage;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
public class ConnectionStatsAccumulatorTest {
private final NetworkStatisticsService networkStatisticsService = new NetworkStatisticsService();
@Test
public void totalSentBytes() {
List<ConnectionStatistics> allConnectionStatistics = new ArrayList<>();
var connectionStatsAccumulator = new ConnectionStatsAccumulator(allConnectionStatistics);
ConnectionStatistics firstConnectionStatistics = networkStatisticsService.newConnectionStatistics();
firstConnectionStatistics.addSentBytes(100);
ConnectionStatistics secondConnectionStatistics = networkStatisticsService.newConnectionStatistics();
secondConnectionStatistics.addSentBytes(100);
ConnectionStatistics thirdConnectionStatistics = networkStatisticsService.newConnectionStatistics();
thirdConnectionStatistics.addSentBytes(100);
allConnectionStatistics.add(firstConnectionStatistics);
allConnectionStatistics.add(secondConnectionStatistics);
allConnectionStatistics.add(thirdConnectionStatistics);
connectionStatsAccumulator.run();
assertEquals(300, connectionStatsAccumulator.getTotalSentBytes());
}
@Test
public void totalReceivedBytes() {
List<ConnectionStatistics> allConnectionStatistics = new ArrayList<>();
var connectionStatsAccumulator = new ConnectionStatsAccumulator(allConnectionStatistics);
ConnectionStatistics firstConnectionStatistics = networkStatisticsService.newConnectionStatistics();
firstConnectionStatistics.addReceivedBytes(100);
ConnectionStatistics secondConnectionStatistics = networkStatisticsService.newConnectionStatistics();
secondConnectionStatistics.addReceivedBytes(100);
ConnectionStatistics thirdConnectionStatistics = networkStatisticsService.newConnectionStatistics();
thirdConnectionStatistics.addReceivedBytes(100);
allConnectionStatistics.add(firstConnectionStatistics);
allConnectionStatistics.add(secondConnectionStatistics);
allConnectionStatistics.add(thirdConnectionStatistics);
connectionStatsAccumulator.run();
assertEquals(300, connectionStatsAccumulator.getTotalReceivedBytes());
}
@Test
public void totalSentMessages() {
ConnectionStatistics firstConnectionStatistics = networkStatisticsService.newConnectionStatistics();
ConnectionStatistics secondConnectionStatistics = networkStatisticsService.newConnectionStatistics();
ConnectionStatistics thirdConnectionStatistics = networkStatisticsService.newConnectionStatistics();
List<ConnectionStatistics> allConnectionStatistics = List.of(
firstConnectionStatistics, secondConnectionStatistics, thirdConnectionStatistics
);
var connectionStatsAccumulator = new ConnectionStatsAccumulator(allConnectionStatistics);
AckMessage ackMessage = mock(AckMessage.class);
for (int i = 0; i < 3; i++) {
firstConnectionStatistics.addSentMessage(ackMessage);
secondConnectionStatistics.addSentMessage(ackMessage);
thirdConnectionStatistics.addSentMessage(ackMessage);
}
AddDataMessage addDataMessage = mock(AddDataMessage.class);
for (int i = 0; i < 5; i++) {
firstConnectionStatistics.addSentMessage(addDataMessage);
secondConnectionStatistics.addSentMessage(addDataMessage);
thirdConnectionStatistics.addSentMessage(addDataMessage);
}
connectionStatsAccumulator.run();
assertEquals(8 * 3, connectionStatsAccumulator.getTotalSentMessages());
}
@Test
public void totalReceivedSentMessages() {
ConnectionStatistics firstConnectionStatistics = networkStatisticsService.newConnectionStatistics();
ConnectionStatistics secondConnectionStatistics = networkStatisticsService.newConnectionStatistics();
ConnectionStatistics thirdConnectionStatistics = networkStatisticsService.newConnectionStatistics();
List<ConnectionStatistics> allConnectionStatistics = List.of(
firstConnectionStatistics, secondConnectionStatistics, thirdConnectionStatistics
);
var connectionStatsAccumulator = new ConnectionStatsAccumulator(allConnectionStatistics);
AckMessage ackMessage = mock(AckMessage.class);
for (int i = 0; i < 3; i++) {
firstConnectionStatistics.addReceivedMessage(ackMessage);
secondConnectionStatistics.addReceivedMessage(ackMessage);
thirdConnectionStatistics.addReceivedMessage(ackMessage);
}
AddDataMessage addDataMessage = mock(AddDataMessage.class);
for (int i = 0; i < 5; i++) {
firstConnectionStatistics.addReceivedMessage(addDataMessage);
secondConnectionStatistics.addReceivedMessage(addDataMessage);
thirdConnectionStatistics.addReceivedMessage(addDataMessage);
}
connectionStatsAccumulator.run();
assertEquals(8 * 3, connectionStatsAccumulator.getTotalReceivedMessages());
}
@Test
public void addListener() {
var listener = new NetworkStatisticsService.Listener() {
long totalSentBytes, numTotalSentMessages;
@Override
public void onTotalSentStatsChanged(long totalSentBytes,
long totalSentMessages,
double totalSentMessagesPerSec) {
this.totalSentBytes = totalSentBytes;
this.numTotalSentMessages = totalSentMessages;
}
};
AckMessage ackMessage = mock(AckMessage.class);
ConnectionStatistics firstConnectionStatistics = networkStatisticsService.newConnectionStatistics();
firstConnectionStatistics.addSentBytes(100);
firstConnectionStatistics.addSentMessage(ackMessage);
ConnectionStatistics secondConnectionStatistics = networkStatisticsService.newConnectionStatistics();
secondConnectionStatistics.addSentBytes(100);
secondConnectionStatistics.addSentMessage(ackMessage);
ConnectionStatistics thirdConnectionStatistics = networkStatisticsService.newConnectionStatistics();
thirdConnectionStatistics.addSentBytes(100);
thirdConnectionStatistics.addSentMessage(ackMessage);
List<ConnectionStatistics> allConnectionStatistics = List.of(
firstConnectionStatistics, secondConnectionStatistics, thirdConnectionStatistics
);
var connectionStatsAccumulator = new ConnectionStatsAccumulator(allConnectionStatistics);
connectionStatsAccumulator.addListener(listener);
connectionStatsAccumulator.run();
assertEquals(300, listener.totalSentBytes);
assertEquals(3, listener.numTotalSentMessages);
}
@Test
public void removeListener() {
var listener = new NetworkStatisticsService.Listener() {
long totalSentBytes, numTotalSentMessages;
@Override
public void onTotalSentStatsChanged(long totalSentBytes,
long totalSentMessages,
double totalSentMessagesPerSec) {
this.totalSentBytes = totalSentBytes;
this.numTotalSentMessages = totalSentMessages;
}
};
ConnectionStatistics firstConnectionStatistics = networkStatisticsService.newConnectionStatistics();
firstConnectionStatistics.addSentBytes(100);
AckMessage ackMessage = mock(AckMessage.class);
firstConnectionStatistics.addSentMessage(ackMessage);
List<ConnectionStatistics> allConnectionStatistics = List.of(firstConnectionStatistics);
var connectionStatsAccumulator = new ConnectionStatsAccumulator(allConnectionStatistics);
connectionStatsAccumulator.addListener(listener);
connectionStatsAccumulator.removeListener(listener);
connectionStatsAccumulator.run();
assertEquals(0, listener.totalSentBytes);
assertEquals(0, listener.numTotalSentMessages);
}
}