diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 6e7c45b56a0..ebe49630012 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -5842,6 +5842,10 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // Determine transactions to relay if (fSendTrickle) { + if (m_txreconciliation && m_txreconciliation->IsPeerRegistered(pto->GetId())) { + // Make transactions added to the reconciliation set during the last interval available + m_txreconciliation->ReadyDelayedTransactions(pto->GetId()); + } // Produce a vector with all candidates for sending std::vector::iterator> vInvTx; vInvTx.reserve(tx_relay->m_tx_inventory_to_send.size()); diff --git a/src/node/txreconciliation.cpp b/src/node/txreconciliation.cpp index 24485f2d752..1359836ad7e 100644 --- a/src/node/txreconciliation.cpp +++ b/src/node/txreconciliation.cpp @@ -56,6 +56,11 @@ public: */ uint64_t m_k0, m_k1; + /** + * Set of transactions to be added to the reconciliation set (moved to m_local_set) upon the trickle. + */ + std::unordered_set m_delayed_local_set; + /** * Store all wtxids which we would announce to the peer (policy checks passed, etc.) * in this set instead of announcing them right away. When reconciliation time comes, we will @@ -74,7 +79,39 @@ public: */ std::map m_short_id_mapping; - TxReconciliationState(bool we_initiate, uint64_t k0, uint64_t k1) : m_we_initiate(we_initiate), m_k0(k0), m_k1(k1) {} + TxReconciliationState(bool we_initiate, uint64_t k0, uint64_t k1) : m_we_initiate(we_initiate), m_k0(k0), m_k1(k1), m_delayed_local_set(), m_local_set(0, m_delayed_local_set.hash_function()) {} + + /** + * Checks whether a transaction is already in the set. If `include_delayed` is set, the delayed set is also + * checked. Otherwise, transactions are only looked up in the regular set. + */ + bool ContainsTx(const Wtxid& wtxid, bool include_delayed) const + { + bool found = m_local_set.find(wtxid) != m_local_set.end(); + if (include_delayed) { + found |= m_delayed_local_set.find(wtxid) != m_delayed_local_set.end(); + } + + return found; + } + + /** + * Returns a pair of sizes, corresponding to the reconciliation set and the delayed transactions set + */ + std::pair ReconSetSize() + { + return std::make_pair(m_local_set.size(), m_delayed_local_set.size()); + } + + bool RemoveFromSet(const Wtxid& wtxid) + { + if (m_local_set.contains(wtxid)) { + Assume(!m_delayed_local_set.contains(wtxid)); + return m_local_set.erase(wtxid); + } else { + return m_delayed_local_set.erase(wtxid); + } + } /** * Reconciliation sketches are computed over short transaction IDs. @@ -213,7 +250,7 @@ public: if (!peer_state) return AddToSetResult::Failed(); // Bypass if the wtxid is already in the set - if (peer_state->m_local_set.find(wtxid) != peer_state->m_local_set.end()) { + if (peer_state->ContainsTx(wtxid, /*include_delayed=*/true)) { LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "%s already in reconciliation set for peer=%d. Bypassing.\n", wtxid.ToString(), peer_id); return AddToSetResult::Succeeded(); @@ -228,7 +265,8 @@ public: } // Transactions which don't make it to the set due to the limit are announced via fan-out. - if (peer_state->m_local_set.size() >= MAX_RECONSET_SIZE) { + auto [recon_set_size, delayed_set_size] = peer_state->ReconSetSize(); + if (recon_set_size + delayed_set_size >= MAX_RECONSET_SIZE) { LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Reconciliation set maximum size reached for peer=%d.\n", peer_id); return AddToSetResult::Failed(); } @@ -236,15 +274,39 @@ public: // The caller currently keeps track of the per-peer transaction announcements, so it // should not attempt to add same tx to the set twice. However, if that happens, we will // simply ignore it. - if (peer_state->m_local_set.insert(wtxid).second) { + if (peer_state->m_delayed_local_set.insert(wtxid).second) { peer_state->m_short_id_mapping.emplace(short_id, wtxid); LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Added %s to the reconciliation set for peer=%d. " - "Now the set contains %i transactions.\n", - wtxid.ToString(), peer_id, peer_state->m_local_set.size()); + "Now the set contains %i reconcilable transactions" + "(plus %i delayed transactions).\n", + wtxid.ToString(), peer_id, recon_set_size, delayed_set_size + 1); } return AddToSetResult::Succeeded(); } + bool ReadyDelayedTransactions(NodeId peer_id) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) + { + AssertLockNotHeld(m_txreconciliation_mutex); + LOCK(m_txreconciliation_mutex); + auto peer_state = GetRegisteredPeerState(peer_id); + if (!peer_state) return false; + + peer_state->m_local_set.merge(peer_state->m_delayed_local_set); + // There should be no duplicates, so m_delayed_local_set should be emptied + Assert(peer_state->m_delayed_local_set.empty()); + return true; + } + + bool IsTransactionInSet(NodeId peer_id, const Wtxid& wtxid, bool include_delayed) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) + { + AssertLockNotHeld(m_txreconciliation_mutex); + LOCK(m_txreconciliation_mutex); + auto peer_state = GetRegisteredPeerState(peer_id); + if (!peer_state) return false; + + return peer_state->ContainsTx(wtxid, include_delayed); + } + bool TryRemovingFromSet(NodeId peer_id, const Wtxid& wtxid) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) { AssertLockNotHeld(m_txreconciliation_mutex); @@ -252,12 +314,14 @@ public: auto peer_state = GetRegisteredPeerState(peer_id); if (!peer_state) return false; - auto removed = peer_state->m_local_set.erase(wtxid) > 0; + auto removed = peer_state->RemoveFromSet(wtxid); if (removed) { + auto [recon_set_size, delayed_set_size] = peer_state->ReconSetSize(); peer_state->m_short_id_mapping.erase(peer_state->ComputeShortID(wtxid)); LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Removed %s from the reconciliation set for peer=%d. " - "Now the set contains %i transactions.\n", - wtxid.ToString(), peer_id, peer_state->m_local_set.size()); + "Now the set contains %i reconcilable transactions" + "(plus %i delayed transactions).\n", + wtxid.ToString(), peer_id, recon_set_size, delayed_set_size); } else { LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Couldn't remove %s from the reconciliation set for peer=%d. " "Transaction not found\n", @@ -378,7 +442,7 @@ public: for (const auto &[peer_id, state_or_salt]: m_states) { if (const auto state = std::get_if(&state_or_salt)) { const size_t parent_count = std::count_if(parents.begin(), parents.end(), - [state](const auto& wtxid){return state->m_local_set.find(wtxid) != state->m_local_set.end();}); + [state](const auto& wtxid){return state->ContainsTx(wtxid, /*include_delayed=*/true);}); parents_by_peer.emplace(parent_count, peer_id); } } @@ -440,6 +504,16 @@ AddToSetResult TxReconciliationTracker::AddToSet(NodeId peer_id, const Wtxid& wt return m_impl->AddToSet(peer_id, wtxid); } +bool TxReconciliationTracker::ReadyDelayedTransactions(NodeId peer_id) +{ + return m_impl->ReadyDelayedTransactions(peer_id); +} + +bool TxReconciliationTracker::IsTransactionInSet(NodeId peer_id, const Wtxid& wtxid, bool include_delayed) +{ + return m_impl->IsTransactionInSet(peer_id, wtxid, include_delayed); +} + bool TxReconciliationTracker::TryRemovingFromSet(NodeId peer_id, const Wtxid& wtxid) { return m_impl->TryRemovingFromSet(peer_id, wtxid); diff --git a/src/node/txreconciliation.h b/src/node/txreconciliation.h index 26de3f7ecd4..b84f0910967 100644 --- a/src/node/txreconciliation.h +++ b/src/node/txreconciliation.h @@ -111,6 +111,18 @@ public: */ AddToSetResult AddToSet(NodeId peer_id, const Wtxid& wtxid); + /** + * Moves delayed transactions to available (m_delayed_local_set -> m_local_set). + * To be called on trickle intervals so transactions are not requestable straightaway. + */ + bool ReadyDelayedTransactions(NodeId peer_id); + + /** + * Checks whether a transaction is part of the peer's reconciliation set + * If include_delayed is set, look also in the delayed set. + */ + bool IsTransactionInSet(NodeId peer_id, const Wtxid& wtxid, bool include_delayed); + /** * Before Step 2, we might want to remove a wtxid from the reconciliation set, for example if * the peer just announced the transaction to us. diff --git a/src/test/txreconciliation_tests.cpp b/src/test/txreconciliation_tests.cpp index 6aca3761763..6cff6189ab4 100644 --- a/src/test/txreconciliation_tests.cpp +++ b/src/test/txreconciliation_tests.cpp @@ -173,6 +173,39 @@ BOOST_AUTO_TEST_CASE(AddToSetCollisionTest) BOOST_REQUIRE_EQUAL(r.m_collision.value(), wtxid); } +BOOST_AUTO_TEST_CASE(IsTransactionInSetTest) +{ + CSipHasher hasher(0x0706050403020100ULL, 0x0F0E0D0C0B0A0908ULL); + TxReconciliationTracker tracker(TXRECONCILIATION_VERSION, hasher); + NodeId peer_id0 = 0; + FastRandomContext frc{/*fDeterministic=*/true}; + + Wtxid wtxid{Wtxid::FromUint256(frc.rand256())}; + + // If the peer is not registered, no transaction can be found + BOOST_REQUIRE(!tracker.IsPeerRegistered(peer_id0)); + BOOST_REQUIRE(!tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/true)); + + // Same happens if the peer is only pre-registered + tracker.PreRegisterPeer(peer_id0); + BOOST_REQUIRE(!tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/true)); + // Or registered but the transaction hasn't been added + BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::SUCCESS); + BOOST_REQUIRE(!tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/true)); + + // Adding the transaction will make it queryable, but only if we set include_delayed, + // given transactions are placed into the delayed set first + auto r = tracker.AddToSet(peer_id0, wtxid); + BOOST_REQUIRE(r.m_succeeded); + BOOST_REQUIRE(!r.m_collision.has_value()); + BOOST_REQUIRE(!tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/false)); + BOOST_REQUIRE(tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/true)); + + // After a trickle interval, the transaction will be queryable from the regular set + BOOST_REQUIRE(tracker.ReadyDelayedTransactions(peer_id0)); + BOOST_REQUIRE(tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/false)); +} + BOOST_AUTO_TEST_CASE(TryRemovingFromSetTest) { CSipHasher hasher(0x0706050403020100ULL, 0x0F0E0D0C0B0A0908ULL); @@ -250,6 +283,54 @@ BOOST_AUTO_TEST_CASE(SortPeersByFewestParentsTest) BOOST_REQUIRE(std::equal(sorted_peers.begin() + 3, sorted_peers.end(), peers.rbegin(), peers.rend() - 3)); } +BOOST_AUTO_TEST_CASE(ReadyAndDelayedTransactionsTest) +{ + CSipHasher hasher(0x0706050403020100ULL, 0x0F0E0D0C0B0A0908ULL); + TxReconciliationTracker tracker(TXRECONCILIATION_VERSION, hasher); + NodeId peer_id0 = 0; + FastRandomContext frc{/*fDeterministic=*/true}; + + // If the peer is not registered, there are no transactions to ready + BOOST_CHECK(!tracker.ReadyDelayedTransactions(peer_id0)); + tracker.PreRegisterPeer(peer_id0); + // Same happens if the peer is only pre-registered + BOOST_CHECK(!tracker.ReadyDelayedTransactions(peer_id0)); + BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::SUCCESS); + BOOST_CHECK(tracker.IsPeerRegistered(peer_id0)); + + Wtxid wtxid{Wtxid::FromUint256(frc.rand256())}; + + // Adding a transaction places it in the delayed set until ReadyDelayedTransactions is called + auto r = tracker.AddToSet(peer_id0, wtxid); + BOOST_REQUIRE(r.m_succeeded); + BOOST_REQUIRE(!r.m_collision.has_value()); + BOOST_REQUIRE(!tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/false)); + BOOST_REQUIRE(tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/true)); + + // Reading the transaction will move it to the regular set + BOOST_REQUIRE(tracker.ReadyDelayedTransactions(peer_id0)); + BOOST_REQUIRE(tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/false)); + + // Trying to add the same transaction twice will be bypassed, given both sets are checked + r = tracker.AddToSet(peer_id0, wtxid); + BOOST_REQUIRE(r.m_succeeded); + BOOST_REQUIRE(!r.m_collision.has_value()); + // The transaction is still in the available set + BOOST_REQUIRE(tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/false)); + + // Removing a transaction does so indistinguishably of what internal set they are in + BOOST_REQUIRE(tracker.TryRemovingFromSet(peer_id0, wtxid)); + BOOST_REQUIRE(!tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/true)); + // Add again to check removing from delayed + r = tracker.AddToSet(peer_id0, wtxid); + BOOST_REQUIRE(r.m_succeeded); + BOOST_REQUIRE(!r.m_collision.has_value()); + BOOST_REQUIRE(!tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/false)); + BOOST_REQUIRE(tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/true)); + BOOST_REQUIRE(tracker.TryRemovingFromSet(peer_id0, wtxid)); + BOOST_REQUIRE(!tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/true)); +} + BOOST_AUTO_TEST_CASE(GetFanoutTargetsTest) { auto should_fanout_to = [](NodeId peer_id, std::vector fanout_targets) {