p2p: Functions to add/remove wtxids to tx reconciliation sets

They will be used later on.
This commit is contained in:
Gleb Naumenko 2022-10-08 09:25:01 +03:00 committed by Sergi Delgado Segura
parent 7e42a50631
commit b84348a8fc
3 changed files with 158 additions and 1 deletions

View file

@ -7,6 +7,7 @@
#include <common/system.h>
#include <logging.h>
#include <util/check.h>
#include <util/hasher.h>
#include <unordered_map>
#include <variant>
@ -48,6 +49,14 @@ public:
*/
uint64_t m_k0, m_k1;
/**
* Store all wtxids which we would announce to the peer (policy checks passed, etc.)
* in this set instead of announcing them right away. When reconciliation time comes, we will
* compute a compressed representation of this set ("sketch") and use it to efficiently
* reconcile this set with a set on the peer's side.
*/
std::unordered_set<Wtxid, SaltedTxidHasher> m_local_set;
TxReconciliationState(bool we_initiate, uint64_t k0, uint64_t k1) : m_we_initiate(we_initiate), m_k0(k0), m_k1(k1) {}
};
@ -70,6 +79,15 @@ private:
*/
std::unordered_map<NodeId, std::variant<uint64_t, TxReconciliationState>> m_states GUARDED_BY(m_txreconciliation_mutex);
TxReconciliationState* GetRegisteredPeerState(NodeId peer_id) EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex)
{
AssertLockHeld(m_txreconciliation_mutex);
auto salt_or_state = m_states.find(peer_id);
if (salt_or_state == m_states.end()) return nullptr;
return std::get_if<TxReconciliationState>(&salt_or_state->second);
}
public:
explicit Impl(uint32_t recon_version) : m_recon_version(recon_version) {}
@ -115,10 +133,60 @@ public:
peer_id, is_peer_inbound);
const uint256 full_salt{ComputeSalt(local_salt, remote_salt)};
recon_state->second = TxReconciliationState(!is_peer_inbound, full_salt.GetUint64(0), full_salt.GetUint64(1));
auto new_state = TxReconciliationState(!is_peer_inbound, full_salt.GetUint64(0), full_salt.GetUint64(1));;
m_states.erase(recon_state);
bool emplaced = m_states.emplace(peer_id, std::move(new_state)).second;
Assume(emplaced);
return ReconciliationRegisterResult::SUCCESS;
}
bool AddToSet(NodeId peer_id, const Wtxid& wtxid) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
{
AssertLockNotHeld(m_txreconciliation_mutex);
LOCK(m_txreconciliation_mutex);
auto peer_state = GetRegisteredPeerState(peer_id);
if (!peer_state) return false;
// Transactions which don't make it to the set due to the limit are announced via fan-out.
if (peer_state->m_local_set.size() >= MAX_RECONSET_SIZE) {
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Reconciliation set maximum size reached for peer=%d.\n", peer_id);
return false;
}
// The caller currently keeps track of the per-peer transaction announcements, so it
// should not attempt to add same tx to the set twice. However, if that happens, we will
// simply ignore it.
if (peer_state->m_local_set.insert(wtxid).second) {
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Added %s to the reconciliation set for peer=%d. "
"Now the set contains %i transactions.\n",
wtxid.ToString(), peer_id, peer_state->m_local_set.size());
}
return true;
}
bool TryRemovingFromSet(NodeId peer_id, const Wtxid& wtxid) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
{
AssertLockNotHeld(m_txreconciliation_mutex);
LOCK(m_txreconciliation_mutex);
auto peer_state = GetRegisteredPeerState(peer_id);
if (!peer_state) return false;
auto removed = peer_state->m_local_set.erase(wtxid) > 0;
if (removed) {
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Removed %s from the reconciliation set for peer=%d. "
"Now the set contains %i transactions.\n",
wtxid.ToString(), peer_id, peer_state->m_local_set.size());
} else {
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Couldn't remove %s from the reconciliation set for peer=%d. "
"Transaction not found\n",
wtxid.ToString(), peer_id);
}
return removed;
}
void ForgetPeer(NodeId peer_id) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
{
AssertLockNotHeld(m_txreconciliation_mutex);
@ -128,6 +196,9 @@ public:
}
}
/**
* For calls within this class use GetRegisteredPeerState instead.
*/
bool IsPeerRegistered(NodeId peer_id) const EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
{
AssertLockNotHeld(m_txreconciliation_mutex);
@ -153,6 +224,16 @@ ReconciliationRegisterResult TxReconciliationTracker::RegisterPeer(NodeId peer_i
return m_impl->RegisterPeer(peer_id, is_peer_inbound, peer_recon_version, remote_salt);
}
bool TxReconciliationTracker::AddToSet(NodeId peer_id, const Wtxid& wtxid)
{
return m_impl->AddToSet(peer_id, wtxid);
}
bool TxReconciliationTracker::TryRemovingFromSet(NodeId peer_id, const Wtxid& wtxid)
{
return m_impl->TryRemovingFromSet(peer_id, wtxid);
}
void TxReconciliationTracker::ForgetPeer(NodeId peer_id)
{
m_impl->ForgetPeer(peer_id);

View file

@ -14,6 +14,12 @@
/** Supported transaction reconciliation protocol version */
static constexpr uint32_t TXRECONCILIATION_VERSION{1};
/**
* Maximum number of wtxids stored in a peer local set, bounded to protect the memory use of
* reconciliation sets and short ids mappings, and CPU used for sketch computation.
*/
constexpr size_t MAX_RECONSET_SIZE = 3000;
enum class ReconciliationRegisterResult {
NOT_FOUND,
SUCCESS,
@ -74,6 +80,20 @@ public:
ReconciliationRegisterResult RegisterPeer(NodeId peer_id, bool is_peer_inbound,
uint32_t peer_recon_version, uint64_t remote_salt);
/**
* Step 1. Add a new transaction we want to announce to the peer to the local reconciliation set
* of the peer, so that it will be reconciled later, unless the set limit is reached.
* Returns whether the transaction appears in the set.
*/
bool AddToSet(NodeId peer_id, const Wtxid& wtxid);
/**
* Before Step 2, we might want to remove a wtxid from the reconciliation set, for example if
* the peer just announced the transaction to us.
* Returns whether the wtxid was removed.
*/
bool TryRemovingFromSet(NodeId peer_id, const Wtxid& wtxid);
/**
* Attempts to forget txreconciliation-related state of the peer (if we previously stored any).
* After this, we won't be able to reconcile transactions with the peer.

View file

@ -81,4 +81,60 @@ BOOST_AUTO_TEST_CASE(IsPeerRegisteredTest)
BOOST_CHECK(!tracker.IsPeerRegistered(peer_id0));
}
BOOST_AUTO_TEST_CASE(AddToSetTest)
{
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION);
NodeId peer_id0 = 0;
FastRandomContext frc{/*fDeterministic=*/true};
Wtxid wtxid{Wtxid::FromUint256(frc.rand256())};
BOOST_REQUIRE(!tracker.IsPeerRegistered(peer_id0));
BOOST_REQUIRE(!tracker.AddToSet(peer_id0, wtxid));
tracker.PreRegisterPeer(peer_id0);
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::SUCCESS);
BOOST_CHECK(tracker.IsPeerRegistered(peer_id0));
BOOST_REQUIRE(tracker.AddToSet(peer_id0, wtxid));
tracker.ForgetPeer(peer_id0);
Wtxid wtxid2{Wtxid::FromUint256(frc.rand256())};
BOOST_REQUIRE(!tracker.AddToSet(peer_id0, wtxid2));
NodeId peer_id1 = 1;
tracker.PreRegisterPeer(peer_id1);
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id1, true, 1, 1), ReconciliationRegisterResult::SUCCESS);
BOOST_CHECK(tracker.IsPeerRegistered(peer_id1));
for (size_t i = 0; i < MAX_RECONSET_SIZE; ++i)
BOOST_REQUIRE(tracker.AddToSet(peer_id1, Wtxid::FromUint256(frc.rand256())));
BOOST_REQUIRE(!tracker.AddToSet(peer_id1, Wtxid::FromUint256(frc.rand256())));
}
BOOST_AUTO_TEST_CASE(TryRemovingFromSetTest)
{
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION);
NodeId peer_id0 = 0;
FastRandomContext frc{/*fDeterministic=*/true};
Wtxid wtxid{Wtxid::FromUint256(frc.rand256())};
BOOST_REQUIRE(!tracker.IsPeerRegistered(peer_id0));
BOOST_REQUIRE(!tracker.TryRemovingFromSet(peer_id0, wtxid));
tracker.PreRegisterPeer(peer_id0);
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::SUCCESS);
BOOST_CHECK(tracker.IsPeerRegistered(peer_id0));
BOOST_REQUIRE(!tracker.TryRemovingFromSet(peer_id0, wtxid));
BOOST_REQUIRE(tracker.AddToSet(peer_id0, wtxid));
BOOST_REQUIRE(tracker.TryRemovingFromSet(peer_id0, wtxid));
BOOST_REQUIRE(!tracker.TryRemovingFromSet(peer_id0, wtxid));
BOOST_REQUIRE(tracker.AddToSet(peer_id0, wtxid));
tracker.ForgetPeer(peer_id0);
BOOST_REQUIRE(!tracker.TryRemovingFromSet(peer_id0, wtxid));
}
BOOST_AUTO_TEST_SUITE_END()