From b84348a8fc93c86dcefb936f986ff2f1ced320b4 Mon Sep 17 00:00:00 2001 From: Gleb Naumenko Date: Sat, 8 Oct 2022 09:25:01 +0300 Subject: [PATCH] p2p: Functions to add/remove wtxids to tx reconciliation sets They will be used later on. --- src/node/txreconciliation.cpp | 83 ++++++++++++++++++++++++++++- src/node/txreconciliation.h | 20 +++++++ src/test/txreconciliation_tests.cpp | 56 +++++++++++++++++++ 3 files changed, 158 insertions(+), 1 deletion(-) diff --git a/src/node/txreconciliation.cpp b/src/node/txreconciliation.cpp index dd088fee7de..30db634b77e 100644 --- a/src/node/txreconciliation.cpp +++ b/src/node/txreconciliation.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -48,6 +49,14 @@ public: */ uint64_t m_k0, m_k1; + /** + * Store all wtxids which we would announce to the peer (policy checks passed, etc.) + * in this set instead of announcing them right away. When reconciliation time comes, we will + * compute a compressed representation of this set ("sketch") and use it to efficiently + * reconcile this set with a set on the peer's side. + */ + std::unordered_set m_local_set; + TxReconciliationState(bool we_initiate, uint64_t k0, uint64_t k1) : m_we_initiate(we_initiate), m_k0(k0), m_k1(k1) {} }; @@ -70,6 +79,15 @@ private: */ std::unordered_map> m_states GUARDED_BY(m_txreconciliation_mutex); + TxReconciliationState* GetRegisteredPeerState(NodeId peer_id) EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex) + { + AssertLockHeld(m_txreconciliation_mutex); + auto salt_or_state = m_states.find(peer_id); + if (salt_or_state == m_states.end()) return nullptr; + + return std::get_if(&salt_or_state->second); + } + public: explicit Impl(uint32_t recon_version) : m_recon_version(recon_version) {} @@ -115,10 +133,60 @@ public: peer_id, is_peer_inbound); const uint256 full_salt{ComputeSalt(local_salt, remote_salt)}; - recon_state->second = TxReconciliationState(!is_peer_inbound, full_salt.GetUint64(0), full_salt.GetUint64(1)); + + auto new_state = TxReconciliationState(!is_peer_inbound, full_salt.GetUint64(0), full_salt.GetUint64(1));; + m_states.erase(recon_state); + bool emplaced = m_states.emplace(peer_id, std::move(new_state)).second; + Assume(emplaced); + return ReconciliationRegisterResult::SUCCESS; } + bool AddToSet(NodeId peer_id, const Wtxid& wtxid) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) + { + AssertLockNotHeld(m_txreconciliation_mutex); + LOCK(m_txreconciliation_mutex); + auto peer_state = GetRegisteredPeerState(peer_id); + if (!peer_state) return false; + + // Transactions which don't make it to the set due to the limit are announced via fan-out. + if (peer_state->m_local_set.size() >= MAX_RECONSET_SIZE) { + LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Reconciliation set maximum size reached for peer=%d.\n", peer_id); + return false; + } + + // The caller currently keeps track of the per-peer transaction announcements, so it + // should not attempt to add same tx to the set twice. However, if that happens, we will + // simply ignore it. + if (peer_state->m_local_set.insert(wtxid).second) { + LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Added %s to the reconciliation set for peer=%d. " + "Now the set contains %i transactions.\n", + wtxid.ToString(), peer_id, peer_state->m_local_set.size()); + } + return true; + } + + bool TryRemovingFromSet(NodeId peer_id, const Wtxid& wtxid) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) + { + AssertLockNotHeld(m_txreconciliation_mutex); + LOCK(m_txreconciliation_mutex); + auto peer_state = GetRegisteredPeerState(peer_id); + if (!peer_state) return false; + + auto removed = peer_state->m_local_set.erase(wtxid) > 0; + if (removed) { + LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Removed %s from the reconciliation set for peer=%d. " + "Now the set contains %i transactions.\n", + wtxid.ToString(), peer_id, peer_state->m_local_set.size()); + } else { + LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Couldn't remove %s from the reconciliation set for peer=%d. " + "Transaction not found\n", + wtxid.ToString(), peer_id); + } + + return removed; + } + void ForgetPeer(NodeId peer_id) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) { AssertLockNotHeld(m_txreconciliation_mutex); @@ -128,6 +196,9 @@ public: } } + /** + * For calls within this class use GetRegisteredPeerState instead. + */ bool IsPeerRegistered(NodeId peer_id) const EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) { AssertLockNotHeld(m_txreconciliation_mutex); @@ -153,6 +224,16 @@ ReconciliationRegisterResult TxReconciliationTracker::RegisterPeer(NodeId peer_i return m_impl->RegisterPeer(peer_id, is_peer_inbound, peer_recon_version, remote_salt); } +bool TxReconciliationTracker::AddToSet(NodeId peer_id, const Wtxid& wtxid) +{ + return m_impl->AddToSet(peer_id, wtxid); +} + +bool TxReconciliationTracker::TryRemovingFromSet(NodeId peer_id, const Wtxid& wtxid) +{ + return m_impl->TryRemovingFromSet(peer_id, wtxid); +} + void TxReconciliationTracker::ForgetPeer(NodeId peer_id) { m_impl->ForgetPeer(peer_id); diff --git a/src/node/txreconciliation.h b/src/node/txreconciliation.h index 3bbb0773664..ee3302d0cff 100644 --- a/src/node/txreconciliation.h +++ b/src/node/txreconciliation.h @@ -14,6 +14,12 @@ /** Supported transaction reconciliation protocol version */ static constexpr uint32_t TXRECONCILIATION_VERSION{1}; +/** + * Maximum number of wtxids stored in a peer local set, bounded to protect the memory use of + * reconciliation sets and short ids mappings, and CPU used for sketch computation. + */ +constexpr size_t MAX_RECONSET_SIZE = 3000; + enum class ReconciliationRegisterResult { NOT_FOUND, SUCCESS, @@ -74,6 +80,20 @@ public: ReconciliationRegisterResult RegisterPeer(NodeId peer_id, bool is_peer_inbound, uint32_t peer_recon_version, uint64_t remote_salt); + /** + * Step 1. Add a new transaction we want to announce to the peer to the local reconciliation set + * of the peer, so that it will be reconciled later, unless the set limit is reached. + * Returns whether the transaction appears in the set. + */ + bool AddToSet(NodeId peer_id, const Wtxid& wtxid); + + /** + * Before Step 2, we might want to remove a wtxid from the reconciliation set, for example if + * the peer just announced the transaction to us. + * Returns whether the wtxid was removed. + */ + bool TryRemovingFromSet(NodeId peer_id, const Wtxid& wtxid); + /** * Attempts to forget txreconciliation-related state of the peer (if we previously stored any). * After this, we won't be able to reconcile transactions with the peer. diff --git a/src/test/txreconciliation_tests.cpp b/src/test/txreconciliation_tests.cpp index e258e3353da..1bb9297f378 100644 --- a/src/test/txreconciliation_tests.cpp +++ b/src/test/txreconciliation_tests.cpp @@ -81,4 +81,60 @@ BOOST_AUTO_TEST_CASE(IsPeerRegisteredTest) BOOST_CHECK(!tracker.IsPeerRegistered(peer_id0)); } +BOOST_AUTO_TEST_CASE(AddToSetTest) +{ + TxReconciliationTracker tracker(TXRECONCILIATION_VERSION); + NodeId peer_id0 = 0; + FastRandomContext frc{/*fDeterministic=*/true}; + + Wtxid wtxid{Wtxid::FromUint256(frc.rand256())}; + + BOOST_REQUIRE(!tracker.IsPeerRegistered(peer_id0)); + BOOST_REQUIRE(!tracker.AddToSet(peer_id0, wtxid)); + + tracker.PreRegisterPeer(peer_id0); + BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::SUCCESS); + BOOST_CHECK(tracker.IsPeerRegistered(peer_id0)); + + BOOST_REQUIRE(tracker.AddToSet(peer_id0, wtxid)); + + tracker.ForgetPeer(peer_id0); + Wtxid wtxid2{Wtxid::FromUint256(frc.rand256())}; + BOOST_REQUIRE(!tracker.AddToSet(peer_id0, wtxid2)); + + NodeId peer_id1 = 1; + tracker.PreRegisterPeer(peer_id1); + BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id1, true, 1, 1), ReconciliationRegisterResult::SUCCESS); + BOOST_CHECK(tracker.IsPeerRegistered(peer_id1)); + + for (size_t i = 0; i < MAX_RECONSET_SIZE; ++i) + BOOST_REQUIRE(tracker.AddToSet(peer_id1, Wtxid::FromUint256(frc.rand256()))); + BOOST_REQUIRE(!tracker.AddToSet(peer_id1, Wtxid::FromUint256(frc.rand256()))); +} + +BOOST_AUTO_TEST_CASE(TryRemovingFromSetTest) +{ + TxReconciliationTracker tracker(TXRECONCILIATION_VERSION); + NodeId peer_id0 = 0; + FastRandomContext frc{/*fDeterministic=*/true}; + + Wtxid wtxid{Wtxid::FromUint256(frc.rand256())}; + + BOOST_REQUIRE(!tracker.IsPeerRegistered(peer_id0)); + BOOST_REQUIRE(!tracker.TryRemovingFromSet(peer_id0, wtxid)); + + tracker.PreRegisterPeer(peer_id0); + BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::SUCCESS); + BOOST_CHECK(tracker.IsPeerRegistered(peer_id0)); + + BOOST_REQUIRE(!tracker.TryRemovingFromSet(peer_id0, wtxid)); + BOOST_REQUIRE(tracker.AddToSet(peer_id0, wtxid)); + BOOST_REQUIRE(tracker.TryRemovingFromSet(peer_id0, wtxid)); + BOOST_REQUIRE(!tracker.TryRemovingFromSet(peer_id0, wtxid)); + + BOOST_REQUIRE(tracker.AddToSet(peer_id0, wtxid)); + tracker.ForgetPeer(peer_id0); + BOOST_REQUIRE(!tracker.TryRemovingFromSet(peer_id0, wtxid)); +} + BOOST_AUTO_TEST_SUITE_END()