p2p: Makes transactions available for reconciliation on trickle

Splits the reconciliation set in two, a delayed set and an available set.
Transactions are added to the delayed set and made available when on the next
trickle interval for the peer. This prevents adversarial nodes from proving
our reconciliation set by spamming reconciliation requests, in an equivalent
manner to how fanout delays work
This commit is contained in:
Sergi Delgado Segura 2024-10-01 15:32:33 -04:00
parent bbdc84ca0f
commit b1b85105be
4 changed files with 181 additions and 10 deletions

View file

@ -5842,6 +5842,10 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
// Determine transactions to relay
if (fSendTrickle) {
if (m_txreconciliation && m_txreconciliation->IsPeerRegistered(pto->GetId())) {
// Make transactions added to the reconciliation set during the last interval available
m_txreconciliation->ReadyDelayedTransactions(pto->GetId());
}
// Produce a vector with all candidates for sending
std::vector<std::set<uint256>::iterator> vInvTx;
vInvTx.reserve(tx_relay->m_tx_inventory_to_send.size());

View file

@ -56,6 +56,11 @@ public:
*/
uint64_t m_k0, m_k1;
/**
* Set of transactions to be added to the reconciliation set (moved to m_local_set) upon the trickle.
*/
std::unordered_set<Wtxid, SaltedTxidHasher> m_delayed_local_set;
/**
* Store all wtxids which we would announce to the peer (policy checks passed, etc.)
* in this set instead of announcing them right away. When reconciliation time comes, we will
@ -74,7 +79,39 @@ public:
*/
std::map<uint32_t, Wtxid> m_short_id_mapping;
TxReconciliationState(bool we_initiate, uint64_t k0, uint64_t k1) : m_we_initiate(we_initiate), m_k0(k0), m_k1(k1) {}
TxReconciliationState(bool we_initiate, uint64_t k0, uint64_t k1) : m_we_initiate(we_initiate), m_k0(k0), m_k1(k1), m_delayed_local_set(), m_local_set(0, m_delayed_local_set.hash_function()) {}
/**
* Checks whether a transaction is already in the set. If `include_delayed` is set, the delayed set is also
* checked. Otherwise, transactions are only looked up in the regular set.
*/
bool ContainsTx(const Wtxid& wtxid, bool include_delayed) const
{
bool found = m_local_set.find(wtxid) != m_local_set.end();
if (include_delayed) {
found |= m_delayed_local_set.find(wtxid) != m_delayed_local_set.end();
}
return found;
}
/**
* Returns a pair of sizes, corresponding to the reconciliation set and the delayed transactions set
*/
std::pair<size_t, size_t> ReconSetSize()
{
return std::make_pair(m_local_set.size(), m_delayed_local_set.size());
}
bool RemoveFromSet(const Wtxid& wtxid)
{
if (m_local_set.contains(wtxid)) {
Assume(!m_delayed_local_set.contains(wtxid));
return m_local_set.erase(wtxid);
} else {
return m_delayed_local_set.erase(wtxid);
}
}
/**
* Reconciliation sketches are computed over short transaction IDs.
@ -213,7 +250,7 @@ public:
if (!peer_state) return AddToSetResult::Failed();
// Bypass if the wtxid is already in the set
if (peer_state->m_local_set.find(wtxid) != peer_state->m_local_set.end()) {
if (peer_state->ContainsTx(wtxid, /*include_delayed=*/true)) {
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "%s already in reconciliation set for peer=%d. Bypassing.\n",
wtxid.ToString(), peer_id);
return AddToSetResult::Succeeded();
@ -228,7 +265,8 @@ public:
}
// Transactions which don't make it to the set due to the limit are announced via fan-out.
if (peer_state->m_local_set.size() >= MAX_RECONSET_SIZE) {
auto [recon_set_size, delayed_set_size] = peer_state->ReconSetSize();
if (recon_set_size + delayed_set_size >= MAX_RECONSET_SIZE) {
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Reconciliation set maximum size reached for peer=%d.\n", peer_id);
return AddToSetResult::Failed();
}
@ -236,15 +274,39 @@ public:
// The caller currently keeps track of the per-peer transaction announcements, so it
// should not attempt to add same tx to the set twice. However, if that happens, we will
// simply ignore it.
if (peer_state->m_local_set.insert(wtxid).second) {
if (peer_state->m_delayed_local_set.insert(wtxid).second) {
peer_state->m_short_id_mapping.emplace(short_id, wtxid);
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Added %s to the reconciliation set for peer=%d. "
"Now the set contains %i transactions.\n",
wtxid.ToString(), peer_id, peer_state->m_local_set.size());
"Now the set contains %i reconcilable transactions"
"(plus %i delayed transactions).\n",
wtxid.ToString(), peer_id, recon_set_size, delayed_set_size + 1);
}
return AddToSetResult::Succeeded();
}
bool ReadyDelayedTransactions(NodeId peer_id) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
{
AssertLockNotHeld(m_txreconciliation_mutex);
LOCK(m_txreconciliation_mutex);
auto peer_state = GetRegisteredPeerState(peer_id);
if (!peer_state) return false;
peer_state->m_local_set.merge(peer_state->m_delayed_local_set);
// There should be no duplicates, so m_delayed_local_set should be emptied
Assert(peer_state->m_delayed_local_set.empty());
return true;
}
bool IsTransactionInSet(NodeId peer_id, const Wtxid& wtxid, bool include_delayed) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
{
AssertLockNotHeld(m_txreconciliation_mutex);
LOCK(m_txreconciliation_mutex);
auto peer_state = GetRegisteredPeerState(peer_id);
if (!peer_state) return false;
return peer_state->ContainsTx(wtxid, include_delayed);
}
bool TryRemovingFromSet(NodeId peer_id, const Wtxid& wtxid) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
{
AssertLockNotHeld(m_txreconciliation_mutex);
@ -252,12 +314,14 @@ public:
auto peer_state = GetRegisteredPeerState(peer_id);
if (!peer_state) return false;
auto removed = peer_state->m_local_set.erase(wtxid) > 0;
auto removed = peer_state->RemoveFromSet(wtxid);
if (removed) {
auto [recon_set_size, delayed_set_size] = peer_state->ReconSetSize();
peer_state->m_short_id_mapping.erase(peer_state->ComputeShortID(wtxid));
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Removed %s from the reconciliation set for peer=%d. "
"Now the set contains %i transactions.\n",
wtxid.ToString(), peer_id, peer_state->m_local_set.size());
"Now the set contains %i reconcilable transactions"
"(plus %i delayed transactions).\n",
wtxid.ToString(), peer_id, recon_set_size, delayed_set_size);
} else {
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Couldn't remove %s from the reconciliation set for peer=%d. "
"Transaction not found\n",
@ -378,7 +442,7 @@ public:
for (const auto &[peer_id, state_or_salt]: m_states) {
if (const auto state = std::get_if<TxReconciliationState>(&state_or_salt)) {
const size_t parent_count = std::count_if(parents.begin(), parents.end(),
[state](const auto& wtxid){return state->m_local_set.find(wtxid) != state->m_local_set.end();});
[state](const auto& wtxid){return state->ContainsTx(wtxid, /*include_delayed=*/true);});
parents_by_peer.emplace(parent_count, peer_id);
}
}
@ -440,6 +504,16 @@ AddToSetResult TxReconciliationTracker::AddToSet(NodeId peer_id, const Wtxid& wt
return m_impl->AddToSet(peer_id, wtxid);
}
bool TxReconciliationTracker::ReadyDelayedTransactions(NodeId peer_id)
{
return m_impl->ReadyDelayedTransactions(peer_id);
}
bool TxReconciliationTracker::IsTransactionInSet(NodeId peer_id, const Wtxid& wtxid, bool include_delayed)
{
return m_impl->IsTransactionInSet(peer_id, wtxid, include_delayed);
}
bool TxReconciliationTracker::TryRemovingFromSet(NodeId peer_id, const Wtxid& wtxid)
{
return m_impl->TryRemovingFromSet(peer_id, wtxid);

View file

@ -111,6 +111,18 @@ public:
*/
AddToSetResult AddToSet(NodeId peer_id, const Wtxid& wtxid);
/**
* Moves delayed transactions to available (m_delayed_local_set -> m_local_set).
* To be called on trickle intervals so transactions are not requestable straightaway.
*/
bool ReadyDelayedTransactions(NodeId peer_id);
/**
* Checks whether a transaction is part of the peer's reconciliation set
* If include_delayed is set, look also in the delayed set.
*/
bool IsTransactionInSet(NodeId peer_id, const Wtxid& wtxid, bool include_delayed);
/**
* Before Step 2, we might want to remove a wtxid from the reconciliation set, for example if
* the peer just announced the transaction to us.

View file

@ -173,6 +173,39 @@ BOOST_AUTO_TEST_CASE(AddToSetCollisionTest)
BOOST_REQUIRE_EQUAL(r.m_collision.value(), wtxid);
}
BOOST_AUTO_TEST_CASE(IsTransactionInSetTest)
{
CSipHasher hasher(0x0706050403020100ULL, 0x0F0E0D0C0B0A0908ULL);
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION, hasher);
NodeId peer_id0 = 0;
FastRandomContext frc{/*fDeterministic=*/true};
Wtxid wtxid{Wtxid::FromUint256(frc.rand256())};
// If the peer is not registered, no transaction can be found
BOOST_REQUIRE(!tracker.IsPeerRegistered(peer_id0));
BOOST_REQUIRE(!tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/true));
// Same happens if the peer is only pre-registered
tracker.PreRegisterPeer(peer_id0);
BOOST_REQUIRE(!tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/true));
// Or registered but the transaction hasn't been added
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::SUCCESS);
BOOST_REQUIRE(!tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/true));
// Adding the transaction will make it queryable, but only if we set include_delayed,
// given transactions are placed into the delayed set first
auto r = tracker.AddToSet(peer_id0, wtxid);
BOOST_REQUIRE(r.m_succeeded);
BOOST_REQUIRE(!r.m_collision.has_value());
BOOST_REQUIRE(!tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/false));
BOOST_REQUIRE(tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/true));
// After a trickle interval, the transaction will be queryable from the regular set
BOOST_REQUIRE(tracker.ReadyDelayedTransactions(peer_id0));
BOOST_REQUIRE(tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/false));
}
BOOST_AUTO_TEST_CASE(TryRemovingFromSetTest)
{
CSipHasher hasher(0x0706050403020100ULL, 0x0F0E0D0C0B0A0908ULL);
@ -250,6 +283,54 @@ BOOST_AUTO_TEST_CASE(SortPeersByFewestParentsTest)
BOOST_REQUIRE(std::equal(sorted_peers.begin() + 3, sorted_peers.end(), peers.rbegin(), peers.rend() - 3));
}
BOOST_AUTO_TEST_CASE(ReadyAndDelayedTransactionsTest)
{
CSipHasher hasher(0x0706050403020100ULL, 0x0F0E0D0C0B0A0908ULL);
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION, hasher);
NodeId peer_id0 = 0;
FastRandomContext frc{/*fDeterministic=*/true};
// If the peer is not registered, there are no transactions to ready
BOOST_CHECK(!tracker.ReadyDelayedTransactions(peer_id0));
tracker.PreRegisterPeer(peer_id0);
// Same happens if the peer is only pre-registered
BOOST_CHECK(!tracker.ReadyDelayedTransactions(peer_id0));
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::SUCCESS);
BOOST_CHECK(tracker.IsPeerRegistered(peer_id0));
Wtxid wtxid{Wtxid::FromUint256(frc.rand256())};
// Adding a transaction places it in the delayed set until ReadyDelayedTransactions is called
auto r = tracker.AddToSet(peer_id0, wtxid);
BOOST_REQUIRE(r.m_succeeded);
BOOST_REQUIRE(!r.m_collision.has_value());
BOOST_REQUIRE(!tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/false));
BOOST_REQUIRE(tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/true));
// Reading the transaction will move it to the regular set
BOOST_REQUIRE(tracker.ReadyDelayedTransactions(peer_id0));
BOOST_REQUIRE(tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/false));
// Trying to add the same transaction twice will be bypassed, given both sets are checked
r = tracker.AddToSet(peer_id0, wtxid);
BOOST_REQUIRE(r.m_succeeded);
BOOST_REQUIRE(!r.m_collision.has_value());
// The transaction is still in the available set
BOOST_REQUIRE(tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/false));
// Removing a transaction does so indistinguishably of what internal set they are in
BOOST_REQUIRE(tracker.TryRemovingFromSet(peer_id0, wtxid));
BOOST_REQUIRE(!tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/true));
// Add again to check removing from delayed
r = tracker.AddToSet(peer_id0, wtxid);
BOOST_REQUIRE(r.m_succeeded);
BOOST_REQUIRE(!r.m_collision.has_value());
BOOST_REQUIRE(!tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/false));
BOOST_REQUIRE(tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/true));
BOOST_REQUIRE(tracker.TryRemovingFromSet(peer_id0, wtxid));
BOOST_REQUIRE(!tracker.IsTransactionInSet(peer_id0, wtxid, /*include_delayed*/true));
}
BOOST_AUTO_TEST_CASE(GetFanoutTargetsTest)
{
auto should_fanout_to = [](NodeId peer_id, std::vector<NodeId> fanout_targets) {