diff --git a/src/net.cpp b/src/net.cpp index 735985a8414..4e9349a6474 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -663,7 +663,7 @@ void CNode::CopyStats(CNodeStats& stats) } #undef X -bool CNode::ReceiveMsgBytes(Span msg_bytes, bool& complete) +bool CNode::ReceiveMsgBytes(Span msg_bytes, bool& complete, NetStats& net_stats) { complete = false; const auto time = GetTime(); @@ -685,6 +685,12 @@ bool CNode::ReceiveMsgBytes(Span msg_bytes, bool& complete) // Message deserialization failed. Drop the message but don't disconnect the peer. // store the size of the corrupt message mapRecvBytesPerMsgType.at(NET_MESSAGE_TYPE_OTHER) += msg.m_raw_message_size; + net_stats.Record(NetStats::RECV, + ConnectedThroughNetwork(), + m_conn_type, + NET_MESSAGE_TYPE_OTHER, + /*num_messages=*/1, + msg.m_raw_message_size); continue; } @@ -696,6 +702,12 @@ bool CNode::ReceiveMsgBytes(Span msg_bytes, bool& complete) } assert(i != mapRecvBytesPerMsgType.end()); i->second += msg.m_raw_message_size; + net_stats.Record(NetStats::RECV, + ConnectedThroughNetwork(), + m_conn_type, + /*msg_type=*/i->first, + /*num_messages=*/1, + msg.m_raw_message_size); // push the message to the process queue, vRecvMsg.push_back(std::move(msg)); @@ -1596,7 +1608,7 @@ Transport::Info V2Transport::GetInfo() const noexcept return info; } -std::pair CConnman::SocketSendData(CNode& node) const +std::pair CConnman::SocketSendData(CNode& node) { auto it = node.vSendMsg.begin(); size_t nSentSize = 0; @@ -1609,9 +1621,16 @@ std::pair CConnman::SocketSendData(CNode& node) const // there is an existing message still being sent, or (for v2 transports) when the // handshake has not yet completed. size_t memusage = it->GetMemoryUsage(); + const auto msg_type = it->m_type; if (node.m_transport->SetMessageToSend(*it)) { // Update memory usage of send buffer (as *it will be deleted). node.m_send_memusage -= memusage; + m_net_stats.Record(NetStats::SENT, + node.ConnectedThroughNetwork(), + node.m_conn_type, + msg_type, + /*num_messages=*/1, + /*num_bytes=*/0); ++it; } } @@ -1647,6 +1666,12 @@ std::pair CConnman::SocketSendData(CNode& node) const // Update statistics per message type. if (!msg_type.empty()) { // don't report v2 handshake bytes for now node.AccountForSentBytes(msg_type, nBytes); + m_net_stats.Record(NetStats::SENT, + node.ConnectedThroughNetwork(), + node.m_conn_type, + msg_type, + /*num_messages=*/0, + nBytes); } nSentSize += nBytes; if ((size_t)nBytes != data.size()) { @@ -2169,7 +2194,7 @@ void CConnman::SocketHandlerConnected(const std::vector& nodes, if (nBytes > 0) { bool notify = false; - if (!pnode->ReceiveMsgBytes({pchBuf, (size_t)nBytes}, notify)) { + if (!pnode->ReceiveMsgBytes({pchBuf, (size_t)nBytes}, notify, m_net_stats)) { LogDebug(BCLog::NET, "receiving message bytes failed, %s\n", pnode->DisconnectMsg(fLogIPs) @@ -3699,6 +3724,150 @@ void CConnman::RecordBytesSent(uint64_t bytes) nMaxOutboundTotalBytesSentInCycle += bytes; } +NetStats::NetStats() + : m_msg_type_to_index{[]() { + MsgTypeToIndex m; + size_t i{0}; + for (const auto& msg_type : ALL_NET_MESSAGE_TYPES) { + m[msg_type] = i++; + } + return m; + }()} +{ +} + +void NetStats::Record(Direction direction, + Network net, + ConnectionType conn_type, + const std::string& msg_type, + size_t num_messages, + size_t num_bytes) +{ + auto& d = m_data.at(DirectionToIndex(direction)) + .at(NetworkToIndex(net)) + .at(ConnectionTypeToIndex(conn_type)) + .at(MessageTypeToIndex(msg_type)); + d.count += num_messages; + d.bytes += num_bytes; +} + +void NetStats::ForEach(std::function func) const +{ + for (size_t dir_i = 0; dir_i < m_data.size(); ++dir_i) { + for (size_t net_i = 0; net_i < m_data[dir_i].size(); ++net_i) { + for (size_t con_i = 0; con_i < m_data[dir_i][net_i].size(); ++con_i) { + for (size_t msg_i = 0; msg_i < m_data[dir_i][net_i][con_i].size(); ++msg_i) { + func(DirectionFromIndex(dir_i), + NetworkFromIndex(net_i), + ConnectionTypeFromIndex(con_i), + MessageTypeFromIndex(msg_i), + m_data[dir_i][net_i][con_i][msg_i]); + } + } + } + } +} + +constexpr size_t NetStats::DirectionToIndex(Direction direction) +{ + switch (direction) { + case SENT: return 0; + case RECV: return 1; + } + assert(false); +} + +constexpr NetStats::Direction NetStats::DirectionFromIndex(size_t index) +{ + switch (index) { + case 0: return SENT; + case 1: return RECV; + } + assert(false); +} + +constexpr size_t NetStats::NetworkToIndex(Network net) +{ + switch (net) { + case NET_UNROUTABLE: return 0; + case NET_IPV4: return 1; + case NET_IPV6: return 2; + case NET_ONION: return 3; + case NET_I2P: return 4; + case NET_CJDNS: return 5; + case NET_INTERNAL: return 6; + case NET_MAX: assert(false); + } + assert(false); +} + +constexpr Network NetStats::NetworkFromIndex(size_t index) +{ + switch (index) { + case 0: return NET_UNROUTABLE; + case 1: return NET_IPV4; + case 2: return NET_IPV6; + case 3: return NET_ONION; + case 4: return NET_I2P; + case 5: return NET_CJDNS; + case 6: return NET_INTERNAL; + } + assert(false); +} + +constexpr size_t NetStats::ConnectionTypeToIndex(ConnectionType conn_type) +{ + switch (conn_type) { + case ConnectionType::INBOUND: return 0; + case ConnectionType::OUTBOUND_FULL_RELAY: return 1; + case ConnectionType::MANUAL: return 2; + case ConnectionType::FEELER: return 3; + case ConnectionType::BLOCK_RELAY: return 4; + case ConnectionType::ADDR_FETCH: return 5; + } + assert(false); +} + +constexpr ConnectionType NetStats::ConnectionTypeFromIndex(size_t index) +{ + switch (index) { + case 0: return ConnectionType::INBOUND; + case 1: return ConnectionType::OUTBOUND_FULL_RELAY; + case 2: return ConnectionType::MANUAL; + case 3: return ConnectionType::FEELER; + case 4: return ConnectionType::BLOCK_RELAY; + case 5: return ConnectionType::ADDR_FETCH; + } + assert(false); +} + +size_t NetStats::MessageTypeToIndex(const std::string& msg_type) const +{ + auto it = m_msg_type_to_index.find(msg_type); + if (it != m_msg_type_to_index.end()) { + return it->second; + } + // Unknown message (NET_MESSAGE_TYPE_OTHER), use the last entry in the array. + return ALL_NET_MESSAGE_TYPES.size(); +} + +std::string NetStats::MessageTypeFromIndex(size_t index) +{ + if (index == ALL_NET_MESSAGE_TYPES.size()) { + return NET_MESSAGE_TYPE_OTHER; + } + return ALL_NET_MESSAGE_TYPES.at(index); +} + +const NetStats& CConnman::GetNetStats() const +{ + return m_net_stats; +} + uint64_t CConnman::GetMaxOutboundTarget() const { AssertLockNotHeld(m_total_bytes_sent_mutex); diff --git a/src/net.h b/src/net.h index e64d9a67f46..6a13efa42bf 100644 --- a/src/net.h +++ b/src/net.h @@ -659,6 +659,84 @@ public: Info GetInfo() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex); }; +/** + * Network traffic (bytes and number of messages). Split by direction, network, connection type and message type. + */ +class NetStats +{ +public: + /// Used to designate the direction of the recorded traffic. + enum Direction { SENT, RECV }; + + /// Number of elements in `Direction`. + static constexpr size_t NUM_DIRECTIONS{2}; + + struct BytesAndCount { + std::atomic_uint64_t bytes{0}; //!< Number of bytes transferred. + std::atomic_uint64_t count{0}; //!< Number of messages transferred. + + BytesAndCount& operator+=(const BytesAndCount& toadd) + { + bytes += toadd.bytes; + count += toadd.count; + return *this; + } + }; + + NetStats(); + + /** + * Increment the number of messages transferred by `num_messages` and the number of bytes by `num_bytes`. + */ + void Record(Direction direction, + Network net, + ConnectionType conn_type, + const std::string& msg_type, + size_t num_messages, + size_t num_bytes); + + /** + * Call the provided function for each stat. + */ + void ForEach(std::function func) const; + +private: + // The ...FromIndex() and ...ToIndex() methods below convert from/to + // indexes of `m_data[]` to the actual values they represent. For example, + // assuming MessageTypeToIndex("ping") == 15, then everything stored in + // m_data[i][j][k][15] is traffic from "ping" messages (for any i, j, k). + + static constexpr size_t DirectionToIndex(Direction direction); + static constexpr Direction DirectionFromIndex(size_t index); + + static constexpr size_t NetworkToIndex(Network net); + static constexpr Network NetworkFromIndex(size_t index); + + static constexpr size_t ConnectionTypeToIndex(ConnectionType conn_type); + static constexpr ConnectionType ConnectionTypeFromIndex(size_t index); + + size_t MessageTypeToIndex(const std::string& msg_type) const; + static std::string MessageTypeFromIndex(size_t index); + + // Access like m_data[direction index][net index][conn type index][msg type index].bytes = 123; + // Arrays are used so that this can be accessed from multiple threads without a mutex protection. + std::array, + NUM_CONNECTION_TYPES>, + NET_MAX>, + NUM_DIRECTIONS> + m_data; + + + using MsgTypeToIndex = std::unordered_map; + + /// Holds the index `i` in `m_data[][][][i]` of a given message type for quick lookup. + const MsgTypeToIndex m_msg_type_to_index; +}; + struct CNodeOptions { NetPermissionFlags permission_flags = NetPermissionFlags::None; @@ -914,7 +992,8 @@ public: * @return True if the peer should stay connected, * False if the peer should be disconnected from. */ - bool ReceiveMsgBytes(Span msg_bytes, bool& complete) EXCLUSIVE_LOCKS_REQUIRED(!cs_vRecv); + bool ReceiveMsgBytes(Span msg_bytes, bool& complete, NetStats& net_stats) + EXCLUSIVE_LOCKS_REQUIRED(!cs_vRecv); void SetCommonVersion(int greatest_common_version) { @@ -1272,6 +1351,8 @@ public: bool MultipleManualOrFullOutboundConns(Network net) const EXCLUSIVE_LOCKS_REQUIRED(m_nodes_mutex); + const NetStats& GetNetStats() const; + private: struct ListenSocket { public: @@ -1371,7 +1452,7 @@ private: NodeId GetNewNodeId(); /** (Try to) send data from node's vSendMsg. Returns (bytes_sent, data_left). */ - std::pair SocketSendData(CNode& node) const EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend); + std::pair SocketSendData(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend); void DumpAddresses(); @@ -1413,6 +1494,8 @@ private: std::atomic nTotalBytesRecv{0}; uint64_t nTotalBytesSent GUARDED_BY(m_total_bytes_sent_mutex) {0}; + NetStats m_net_stats; + // outbound limit & stats uint64_t nMaxOutboundTotalBytesSentInCycle GUARDED_BY(m_total_bytes_sent_mutex) {0}; std::chrono::seconds nMaxOutboundCycleStartTime GUARDED_BY(m_total_bytes_sent_mutex) {0}; diff --git a/src/node/connection_types.h b/src/node/connection_types.h index a911b95f7e9..151c1aebf1d 100644 --- a/src/node/connection_types.h +++ b/src/node/connection_types.h @@ -12,9 +12,11 @@ * information we have available at the time of opening or accepting the * connection. Aside from INBOUND, all types are initiated by us. * - * If adding or removing types, please update CONNECTION_TYPE_DOC in - * src/rpc/net.cpp and src/qt/rpcconsole.cpp, as well as the descriptions in - * src/qt/guiutil.cpp and src/bitcoin-cli.cpp::NetinfoRequestHandler. */ + * If adding or removing types, please update: + * - CONNECTION_TYPE_DOC in src/rpc/net.cpp and src/qt/rpcconsole.cpp + * - the descriptions in src/qt/guiutil.cpp and src/bitcoin-cli.cpp::NetinfoRequestHandler + * - NUM_CONNECTION_TYPES below. + */ enum class ConnectionType { /** * Inbound connections are those initiated by a peer. This is the only @@ -77,6 +79,9 @@ enum class ConnectionType { ADDR_FETCH, }; +/** Number of entries in ConnectionType. */ +static constexpr size_t NUM_CONNECTION_TYPES{6}; + /** Convert ConnectionType enum to a string value */ std::string ConnectionTypeAsString(ConnectionType conn_type); diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index 1b711e3c5b1..fd85dcca756 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -253,6 +253,7 @@ static const CRPCConvertParam vRPCConvertParams[] = { "verifychain", 1, "nblocks" }, { "getblockstats", 0, "hash_or_height" }, { "getblockstats", 1, "stats" }, + { "getnetmsgstats", 0, "aggregate_by"}, { "pruneblockchain", 0, "height" }, { "keypoolrefill", 0, "newsize" }, { "getrawmempool", 0, "verbose" }, diff --git a/src/rpc/net.cpp b/src/rpc/net.cpp index bda07365e0e..cef465103bd 100644 --- a/src/rpc/net.cpp +++ b/src/rpc/net.cpp @@ -546,6 +546,178 @@ static RPCHelpMan getaddednodeinfo() }; } +namespace net_stats { + +namespace aggregate_dimensions { +static std::string DIRECTION{"direction"}; +static std::string NETWORK{"network"}; +static std::string CONNECTION_TYPE{"connection_type"}; +static std::string MESSAGE_TYPE{"message_type"}; +}; + +UniValue CreateJSON(const RPCHelpMan&, const JSONRPCRequest& request) +{ + const NodeContext& node = EnsureAnyNodeContext(request.context); + const CConnman& connman = EnsureConnman(node); + + // Used for a quick check if a string is in request.params[0] which is + // supposed to be a JSON array, e.g. ["direction", "network"]. + std::unordered_set aggregate_by; + if (request.params[0].isArray()) { + const UniValue& arr{request.params[0].get_array()}; + for (size_t i = 0; i < arr.size(); ++i) { + const auto& agg{arr[i].get_str()}; + if (agg != aggregate_dimensions::DIRECTION && + agg != aggregate_dimensions::NETWORK && + agg != aggregate_dimensions::CONNECTION_TYPE && + agg != aggregate_dimensions::MESSAGE_TYPE) { + throw JSONRPCError( + RPC_INVALID_PARAMS, + strprintf( + R"(Unrecognized aggregation parameter: "%s". The array should consist of zero or more of "%s", "%s", "%s", "%s".)", + agg, + aggregate_dimensions::DIRECTION, + aggregate_dimensions::NETWORK, + aggregate_dimensions::CONNECTION_TYPE, + aggregate_dimensions::MESSAGE_TYPE)); + } + aggregate_by.insert(agg); + } + } + + // The keys might as well be an empty string (if aggregating by that dimension). + std::unordered_map< + std::string, // "sent" or "recv" + std::unordered_map>>> + result_map; + + connman.GetNetStats().ForEach([&aggregate_by, &result_map](NetStats::Direction dir, + Network net, + ConnectionType con, + const std::string& msg, + const NetStats::BytesAndCount& data) { + const std::string dir_str{aggregate_by.contains(aggregate_dimensions::DIRECTION) ? "" : dir == NetStats::SENT ? "sent" : "recv"}; + const std::string net_str{aggregate_by.contains(aggregate_dimensions::NETWORK) ? "" : GetNetworkName(net)}; + const std::string con_str{aggregate_by.contains(aggregate_dimensions::CONNECTION_TYPE) ? "" : ConnectionTypeAsString(con)}; + const std::string msg_str{aggregate_by.contains(aggregate_dimensions::MESSAGE_TYPE) ? "" : msg}; + + result_map[dir_str][net_str][con_str][msg_str] += data; + }); + + auto Add = [](UniValue& target, const std::string& key, const UniValue& val) { + if (val.empty()) { + return; + } + if (key.empty()) { + target = val; + return; + } + target.pushKV(key, val); + }; + + UniValue dir_json{UniValue::VOBJ}; + for (const auto& [dir_key, dir_val] : result_map) { + UniValue net_json{UniValue::VOBJ}; + for (const auto& [net_key, net_val] : dir_val) { + UniValue con_json{UniValue::VOBJ}; + for (const auto& [con_key, con_val] : net_val) { + UniValue msg_json{UniValue::VOBJ}; + for (const auto& [msg_key, stats] : con_val) { + const auto bytes = stats.bytes.load(); + const auto count = stats.count.load(); + if (bytes == 0 || count == 0) { + continue; + } + UniValue bytes_and_count_json{UniValue::VOBJ}; + bytes_and_count_json.pushKV("bytes", bytes); + bytes_and_count_json.pushKV("count", count); + + Add(msg_json, msg_key, bytes_and_count_json); + } + Add(con_json, con_key, msg_json); + } + Add(net_json, net_key, con_json); + } + Add(dir_json, dir_key, net_json); + } + + return dir_json; +} +}; // namespace net_stats + +static RPCHelpMan getnetmsgstats() +{ + return RPCHelpMan{ + "getnetmsgstats", + "\nReturns the messages count and total number of bytes for network traffic.\n" + "Results may optionally be aggregated.\n", + {RPCArg{ + "aggregate_by", + RPCArg::Type::ARR, + RPCArg::DefaultHint{"empty, no aggregation"}, + "An array of keywords for aggregating the results.", + {RPCArg{"direction", + RPCArg::Type::STR, + RPCArg::Optional::OMITTED, + "Aggregate by direction and don't show direction in the result."}, + RPCArg{"network", + RPCArg::Type::STR, + RPCArg::Optional::OMITTED, + "Aggregate by network and don't show network in the result."}, + RPCArg{"connection_type", + RPCArg::Type::STR, + RPCArg::Optional::OMITTED, + "Aggregate by connection type and don't show connection type in the result."}, + RPCArg{"message_type", + RPCArg::Type::STR, + RPCArg::Optional::OMITTED, + "Aggregate by message type and don't show message type in the result."}}}}, + {RPCResult{ + RPCResult::Type::OBJ_DYN, + "", + false, + "When a direction, network,\n" + "connection type or message type is not listed,\n" + "the statistics for that are 0.", + {RPCResult{ + RPCResult::Type::OBJ_DYN, + "sent", + true, + "The direction in which the traffic occurred.", + {RPCResult{ + RPCResult::Type::OBJ_DYN, + "ipv4", + true, + "The network over which the traffic occurred.", + {RPCResult{ + RPCResult::Type::OBJ_DYN, + "inbound", + true, + "The connection type over which the traffic occurred.", + {RPCResult{ + RPCResult::Type::OBJ, + "verack", + true, + "Type of the messages transferred.", + { + RPCResult{RPCResult::Type::NUM, "bytes", false, "Total number of bytes.", {}, true}, + RPCResult{RPCResult::Type::NUM, "count", false, "Total number of messages.", {}, true} + }, + true}}, + true}}, + true}}, + true}}, + true}}, + RPCExamples{HelpExampleCli("getnetmsgstats", "") + + HelpExampleCli("getnetmsgstats", R"('["network", "message_type"]')") + + HelpExampleRpc("getnetmsgstats", "") + + HelpExampleRpc("getnetmsgstats", R"(["network", "message_type"])")}, + net_stats::CreateJSON}; +} + static RPCHelpMan getnettotals() { return RPCHelpMan{"getnettotals", @@ -1188,6 +1360,7 @@ void RegisterNetRPCCommands(CRPCTable& t) {"network", &addnode}, {"network", &disconnectnode}, {"network", &getaddednodeinfo}, + {"network", &getnetmsgstats}, {"network", &getnettotals}, {"network", &getnetworkinfo}, {"network", &setban}, diff --git a/src/test/fuzz/net.cpp b/src/test/fuzz/net.cpp index 1a0de7aa363..cfa0b1f61c5 100644 --- a/src/test/fuzz/net.cpp +++ b/src/test/fuzz/net.cpp @@ -39,6 +39,7 @@ FUZZ_TARGET(net, .init = initialize_net) { node.SetAddrLocal(*service_opt); } + NetStats net_stats; LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), 10000) { CallOneOf( fuzzed_data_provider, @@ -61,7 +62,7 @@ FUZZ_TARGET(net, .init = initialize_net) [&] { const std::vector b = ConsumeRandomLengthByteVector(fuzzed_data_provider); bool complete; - node.ReceiveMsgBytes(b, complete); + node.ReceiveMsgBytes(b, complete, net_stats); }); } diff --git a/src/test/fuzz/rpc.cpp b/src/test/fuzz/rpc.cpp index 7bf90b9b775..b58b270be64 100644 --- a/src/test/fuzz/rpc.cpp +++ b/src/test/fuzz/rpc.cpp @@ -140,6 +140,7 @@ const std::vector RPC_COMMANDS_SAFE_FOR_FUZZING{ "getmempoolentry", "getmempoolinfo", "getmininginfo", + "getnetmsgstats", "getnettotals", "getnetworkhashps", "getnetworkinfo", diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp index ddd96a50640..3d7943976c6 100644 --- a/src/test/util/net.cpp +++ b/src/test/util/net.cpp @@ -71,9 +71,9 @@ void ConnmanTestMsg::Handshake(CNode& node, } } -void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span msg_bytes, bool& complete) const +void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span msg_bytes, bool& complete) { - assert(node.ReceiveMsgBytes(msg_bytes, complete)); + assert(node.ReceiveMsgBytes(msg_bytes, complete, m_net_stats)); if (complete) { node.MarkReceivedMsgsForProcessing(); } @@ -91,7 +91,7 @@ void ConnmanTestMsg::FlushSendBuffer(CNode& node) const } } -bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const +bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) { bool queued = node.m_transport->SetMessageToSend(ser_msg); assert(queued); diff --git a/src/test/util/net.h b/src/test/util/net.h index 3e717341d87..3f41d86bb9e 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -81,9 +81,9 @@ struct ConnmanTestMsg : public CConnman { return m_msgproc->ProcessMessages(&node, flagInterruptMsgProc); } - void NodeReceiveMsgBytes(CNode& node, Span msg_bytes, bool& complete) const; + void NodeReceiveMsgBytes(CNode& node, Span msg_bytes, bool& complete); - bool ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const; + bool ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg); void FlushSendBuffer(CNode& node) const; bool AlreadyConnectedPublic(const CAddress& addr) { return AlreadyConnectedToAddress(addr); }; diff --git a/test/functional/rpc_net.py b/test/functional/rpc_net.py index 41ecbbed22d..6e128d79dd0 100755 --- a/test/functional/rpc_net.py +++ b/test/functional/rpc_net.py @@ -10,6 +10,7 @@ Tests correspond to code in rpc/net.cpp. from decimal import Decimal from itertools import product import platform +import random import time import test_framework.messages @@ -90,6 +91,7 @@ class NetTest(BitcoinTestFramework): self.test_sendmsgtopeer() self.test_getaddrmaninfo() self.test_getrawaddrman() + self.test_getnetmsgstats() def test_connection_count(self): self.log.info("Test getconnectioncount") @@ -202,6 +204,177 @@ class NetTest(BitcoinTestFramework): self.wait_until(lambda: peer_after()['bytesrecv_per_msg'].get('pong', 0) >= peer_before['bytesrecv_per_msg'].get('pong', 0) + ping_size, timeout=1) self.wait_until(lambda: peer_after()['bytessent_per_msg'].get('ping', 0) >= peer_before['bytessent_per_msg'].get('ping', 0) + ping_size, timeout=1) + def test_getnetmsgstats(self): + self.log.info("Test getnetmsgstats") + + self.restart_node(0) + node0 = self.nodes[0] + self.connect_nodes(0, 1) # Generate some traffic. + # Wait for the initial messages to be sent/received (don't disconnect too early). "sendheaders" is the last one. + self.wait_until(lambda: "sendheaders" in node0.getnetmsgstats()["recv"]["not_publicly_routable"]["manual"]) + self.wait_until(lambda: "sendheaders" in node0.getnetmsgstats()["sent"]["not_publicly_routable"]["manual"]) + self.disconnect_nodes(0, 1) # Avoid random/unpredictable packets (e.g. ping) messing with the tests below. + assert_equal(len(node0.getpeerinfo()), 0) + + # In v2 getnettotals counts also bytes that are not accounted at any message (the v2 handshake). + # Also the v2 handshake's size could vary. + if not self.options.v2transport: + self.log.debug("Compare byte count getnetmsgstats vs getnettotals") + nettotals = self.nodes[0].getnettotals() + stats_net_con_msg = self.nodes[0].getnetmsgstats(aggregate_by=["network", "connection_type", "message_type"]) + assert_equal(nettotals["totalbytessent"], stats_net_con_msg["sent"]["bytes"]) + assert_equal(nettotals["totalbytesrecv"], stats_net_con_msg["recv"]["bytes"]) + + self.log.debug("Test full (un-aggregated) output is as expected") + stats_full = node0.getnetmsgstats() + if self.options.v2transport: + assert_equal( + stats_full, + { + "recv": { + "not_publicly_routable": { + "manual": { + "addrv2": {"bytes": 63, "count": 1}, + "feefilter": {"bytes": 29, "count": 1}, + "getheaders": {"bytes": 666, "count": 1}, + "headers": {"bytes": 103, "count": 1}, + "ping": {"bytes": 29, "count": 1}, + "pong": {"bytes": 29, "count": 1}, + "sendaddrv2": {"bytes": 33, "count": 1}, + "sendcmpct": {"bytes": 30, "count": 1}, + "sendheaders": {"bytes": 33, "count": 1}, + "verack": {"bytes": 33, "count": 1}, + "version": {"bytes": 147, "count": 1}, + "wtxidrelay": {"bytes": 33, "count": 1}, + } + } + }, + "sent": { + "not_publicly_routable": { + "manual": { + "feefilter": {"bytes": 29, "count": 1}, + "getaddr": {"bytes": 33, "count": 1}, + "getheaders": {"bytes": 666, "count": 1}, + "headers": {"bytes": 103, "count": 1}, + "ping": {"bytes": 29, "count": 1}, + "pong": {"bytes": 29, "count": 1}, + "sendaddrv2": {"bytes": 33, "count": 1}, + "sendcmpct": {"bytes": 30, "count": 1}, + "sendheaders": {"bytes": 33, "count": 1}, + "verack": {"bytes": 33, "count": 1}, + "version": {"bytes": 147, "count": 1}, + "wtxidrelay": {"bytes": 33, "count": 1}, + } + } + } + } + ) + else: + assert_equal( + stats_full, + { + "recv": { + "not_publicly_routable": { + "manual": { + "addrv2": {"bytes": 66, "count": 1}, + "feefilter": {"bytes": 32, "count": 1}, + "getheaders": {"bytes": 669, "count": 1}, + "headers": {"bytes": 106, "count": 1}, + "ping": {"bytes": 32, "count": 1}, + "pong": {"bytes": 32, "count": 1}, + "sendaddrv2": {"bytes": 24, "count": 1}, + "sendcmpct": {"bytes": 33, "count": 1}, + "sendheaders": {"bytes": 24, "count": 1}, + "verack": {"bytes": 24, "count": 1}, + "version": {"bytes": 138, "count": 1}, + "wtxidrelay": {"bytes": 24, "count": 1}, + } + } + }, + "sent": { + "not_publicly_routable": { + "manual": { + "feefilter": {"bytes": 32, "count": 1}, + "getaddr": {"bytes": 24, "count": 1}, + "getheaders": {"bytes": 669, "count": 1}, + "headers": {"bytes": 106, "count": 1}, + "ping": {"bytes": 32, "count": 1}, + "pong": {"bytes": 32, "count": 1}, + "sendaddrv2": {"bytes": 24, "count": 1}, + "sendcmpct": {"bytes": 33, "count": 1}, + "sendheaders": {"bytes": 24, "count": 1}, + "verack": {"bytes": 24, "count": 1}, + "version": {"bytes": 138, "count": 1}, + "wtxidrelay": {"bytes": 24, "count": 1}, + } + } + } + } + ) + + self.log.debug("Check that aggregation works correctly") + + def sum_all(json): + if "bytes" in json: + return dict(bytes=json["bytes"], count=json["count"]) + + s = dict(bytes=0, count=0) + #print(f'S json={json}') + for k, v in json.items(): + #print(f'S k={k}, v={v}') + #print(f'S diving into {v}') + sub = sum_all(v) + s["bytes"] += sub["bytes"] + s["count"] += sub["count"] + return s + + stats_aggregated_by_bitcoind = node0.getnetmsgstats(aggregate_by=["direction", "network", "connection_type", "message_type"]) + stats_aggregated_by_test = sum_all(stats_full) + assert_equal(stats_aggregated_by_bitcoind, stats_aggregated_by_test) + if not self.options.v2transport: + assert_equal(nettotals["totalbytessent"] + nettotals["totalbytesrecv"], stats_aggregated_by_test["bytes"]) + + for i in range(1, 16): + keywords = [] + if i & 1: + keywords.append("direction") + if i & 2: + keywords.append("network") + if i & 4: + keywords.append("connection_type") + if i & 8: + keywords.append("message_type") + random.shuffle(keywords) + self.log.debug(f"Test values add up correctly when aggregated by {keywords}") + assert_equal(stats_aggregated_by_test, sum_all(node0.getnetmsgstats(aggregate_by=keywords))) + + def get_stats(node): + return node.getnetmsgstats(aggregate_by=["network", "connection_type"]) + + self.log.debug("Test that message count and total number of bytes increment when a ping message is sent") + stats_before_connect = get_stats(node0) + node2 = node0.add_p2p_connection(P2PInterface()) + assert_equal(len(node0.getpeerinfo()), 1) + # Wait for the initial PING (that is sent immediately after the connection is estabilshed) to go through. + self.wait_until(lambda: get_stats(node0)["sent"]["ping"]["count"] > stats_before_connect["sent"]["ping"]["count"]) + stats_before_ping = get_stats(node0) + node0.ping() + self.wait_until(lambda: get_stats(node0)["sent"]["ping"]["count"] > stats_before_ping["sent"]["ping"]["count"]) + self.wait_until(lambda: get_stats(node0)["sent"]["ping"]["bytes"] > stats_before_ping["sent"]["ping"]["bytes"]) + + self.log.debug("Test that when a message is broken in two, the stats only update once the full message has been received") + ping_msg = node2.build_message(test_framework.messages.msg_ping(nonce=12345)) + stats_before_ping = get_stats(node0) + # Send the message in two pieces. + cut_pos = 7 # Chosen at an arbitrary position within the header. + node2.send_raw_message(ping_msg[:cut_pos]) + assert_equal(get_stats(node0)["recv"]["ping"], stats_before_ping["recv"]["ping"]) + # Send the rest of the ping. + node2.send_raw_message(ping_msg[cut_pos:]) + self.wait_until(lambda: get_stats(node0)["recv"]["ping"]["count"] == stats_before_ping["recv"]["ping"]["count"] + 1) + + node2.peer_disconnect() + def test_getnetworkinfo(self): self.log.info("Test getnetworkinfo") info = self.nodes[0].getnetworkinfo()