Merge #19911: net: guard vRecvGetData with cs_vRecv and orphan_work_set with g_cs_orphans

da0988daf1 scripted-diff: rename vRecvGetData (Neha Narula)
ba951812ec Guard vRecvGetData (now in net processing) with its own mutex (Neha Narula)
2d9f2fca43 Move vRecvGetData to net processing (Neha Narula)
673247b58c Lock before checking if orphan_work_set is empty; indicate it is guarded (Neha Narula)
8803aee668 Move m_orphan_work_set to net_processing (Neha Narula)
9c47cb29f9 [Rename only] Rename orphan_work_set to m_orphan_work_set. (Neha Narula)

Pull request description:

  Add annotations to guard `vRecvGetData` and `orphan_work_set` and fix up places where they were accessed without a lock. There is no current data race because they happen to be accessed by only one thread, but this might not always be the case.

  Original discussion: https://github.com/bitcoin/bitcoin/pull/18861#discussion_r451778445

ACKs for top commit:
  MarcoFalke:
    review ACK da0988daf1 🐬
  jnewbery:
    Code review ACK da0988daf1
  hebasto:
    ACK da0988daf1, I have reviewed the code and it looks correct, I agree it can be merged.

Tree-SHA512: 31cadd319ddc9273a87e77afc4db7339fd636e816b5e742eba5cb32927ac5cc07a672b2268d2d38a75a0f1b17d93836adab9acf7e52f26ea9a43f54efa57257e
This commit is contained in:
fanquake 2020-10-19 09:03:19 +08:00
commit c92aa8357c
No known key found for this signature in database
GPG Key ID: 2EEB9F5CC09526C1
2 changed files with 80 additions and 50 deletions

View File

@ -857,7 +857,6 @@ public:
RecursiveMutex cs_sendProcessing;
std::deque<CInv> vRecvGetData;
uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0};
std::atomic<int64_t> nLastSend{0};
@ -1051,8 +1050,6 @@ public:
// Whether a ping is requested.
std::atomic<bool> fPingQueued{false};
std::set<uint256> orphan_work_set;
CNode(NodeId id, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress &addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string &addrNameIn, ConnectionType conn_type_in, bool inbound_onion = false);
~CNode();
CNode(const CNode&) = delete;

View File

