[net] Deduplicate marking received message for processing

This commit is contained in:
dergoegge 2023-03-14 17:38:46 +01:00
parent ad44aa5c64
commit cc5cdf8776
3 changed files with 23 additions and 24 deletions

View file

@ -1328,18 +1328,7 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
}
RecordBytesRecv(nBytes);
if (notify) {
size_t nSizeAdded = 0;
for (const auto& msg : pnode->vRecvMsg) {
// vRecvMsg contains only completed CNetMessage
// the single possible partially deserialized message are held by TransportDeserializer
nSizeAdded += msg.m_raw_message_size;
}
{
LOCK(pnode->cs_vProcessMsg);
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg);
pnode->nProcessQueueSize += nSizeAdded;
pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
}
pnode->MarkReceivedMsgsForProcessing(nReceiveFloodSize);
WakeMessageHandler();
}
}
@ -2806,6 +2795,23 @@ CNode::CNode(NodeId idIn,
}
}
void CNode::MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
{
AssertLockNotHeld(cs_vProcessMsg);
size_t nSizeAdded = 0;
for (const auto& msg : vRecvMsg) {
// vRecvMsg contains only completed CNetMessage
// the single possible partially deserialized message are held by TransportDeserializer
nSizeAdded += msg.m_raw_message_size;
}
LOCK(cs_vProcessMsg);
vProcessMsg.splice(vProcessMsg.end(), vRecvMsg);
nProcessQueueSize += nSizeAdded;
fPauseRecv = nProcessQueueSize > recv_flood_size;
}
bool CConnman::NodeFullyConnected(const CNode* pnode)
{
return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;

View file

@ -422,6 +422,10 @@ public:
return m_conn_type;
}
/** Move all messages from the received queue to the processing queue. */
void MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
EXCLUSIVE_LOCKS_REQUIRED(!cs_vProcessMsg);
bool IsOutboundOrBlockRelayConn() const {
switch (m_conn_type) {
case ConnectionType::OUTBOUND_FULL_RELAY:

View file

@ -66,18 +66,7 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_by
{
assert(node.ReceiveMsgBytes(msg_bytes, complete));
if (complete) {
size_t nSizeAdded = 0;
for (const auto& msg : node.vRecvMsg) {
// vRecvMsg contains only completed CNetMessage
// the single possible partially deserialized message are held by TransportDeserializer
nSizeAdded += msg.m_raw_message_size;
}
{
LOCK(node.cs_vProcessMsg);
node.vProcessMsg.splice(node.vProcessMsg.end(), node.vRecvMsg);
node.nProcessQueueSize += nSizeAdded;
node.fPauseRecv = node.nProcessQueueSize > nReceiveFloodSize;
}
node.MarkReceivedMsgsForProcessing(nReceiveFloodSize);
}
}