net: replace manual reference counting of CNode with shared_ptr

Before this change the code used to count references to `CNode` objects
manually via `CNode::nRefCount`. Unneeded `CNode`s were scheduled for
deletion by putting them in `CConnman::m_nodes_disconnected` and were
deleted after their reference count reached zero. Deleting consists of
calling `PeerManager::FinalizeNode()` and destroying the `CNode` object.

Replace this scheme with `std::shared_ptr`. This simplifies the code and
removes:
`CNode::nRefCount`
`CNode::GetRefCount()`
`CNode::AddRef()`
`CNode::Release()`
`CConnman::m_nodes_disconnected`
`CConnman::NodesSnapshot`

Now creating a snapshot of `CConnman::m_nodes` is done by simply copying
it (under the mutex).

Call `PeerManager::FinalizeNode()` from the destructor of `CNode`, which
is called when the reference count reaches 0.
This commit is contained in:
Vasil Dimov 2023-08-14 19:08:48 +02:00
parent a9a2b669f3
commit af622d00ba
No known key found for this signature in database
GPG key ID: 54DF06F64B55CBBF
11 changed files with 191 additions and 255 deletions

View file

@ -332,10 +332,10 @@ bool IsLocal(const CService& addr)
return mapLocalHost.count(addr) > 0;
}
CNode* CConnman::FindNode(const CNetAddr& ip)
std::shared_ptr<CNode> CConnman::FindNode(const CNetAddr& ip)
{
LOCK(m_nodes_mutex);
for (CNode* pnode : m_nodes) {
for (auto& pnode : m_nodes) {
if (static_cast<CNetAddr>(pnode->addr) == ip) {
return pnode;
}
@ -343,10 +343,10 @@ CNode* CConnman::FindNode(const CNetAddr& ip)
return nullptr;
}
CNode* CConnman::FindNode(const std::string& addrName)
std::shared_ptr<CNode> CConnman::FindNode(const std::string& addrName)
{
LOCK(m_nodes_mutex);
for (CNode* pnode : m_nodes) {
for (auto& pnode : m_nodes) {
if (pnode->m_addr_name == addrName) {
return pnode;
}
@ -354,10 +354,10 @@ CNode* CConnman::FindNode(const std::string& addrName)
return nullptr;
}
CNode* CConnman::FindNode(const CService& addr)
std::shared_ptr<CNode> CConnman::FindNode(const CService& addr)
{
LOCK(m_nodes_mutex);
for (CNode* pnode : m_nodes) {
for (auto& pnode : m_nodes) {
if (static_cast<CService>(pnode->addr) == addr) {
return pnode;
}
@ -373,7 +373,7 @@ bool CConnman::AlreadyConnectedToAddress(const CAddress& addr)
bool CConnman::CheckIncomingNonce(uint64_t nonce)
{
LOCK(m_nodes_mutex);
for (const CNode* pnode : m_nodes) {
for (const auto& pnode : m_nodes) {
if (!pnode->fSuccessfullyConnected && !pnode->IsInboundConn() && pnode->GetLocalNonce() == nonce)
return false;
}
@ -394,7 +394,7 @@ static CService GetBindAddress(const Sock& sock)
return addr_bind;
}
CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport)
std::shared_ptr<CNode> CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport)
{
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
assert(conn_type != ConnectionType::INBOUND);
@ -404,9 +404,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
return nullptr;
// Look for an existing connection
CNode* pnode = FindNode(static_cast<CService>(addrConnect));
if (pnode)
{
if (FindNode(static_cast<CService>(addrConnect))) {
LogPrintf("Failed to open new connection, already connected\n");
return nullptr;
}
@ -438,8 +436,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
// It is possible that we already have a connection to the IP/port pszDest resolved to.
// In that case, drop the connection that was just created.
LOCK(m_nodes_mutex);
CNode* pnode = FindNode(static_cast<CService>(addrConnect));
if (pnode) {
if (FindNode(static_cast<CService>(addrConnect))) {
LogPrintf("Not opening a connection to %s, already connected to %s\n", pszDest, addrConnect.ToStringAddrPort());
return nullptr;
}
@ -531,7 +528,8 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
if (!addr_bind.IsValid()) {
addr_bind = GetBindAddress(*sock);
}
CNode* pnode = new CNode(id,
auto pnode = std::make_shared<CNode>(
id,
std::move(sock),
target_addr,
CalculateKeyedNetGroup(target_addr),
@ -540,13 +538,13 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
pszDest ? pszDest : "",
conn_type,
/*inbound_onion=*/false,
[this](CNode& node) { m_msgproc->FinalizeNode(node); },
CNodeOptions{
.permission_flags = permission_flags,
.i2p_sam_session = std::move(i2p_transient_session),
.recv_flood_size = nReceiveFloodSize,
.use_v2transport = use_v2transport,
});
pnode->AddRef();
// We're making a new connection, harvest entropy from the time (and our peer count)
RandAddEvent((uint32_t)id);
@ -1689,7 +1687,7 @@ bool CConnman::AttemptToEvictConnection()
{
LOCK(m_nodes_mutex);
for (const CNode* node : m_nodes) {
for (const auto& node : m_nodes) {
if (node->fDisconnect)
continue;
NodeEvictionCandidate candidate{
@ -1716,7 +1714,7 @@ bool CConnman::AttemptToEvictConnection()
return false;
}
LOCK(m_nodes_mutex);
for (CNode* pnode : m_nodes) {
for (const auto& pnode : m_nodes) {
if (pnode->GetId() == *node_id_to_evict) {
LogDebug(BCLog::NET, "selected %s connection for eviction, %s", pnode->ConnectionTypeAsString(), pnode->DisconnectMsg(fLogIPs));
TRACEPOINT(net, evicted_inbound_connection,
@ -1771,7 +1769,7 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
{
LOCK(m_nodes_mutex);
for (const CNode* pnode : m_nodes) {
for (const auto& pnode : m_nodes) {
if (pnode->IsInboundConn()) nInbound++;
}
}
@ -1828,7 +1826,8 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
ServiceFlags local_services = GetLocalServices();
const bool use_v2transport(local_services & NODE_P2P_V2);
CNode* pnode = new CNode(id,
auto pnode = std::make_shared<CNode>(
id,
std::move(sock),
CAddress{addr, NODE_NONE},
CalculateKeyedNetGroup(addr),
@ -1837,13 +1836,14 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
/*addrNameIn=*/"",
ConnectionType::INBOUND,
inbound_onion,
[this](CNode& node) { m_msgproc->FinalizeNode(node); },
CNodeOptions{
.permission_flags = permission_flags,
.prefer_evict = discouraged,
.recv_flood_size = nReceiveFloodSize,
.use_v2transport = use_v2transport,
});
pnode->AddRef();
m_msgproc->InitializeNode(*pnode, local_services);
{
LOCK(m_nodes_mutex);
@ -1884,8 +1884,9 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
} // no default case, so the compiler can warn about missing cases
// Count existing connections
int existing_connections = WITH_LOCK(m_nodes_mutex,
return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->m_conn_type == conn_type; }););
const int existing_connections = WITH_LOCK(m_nodes_mutex,
return std::ranges::count_if(m_nodes, [conn_type](const auto& node) { return node->m_conn_type == conn_type; });
);
// Max connections of specified type already exist
if (max_connections != std::nullopt && existing_connections >= max_connections) return false;
@ -1907,12 +1908,14 @@ void CConnman::DisconnectNodes()
// m_reconnections_mutex while holding m_nodes_mutex.
decltype(m_reconnections) reconnections_to_add;
std::vector<std::shared_ptr<CNode>> disconnected_nodes;
{
LOCK(m_nodes_mutex);
if (!fNetworkActive) {
// Disconnect any connected nodes
for (CNode* pnode : m_nodes) {
for (const auto& pnode : m_nodes) {
if (!pnode->fDisconnect) {
LogDebug(BCLog::NET, "Network not active, %s\n", pnode->DisconnectMsg(fLogIPs));
pnode->fDisconnect = true;
@ -1921,51 +1924,40 @@ void CConnman::DisconnectNodes()
}
// Disconnect unused nodes
std::vector<CNode*> nodes_copy = m_nodes;
for (CNode* pnode : nodes_copy)
{
if (pnode->fDisconnect)
{
// remove from m_nodes
m_nodes.erase(remove(m_nodes.begin(), m_nodes.end(), pnode), m_nodes.end());
for (auto it = m_nodes.begin(); it != m_nodes.end();) {
auto node = *it;
if (node->fDisconnect) {
it = m_nodes.erase(it);
// Keep a reference to this CNode object to delay its destruction until
// after m_nodes_mutex has been released. Destructing a node involves
// calling m_msgproc->FinalizeNode() which acquires cs_main. Lock order
// should be cs_main, m_nodes_mutex.
disconnected_nodes.push_back(node);
// Add to reconnection list if appropriate. We don't reconnect right here, because
// the creation of a connection is a blocking operation (up to several seconds),
// and we don't want to hold up the socket handler thread for that long.
if (pnode->m_transport->ShouldReconnectV1()) {
if (node->m_transport->ShouldReconnectV1()) {
reconnections_to_add.push_back({
.addr_connect = pnode->addr,
.grant = std::move(pnode->grantOutbound),
.destination = pnode->m_dest,
.conn_type = pnode->m_conn_type,
.addr_connect = node->addr,
.grant = std::move(node->grantOutbound),
.destination = node->m_dest,
.conn_type = node->m_conn_type,
.use_v2transport = false});
LogDebug(BCLog::NET, "retrying with v1 transport protocol for peer=%d\n", pnode->GetId());
LogDebug(BCLog::NET, "retrying with v1 transport protocol for peer=%d\n", node->GetId());
}
// release outbound grant (if any)
pnode->grantOutbound.Release();
node->grantOutbound.Release();
// close socket and cleanup
pnode->CloseSocketDisconnect();
node->CloseSocketDisconnect();
// update connection count by network
if (pnode->IsManualOrFullOutboundConn()) --m_network_conn_counts[pnode->addr.GetNetwork()];
// hold in disconnected pool until all refs are released
pnode->Release();
m_nodes_disconnected.push_back(pnode);
}
}
}
{
// Delete disconnected nodes
std::list<CNode*> nodes_disconnected_copy = m_nodes_disconnected;
for (CNode* pnode : nodes_disconnected_copy)
{
// Destroy the object only after other threads have stopped using it.
if (pnode->GetRefCount() <= 0) {
m_nodes_disconnected.remove(pnode);
DeleteNode(pnode);
if (node->IsManualOrFullOutboundConn()) --m_network_conn_counts[node->addr.GetNetwork()];
} else {
++it;
}
}
}
@ -2050,7 +2042,7 @@ bool CConnman::InactivityCheck(const CNode& node) const
return false;
}
Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
Sock::EventsPerSock CConnman::GenerateWaitSockets(const std::vector<std::shared_ptr<CNode>>& nodes)
{
Sock::EventsPerSock events_per_sock;
@ -2058,7 +2050,7 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
events_per_sock.emplace(hListenSocket.sock, Sock::Events{Sock::RECV});
}
for (CNode* pnode : nodes) {
for (auto& pnode : nodes) {
bool select_recv = !pnode->fPauseRecv;
bool select_send;
{
@ -2087,8 +2079,7 @@ void CConnman::SocketHandler()
Sock::EventsPerSock events_per_sock;
{
const NodesSnapshot snap{*this, /*shuffle=*/false};
auto nodes = WITH_LOCK(m_nodes_mutex, return m_nodes);
const auto timeout = std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS);
@ -2096,25 +2087,26 @@ void CConnman::SocketHandler()
// listening sockets in one call ("readiness" as in poll(2) or
// select(2)). If none are ready, wait for a short while and return
// empty sets.
events_per_sock = GenerateWaitSockets(snap.Nodes());
events_per_sock = GenerateWaitSockets(nodes);
if (events_per_sock.empty() || !events_per_sock.begin()->first->WaitMany(timeout, events_per_sock)) {
interruptNet.sleep_for(timeout);
}
// Service (send/receive) each of the already connected nodes.
SocketHandlerConnected(snap.Nodes(), events_per_sock);
}
SocketHandlerConnected(nodes, events_per_sock);
nodes.clear();
// Accept new connections from listening sockets.
SocketHandlerListening(events_per_sock);
}
void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
void CConnman::SocketHandlerConnected(const std::vector<std::shared_ptr<CNode>>& nodes,
const Sock::EventsPerSock& events_per_sock)
{
AssertLockNotHeld(m_total_bytes_sent_mutex);
for (CNode* pnode : nodes) {
for (auto& pnode : nodes) {
if (interruptNet)
return;
@ -2432,14 +2424,10 @@ void CConnman::StartExtraBlockRelayPeers()
// Return the number of outbound connections that are full relay (not blocks only)
int CConnman::GetFullOutboundConnCount() const
{
int nRelevant = 0;
{
LOCK(m_nodes_mutex);
for (const CNode* pnode : m_nodes) {
if (pnode->fSuccessfullyConnected && pnode->IsFullOutboundConn()) ++nRelevant;
}
}
return nRelevant;
return std::ranges::count_if(m_nodes, [](const auto& node) {
return node->fSuccessfullyConnected && node->IsFullOutboundConn();
});
}
// Return the number of peers we have over our outbound connection limit
@ -2453,7 +2441,7 @@ int CConnman::GetExtraFullOutboundCount() const
int full_outbound_peers = 0;
{
LOCK(m_nodes_mutex);
for (const CNode* pnode : m_nodes) {
for (const auto& pnode : m_nodes) {
if (pnode->fSuccessfullyConnected && !pnode->fDisconnect && pnode->IsFullOutboundConn()) {
++full_outbound_peers;
}
@ -2467,7 +2455,7 @@ int CConnman::GetExtraBlockRelayCount() const
int block_relay_peers = 0;
{
LOCK(m_nodes_mutex);
for (const CNode* pnode : m_nodes) {
for (const auto& pnode : m_nodes) {
if (pnode->fSuccessfullyConnected && !pnode->fDisconnect && pnode->IsBlockOnlyConn()) {
++block_relay_peers;
}
@ -2638,7 +2626,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, Spa
{
LOCK(m_nodes_mutex);
for (const CNode* pnode : m_nodes) {
for (const auto& pnode : m_nodes) {
if (pnode->IsFullOutboundConn()) nOutboundFullRelay++;
if (pnode->IsBlockOnlyConn()) nOutboundBlockRelay++;
@ -2883,7 +2871,7 @@ std::vector<CAddress> CConnman::GetCurrentBlockRelayOnlyConns() const
{
std::vector<CAddress> ret;
LOCK(m_nodes_mutex);
for (const CNode* pnode : m_nodes) {
for (const auto& pnode : m_nodes) {
if (pnode->IsBlockOnlyConn()) {
ret.push_back(pnode->addr);
}
@ -2909,7 +2897,7 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo(bool include_connected) co
std::map<std::string, std::pair<bool, CService>> mapConnectedByName;
{
LOCK(m_nodes_mutex);
for (const CNode* pnode : m_nodes) {
for (const auto& pnode : m_nodes) {
if (pnode->addr.IsValid()) {
mapConnected[pnode->addr] = pnode->IsInboundConn();
}
@ -3004,7 +2992,7 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
} else if (FindNode(std::string(pszDest)))
return;
CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure, conn_type, use_v2transport);
auto pnode = ConnectNode(addrConnect, pszDest, fCountFailure, conn_type, use_v2transport);
if (!pnode)
return;
@ -3037,28 +3025,29 @@ void CConnman::ThreadMessageHandler()
{
bool fMoreWork = false;
{
auto nodes = WITH_LOCK(m_nodes_mutex, return m_nodes);
// Randomize the order in which we process messages from/to our peers.
// This prevents attacks in which an attacker exploits having multiple
// consecutive connections in the m_nodes list.
const NodesSnapshot snap{*this, /*shuffle=*/true};
std::shuffle(nodes.begin(), nodes.end(), FastRandomContext{});
for (CNode* pnode : snap.Nodes()) {
for (auto& pnode : nodes) {
if (pnode->fDisconnect)
continue;
// Receive messages
bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc);
bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode.get(), flagInterruptMsgProc);
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
if (flagInterruptMsgProc)
return;
// Send messages
m_msgproc->SendMessages(pnode);
m_msgproc->SendMessages(pnode.get());
if (flagInterruptMsgProc)
return;
}
}
nodes.clear();
WAIT_LOCK(mutexMsgProc, lock);
if (!fMoreWork) {
@ -3470,30 +3459,19 @@ void CConnman::StopNodes()
}
// Delete peer connections.
std::vector<CNode*> nodes;
std::vector<std::shared_ptr<CNode>> nodes;
WITH_LOCK(m_nodes_mutex, nodes.swap(m_nodes));
for (CNode* pnode : nodes) {
for (auto& pnode : nodes) {
LogDebug(BCLog::NET, "Stopping node, %s", pnode->DisconnectMsg(fLogIPs));
pnode->CloseSocketDisconnect();
DeleteNode(pnode);
}
nodes.clear();
for (CNode* pnode : m_nodes_disconnected) {
DeleteNode(pnode);
}
m_nodes_disconnected.clear();
vhListenSocket.clear();
semOutbound.reset();
semAddnode.reset();
}
void CConnman::DeleteNode(CNode* pnode)
{
assert(pnode);
m_msgproc->FinalizeNode(*pnode);
delete pnode;
}
CConnman::~CConnman()
{
Interrupt();
@ -3626,7 +3604,7 @@ void CConnman::GetNodeStats(std::vector<CNodeStats>& vstats) const
vstats.clear();
LOCK(m_nodes_mutex);
vstats.reserve(m_nodes.size());
for (CNode* pnode : m_nodes) {
for (const auto& pnode : m_nodes) {
vstats.emplace_back();
pnode->CopyStats(vstats.back());
vstats.back().m_mapped_as = GetMappedAS(pnode->addr);
@ -3636,7 +3614,7 @@ void CConnman::GetNodeStats(std::vector<CNodeStats>& vstats) const
bool CConnman::DisconnectNode(const std::string& strNode)
{
LOCK(m_nodes_mutex);
if (CNode* pnode = FindNode(strNode)) {
if (auto pnode = FindNode(strNode)) {
LogDebug(BCLog::NET, "disconnect by address%s match, %s", (fLogIPs ? strprintf("=%s", strNode) : ""), pnode->DisconnectMsg(fLogIPs));
pnode->fDisconnect = true;
return true;
@ -3648,7 +3626,7 @@ bool CConnman::DisconnectNode(const CSubNet& subnet)
{
bool disconnected = false;
LOCK(m_nodes_mutex);
for (CNode* pnode : m_nodes) {
for (auto& pnode : m_nodes) {
if (subnet.Match(pnode->addr)) {
LogDebug(BCLog::NET, "disconnect by subnet%s match, %s", (fLogIPs ? strprintf("=%s", subnet.ToString()) : ""), pnode->DisconnectMsg(fLogIPs));
pnode->fDisconnect = true;
@ -3666,7 +3644,7 @@ bool CConnman::DisconnectNode(const CNetAddr& addr)
bool CConnman::DisconnectNode(NodeId id)
{
LOCK(m_nodes_mutex);
for(CNode* pnode : m_nodes) {
for(auto& pnode : m_nodes) {
if (id == pnode->GetId()) {
LogDebug(BCLog::NET, "disconnect by id, %s", pnode->DisconnectMsg(fLogIPs));
pnode->fDisconnect = true;
@ -3799,6 +3777,7 @@ CNode::CNode(NodeId idIn,
const std::string& addrNameIn,
ConnectionType conn_type_in,
bool inbound_onion,
std::function<void(CNode&)> destruct_cb,
CNodeOptions&& node_opts)
: m_transport{MakeTransport(idIn, node_opts.use_v2transport, conn_type_in == ConnectionType::INBOUND)},
m_permission_flags{node_opts.permission_flags},
@ -3815,7 +3794,8 @@ CNode::CNode(NodeId idIn,
id{idIn},
nLocalHostNonce{nLocalHostNonceIn},
m_recv_flood_size{node_opts.recv_flood_size},
m_i2p_sam_session{std::move(node_opts.i2p_sam_session)}
m_i2p_sam_session{std::move(node_opts.i2p_sam_session)},
m_destruct_cb{destruct_cb}
{
if (inbound_onion) assert(conn_type_in == ConnectionType::INBOUND);
@ -3831,6 +3811,13 @@ CNode::CNode(NodeId idIn,
}
}
CNode::~CNode()
{
if (m_destruct_cb) {
m_destruct_cb(*this);
}
}
void CNode::MarkReceivedMsgsForProcessing()
{
AssertLockNotHeld(m_msg_process_queue_mutex);
@ -3862,9 +3849,9 @@ std::optional<std::pair<CNetMessage, bool>> CNode::PollMessage()
return std::make_pair(std::move(msgs.front()), !m_msg_process_queue.empty());
}
bool CConnman::NodeFullyConnected(const CNode* pnode)
bool CConnman::NodeFullyConnected(const CNode& node)
{
return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
return node.fSuccessfullyConnected && !node.fDisconnect;
}
void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
@ -3916,15 +3903,13 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)
{
CNode* found = nullptr;
LOCK(m_nodes_mutex);
for (auto&& pnode : m_nodes) {
if(pnode->GetId() == id) {
found = pnode;
break;
return NodeFullyConnected(*pnode) && func(pnode.get());
}
}
return found != nullptr && NodeFullyConnected(found) && func(found);
return false;
}
CSipHasher CConnman::GetDeterministicRandomizer(uint64_t id) const

View file

@ -730,7 +730,6 @@ public:
// next time DisconnectNodes() runs
std::atomic_bool fDisconnect{false};
CSemaphoreGrant grantOutbound;
std::atomic<int> nRefCount{0};
const uint64_t nKeyedNetGroup;
std::atomic_bool fPauseRecv{false};
@ -887,10 +886,13 @@ public:
const std::string& addrNameIn,
ConnectionType conn_type_in,
bool inbound_onion,
std::function<void(CNode&)> destruct_cb = {},
CNodeOptions&& node_opts = {});
CNode(const CNode&) = delete;
CNode& operator=(const CNode&) = delete;
~CNode();
NodeId GetId() const {
return id;
}
@ -899,12 +901,6 @@ public:
return nLocalHostNonce;
}
int GetRefCount() const
{
assert(nRefCount >= 0);
return nRefCount;
}
/**
* Receive bytes from the buffer and deserialize them into messages.
*
@ -930,17 +926,6 @@ public:
//! May not be called more than once
void SetAddrLocal(const CService& addrLocalIn) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_local_mutex);
CNode* AddRef()
{
nRefCount++;
return this;
}
void Release()
{
nRefCount--;
}
void CloseSocketDisconnect() EXCLUSIVE_LOCKS_REQUIRED(!m_sock_mutex);
void CopyStats(CNodeStats& stats) EXCLUSIVE_LOCKS_REQUIRED(!m_subver_mutex, !m_addr_local_mutex, !cs_vSend, !cs_vRecv);
@ -999,6 +984,11 @@ private:
* Otherwise this unique_ptr is empty.
*/
std::unique_ptr<i2p::sam::Session> m_i2p_sam_session GUARDED_BY(m_sock_mutex);
/**
* A function to be called just before this object is destroyed.
*/
std::function<void(CNode&)> m_destruct_cb;
};
/**
@ -1152,8 +1142,9 @@ public:
{
LOCK(m_nodes_mutex);
for (auto&& node : m_nodes) {
if (NodeFullyConnected(node))
func(node);
if (NodeFullyConnected(*node)) {
func(node.get());
}
}
};
@ -1161,8 +1152,9 @@ public:
{
LOCK(m_nodes_mutex);
for (auto&& node : m_nodes) {
if (NodeFullyConnected(node))
func(node);
if (NodeFullyConnected(*node)) {
func(node.get());
}
}
};
@ -1325,7 +1317,7 @@ private:
* @param[in] nodes Select from these nodes' sockets.
* @return sockets to check for readiness
*/
Sock::EventsPerSock GenerateWaitSockets(Span<CNode* const> nodes);
Sock::EventsPerSock GenerateWaitSockets(const std::vector<std::shared_ptr<CNode>>& nodes);
/**
* Check connected and listening sockets for IO readiness and process them accordingly.
@ -1337,7 +1329,7 @@ private:
* @param[in] nodes Nodes to process. The socket of each node is checked against `what`.
* @param[in] events_per_sock Sockets that are ready for IO.
*/
void SocketHandlerConnected(const std::vector<CNode*>& nodes,
void SocketHandlerConnected(const std::vector<std::shared_ptr<CNode>>& nodes,
const Sock::EventsPerSock& events_per_sock)
EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc);
@ -1352,9 +1344,9 @@ private:
uint64_t CalculateKeyedNetGroup(const CNetAddr& ad) const;
CNode* FindNode(const CNetAddr& ip);
CNode* FindNode(const std::string& addrName);
CNode* FindNode(const CService& addr);
std::shared_ptr<CNode> FindNode(const CNetAddr& ip);
std::shared_ptr<CNode> FindNode(const std::string& addrName);
std::shared_ptr<CNode> FindNode(const CService& addr);
/**
* Determine whether we're already connected to a given address, in order to
@ -1363,7 +1355,7 @@ private:
bool AlreadyConnectedToAddress(const CAddress& addr);
bool AttemptToEvictConnection();
CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
std::shared_ptr<CNode> ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
void AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr, const std::vector<NetWhitelistPermissions>& ranges) const;
void DeleteNode(CNode* pnode);
@ -1403,7 +1395,7 @@ private:
bool MaybePickPreferredNetwork(std::optional<Network>& network);
// Whether the node should be passed out in ForEach* callbacks
static bool NodeFullyConnected(const CNode* pnode);
static bool NodeFullyConnected(const CNode& node);
uint16_t GetDefaultPort(Network net) const;
uint16_t GetDefaultPort(const std::string& addr) const;
@ -1442,8 +1434,7 @@ private:
std::vector<AddedNodeParams> m_added_node_params GUARDED_BY(m_added_nodes_mutex);
mutable Mutex m_added_nodes_mutex;
std::vector<CNode*> m_nodes GUARDED_BY(m_nodes_mutex);
std::list<CNode*> m_nodes_disconnected;
std::vector<std::shared_ptr<CNode>> m_nodes GUARDED_BY(m_nodes_mutex);
mutable RecursiveMutex m_nodes_mutex;
std::atomic<NodeId> nLastNodeId{0};
unsigned int nPrevNodeCount{0};
@ -1634,43 +1625,6 @@ private:
*/
static constexpr size_t MAX_UNUSED_I2P_SESSIONS_SIZE{10};
/**
* RAII helper to atomically create a copy of `m_nodes` and add a reference
* to each of the nodes. The nodes are released when this object is destroyed.
*/
class NodesSnapshot
{
public:
explicit NodesSnapshot(const CConnman& connman, bool shuffle)
{
{
LOCK(connman.m_nodes_mutex);
m_nodes_copy = connman.m_nodes;
for (auto& node : m_nodes_copy) {
node->AddRef();
}
}
if (shuffle) {
std::shuffle(m_nodes_copy.begin(), m_nodes_copy.end(), FastRandomContext{});
}
}
~NodesSnapshot()
{
for (auto& node : m_nodes_copy) {
node->Release();
}
}
const std::vector<CNode*>& Nodes() const
{
return m_nodes_copy;
}
private:
std::vector<CNode*> m_nodes_copy;
};
const CChainParams& m_params;
friend struct ConnmanTestMsg;

View file

@ -49,15 +49,6 @@ FUZZ_TARGET(net, .init = initialize_net)
CNodeStats stats;
node.CopyStats(stats);
},
[&] {
const CNode* add_ref_node = node.AddRef();
assert(add_ref_node == &node);
},
[&] {
if (node.GetRefCount() > 0) {
node.Release();
}
},
[&] {
const std::vector<uint8_t> b = ConsumeRandomLengthByteVector(fuzzed_data_provider);
bool complete;
@ -68,8 +59,6 @@ FUZZ_TARGET(net, .init = initialize_net)
(void)node.GetAddrLocal();
(void)node.GetId();
(void)node.GetLocalNonce();
const int ref_count = node.GetRefCount();
assert(ref_count >= 0);
(void)node.GetCommonVersion();
const NetPermissionFlags net_permission_flags = ConsumeWeakEnum(fuzzed_data_provider, ALL_NET_PERMISSION_FLAGS);

View file

@ -64,7 +64,9 @@ FUZZ_TARGET(p2p_handshake, .init = ::initialize)
std::vector<CNode*> peers;
const auto num_peers_to_add = fuzzed_data_provider.ConsumeIntegralInRange(1, 3);
for (int i = 0; i < num_peers_to_add; ++i) {
peers.push_back(ConsumeNodeAsUniquePtr(fuzzed_data_provider, i).release());
peers.push_back(ConsumeNodeAsUniquePtr(fuzzed_data_provider, i, [&peerman](CNode& node) {
peerman->FinalizeNode(node);
}).release());
connman.AddTestNode(*peers.back());
peerman->InitializeNode(
*peers.back(),

View file

@ -60,7 +60,8 @@ void HeadersSyncSetup::ResetAndInitialize()
for (auto conn_type : conn_types) {
CAddress addr{};
m_connections.push_back(new CNode(id++, nullptr, addr, 0, 0, addr, "", conn_type, false));
m_connections.push_back(new CNode(id++, nullptr, addr, 0, 0, addr, "", conn_type, false,
[this](CNode& node) { m_node.peerman->FinalizeNode(node); }));
CNode& p2p_node = *m_connections.back();
connman.Handshake(

View file

@ -66,7 +66,9 @@ FUZZ_TARGET(process_message, .init = initialize_process_message)
if (!LIMIT_TO_MESSAGE_TYPE.empty() && random_message_type != LIMIT_TO_MESSAGE_TYPE) {
return;
}
CNode& p2p_node = *ConsumeNodeAsUniquePtr(fuzzed_data_provider).release();
CNode& p2p_node = *ConsumeNodeAsUniquePtr(fuzzed_data_provider, std::nullopt, [](CNode& node) {
g_setup->m_node.peerman->FinalizeNode(node);
}).release();
connman.AddTestNode(p2p_node);
FillNode(fuzzed_data_provider, connman, p2p_node);

View file

@ -55,7 +55,9 @@ FUZZ_TARGET(process_messages, .init = initialize_process_messages)
std::vector<CNode*> peers;
const auto num_peers_to_add = fuzzed_data_provider.ConsumeIntegralInRange(1, 3);
for (int i = 0; i < num_peers_to_add; ++i) {
peers.push_back(ConsumeNodeAsUniquePtr(fuzzed_data_provider, i).release());
peers.push_back(ConsumeNodeAsUniquePtr(fuzzed_data_provider, i, [&connman](CNode& node) {
connman.MsgProc()->FinalizeNode(node);
}).release());
CNode& p2p_node = *peers.back();
FillNode(fuzzed_data_provider, connman, p2p_node);

View file

@ -228,7 +228,7 @@ inline CService ConsumeService(FuzzedDataProvider& fuzzed_data_provider) noexcep
CAddress ConsumeAddress(FuzzedDataProvider& fuzzed_data_provider) noexcept;
template <bool ReturnUniquePtr = false>
auto ConsumeNode(FuzzedDataProvider& fuzzed_data_provider, const std::optional<NodeId>& node_id_in = std::nullopt) noexcept
auto ConsumeNode(FuzzedDataProvider& fuzzed_data_provider, const std::optional<NodeId>& node_id_in = std::nullopt, std::function<void(CNode&)> destruct_cb = {}) noexcept
{
const NodeId node_id = node_id_in.value_or(fuzzed_data_provider.ConsumeIntegralInRange<NodeId>(0, std::numeric_limits<NodeId>::max()));
const auto sock = std::make_shared<FuzzedSock>(fuzzed_data_provider);
@ -250,6 +250,7 @@ auto ConsumeNode(FuzzedDataProvider& fuzzed_data_provider, const std::optional<N
addr_name,
conn_type,
inbound_onion,
destruct_cb,
CNodeOptions{ .permission_flags = permission_flags });
} else {
return CNode{node_id,
@ -261,10 +262,11 @@ auto ConsumeNode(FuzzedDataProvider& fuzzed_data_provider, const std::optional<N
addr_name,
conn_type,
inbound_onion,
destruct_cb,
CNodeOptions{ .permission_flags = permission_flags }};
}
}
inline std::unique_ptr<CNode> ConsumeNodeAsUniquePtr(FuzzedDataProvider& fdp, const std::optional<NodeId>& node_id_in = std::nullopt) { return ConsumeNode<true>(fdp, node_id_in); }
inline std::unique_ptr<CNode> ConsumeNodeAsUniquePtr(FuzzedDataProvider& fdp, const std::optional<NodeId>& node_id_in = std::nullopt, std::function<void(CNode&)> destruct_cb = {}) { return ConsumeNode<true>(fdp, node_id_in, destruct_cb); }
void FillNode(FuzzedDataProvider& fuzzed_data_provider, ConnmanTestMsg& connman, CNode& node) noexcept EXCLUSIVE_LOCKS_REQUIRED(NetEventsInterface::g_msgproc_mutex);

View file

@ -117,7 +117,7 @@ BOOST_FIXTURE_TEST_CASE(test_addnode_getaddednodeinfo_and_connection_detection,
BOOST_CHECK_EQUAL(nodes.back()->ConnectedThroughNetwork(), Network::NET_CJDNS);
BOOST_TEST_MESSAGE("Call AddNode() for all the peers");
for (auto node : connman->TestNodes()) {
for (const auto& node : connman->TestNodes()) {
BOOST_CHECK(connman->AddNode({/*m_added_node=*/node->addr.ToStringAddrPort(), /*m_use_v2transport=*/true}));
BOOST_TEST_MESSAGE(strprintf("peer id=%s addr=%s", node->GetId(), node->addr.ToStringAddrPort()));
}
@ -134,7 +134,7 @@ BOOST_FIXTURE_TEST_CASE(test_addnode_getaddednodeinfo_and_connection_detection,
BOOST_CHECK(connman->GetAddedNodeInfo(/*include_connected=*/false).empty());
// Test AddedNodesContain()
for (auto node : connman->TestNodes()) {
for (const auto& node : connman->TestNodes()) {
BOOST_CHECK(connman->AddedNodesContain(node->addr));
}
AddPeer(id, nodes, *peerman, *connman, ConnectionType::OUTBOUND_FULL_RELAY);
@ -151,12 +151,12 @@ BOOST_FIXTURE_TEST_CASE(test_addnode_getaddednodeinfo_and_connection_detection,
}
BOOST_TEST_MESSAGE("\nCheck that all connected peers are correctly detected as connected");
for (auto node : connman->TestNodes()) {
for (const auto& node : connman->TestNodes()) {
BOOST_CHECK(connman->AlreadyConnectedPublic(node->addr));
}
// Clean up
for (auto node : connman->TestNodes()) {
for (const auto& node : connman->TestNodes()) {
peerman->FinalizeNode(*node);
}
connman->ClearTestNodes();

View file

@ -105,9 +105,9 @@ bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) co
return complete;
}
CNode* ConnmanTestMsg::ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type)
std::shared_ptr<CNode> ConnmanTestMsg::ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type)
{
CNode* node = ConnectNode(CAddress{}, pszDest, /*fCountFailure=*/false, conn_type, /*use_v2transport=*/true);
auto node = ConnectNode(CAddress{}, pszDest, /*fCountFailure=*/false, conn_type, /*use_v2transport=*/true);
if (!node) return nullptr;
node->SetCommonVersion(PROTOCOL_VERSION);
peerman.InitializeNode(*node, ServiceFlags(NODE_NETWORK | NODE_WITNESS));

View file

@ -45,7 +45,7 @@ struct ConnmanTestMsg : public CConnman {
m_peer_connect_timeout = timeout;
}
std::vector<CNode*> TestNodes()
std::vector<std::shared_ptr<CNode>> TestNodes()
{
LOCK(m_nodes_mutex);
return m_nodes;
@ -54,7 +54,7 @@ struct ConnmanTestMsg : public CConnman {
void AddTestNode(CNode& node)
{
LOCK(m_nodes_mutex);
m_nodes.push_back(&node);
m_nodes.push_back(std::shared_ptr<CNode>(&node));
if (node.IsManualOrFullOutboundConn()) ++m_network_conn_counts[node.addr.GetNetwork()];
}
@ -62,12 +62,11 @@ struct ConnmanTestMsg : public CConnman {
void ClearTestNodes()
{
LOCK(m_nodes_mutex);
for (CNode* node : m_nodes) {
delete node;
}
m_nodes.clear();
}
NetEventsInterface* MsgProc() const { return m_msgproc; };
void Handshake(CNode& node,
bool successfully_connected,
ServiceFlags remote_services,
@ -88,7 +87,7 @@ struct ConnmanTestMsg : public CConnman {
bool AlreadyConnectedPublic(const CAddress& addr) { return AlreadyConnectedToAddress(addr); };
CNode* ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type)
std::shared_ptr<CNode> ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type)
EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
};