@ -446,6 +446,14 @@ struct Peer {
/** Whether this peer should be disconnected and marked as discouraged (unless it has the noban permission). */
bool m_should_discourage GUARDED_BY(m_misbehavior_mutex){false};
/** Set of txids to reconsider once their parent transactions have been accepted **/
std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans);
/** Protects m_getdata_requests **/
Mutex m_getdata_requests_mutex;
/** Work queue of items requested by this peer **/
std::deque<CInv> m_getdata_requests GUARDED_BY(m_getdata_requests_mutex);
Peer(NodeId id) : m_id(id) {}
};
@ -1654,11 +1662,11 @@ static CTransactionRef FindTxForGetData(const CTxMemPool& mempool, const CNode&
return {};
}
void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) LOCKS_EXCLUDED(cs_main)
void static ProcessGetData(CNode& pfrom, Peer& peer, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(!cs_main, peer.m_getdata_requests_mutex)
{
AssertLockNotHeld(cs_main);
std::deque<CInv>::iterator it = pfrom.vRecvGetData.begin();
std::deque<CInv>::iterator it = peer.m_getdata_requests.begin();
std::vector<CInv> vNotFound;
const CNetMsgMaker msgMaker(pfrom.GetCommonVersion());
@ -1670,7 +1678,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
// Process as many TX items from the front of the getdata queue as
// possible, since they're common and it's efficient to batch process
// them.
while (it != pfrom.vRecvGetData.end() && it->IsGenTxMsg()) {
while (it != peer.m_getdata_requests.end() && it->IsGenTxMsg()) {
if (interruptMsgProc) return;
// The send buffer provides backpressure. If there's no space in
// the buffer, pause processing until the next call.
@ -1718,7 +1726,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
// Only process one BLOCK item per call, since they're uncommon and can be
// expensive to process.
if (it != pfrom.vRecvGetData.end() && !pfrom.fPauseSend) {
if (it != peer.m_getdata_requests.end() && !pfrom.fPauseSend) {
const CInv &inv = *it++;
if (inv.IsGenBlkMsg()) {
ProcessGetBlockData(pfrom, chainparams, inv, connman);
@ -1727,7 +1735,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
// and continue processing the queue on the next call.
}
pfrom.vRecvGetData.erase(pfrom.vRecvGetData.begin(), it);
peer.m_getdata_requests.erase(peer.m_getdata_requests.begin(), it);
if (!vNotFound.empty()) {
// Let the peer know that we didn't find what it asked for, so it doesn't
@ -2270,6 +2278,8 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
return;
}
PeerRef peer = GetPeerRef(pfrom.GetId());
if (peer == nullptr) return;
if (msg_type == NetMsgType::VERSION) {
// Each connection can only send one version message
@ -2708,8 +2718,12 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
LogPrint(BCLog::NET, "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom.GetId());
}
pfrom.vRecvGetData.insert(pfrom.vRecvGetData.end(), vInv.begin(), vInv.end());
ProcessGetData(pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc);
{
LOCK(peer->m_getdata_requests_mutex);
peer->m_getdata_requests.insert(peer->m_getdata_requests.end(), vInv.begin(), vInv.end());
ProcessGetData(pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc);
}
return;
}
@ -2797,36 +2811,38 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
return;
}
LOCK(cs_main);
{
LOCK(cs_main);
const CBlockIndex* pindex = LookupBlockIndex(req.blockhash);
if (!pindex || !(pindex->nStatus & BLOCK_HAVE_DATA)) {
LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block we don't have\n", pfrom.GetId());
return;
const CBlockIndex* pindex = LookupBlockIndex(req.blockhash);
if (!pindex || !(pindex->nStatus & BLOCK_HAVE_DATA)) {
LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block we don't have\n", pfrom.GetId());
return;
}
if (pindex->nHeight >= ::ChainActive().Height() - MAX_BLOCKTXN_DEPTH) {
CBlock block;
bool ret = ReadBlockFromDisk(block, pindex, m_chainparams.GetConsensus());
assert(ret);
SendBlockTransactions(pfrom, block, req);
return;
}
}
if (pindex->nHeight < ::ChainActive().Height() - MAX_BLOCKTXN_DEPTH) {
// If an older block is requested (should never happen in practice,
// but can happen in tests) send a block response instead of a
// blocktxn response. Sending a full block response instead of a
// small blocktxn response is preferable in the case where a peer
// might maliciously send lots of getblocktxn requests to trigger
// expensive disk reads, because it will require the peer to
// actually receive all the data read from disk over the network.
LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block > %i deep\n", pfrom.GetId(), MAX_BLOCKTXN_DEPTH);
CInv inv;
inv.type = State(pfrom.GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK;
inv.hash = req.blockhash;
pfrom.vRecvGetData.push_back(inv);
// The message processing loop will go around again (without pausing) and we'll respond then (without cs_main)
return;
}
CBlock block;
bool ret = ReadBlockFromDisk(block, pindex, m_chainparams.GetConsensus());
assert(ret);
SendBlockTransactions(pfrom, block, req);
// If an older block is requested (should never happen in practice,
// but can happen in tests) send a block response instead of a
// blocktxn response. Sending a full block response instead of a
// small blocktxn response is preferable in the case where a peer
// might maliciously send lots of getblocktxn requests to trigger
// expensive disk reads, because it will require the peer to
// actually receive all the data read from disk over the network.
LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block > %i deep\n", pfrom.GetId(), MAX_BLOCKTXN_DEPTH);
CInv inv;
WITH_LOCK(cs_main, inv.type = State(pfrom.GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK);
inv.hash = req.blockhash;
WITH_LOCK(peer->m_getdata_requests_mutex, peer->m_getdata_requests.push_back(inv));
// The message processing loop will go around again (without pausing) and we'll respond then
return;
}
@ -2961,7 +2977,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(txid, i));
if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
for (const auto& elem : it_by_prev->second) {
pfrom.orphan_work_set.insert(elem->first);
peer->m_orphan_work_set.insert(elem->first);
}
}
}
@ -2978,7 +2994,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
}
// Recursively process any orphan transactions that depended on this one
ProcessOrphanTx(pfrom.orphan_work_set);
ProcessOrphanTx(peer->m_orphan_work_set);
}
else if (state.GetResult() == TxValidationResult::TX_MISSING_INPUTS)
{
@ -3773,21 +3789,37 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
{
bool fMoreWork = false;
if (!pfrom->vRecvGetData.empty())
ProcessGetData(*pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc);
PeerRef peer = GetPeerRef(pfrom->GetId());
if (peer == nullptr) return false;
if (!pfrom->orphan_work_set.empty()) {
{
LOCK(peer->m_getdata_requests_mutex);
if (!peer->m_getdata_requests.empty()) {
ProcessGetData(*pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc);
}
}
{
LOCK2(cs_main, g_cs_orphans);
ProcessOrphanTx(pfrom->orphan_work_set);
if (!peer->m_orphan_work_set.empty()) {
ProcessOrphanTx(peer->m_orphan_work_set);
}
}
if (pfrom->fDisconnect)
return false;
// this maintains the order of responses
// and prevents vRecvGetData to grow unbounded
if (!pfrom->vRecvGetData.empty()) return true;
if (!pfrom->orphan_work_set.empty()) return true;
// and prevents m_getdata_requests to grow unbounded
{
LOCK(peer->m_getdata_requests_mutex);
if (!peer->m_getdata_requests.empty()) return true;
}
{
LOCK(g_cs_orphans);
if (!peer->m_orphan_work_set.empty()) return true;
}
// Don't bother if send buffer is too full to respond anyway
if (pfrom->fPauseSend)
@ -3814,10 +3846,11 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
try {
ProcessMessage(*pfrom, msg_type, msg.m_recv, msg.m_time, interruptMsgProc);
if (interruptMsgProc)
return false;
if (!pfrom->vRecvGetData.empty())
fMoreWork = true;
if (interruptMsgProc) return false;
{
LOCK(peer->m_getdata_requests_mutex);
if (!peer->m_getdata_requests.empty()) fMoreWork = true;
}
} catch (const std::exception& e) {
LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", __func__, SanitizeString(msg_type), nMessageSize, e.what(), typeid(e).name());
} catch (...) {