From af622d00ba7f5317220cf8d7d71ad9cd791dd892 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Mon, 14 Aug 2023 19:08:48 +0200 Subject: [PATCH] net: replace manual reference counting of CNode with shared_ptr Before this change the code used to count references to `CNode` objects manually via `CNode::nRefCount`. Unneeded `CNode`s were scheduled for deletion by putting them in `CConnman::m_nodes_disconnected` and were deleted after their reference count reached zero. Deleting consists of calling `PeerManager::FinalizeNode()` and destroying the `CNode` object. Replace this scheme with `std::shared_ptr`. This simplifies the code and removes: `CNode::nRefCount` `CNode::GetRefCount()` `CNode::AddRef()` `CNode::Release()` `CConnman::m_nodes_disconnected` `CConnman::NodesSnapshot` Now creating a snapshot of `CConnman::m_nodes` is done by simply copying it (under the mutex). Call `PeerManager::FinalizeNode()` from the destructor of `CNode`, which is called when the reference count reaches 0. --- src/net.cpp | 301 ++++++++++++------------- src/net.h | 90 ++------ src/test/fuzz/net.cpp | 11 - src/test/fuzz/p2p_handshake.cpp | 4 +- src/test/fuzz/p2p_headers_presync.cpp | 3 +- src/test/fuzz/process_message.cpp | 4 +- src/test/fuzz/process_messages.cpp | 4 +- src/test/fuzz/util/net.h | 6 +- src/test/net_peer_connection_tests.cpp | 8 +- src/test/util/net.cpp | 4 +- src/test/util/net.h | 11 +- 11 files changed, 191 insertions(+), 255 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 735985a8414..98cbf54ba45 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -332,10 +332,10 @@ bool IsLocal(const CService& addr) return mapLocalHost.count(addr) > 0; } -CNode* CConnman::FindNode(const CNetAddr& ip) +std::shared_ptr CConnman::FindNode(const CNetAddr& ip) { LOCK(m_nodes_mutex); - for (CNode* pnode : m_nodes) { + for (auto& pnode : m_nodes) { if (static_cast(pnode->addr) == ip) { return pnode; } @@ -343,10 +343,10 @@ CNode* CConnman::FindNode(const CNetAddr& ip) return nullptr; } -CNode* CConnman::FindNode(const std::string& addrName) +std::shared_ptr CConnman::FindNode(const std::string& addrName) { LOCK(m_nodes_mutex); - for (CNode* pnode : m_nodes) { + for (auto& pnode : m_nodes) { if (pnode->m_addr_name == addrName) { return pnode; } @@ -354,10 +354,10 @@ CNode* CConnman::FindNode(const std::string& addrName) return nullptr; } -CNode* CConnman::FindNode(const CService& addr) +std::shared_ptr CConnman::FindNode(const CService& addr) { LOCK(m_nodes_mutex); - for (CNode* pnode : m_nodes) { + for (auto& pnode : m_nodes) { if (static_cast(pnode->addr) == addr) { return pnode; } @@ -373,7 +373,7 @@ bool CConnman::AlreadyConnectedToAddress(const CAddress& addr) bool CConnman::CheckIncomingNonce(uint64_t nonce) { LOCK(m_nodes_mutex); - for (const CNode* pnode : m_nodes) { + for (const auto& pnode : m_nodes) { if (!pnode->fSuccessfullyConnected && !pnode->IsInboundConn() && pnode->GetLocalNonce() == nonce) return false; } @@ -394,7 +394,7 @@ static CService GetBindAddress(const Sock& sock) return addr_bind; } -CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) +std::shared_ptr CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) { AssertLockNotHeld(m_unused_i2p_sessions_mutex); assert(conn_type != ConnectionType::INBOUND); @@ -404,9 +404,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo return nullptr; // Look for an existing connection - CNode* pnode = FindNode(static_cast(addrConnect)); - if (pnode) - { + if (FindNode(static_cast(addrConnect))) { LogPrintf("Failed to open new connection, already connected\n"); return nullptr; } @@ -438,8 +436,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo // It is possible that we already have a connection to the IP/port pszDest resolved to. // In that case, drop the connection that was just created. LOCK(m_nodes_mutex); - CNode* pnode = FindNode(static_cast(addrConnect)); - if (pnode) { + if (FindNode(static_cast(addrConnect))) { LogPrintf("Not opening a connection to %s, already connected to %s\n", pszDest, addrConnect.ToStringAddrPort()); return nullptr; } @@ -531,22 +528,23 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo if (!addr_bind.IsValid()) { addr_bind = GetBindAddress(*sock); } - CNode* pnode = new CNode(id, - std::move(sock), - target_addr, - CalculateKeyedNetGroup(target_addr), - nonce, - addr_bind, - pszDest ? pszDest : "", - conn_type, - /*inbound_onion=*/false, - CNodeOptions{ - .permission_flags = permission_flags, - .i2p_sam_session = std::move(i2p_transient_session), - .recv_flood_size = nReceiveFloodSize, - .use_v2transport = use_v2transport, - }); - pnode->AddRef(); + auto pnode = std::make_shared( + id, + std::move(sock), + target_addr, + CalculateKeyedNetGroup(target_addr), + nonce, + addr_bind, + pszDest ? pszDest : "", + conn_type, + /*inbound_onion=*/false, + [this](CNode& node) { m_msgproc->FinalizeNode(node); }, + CNodeOptions{ + .permission_flags = permission_flags, + .i2p_sam_session = std::move(i2p_transient_session), + .recv_flood_size = nReceiveFloodSize, + .use_v2transport = use_v2transport, + }); // We're making a new connection, harvest entropy from the time (and our peer count) RandAddEvent((uint32_t)id); @@ -1689,7 +1687,7 @@ bool CConnman::AttemptToEvictConnection() { LOCK(m_nodes_mutex); - for (const CNode* node : m_nodes) { + for (const auto& node : m_nodes) { if (node->fDisconnect) continue; NodeEvictionCandidate candidate{ @@ -1716,7 +1714,7 @@ bool CConnman::AttemptToEvictConnection() return false; } LOCK(m_nodes_mutex); - for (CNode* pnode : m_nodes) { + for (const auto& pnode : m_nodes) { if (pnode->GetId() == *node_id_to_evict) { LogDebug(BCLog::NET, "selected %s connection for eviction, %s", pnode->ConnectionTypeAsString(), pnode->DisconnectMsg(fLogIPs)); TRACEPOINT(net, evicted_inbound_connection, @@ -1771,7 +1769,7 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr&& sock, { LOCK(m_nodes_mutex); - for (const CNode* pnode : m_nodes) { + for (const auto& pnode : m_nodes) { if (pnode->IsInboundConn()) nInbound++; } } @@ -1828,22 +1826,24 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr&& sock, ServiceFlags local_services = GetLocalServices(); const bool use_v2transport(local_services & NODE_P2P_V2); - CNode* pnode = new CNode(id, - std::move(sock), - CAddress{addr, NODE_NONE}, - CalculateKeyedNetGroup(addr), - nonce, - addr_bind, - /*addrNameIn=*/"", - ConnectionType::INBOUND, - inbound_onion, - CNodeOptions{ - .permission_flags = permission_flags, - .prefer_evict = discouraged, - .recv_flood_size = nReceiveFloodSize, - .use_v2transport = use_v2transport, - }); - pnode->AddRef(); + auto pnode = std::make_shared( + id, + std::move(sock), + CAddress{addr, NODE_NONE}, + CalculateKeyedNetGroup(addr), + nonce, + addr_bind, + /*addrNameIn=*/"", + ConnectionType::INBOUND, + inbound_onion, + [this](CNode& node) { m_msgproc->FinalizeNode(node); }, + CNodeOptions{ + .permission_flags = permission_flags, + .prefer_evict = discouraged, + .recv_flood_size = nReceiveFloodSize, + .use_v2transport = use_v2transport, + }); + m_msgproc->InitializeNode(*pnode, local_services); { LOCK(m_nodes_mutex); @@ -1884,8 +1884,9 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ } // no default case, so the compiler can warn about missing cases // Count existing connections - int existing_connections = WITH_LOCK(m_nodes_mutex, - return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->m_conn_type == conn_type; });); + const int existing_connections = WITH_LOCK(m_nodes_mutex, + return std::ranges::count_if(m_nodes, [conn_type](const auto& node) { return node->m_conn_type == conn_type; }); + ); // Max connections of specified type already exist if (max_connections != std::nullopt && existing_connections >= max_connections) return false; @@ -1907,12 +1908,14 @@ void CConnman::DisconnectNodes() // m_reconnections_mutex while holding m_nodes_mutex. decltype(m_reconnections) reconnections_to_add; + std::vector> disconnected_nodes; + { LOCK(m_nodes_mutex); if (!fNetworkActive) { // Disconnect any connected nodes - for (CNode* pnode : m_nodes) { + for (const auto& pnode : m_nodes) { if (!pnode->fDisconnect) { LogDebug(BCLog::NET, "Network not active, %s\n", pnode->DisconnectMsg(fLogIPs)); pnode->fDisconnect = true; @@ -1921,51 +1924,40 @@ void CConnman::DisconnectNodes() } // Disconnect unused nodes - std::vector nodes_copy = m_nodes; - for (CNode* pnode : nodes_copy) - { - if (pnode->fDisconnect) - { - // remove from m_nodes - m_nodes.erase(remove(m_nodes.begin(), m_nodes.end(), pnode), m_nodes.end()); + for (auto it = m_nodes.begin(); it != m_nodes.end();) { + auto node = *it; + if (node->fDisconnect) { + it = m_nodes.erase(it); + + // Keep a reference to this CNode object to delay its destruction until + // after m_nodes_mutex has been released. Destructing a node involves + // calling m_msgproc->FinalizeNode() which acquires cs_main. Lock order + // should be cs_main, m_nodes_mutex. + disconnected_nodes.push_back(node); // Add to reconnection list if appropriate. We don't reconnect right here, because // the creation of a connection is a blocking operation (up to several seconds), // and we don't want to hold up the socket handler thread for that long. - if (pnode->m_transport->ShouldReconnectV1()) { + if (node->m_transport->ShouldReconnectV1()) { reconnections_to_add.push_back({ - .addr_connect = pnode->addr, - .grant = std::move(pnode->grantOutbound), - .destination = pnode->m_dest, - .conn_type = pnode->m_conn_type, + .addr_connect = node->addr, + .grant = std::move(node->grantOutbound), + .destination = node->m_dest, + .conn_type = node->m_conn_type, .use_v2transport = false}); - LogDebug(BCLog::NET, "retrying with v1 transport protocol for peer=%d\n", pnode->GetId()); + LogDebug(BCLog::NET, "retrying with v1 transport protocol for peer=%d\n", node->GetId()); } // release outbound grant (if any) - pnode->grantOutbound.Release(); + node->grantOutbound.Release(); // close socket and cleanup - pnode->CloseSocketDisconnect(); + node->CloseSocketDisconnect(); // update connection count by network - if (pnode->IsManualOrFullOutboundConn()) --m_network_conn_counts[pnode->addr.GetNetwork()]; - - // hold in disconnected pool until all refs are released - pnode->Release(); - m_nodes_disconnected.push_back(pnode); - } - } - } - { - // Delete disconnected nodes - std::list nodes_disconnected_copy = m_nodes_disconnected; - for (CNode* pnode : nodes_disconnected_copy) - { - // Destroy the object only after other threads have stopped using it. - if (pnode->GetRefCount() <= 0) { - m_nodes_disconnected.remove(pnode); - DeleteNode(pnode); + if (node->IsManualOrFullOutboundConn()) --m_network_conn_counts[node->addr.GetNetwork()]; + } else { + ++it; } } } @@ -2050,7 +2042,7 @@ bool CConnman::InactivityCheck(const CNode& node) const return false; } -Sock::EventsPerSock CConnman::GenerateWaitSockets(Span nodes) +Sock::EventsPerSock CConnman::GenerateWaitSockets(const std::vector>& nodes) { Sock::EventsPerSock events_per_sock; @@ -2058,7 +2050,7 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span nodes) events_per_sock.emplace(hListenSocket.sock, Sock::Events{Sock::RECV}); } - for (CNode* pnode : nodes) { + for (auto& pnode : nodes) { bool select_recv = !pnode->fPauseRecv; bool select_send; { @@ -2087,34 +2079,34 @@ void CConnman::SocketHandler() Sock::EventsPerSock events_per_sock; - { - const NodesSnapshot snap{*this, /*shuffle=*/false}; + auto nodes = WITH_LOCK(m_nodes_mutex, return m_nodes); - const auto timeout = std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS); + const auto timeout = std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS); - // Check for the readiness of the already connected sockets and the - // listening sockets in one call ("readiness" as in poll(2) or - // select(2)). If none are ready, wait for a short while and return - // empty sets. - events_per_sock = GenerateWaitSockets(snap.Nodes()); - if (events_per_sock.empty() || !events_per_sock.begin()->first->WaitMany(timeout, events_per_sock)) { - interruptNet.sleep_for(timeout); - } - - // Service (send/receive) each of the already connected nodes. - SocketHandlerConnected(snap.Nodes(), events_per_sock); + // Check for the readiness of the already connected sockets and the + // listening sockets in one call ("readiness" as in poll(2) or + // select(2)). If none are ready, wait for a short while and return + // empty sets. + events_per_sock = GenerateWaitSockets(nodes); + if (events_per_sock.empty() || !events_per_sock.begin()->first->WaitMany(timeout, events_per_sock)) { + interruptNet.sleep_for(timeout); } + // Service (send/receive) each of the already connected nodes. + SocketHandlerConnected(nodes, events_per_sock); + + nodes.clear(); + // Accept new connections from listening sockets. SocketHandlerListening(events_per_sock); } -void CConnman::SocketHandlerConnected(const std::vector& nodes, +void CConnman::SocketHandlerConnected(const std::vector>& nodes, const Sock::EventsPerSock& events_per_sock) { AssertLockNotHeld(m_total_bytes_sent_mutex); - for (CNode* pnode : nodes) { + for (auto& pnode : nodes) { if (interruptNet) return; @@ -2432,14 +2424,10 @@ void CConnman::StartExtraBlockRelayPeers() // Return the number of outbound connections that are full relay (not blocks only) int CConnman::GetFullOutboundConnCount() const { - int nRelevant = 0; - { - LOCK(m_nodes_mutex); - for (const CNode* pnode : m_nodes) { - if (pnode->fSuccessfullyConnected && pnode->IsFullOutboundConn()) ++nRelevant; - } - } - return nRelevant; + LOCK(m_nodes_mutex); + return std::ranges::count_if(m_nodes, [](const auto& node) { + return node->fSuccessfullyConnected && node->IsFullOutboundConn(); + }); } // Return the number of peers we have over our outbound connection limit @@ -2453,7 +2441,7 @@ int CConnman::GetExtraFullOutboundCount() const int full_outbound_peers = 0; { LOCK(m_nodes_mutex); - for (const CNode* pnode : m_nodes) { + for (const auto& pnode : m_nodes) { if (pnode->fSuccessfullyConnected && !pnode->fDisconnect && pnode->IsFullOutboundConn()) { ++full_outbound_peers; } @@ -2467,7 +2455,7 @@ int CConnman::GetExtraBlockRelayCount() const int block_relay_peers = 0; { LOCK(m_nodes_mutex); - for (const CNode* pnode : m_nodes) { + for (const auto& pnode : m_nodes) { if (pnode->fSuccessfullyConnected && !pnode->fDisconnect && pnode->IsBlockOnlyConn()) { ++block_relay_peers; } @@ -2638,7 +2626,7 @@ void CConnman::ThreadOpenConnections(const std::vector connect, Spa { LOCK(m_nodes_mutex); - for (const CNode* pnode : m_nodes) { + for (const auto& pnode : m_nodes) { if (pnode->IsFullOutboundConn()) nOutboundFullRelay++; if (pnode->IsBlockOnlyConn()) nOutboundBlockRelay++; @@ -2883,7 +2871,7 @@ std::vector CConnman::GetCurrentBlockRelayOnlyConns() const { std::vector ret; LOCK(m_nodes_mutex); - for (const CNode* pnode : m_nodes) { + for (const auto& pnode : m_nodes) { if (pnode->IsBlockOnlyConn()) { ret.push_back(pnode->addr); } @@ -2909,7 +2897,7 @@ std::vector CConnman::GetAddedNodeInfo(bool include_connected) co std::map> mapConnectedByName; { LOCK(m_nodes_mutex); - for (const CNode* pnode : m_nodes) { + for (const auto& pnode : m_nodes) { if (pnode->addr.IsValid()) { mapConnected[pnode->addr] = pnode->IsInboundConn(); } @@ -3004,7 +2992,7 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai } else if (FindNode(std::string(pszDest))) return; - CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure, conn_type, use_v2transport); + auto pnode = ConnectNode(addrConnect, pszDest, fCountFailure, conn_type, use_v2transport); if (!pnode) return; @@ -3037,29 +3025,30 @@ void CConnman::ThreadMessageHandler() { bool fMoreWork = false; - { - // Randomize the order in which we process messages from/to our peers. - // This prevents attacks in which an attacker exploits having multiple - // consecutive connections in the m_nodes list. - const NodesSnapshot snap{*this, /*shuffle=*/true}; + auto nodes = WITH_LOCK(m_nodes_mutex, return m_nodes); + // Randomize the order in which we process messages from/to our peers. + // This prevents attacks in which an attacker exploits having multiple + // consecutive connections in the m_nodes list. + std::shuffle(nodes.begin(), nodes.end(), FastRandomContext{}); - for (CNode* pnode : snap.Nodes()) { - if (pnode->fDisconnect) - continue; + for (auto& pnode : nodes) { + if (pnode->fDisconnect) + continue; - // Receive messages - bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc); - fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); - if (flagInterruptMsgProc) - return; - // Send messages - m_msgproc->SendMessages(pnode); + // Receive messages + bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode.get(), flagInterruptMsgProc); + fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); + if (flagInterruptMsgProc) + return; + // Send messages + m_msgproc->SendMessages(pnode.get()); - if (flagInterruptMsgProc) - return; - } + if (flagInterruptMsgProc) + return; } + nodes.clear(); + WAIT_LOCK(mutexMsgProc, lock); if (!fMoreWork) { condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this]() EXCLUSIVE_LOCKS_REQUIRED(mutexMsgProc) { return fMsgProcWake; }); @@ -3470,30 +3459,19 @@ void CConnman::StopNodes() } // Delete peer connections. - std::vector nodes; + std::vector> nodes; WITH_LOCK(m_nodes_mutex, nodes.swap(m_nodes)); - for (CNode* pnode : nodes) { + for (auto& pnode : nodes) { LogDebug(BCLog::NET, "Stopping node, %s", pnode->DisconnectMsg(fLogIPs)); pnode->CloseSocketDisconnect(); - DeleteNode(pnode); } + nodes.clear(); - for (CNode* pnode : m_nodes_disconnected) { - DeleteNode(pnode); - } - m_nodes_disconnected.clear(); vhListenSocket.clear(); semOutbound.reset(); semAddnode.reset(); } -void CConnman::DeleteNode(CNode* pnode) -{ - assert(pnode); - m_msgproc->FinalizeNode(*pnode); - delete pnode; -} - CConnman::~CConnman() { Interrupt(); @@ -3626,7 +3604,7 @@ void CConnman::GetNodeStats(std::vector& vstats) const vstats.clear(); LOCK(m_nodes_mutex); vstats.reserve(m_nodes.size()); - for (CNode* pnode : m_nodes) { + for (const auto& pnode : m_nodes) { vstats.emplace_back(); pnode->CopyStats(vstats.back()); vstats.back().m_mapped_as = GetMappedAS(pnode->addr); @@ -3636,7 +3614,7 @@ void CConnman::GetNodeStats(std::vector& vstats) const bool CConnman::DisconnectNode(const std::string& strNode) { LOCK(m_nodes_mutex); - if (CNode* pnode = FindNode(strNode)) { + if (auto pnode = FindNode(strNode)) { LogDebug(BCLog::NET, "disconnect by address%s match, %s", (fLogIPs ? strprintf("=%s", strNode) : ""), pnode->DisconnectMsg(fLogIPs)); pnode->fDisconnect = true; return true; @@ -3648,7 +3626,7 @@ bool CConnman::DisconnectNode(const CSubNet& subnet) { bool disconnected = false; LOCK(m_nodes_mutex); - for (CNode* pnode : m_nodes) { + for (auto& pnode : m_nodes) { if (subnet.Match(pnode->addr)) { LogDebug(BCLog::NET, "disconnect by subnet%s match, %s", (fLogIPs ? strprintf("=%s", subnet.ToString()) : ""), pnode->DisconnectMsg(fLogIPs)); pnode->fDisconnect = true; @@ -3666,7 +3644,7 @@ bool CConnman::DisconnectNode(const CNetAddr& addr) bool CConnman::DisconnectNode(NodeId id) { LOCK(m_nodes_mutex); - for(CNode* pnode : m_nodes) { + for(auto& pnode : m_nodes) { if (id == pnode->GetId()) { LogDebug(BCLog::NET, "disconnect by id, %s", pnode->DisconnectMsg(fLogIPs)); pnode->fDisconnect = true; @@ -3799,6 +3777,7 @@ CNode::CNode(NodeId idIn, const std::string& addrNameIn, ConnectionType conn_type_in, bool inbound_onion, + std::function destruct_cb, CNodeOptions&& node_opts) : m_transport{MakeTransport(idIn, node_opts.use_v2transport, conn_type_in == ConnectionType::INBOUND)}, m_permission_flags{node_opts.permission_flags}, @@ -3815,7 +3794,8 @@ CNode::CNode(NodeId idIn, id{idIn}, nLocalHostNonce{nLocalHostNonceIn}, m_recv_flood_size{node_opts.recv_flood_size}, - m_i2p_sam_session{std::move(node_opts.i2p_sam_session)} + m_i2p_sam_session{std::move(node_opts.i2p_sam_session)}, + m_destruct_cb{destruct_cb} { if (inbound_onion) assert(conn_type_in == ConnectionType::INBOUND); @@ -3831,6 +3811,13 @@ CNode::CNode(NodeId idIn, } } +CNode::~CNode() +{ + if (m_destruct_cb) { + m_destruct_cb(*this); + } +} + void CNode::MarkReceivedMsgsForProcessing() { AssertLockNotHeld(m_msg_process_queue_mutex); @@ -3862,9 +3849,9 @@ std::optional> CNode::PollMessage() return std::make_pair(std::move(msgs.front()), !m_msg_process_queue.empty()); } -bool CConnman::NodeFullyConnected(const CNode* pnode) +bool CConnman::NodeFullyConnected(const CNode& node) { - return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; + return node.fSuccessfullyConnected && !node.fDisconnect; } void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) @@ -3916,15 +3903,13 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) bool CConnman::ForNode(NodeId id, std::function func) { - CNode* found = nullptr; LOCK(m_nodes_mutex); for (auto&& pnode : m_nodes) { if(pnode->GetId() == id) { - found = pnode; - break; + return NodeFullyConnected(*pnode) && func(pnode.get()); } } - return found != nullptr && NodeFullyConnected(found) && func(found); + return false; } CSipHasher CConnman::GetDeterministicRandomizer(uint64_t id) const diff --git a/src/net.h b/src/net.h index e64d9a67f46..5297f7f1792 100644 --- a/src/net.h +++ b/src/net.h @@ -730,7 +730,6 @@ public: // next time DisconnectNodes() runs std::atomic_bool fDisconnect{false}; CSemaphoreGrant grantOutbound; - std::atomic nRefCount{0}; const uint64_t nKeyedNetGroup; std::atomic_bool fPauseRecv{false}; @@ -887,10 +886,13 @@ public: const std::string& addrNameIn, ConnectionType conn_type_in, bool inbound_onion, + std::function destruct_cb = {}, CNodeOptions&& node_opts = {}); CNode(const CNode&) = delete; CNode& operator=(const CNode&) = delete; + ~CNode(); + NodeId GetId() const { return id; } @@ -899,12 +901,6 @@ public: return nLocalHostNonce; } - int GetRefCount() const - { - assert(nRefCount >= 0); - return nRefCount; - } - /** * Receive bytes from the buffer and deserialize them into messages. * @@ -930,17 +926,6 @@ public: //! May not be called more than once void SetAddrLocal(const CService& addrLocalIn) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_local_mutex); - CNode* AddRef() - { - nRefCount++; - return this; - } - - void Release() - { - nRefCount--; - } - void CloseSocketDisconnect() EXCLUSIVE_LOCKS_REQUIRED(!m_sock_mutex); void CopyStats(CNodeStats& stats) EXCLUSIVE_LOCKS_REQUIRED(!m_subver_mutex, !m_addr_local_mutex, !cs_vSend, !cs_vRecv); @@ -999,6 +984,11 @@ private: * Otherwise this unique_ptr is empty. */ std::unique_ptr m_i2p_sam_session GUARDED_BY(m_sock_mutex); + + /** + * A function to be called just before this object is destroyed. + */ + std::function m_destruct_cb; }; /** @@ -1152,8 +1142,9 @@ public: { LOCK(m_nodes_mutex); for (auto&& node : m_nodes) { - if (NodeFullyConnected(node)) - func(node); + if (NodeFullyConnected(*node)) { + func(node.get()); + } } }; @@ -1161,8 +1152,9 @@ public: { LOCK(m_nodes_mutex); for (auto&& node : m_nodes) { - if (NodeFullyConnected(node)) - func(node); + if (NodeFullyConnected(*node)) { + func(node.get()); + } } }; @@ -1325,7 +1317,7 @@ private: * @param[in] nodes Select from these nodes' sockets. * @return sockets to check for readiness */ - Sock::EventsPerSock GenerateWaitSockets(Span nodes); + Sock::EventsPerSock GenerateWaitSockets(const std::vector>& nodes); /** * Check connected and listening sockets for IO readiness and process them accordingly. @@ -1337,7 +1329,7 @@ private: * @param[in] nodes Nodes to process. The socket of each node is checked against `what`. * @param[in] events_per_sock Sockets that are ready for IO. */ - void SocketHandlerConnected(const std::vector& nodes, + void SocketHandlerConnected(const std::vector>& nodes, const Sock::EventsPerSock& events_per_sock) EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc); @@ -1352,9 +1344,9 @@ private: uint64_t CalculateKeyedNetGroup(const CNetAddr& ad) const; - CNode* FindNode(const CNetAddr& ip); - CNode* FindNode(const std::string& addrName); - CNode* FindNode(const CService& addr); + std::shared_ptr FindNode(const CNetAddr& ip); + std::shared_ptr FindNode(const std::string& addrName); + std::shared_ptr FindNode(const CService& addr); /** * Determine whether we're already connected to a given address, in order to @@ -1363,7 +1355,7 @@ private: bool AlreadyConnectedToAddress(const CAddress& addr); bool AttemptToEvictConnection(); - CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); + std::shared_ptr ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); void AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr, const std::vector& ranges) const; void DeleteNode(CNode* pnode); @@ -1403,7 +1395,7 @@ private: bool MaybePickPreferredNetwork(std::optional& network); // Whether the node should be passed out in ForEach* callbacks - static bool NodeFullyConnected(const CNode* pnode); + static bool NodeFullyConnected(const CNode& node); uint16_t GetDefaultPort(Network net) const; uint16_t GetDefaultPort(const std::string& addr) const; @@ -1442,8 +1434,7 @@ private: std::vector m_added_node_params GUARDED_BY(m_added_nodes_mutex); mutable Mutex m_added_nodes_mutex; - std::vector m_nodes GUARDED_BY(m_nodes_mutex); - std::list m_nodes_disconnected; + std::vector> m_nodes GUARDED_BY(m_nodes_mutex); mutable RecursiveMutex m_nodes_mutex; std::atomic nLastNodeId{0}; unsigned int nPrevNodeCount{0}; @@ -1634,43 +1625,6 @@ private: */ static constexpr size_t MAX_UNUSED_I2P_SESSIONS_SIZE{10}; - /** - * RAII helper to atomically create a copy of `m_nodes` and add a reference - * to each of the nodes. The nodes are released when this object is destroyed. - */ - class NodesSnapshot - { - public: - explicit NodesSnapshot(const CConnman& connman, bool shuffle) - { - { - LOCK(connman.m_nodes_mutex); - m_nodes_copy = connman.m_nodes; - for (auto& node : m_nodes_copy) { - node->AddRef(); - } - } - if (shuffle) { - std::shuffle(m_nodes_copy.begin(), m_nodes_copy.end(), FastRandomContext{}); - } - } - - ~NodesSnapshot() - { - for (auto& node : m_nodes_copy) { - node->Release(); - } - } - - const std::vector& Nodes() const - { - return m_nodes_copy; - } - - private: - std::vector m_nodes_copy; - }; - const CChainParams& m_params; friend struct ConnmanTestMsg; diff --git a/src/test/fuzz/net.cpp b/src/test/fuzz/net.cpp index 1a0de7aa363..3525f1e4170 100644 --- a/src/test/fuzz/net.cpp +++ b/src/test/fuzz/net.cpp @@ -49,15 +49,6 @@ FUZZ_TARGET(net, .init = initialize_net) CNodeStats stats; node.CopyStats(stats); }, - [&] { - const CNode* add_ref_node = node.AddRef(); - assert(add_ref_node == &node); - }, - [&] { - if (node.GetRefCount() > 0) { - node.Release(); - } - }, [&] { const std::vector b = ConsumeRandomLengthByteVector(fuzzed_data_provider); bool complete; @@ -68,8 +59,6 @@ FUZZ_TARGET(net, .init = initialize_net) (void)node.GetAddrLocal(); (void)node.GetId(); (void)node.GetLocalNonce(); - const int ref_count = node.GetRefCount(); - assert(ref_count >= 0); (void)node.GetCommonVersion(); const NetPermissionFlags net_permission_flags = ConsumeWeakEnum(fuzzed_data_provider, ALL_NET_PERMISSION_FLAGS); diff --git a/src/test/fuzz/p2p_handshake.cpp b/src/test/fuzz/p2p_handshake.cpp index d608efd87ac..d881cb35baf 100644 --- a/src/test/fuzz/p2p_handshake.cpp +++ b/src/test/fuzz/p2p_handshake.cpp @@ -64,7 +64,9 @@ FUZZ_TARGET(p2p_handshake, .init = ::initialize) std::vector peers; const auto num_peers_to_add = fuzzed_data_provider.ConsumeIntegralInRange(1, 3); for (int i = 0; i < num_peers_to_add; ++i) { - peers.push_back(ConsumeNodeAsUniquePtr(fuzzed_data_provider, i).release()); + peers.push_back(ConsumeNodeAsUniquePtr(fuzzed_data_provider, i, [&peerman](CNode& node) { + peerman->FinalizeNode(node); + }).release()); connman.AddTestNode(*peers.back()); peerman->InitializeNode( *peers.back(), diff --git a/src/test/fuzz/p2p_headers_presync.cpp b/src/test/fuzz/p2p_headers_presync.cpp index ed7041ad1f1..4c470dab1cf 100644 --- a/src/test/fuzz/p2p_headers_presync.cpp +++ b/src/test/fuzz/p2p_headers_presync.cpp @@ -60,7 +60,8 @@ void HeadersSyncSetup::ResetAndInitialize() for (auto conn_type : conn_types) { CAddress addr{}; - m_connections.push_back(new CNode(id++, nullptr, addr, 0, 0, addr, "", conn_type, false)); + m_connections.push_back(new CNode(id++, nullptr, addr, 0, 0, addr, "", conn_type, false, + [this](CNode& node) { m_node.peerman->FinalizeNode(node); })); CNode& p2p_node = *m_connections.back(); connman.Handshake( diff --git a/src/test/fuzz/process_message.cpp b/src/test/fuzz/process_message.cpp index 4bd38a1ac68..b2031a60cc7 100644 --- a/src/test/fuzz/process_message.cpp +++ b/src/test/fuzz/process_message.cpp @@ -66,7 +66,9 @@ FUZZ_TARGET(process_message, .init = initialize_process_message) if (!LIMIT_TO_MESSAGE_TYPE.empty() && random_message_type != LIMIT_TO_MESSAGE_TYPE) { return; } - CNode& p2p_node = *ConsumeNodeAsUniquePtr(fuzzed_data_provider).release(); + CNode& p2p_node = *ConsumeNodeAsUniquePtr(fuzzed_data_provider, std::nullopt, [](CNode& node) { + g_setup->m_node.peerman->FinalizeNode(node); + }).release(); connman.AddTestNode(p2p_node); FillNode(fuzzed_data_provider, connman, p2p_node); diff --git a/src/test/fuzz/process_messages.cpp b/src/test/fuzz/process_messages.cpp index 0688868c02b..89584e3d59e 100644 --- a/src/test/fuzz/process_messages.cpp +++ b/src/test/fuzz/process_messages.cpp @@ -55,7 +55,9 @@ FUZZ_TARGET(process_messages, .init = initialize_process_messages) std::vector peers; const auto num_peers_to_add = fuzzed_data_provider.ConsumeIntegralInRange(1, 3); for (int i = 0; i < num_peers_to_add; ++i) { - peers.push_back(ConsumeNodeAsUniquePtr(fuzzed_data_provider, i).release()); + peers.push_back(ConsumeNodeAsUniquePtr(fuzzed_data_provider, i, [&connman](CNode& node) { + connman.MsgProc()->FinalizeNode(node); + }).release()); CNode& p2p_node = *peers.back(); FillNode(fuzzed_data_provider, connman, p2p_node); diff --git a/src/test/fuzz/util/net.h b/src/test/fuzz/util/net.h index 698001a7f15..29dcfbe28c9 100644 --- a/src/test/fuzz/util/net.h +++ b/src/test/fuzz/util/net.h @@ -228,7 +228,7 @@ inline CService ConsumeService(FuzzedDataProvider& fuzzed_data_provider) noexcep CAddress ConsumeAddress(FuzzedDataProvider& fuzzed_data_provider) noexcept; template -auto ConsumeNode(FuzzedDataProvider& fuzzed_data_provider, const std::optional& node_id_in = std::nullopt) noexcept +auto ConsumeNode(FuzzedDataProvider& fuzzed_data_provider, const std::optional& node_id_in = std::nullopt, std::function destruct_cb = {}) noexcept { const NodeId node_id = node_id_in.value_or(fuzzed_data_provider.ConsumeIntegralInRange(0, std::numeric_limits::max())); const auto sock = std::make_shared(fuzzed_data_provider); @@ -250,6 +250,7 @@ auto ConsumeNode(FuzzedDataProvider& fuzzed_data_provider, const std::optional ConsumeNodeAsUniquePtr(FuzzedDataProvider& fdp, const std::optional& node_id_in = std::nullopt) { return ConsumeNode(fdp, node_id_in); } +inline std::unique_ptr ConsumeNodeAsUniquePtr(FuzzedDataProvider& fdp, const std::optional& node_id_in = std::nullopt, std::function destruct_cb = {}) { return ConsumeNode(fdp, node_id_in, destruct_cb); } void FillNode(FuzzedDataProvider& fuzzed_data_provider, ConnmanTestMsg& connman, CNode& node) noexcept EXCLUSIVE_LOCKS_REQUIRED(NetEventsInterface::g_msgproc_mutex); diff --git a/src/test/net_peer_connection_tests.cpp b/src/test/net_peer_connection_tests.cpp index e60ce8b99d3..0a94456bec7 100644 --- a/src/test/net_peer_connection_tests.cpp +++ b/src/test/net_peer_connection_tests.cpp @@ -117,7 +117,7 @@ BOOST_FIXTURE_TEST_CASE(test_addnode_getaddednodeinfo_and_connection_detection, BOOST_CHECK_EQUAL(nodes.back()->ConnectedThroughNetwork(), Network::NET_CJDNS); BOOST_TEST_MESSAGE("Call AddNode() for all the peers"); - for (auto node : connman->TestNodes()) { + for (const auto& node : connman->TestNodes()) { BOOST_CHECK(connman->AddNode({/*m_added_node=*/node->addr.ToStringAddrPort(), /*m_use_v2transport=*/true})); BOOST_TEST_MESSAGE(strprintf("peer id=%s addr=%s", node->GetId(), node->addr.ToStringAddrPort())); } @@ -134,7 +134,7 @@ BOOST_FIXTURE_TEST_CASE(test_addnode_getaddednodeinfo_and_connection_detection, BOOST_CHECK(connman->GetAddedNodeInfo(/*include_connected=*/false).empty()); // Test AddedNodesContain() - for (auto node : connman->TestNodes()) { + for (const auto& node : connman->TestNodes()) { BOOST_CHECK(connman->AddedNodesContain(node->addr)); } AddPeer(id, nodes, *peerman, *connman, ConnectionType::OUTBOUND_FULL_RELAY); @@ -151,12 +151,12 @@ BOOST_FIXTURE_TEST_CASE(test_addnode_getaddednodeinfo_and_connection_detection, } BOOST_TEST_MESSAGE("\nCheck that all connected peers are correctly detected as connected"); - for (auto node : connman->TestNodes()) { + for (const auto& node : connman->TestNodes()) { BOOST_CHECK(connman->AlreadyConnectedPublic(node->addr)); } // Clean up - for (auto node : connman->TestNodes()) { + for (const auto& node : connman->TestNodes()) { peerman->FinalizeNode(*node); } connman->ClearTestNodes(); diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp index ddd96a50640..0dadabed2e3 100644 --- a/src/test/util/net.cpp +++ b/src/test/util/net.cpp @@ -105,9 +105,9 @@ bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) co return complete; } -CNode* ConnmanTestMsg::ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type) +std::shared_ptr ConnmanTestMsg::ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type) { - CNode* node = ConnectNode(CAddress{}, pszDest, /*fCountFailure=*/false, conn_type, /*use_v2transport=*/true); + auto node = ConnectNode(CAddress{}, pszDest, /*fCountFailure=*/false, conn_type, /*use_v2transport=*/true); if (!node) return nullptr; node->SetCommonVersion(PROTOCOL_VERSION); peerman.InitializeNode(*node, ServiceFlags(NODE_NETWORK | NODE_WITNESS)); diff --git a/src/test/util/net.h b/src/test/util/net.h index 3e717341d87..152f106f01c 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -45,7 +45,7 @@ struct ConnmanTestMsg : public CConnman { m_peer_connect_timeout = timeout; } - std::vector TestNodes() + std::vector> TestNodes() { LOCK(m_nodes_mutex); return m_nodes; @@ -54,7 +54,7 @@ struct ConnmanTestMsg : public CConnman { void AddTestNode(CNode& node) { LOCK(m_nodes_mutex); - m_nodes.push_back(&node); + m_nodes.push_back(std::shared_ptr(&node)); if (node.IsManualOrFullOutboundConn()) ++m_network_conn_counts[node.addr.GetNetwork()]; } @@ -62,12 +62,11 @@ struct ConnmanTestMsg : public CConnman { void ClearTestNodes() { LOCK(m_nodes_mutex); - for (CNode* node : m_nodes) { - delete node; - } m_nodes.clear(); } + NetEventsInterface* MsgProc() const { return m_msgproc; }; + void Handshake(CNode& node, bool successfully_connected, ServiceFlags remote_services, @@ -88,7 +87,7 @@ struct ConnmanTestMsg : public CConnman { bool AlreadyConnectedPublic(const CAddress& addr) { return AlreadyConnectedToAddress(addr); }; - CNode* ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type) + std::shared_ptr ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); };