Merge bitcoin/bitcoin#31666: multi-peer orphan resolution followups

7426afbe62 [p2p] assign just 1 random announcer in AddChildrenToWorkSet (glozow)
4c1fa6b28c test fix: make peer who sends MSG_TX announcement non-wtxidrelay (glozow)
2da46b88f0 pass P2PTxInvStore init args to P2PInterface init (glozow)
e3bd51e4b5 [doc] how unique_parents can be empty (glozow)
32eb6dc758 [refactor] assign local variable for wtxid (glozow)
18820ccf6b multi-announcer orphan handling test fixups (glozow)
c4cc61db98 [fuzz] GetCandidatePeers (glozow)
7704139cf0 [refactor] make GetCandidatePeers take uint256 and in-out vector (glozow)
6e4d392a75 [refactor] rename to OrphanResolutionCandidate to MaybeAdd* (glozow)
57221ad979 [refactor] move parent inv-adding to OrphanResolutionCandidate (glozow)

Pull request description:

  Followup to #31397.

  Addressing (in order):
  https://github.com/bitcoin/bitcoin/pull/31397#discussion_r1906077380
  https://github.com/bitcoin/bitcoin/pull/31397#discussion_r1881060842
  https://github.com/bitcoin/bitcoin/pull/31397#discussion_r1905994963
  https://github.com/bitcoin/bitcoin/pull/31397#discussion_r1905999581
  https://github.com/bitcoin/bitcoin/pull/31397#discussion_r1906001592
  https://github.com/bitcoin/bitcoin/pull/31397#discussion_r1905989913
  https://github.com/bitcoin/bitcoin/pull/31397#discussion_r1905920861
  https://github.com/bitcoin/bitcoin/pull/31658#pullrequestreview-2551617694
  https://github.com/bitcoin/bitcoin/pull/31397#discussion_r1917559601

ACKs for top commit:
  instagibbs:
    reACK 7426afbe62
  marcofleon:
    reACK 7426afbe62
  mzumsande:
    Code Review ACK 7426afbe62
  dergoegge:
    Code review ACK 7426afbe62

Tree-SHA512: bca8f576873fdaa20b758e1ee9708ce94e618ff14726864b29b50f0f9a4db58136a286d2b654af569b09433a028901fe6bcdda68dcbfea71e2d1271934725503
This commit is contained in:
merge-script 2025-02-04 10:10:29 +00:00
commit 94ca99ac51
No known key found for this signature in database
GPG key ID: 2EEB9F5CC09526C1
11 changed files with 105 additions and 85 deletions

View file

@ -179,23 +179,21 @@ bool TxDownloadManagerImpl::AddTxAnnouncement(NodeId peer, const GenTxid& gtxid,
// - exists in orphanage
// - peer can be an orphan resolution candidate
if (gtxid.IsWtxid()) {
if (auto orphan_tx{m_orphanage.GetTx(Wtxid::FromUint256(gtxid.GetHash()))}) {
const auto wtxid{Wtxid::FromUint256(gtxid.GetHash())};
if (auto orphan_tx{m_orphanage.GetTx(wtxid)}) {
auto unique_parents{GetUniqueParents(*orphan_tx)};
std::erase_if(unique_parents, [&](const auto& txid){
return AlreadyHaveTx(GenTxid::Txid(txid), /*include_reconsiderable=*/false);
});
if (unique_parents.empty()) return true;
// The missing parents may have all been rejected or accepted since the orphan was added to the orphanage.
// Do not delete from the orphanage, as it may be queued for processing.
if (unique_parents.empty()) {
return true;
}
if (auto delay{OrphanResolutionCandidate(peer, Wtxid::FromUint256(gtxid.GetHash()), unique_parents.size())}) {
m_orphanage.AddAnnouncer(Wtxid::FromUint256(gtxid.GetHash()), peer);
const auto& info = m_peer_info.at(peer).m_connection_info;
for (const auto& parent_txid : unique_parents) {
m_txrequest.ReceivedInv(peer, GenTxid::Txid(parent_txid), info.m_preferred, now + *delay);
}
LogDebug(BCLog::TXPACKAGES, "added peer=%d as a candidate for resolving orphan %s\n", peer, gtxid.GetHash().ToString());
if (MaybeAddOrphanResolutionCandidate(unique_parents, wtxid, peer, now)) {
m_orphanage.AddAnnouncer(orphan_tx->GetWitnessHash(), peer);
}
// Return even if the peer isn't an orphan resolution candidate. This would be caught by AlreadyHaveTx.
@ -231,13 +229,15 @@ bool TxDownloadManagerImpl::AddTxAnnouncement(NodeId peer, const GenTxid& gtxid,
return false;
}
std::optional<std::chrono::seconds> TxDownloadManagerImpl::OrphanResolutionCandidate(NodeId nodeid, const Wtxid& orphan_wtxid, size_t num_parents)
bool TxDownloadManagerImpl::MaybeAddOrphanResolutionCandidate(const std::vector<Txid>& unique_parents, const Wtxid& wtxid, NodeId nodeid, std::chrono::microseconds now)
{
if (m_peer_info.count(nodeid) == 0) return std::nullopt;
if (m_orphanage.HaveTxFromPeer(orphan_wtxid, nodeid)) return std::nullopt;
auto it_peer = m_peer_info.find(nodeid);
if (it_peer == m_peer_info.end()) return false;
if (m_orphanage.HaveTxFromPeer(wtxid, nodeid)) return false;
const auto& peer_entry = m_peer_info.at(nodeid);
const auto& info = peer_entry.m_connection_info;
// TODO: add delays and limits based on the amount of orphan resolution we are already doing
// with this peer, how much they are using the orphanage, etc.
if (!info.m_relay_permissions) {
@ -245,7 +245,7 @@ std::optional<std::chrono::seconds> TxDownloadManagerImpl::OrphanResolutionCandi
// existing behavior: drop if we are tracking too many invs for this peer already. Each
// orphan resolution involves at least 1 transaction request which may or may not be
// currently tracked in m_txrequest, so we include that in the count.
if (m_txrequest.Count(nodeid) + num_parents > MAX_PEER_TX_ANNOUNCEMENTS) return std::nullopt;
if (m_txrequest.Count(nodeid) + unique_parents.size() > MAX_PEER_TX_ANNOUNCEMENTS) return false;
}
std::chrono::seconds delay{0s};
@ -258,7 +258,13 @@ std::optional<std::chrono::seconds> TxDownloadManagerImpl::OrphanResolutionCandi
const bool overloaded = !info.m_relay_permissions && m_txrequest.CountInFlight(nodeid) >= MAX_PEER_TX_REQUEST_IN_FLIGHT;
if (overloaded) delay += OVERLOADED_PEER_TX_DELAY;
return delay;
// Treat finding orphan resolution candidate as equivalent to the peer announcing all missing parents.
// In the future, orphan resolution may include more explicit steps
for (const auto& parent_txid : unique_parents) {
m_txrequest.ReceivedInv(nodeid, GenTxid::Txid(parent_txid), info.m_preferred, now + delay);
}
LogDebug(BCLog::TXPACKAGES, "added peer=%d as a candidate for resolving orphan %s\n", nodeid, wtxid.ToString());
return true;
}
std::vector<GenTxid> TxDownloadManagerImpl::GetRequestsToSend(NodeId nodeid, std::chrono::microseconds current_time)
@ -327,7 +333,7 @@ void TxDownloadManagerImpl::MempoolAcceptedTx(const CTransactionRef& tx)
m_txrequest.ForgetTxHash(tx->GetHash());
m_txrequest.ForgetTxHash(tx->GetWitnessHash());
m_orphanage.AddChildrenToWorkSet(*tx);
m_orphanage.AddChildrenToWorkSet(*tx, m_opts.m_rng);
// If it came from the orphanage, remove it. No-op if the tx is not in txorphanage.
m_orphanage.EraseTx(tx->GetWitnessHash());
}
@ -400,27 +406,19 @@ node::RejectedTxTodo TxDownloadManagerImpl::MempoolRejectedTx(const CTransaction
// means it was already added to vExtraTxnForCompact.
add_extra_compact_tx &= !m_orphanage.HaveTx(wtxid);
auto add_orphan_reso_candidate = [&](const CTransactionRef& orphan_tx, const std::vector<Txid>& unique_parents, NodeId nodeid, std::chrono::microseconds now) {
const auto& wtxid = orphan_tx->GetWitnessHash();
if (auto delay{OrphanResolutionCandidate(nodeid, wtxid, unique_parents.size())}) {
const auto& info = m_peer_info.at(nodeid).m_connection_info;
m_orphanage.AddTx(orphan_tx, nodeid);
// Treat finding orphan resolution candidate as equivalent to the peer announcing all missing parents
// In the future, orphan resolution may include more explicit steps
for (const auto& parent_txid : unique_parents) {
m_txrequest.ReceivedInv(nodeid, GenTxid::Txid(parent_txid), info.m_preferred, now + *delay);
}
LogDebug(BCLog::TXPACKAGES, "added peer=%d as a candidate for resolving orphan %s\n", nodeid, wtxid.ToString());
}
};
// If there is no candidate for orphan resolution, AddTx will not be called. This means
// that if a peer is overloading us with invs and orphans, they will eventually not be
// able to add any more transactions to the orphanage.
add_orphan_reso_candidate(ptx, unique_parents, nodeid, now);
for (const auto& candidate : m_txrequest.GetCandidatePeers(ptx)) {
add_orphan_reso_candidate(ptx, unique_parents, candidate, now);
//
// Search by txid and, if the tx has a witness, wtxid
std::vector<NodeId> orphan_resolution_candidates{nodeid};
m_txrequest.GetCandidatePeers(ptx->GetHash().ToUint256(), orphan_resolution_candidates);
if (ptx->HasWitness()) m_txrequest.GetCandidatePeers(ptx->GetWitnessHash().ToUint256(), orphan_resolution_candidates);
for (const auto& nodeid : orphan_resolution_candidates) {
if (MaybeAddOrphanResolutionCandidate(unique_parents, ptx->GetWitnessHash(), nodeid, now)) {
m_orphanage.AddTx(ptx, nodeid);
}
}
// Once added to the orphan pool, a tx is considered AlreadyHave, and we shouldn't request it anymore.

View file

@ -194,11 +194,11 @@ protected:
/** Helper for getting deduplicated vector of Txids in vin. */
std::vector<Txid> GetUniqueParents(const CTransaction& tx);
/** Determine candidacy (and delay) for potential orphan resolution candidate.
* @returns delay for orphan resolution if this peer is a good candidate for orphan resolution,
* std::nullopt if this peer cannot be added because it has reached download/orphanage limits.
/** If this peer is an orphan resolution candidate for this transaction, treat the unique_parents as announced by
* this peer; add them as new invs to m_txrequest.
* @returns whether this transaction was a valid orphan resolution candidate.
* */
std::optional<std::chrono::seconds> OrphanResolutionCandidate(NodeId nodeid, const Wtxid& orphan_wtxid, size_t num_parents);
bool MaybeAddOrphanResolutionCandidate(const std::vector<Txid>& unique_parents, const Wtxid& wtxid, NodeId nodeid, std::chrono::microseconds now);
};
} // namespace node
#endif // BITCOIN_NODE_TXDOWNLOADMAN_IMPL_H

View file

@ -33,7 +33,7 @@ void initialize_orphanage()
FUZZ_TARGET(txorphan, .init = initialize_orphanage)
{
FuzzedDataProvider fuzzed_data_provider(buffer.data(), buffer.size());
FastRandomContext limit_orphans_rng{/*fDeterministic=*/true};
FastRandomContext orphanage_rng{/*fDeterministic=*/true};
SetMockTime(ConsumeTime(fuzzed_data_provider));
TxOrphanage orphanage;
@ -79,7 +79,7 @@ FUZZ_TARGET(txorphan, .init = initialize_orphanage)
// previous loop and potentially the parent of this tx.
if (ptx_potential_parent) {
// Set up future GetTxToReconsider call.
orphanage.AddChildrenToWorkSet(*ptx_potential_parent);
orphanage.AddChildrenToWorkSet(*ptx_potential_parent, orphanage_rng);
// Check that all txns returned from GetChildrenFrom* are indeed a direct child of this tx.
NodeId peer_id = fuzzed_data_provider.ConsumeIntegral<NodeId>();
@ -154,7 +154,7 @@ FUZZ_TARGET(txorphan, .init = initialize_orphanage)
// test mocktime and expiry
SetMockTime(ConsumeTime(fuzzed_data_provider));
auto limit = fuzzed_data_provider.ConsumeIntegral<unsigned int>();
orphanage.LimitOrphans(limit, limit_orphans_rng);
orphanage.LimitOrphans(limit, orphanage_rng);
Assert(orphanage.Size() <= limit);
});

View file

@ -295,6 +295,19 @@ public:
tracked += m_announcements[txhash][peer].m_state != State::NOTHING;
inflight += m_announcements[txhash][peer].m_state == State::REQUESTED;
candidates += m_announcements[txhash][peer].m_state == State::CANDIDATE;
std::bitset<MAX_PEERS> expected_announcers;
for (int peer = 0; peer < MAX_PEERS; ++peer) {
if (m_announcements[txhash][peer].m_state == State::CANDIDATE || m_announcements[txhash][peer].m_state == State::REQUESTED) {
expected_announcers[peer] = true;
}
}
std::vector<NodeId> candidate_peers;
m_tracker.GetCandidatePeers(TXHASHES[txhash], candidate_peers);
assert(expected_announcers.count() == candidate_peers.size());
for (const auto& peer : candidate_peers) {
assert(expected_announcers[peer]);
}
}
assert(m_tracker.Count(peer) == tracked);
assert(m_tracker.CountInFlight(peer) == inflight);

View file

@ -532,19 +532,27 @@ BOOST_AUTO_TEST_CASE(peer_worksets)
BOOST_CHECK(orphanage.HaveTxFromPeer(orphan_wtxid, node));
}
// Parent accepted: add child to all 3 worksets.
orphanage.AddChildrenToWorkSet(*tx_missing_parent);
BOOST_CHECK_EQUAL(orphanage.GetTxToReconsider(node0), tx_orphan);
BOOST_CHECK_EQUAL(orphanage.GetTxToReconsider(node1), tx_orphan);
// Don't call GetTxToReconsider(node2) yet because it mutates the workset.
// Parent accepted: child is added to 1 of 3 worksets.
orphanage.AddChildrenToWorkSet(*tx_missing_parent, det_rand);
int node0_reconsider = orphanage.HaveTxToReconsider(node0);
int node1_reconsider = orphanage.HaveTxToReconsider(node1);
int node2_reconsider = orphanage.HaveTxToReconsider(node2);
BOOST_CHECK_EQUAL(node0_reconsider + node1_reconsider + node2_reconsider, 1);
NodeId assigned_peer;
if (node0_reconsider) {
assigned_peer = node0;
} else if (node1_reconsider) {
assigned_peer = node1;
} else {
BOOST_CHECK(node2_reconsider);
assigned_peer = node2;
}
// EraseForPeer also removes that tx from the workset.
orphanage.EraseForPeer(node0);
orphanage.EraseForPeer(assigned_peer);
BOOST_CHECK_EQUAL(orphanage.GetTxToReconsider(node0), nullptr);
// However, the other peers' worksets are not touched.
BOOST_CHECK_EQUAL(orphanage.GetTxToReconsider(node2), tx_orphan);
// Delete this tx, clearing the orphanage.
BOOST_CHECK_EQUAL(orphanage.EraseTx(orphan_wtxid), 1);
BOOST_CHECK_EQUAL(orphanage.Size(), 0);

View file

@ -152,7 +152,7 @@ void TxOrphanage::LimitOrphans(unsigned int max_orphans, FastRandomContext& rng)
if (nEvicted > 0) LogDebug(BCLog::TXPACKAGES, "orphanage overflow, removed %u tx\n", nEvicted);
}
void TxOrphanage::AddChildrenToWorkSet(const CTransaction& tx)
void TxOrphanage::AddChildrenToWorkSet(const CTransaction& tx, FastRandomContext& rng)
{
for (unsigned int i = 0; i < tx.vout.size(); i++) {
const auto it_by_prev = m_outpoint_to_orphan_it.find(COutPoint(tx.GetHash(), i));
@ -160,15 +160,21 @@ void TxOrphanage::AddChildrenToWorkSet(const CTransaction& tx)
for (const auto& elem : it_by_prev->second) {
// Belt and suspenders, each orphan should always have at least 1 announcer.
if (!Assume(!elem->second.announcers.empty())) continue;
for (const auto announcer: elem->second.announcers) {
// Get this source peer's work set, emplacing an empty set if it didn't exist
// (note: if this peer wasn't still connected, we would have removed the orphan tx already)
std::set<Wtxid>& orphan_work_set = m_peer_work_set.try_emplace(announcer).first->second;
// Add this tx to the work set
orphan_work_set.insert(elem->first);
LogDebug(BCLog::TXPACKAGES, "added %s (wtxid=%s) to peer %d workset\n",
tx.GetHash().ToString(), tx.GetWitnessHash().ToString(), announcer);
}
// Select a random peer to assign orphan processing, reducing wasted work if the orphan is still missing
// inputs. However, we don't want to create an issue in which the assigned peer can purposefully stop us
// from processing the orphan by disconnecting.
auto announcer_iter = std::begin(elem->second.announcers);
std::advance(announcer_iter, rng.randrange(elem->second.announcers.size()));
auto announcer = *(announcer_iter);
// Get this source peer's work set, emplacing an empty set if it didn't exist
// (note: if this peer wasn't still connected, we would have removed the orphan tx already)
std::set<Wtxid>& orphan_work_set = m_peer_work_set.try_emplace(announcer).first->second;
// Add this tx to the work set
orphan_work_set.insert(elem->first);
LogDebug(BCLog::TXPACKAGES, "added %s (wtxid=%s) to peer %d workset\n",
tx.GetHash().ToString(), tx.GetWitnessHash().ToString(), announcer);
}
}
}

View file

@ -62,7 +62,7 @@ public:
void LimitOrphans(unsigned int max_orphans, FastRandomContext& rng);
/** Add any orphans that list a particular tx as a parent into the from peer's work set */
void AddChildrenToWorkSet(const CTransaction& tx);
void AddChildrenToWorkSet(const CTransaction& tx, FastRandomContext& rng);
/** Does this peer have any work to do? */
bool HaveTxToReconsider(NodeId peer);

View file

@ -574,21 +574,13 @@ public:
}
}
std::vector<NodeId> GetCandidatePeers(const CTransactionRef& tx) const
void GetCandidatePeers(const uint256& txhash, std::vector<NodeId>& result_peers) const
{
// Search by txid and, if the tx has a witness, wtxid
std::vector<uint256> hashes{tx->GetHash().ToUint256()};
if (tx->HasWitness()) hashes.emplace_back(tx->GetWitnessHash().ToUint256());
std::vector<NodeId> result_peers;
for (const uint256& txhash : hashes) {
auto it = m_index.get<ByTxHash>().lower_bound(ByTxHashView{txhash, State::CANDIDATE_DELAYED, 0});
while (it != m_index.get<ByTxHash>().end() && it->m_txhash == txhash && it->GetState() != State::COMPLETED) {
result_peers.push_back(it->m_peer);
++it;
}
auto it = m_index.get<ByTxHash>().lower_bound(ByTxHashView{txhash, State::CANDIDATE_DELAYED, 0});
while (it != m_index.get<ByTxHash>().end() && it->m_txhash == txhash && it->GetState() != State::COMPLETED) {
result_peers.push_back(it->m_peer);
++it;
}
return result_peers;
}
void ReceivedInv(NodeId peer, const GenTxid& gtxid, bool preferred,
@ -738,7 +730,7 @@ size_t TxRequestTracker::CountInFlight(NodeId peer) const { return m_impl->Count
size_t TxRequestTracker::CountCandidates(NodeId peer) const { return m_impl->CountCandidates(peer); }
size_t TxRequestTracker::Count(NodeId peer) const { return m_impl->Count(peer); }
size_t TxRequestTracker::Size() const { return m_impl->Size(); }
std::vector<NodeId> TxRequestTracker::GetCandidatePeers(const CTransactionRef& tx) const { return m_impl->GetCandidatePeers(tx); }
void TxRequestTracker::GetCandidatePeers(const uint256& txhash, std::vector<NodeId>& result_peers) const { return m_impl->GetCandidatePeers(txhash, result_peers); }
void TxRequestTracker::SanityCheck() const { m_impl->SanityCheck(); }
void TxRequestTracker::PostGetRequestableSanityCheck(std::chrono::microseconds now) const

View file

@ -195,8 +195,9 @@ public:
/** Count how many announcements are being tracked in total across all peers and transaction hashes. */
size_t Size() const;
/** For some tx return all peers with non-COMPLETED announcements for its txid or wtxid. The resulting vector may contain duplicate NodeIds. */
std::vector<NodeId> GetCandidatePeers(const CTransactionRef& tx) const;
/** For some txhash (txid or wtxid), finds all peers with non-COMPLETED announcements and appends them to
* result_peers. Does not try to ensure that result_peers contains no duplicates. */
void GetCandidatePeers(const uint256& txhash, std::vector<NodeId>& result_peers) const;
/** Access to the internal priority computation (testing only) */
uint64_t ComputePriority(const uint256& txhash, NodeId peer, bool preferred) const;

View file

@ -66,8 +66,8 @@ def cleanup(func):
class PeerTxRelayer(P2PTxInvStore):
"""A P2PTxInvStore that also remembers all of the getdata and tx messages it receives."""
def __init__(self):
super().__init__()
def __init__(self, wtxidrelay=True):
super().__init__(wtxidrelay=wtxidrelay)
self._tx_received = []
self._getdata_received = []
@ -402,7 +402,7 @@ class OrphanHandlingTest(BitcoinTestFramework):
node = self.nodes[0]
peer1 = node.add_p2p_connection(PeerTxRelayer())
peer2 = node.add_p2p_connection(PeerTxRelayer())
peer3 = node.add_p2p_connection(PeerTxRelayer())
peer3 = node.add_p2p_connection(PeerTxRelayer(wtxidrelay=False))
self.log.info("Test that an orphan with rejected parents, along with any descendants, cannot be retried with an alternate witness")
parent_low_fee_nonsegwit = self.wallet_nonsegwit.create_self_transfer(fee_rate=0)
@ -776,16 +776,18 @@ class OrphanHandlingTest(BitcoinTestFramework):
assert tx_replacer_BC["txid"] in node.getrawmempool()
node.sendrawtransaction(tx_replacer_C["hex"])
assert tx_replacer_BC["txid"] not in node.getrawmempool()
assert parent_peekaboo_AB["txid"] not in node.getrawmempool()
assert tx_replacer_C["txid"] in node.getrawmempool()
# Second peer is an additional announcer for this orphan
# Second peer is an additional announcer for this orphan, but its missing parents are different from when it was
# previously announced.
peer2 = node.add_p2p_connection(PeerTxRelayer())
peer2.send_and_ping(msg_inv([orphan_inv]))
assert_equal(len(node.getorphantxs(verbosity=2)[0]["from"]), 2)
# Disconnect peer1. peer2 should become the new candidate for orphan resolution.
peer1.peer_disconnect()
node.bumpmocktime(NONPREF_PEER_TX_DELAY + TXID_RELAY_DELAY)
node.bumpmocktime(TXREQUEST_TIME_SKIP)
self.wait_until(lambda: len(node.getorphantxs(verbosity=2)[0]["from"]) == 1)
# Both parents should be requested, now that they are both missing.
peer2.wait_for_parent_requests([int(parent_peekaboo_AB["txid"], 16), int(parent_missing["txid"], 16)])

View file

@ -928,8 +928,8 @@ class P2PDataStore(P2PInterface):
class P2PTxInvStore(P2PInterface):
"""A P2PInterface which stores a count of how many times each txid has been announced."""
def __init__(self):
super().__init__()
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.tx_invs_received = defaultdict(int)
def on_inv(self, message):