p2p: Add transactions to reconciliation sets

Transactions eligible for reconciliation are added to the reconciliation sets. For the remaining txs, low-fanout is used.

Co-authored-by:  Gleb Naumenko <naumenko.gs@gmail.com>
This commit is contained in:
Sergi Delgado Segura 2024-05-30 12:03:58 -04:00
parent ea0f6d9d0d
commit bcdccad41b
4 changed files with 312 additions and 21 deletions

View file

@ -162,6 +162,8 @@ static constexpr double MAX_ADDR_RATE_PER_SECOND{0.1};
static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND};
/** The compactblocks version we support. See BIP 152. */
static constexpr uint64_t CMPCTBLOCKS_VERSION{2};
/** Used to determine whether to use low-fanout flooding (or reconciliation) for a tx relay event. */
static constexpr uint64_t RANDOMIZER_ID_FANOUT_TARGET = 0xbac89af818407b6aULL; // SHA256("fanouttarget")[0:8]
// Internal stuff
namespace {
@ -1897,7 +1899,8 @@ PeerManagerImpl::PeerManagerImpl(CConnman& connman, AddrMan& addrman,
// While Erlay support is incomplete, it must be enabled explicitly via -txreconciliation.
// This argument can go away after Erlay support is complete.
if (opts.reconcile_txs) {
m_txreconciliation = std::make_unique<TxReconciliationTracker>(TXRECONCILIATION_VERSION);
m_txreconciliation = std::make_unique<TxReconciliationTracker>(TXRECONCILIATION_VERSION,
m_connman.GetDeterministicRandomizer(RANDOMIZER_ID_FANOUT_TARGET));
}
}
@ -2137,10 +2140,49 @@ std::pair<size_t, size_t> PeerManagerImpl::GetFanoutPeersCount()
void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid)
{
size_t inbounds_fanout_tx_relay{0}, outbounds_fanout_tx_relay{0};
std::vector<Wtxid> parents;
std::vector<NodeId> fanout_with_ancestors;
std::vector<NodeId> fanout_targets;
{
if (m_txreconciliation) {
std::tie(inbounds_fanout_tx_relay, outbounds_fanout_tx_relay) = GetFanoutPeersCount();
LOCK(m_mempool.cs);
if (auto txiter = m_mempool.GetIter(wtxid)) {
const auto parents_refs = (*txiter)->GetMemPoolParents();
if (!parents_refs.empty()) {
// If the transaction to be relayed has in-mempool parents we want to be consistent with
// the relay method for all the ancestor set (to minimize orphans). This means that we
// either fanout or reconcile all of them. In order to do this, we pick a small subset of
// our reconciling peers, remove all the matches from their reconciliation set and fanout
// the ancestor set. The sorting criteria favors peers with the least amount of ancestors,
// in order to minimize the number of removed transactions.
// Notice this is only done on a best-effort basis and may not cover all edge cases. Those will
// be covered post-reconciliation, during the INV message building.
for (const auto &tx : parents_refs) {
parents.emplace_back(tx.get().GetTx().GetWitnessHash());
}
fanout_with_ancestors = m_txreconciliation->SortPeersByFewestParents(parents);
fanout_with_ancestors.resize(2); // FIXME: Resize to 2 for now
// Account for the peers we fanout with ancestors to so it affects how many extra peers we
// select to fanout using the regular criteria (if any)
LOCK(m_peer_mutex);
for (const auto& peer_id: fanout_with_ancestors) {
if (m_peer_map.find(peer_id)->second->m_is_inbound) {
++inbounds_fanout_tx_relay;
} else {
++outbounds_fanout_tx_relay;
}
}
}
}
fanout_targets = m_txreconciliation->GetFanoutTargets(Wtxid::FromUint256(wtxid), inbounds_fanout_tx_relay, outbounds_fanout_tx_relay);
}
}
LOCK(m_peer_mutex);
for(auto& it : m_peer_map) {
Peer& peer = *it.second;
auto tx_relay = peer.GetTxRelay();
for(const auto& [peer_id, peer] : m_peer_map) {
auto tx_relay = peer->GetTxRelay();
if (!tx_relay) continue;
LOCK(tx_relay->m_tx_inventory_mutex);
@ -2151,9 +2193,35 @@ void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid
// in the announcement.
if (tx_relay->m_next_inv_send_time == 0s) continue;
const uint256& hash{peer.m_wtxid_relay ? wtxid : txid};
if (!tx_relay->m_tx_inventory_known_filter.contains(hash)) {
tx_relay->m_tx_inventory_to_send.insert(hash);
bool fanout = true;
std::vector<uint256> invs_to_send;
if (m_txreconciliation && m_txreconciliation->IsPeerRegistered(peer_id)) {
// If this transaction has parents in the mempool and the peer is within the peers with less ancestors
// to reconcile, fanout the transaction an all its ancestors. We just add the parents here and leave fanout as true
auto it = std::find(fanout_with_ancestors.begin(), fanout_with_ancestors.end(), peer_id);
if (it != fanout_with_ancestors.end()) {
for (const auto parent_wtxid: parents) {
if (m_txreconciliation->TryRemovingFromSet(peer_id, parent_wtxid)) {
invs_to_send.push_back(parent_wtxid);
}
}
} else {
// If the peer is registered for set reconciliation, maybe pick it as fanout
fanout = std::find(fanout_targets.begin(), fanout_targets.end(), peer_id) != fanout_targets.end();
}
}
if (fanout || !m_txreconciliation->AddToSet(peer_id, Wtxid::FromUint256(wtxid)).m_succeeded) {
// If the transaction cannot be added to the set, we simply fanout, and do not do any special handling here regarding ancestors.
// This should not happen under normal conditions, given the set size should be well over the number of transactions received
// between reconciling intervals. A peer hitting the limit is likely to be either a broken implementation or an attacker.
invs_to_send.push_back(peer->m_wtxid_relay ? wtxid : txid);
}
for (const auto& hash : invs_to_send) {
if (!tx_relay->m_tx_inventory_known_filter.contains(hash)) {
tx_relay->m_tx_inventory_to_send.insert(hash);
}
}
};
}
@ -3959,6 +4027,9 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
const bool fAlreadyHave{m_txdownloadman.AddTxAnnouncement(pfrom.GetId(), gtxid, current_time, /*p2p_inv=*/true)};
LogDebug(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom.GetId());
}
if (m_txreconciliation && gtxid.IsWtxid()) {
m_txreconciliation->TryRemovingFromSet(pfrom.GetId(), Wtxid::FromUint256(inv.hash));
}
} else {
LogDebug(BCLog::NET, "Unknown inv type \"%s\" received from peer=%d\n", inv.ToString(), pfrom.GetId());
}
@ -4249,6 +4320,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
LOCK2(cs_main, m_tx_download_mutex);
if (m_txreconciliation) m_txreconciliation->TryRemovingFromSet(pfrom.GetId(), Wtxid::FromUint256(wtxid));
const auto& [should_validate, package_to_validate] = m_txdownloadman.ReceivedTx(pfrom.GetId(), ptx);
if (!should_validate) {
if (pfrom.HasPermission(NetPermissionFlags::ForceRelay)) {
@ -5690,6 +5763,8 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
}
if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
// Lock way before it's used to maintain lock ordering.
LOCK2(m_mempool.cs, m_peer_mutex);
LOCK(tx_relay->m_tx_inventory_mutex);
// Check whether periodic sends should happen
bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan);
@ -5794,7 +5869,6 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
}
// Ensure we'll respond to GETDATA requests for anything we've just announced
LOCK(m_mempool.cs);
tx_relay->m_last_inv_sequence = m_mempool.GetSequence();
}
}

View file

@ -9,6 +9,7 @@
#include <util/check.h>
#include <util/hasher.h>
#include <cmath>
#include <unordered_map>
#include <variant>
@ -18,6 +19,12 @@ namespace {
/** Static salt component used to compute short txids for sketch construction, see BIP-330. */
const std::string RECON_STATIC_SALT = "Tx Relay Salting";
const HashWriter RECON_SALT_HASHER = TaggedHash(RECON_STATIC_SALT);
/**
* Announce transactions via full wtxid to a limited number of inbound and outbound peers.
* Justification for these values are provided here:
* https://github.com/naumenkogs/txrelaysim/issues/7#issuecomment-902165806 */
constexpr double INBOUND_FANOUT_DESTINATIONS_FRACTION = 0.1;
constexpr size_t OUTBOUND_FANOUT_DESTINATIONS = 1;
/**
* Salt (specified by BIP-330) constructed from contributions from both peers. It is used
@ -85,6 +92,15 @@ private:
*/
size_t m_inbounds_count GUARDED_BY(m_txreconciliation_mutex){0};
/*
* Keeps track of how many of the registered peers are outbound. Updated on registering or
* forgetting peers.
*/
size_t m_outbounds_count GUARDED_BY(m_txreconciliation_mutex){0};
// Used for randomly choosing fanout targets.
CSipHasher m_deterministic_randomizer;
TxReconciliationState* GetRegisteredPeerState(NodeId peer_id) EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex)
{
AssertLockHeld(m_txreconciliation_mutex);
@ -95,7 +111,7 @@ private:
}
public:
explicit Impl(uint32_t recon_version) : m_recon_version(recon_version) {}
explicit Impl(uint32_t recon_version, CSipHasher hasher) : m_recon_version(recon_version), m_deterministic_randomizer(std::move(hasher)) {}
uint64_t PreRegisterPeer(NodeId peer_id) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
{
@ -148,6 +164,10 @@ public:
if (is_peer_inbound && m_inbounds_count < std::numeric_limits<size_t>::max()) {
++m_inbounds_count;
}
if (!is_peer_inbound && m_outbounds_count < std::numeric_limits<size_t>::max()) {
++m_outbounds_count;
}
return ReconciliationRegisterResult::SUCCESS;
}
@ -207,9 +227,14 @@ public:
if (peer == m_states.end()) return;
const auto registered = std::get_if<TxReconciliationState>(&peer->second);
if (registered && !registered->m_we_initiate) {
Assert(m_inbounds_count > 0);
--m_inbounds_count;
if (registered) {
if (registered->m_we_initiate) {
Assert(m_outbounds_count > 0);
--m_outbounds_count;
} else {
Assert(m_inbounds_count > 0);
--m_inbounds_count;
}
}
if (m_states.erase(peer_id)) {
@ -229,6 +254,73 @@ public:
std::holds_alternative<TxReconciliationState>(recon_state->second));
}
std::vector<NodeId> GetFanoutTargets(const Wtxid& wtxid, size_t inbounds_fanout_tx_relay, size_t outbounds_fanout_tx_relay) const EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
{
AssertLockNotHeld(m_txreconciliation_mutex);
LOCK(m_txreconciliation_mutex);
// We decide whether a particular peer is a low-fanout flood target differently based on its connection direction:
// - for outbounds we have a fixed number of flood destinations.
// - for inbounds we use a fraction of all inbound peers supporting tx relay.
size_t outbounds_target_size = 0;
if (OUTBOUND_FANOUT_DESTINATIONS > outbounds_fanout_tx_relay) {
// This may only happen in testing, but make sure we don't have a target size bigger than our registered outbounds count
outbounds_target_size = std::min(OUTBOUND_FANOUT_DESTINATIONS - outbounds_fanout_tx_relay, m_outbounds_count);
}
// Since we use the fraction for inbound peers, we first need to compute the total number of inbound targets.
const double inbound_targets = (inbounds_fanout_tx_relay + m_inbounds_count) * INBOUND_FANOUT_DESTINATIONS_FRACTION;
double n = std::max(inbound_targets - inbounds_fanout_tx_relay, 0.0);
// Being this a fraction, we need to round it either up or down. We do this deterministically at random based on the
// transaction we are picking the peers for.
CSipHasher deterministic_randomizer_in{m_deterministic_randomizer};
deterministic_randomizer_in.Write(wtxid.ToUint256());
CSipHasher deterministic_randomizer_out{deterministic_randomizer_in};
const size_t inbounds_target_size = ((deterministic_randomizer_in.Finalize() & 0xFFFFFFFF) + uint64_t(n * 0x100000000)) >> 32;
// Pick all reconciliation registered peers and assign them a deterministically random value based on their peer id
// Also, split peers in inbounds/outbounds
std::vector<std::pair<uint64_t, NodeId>> weighted_inbounds, weighed_outbounds;
weighted_inbounds.reserve(m_inbounds_count);
weighed_outbounds.reserve(m_outbounds_count);
// We may have some pre-registered peers, so the number of registered peers should never be higher than the m_states size
Assume(m_states.size() >= m_inbounds_count + m_outbounds_count);
auto assign_key = [](NodeId node_id, CSipHasher randomizer, std::vector<std::pair<uint64_t, NodeId>>& weighted_peers) {
uint64_t hash_key = randomizer.Write(node_id).Finalize();
weighted_peers.emplace_back(hash_key, node_id);
};
for (const auto& [node_id, op_peer_state]: m_states) {
const auto peer_state = std::get_if<TxReconciliationState>(&op_peer_state);
if (peer_state) {
if (peer_state->m_we_initiate) {
assign_key(node_id, deterministic_randomizer_out, weighed_outbounds);
} else {
assign_key(node_id, deterministic_randomizer_in, weighted_inbounds);
}
}
}
// Sort the peers based on their assigned random value, extract the node_ids and trim the collections to size
std::vector<NodeId> fanout_targets;
fanout_targets.reserve(inbounds_target_size + outbounds_target_size);
auto collect_fanout_targets = [&](std::vector<std::pair<uint64_t, NodeId>> weighted_peers, const size_t target_size) {
// Make sure we never select more targets than we can
Assert(outbounds_target_size <= weighed_outbounds.size());
Assert(inbounds_target_size <= weighted_inbounds.size());
if (target_size == 0) return;
std::nth_element(weighted_peers.begin(), weighted_peers.begin() + target_size, weighted_peers.end());
for_each(weighted_peers.begin(), weighted_peers.begin() + target_size,
[&fanout_targets](auto& keyed_peer) { fanout_targets.push_back(keyed_peer.second); });
};
collect_fanout_targets(weighted_inbounds, inbounds_target_size);
collect_fanout_targets(weighed_outbounds, outbounds_target_size);
return fanout_targets;
}
std::vector<NodeId> SortPeersByFewestParents(std::vector<Wtxid> parents) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
{
AssertLockNotHeld(m_txreconciliation_mutex);
@ -274,7 +366,7 @@ AddToSetResult AddToSetResult::Collision(Wtxid wtxid)
return AddToSetResult(false, std::make_optional(wtxid));
}
TxReconciliationTracker::TxReconciliationTracker(uint32_t recon_version) : m_impl{std::make_unique<TxReconciliationTracker::Impl>(recon_version)} {}
TxReconciliationTracker::TxReconciliationTracker(uint32_t recon_version, CSipHasher hasher) : m_impl{std::make_unique<TxReconciliationTracker::Impl>(recon_version, hasher)} {}
TxReconciliationTracker::~TxReconciliationTracker() = default;
@ -309,6 +401,11 @@ bool TxReconciliationTracker::IsPeerRegistered(NodeId peer_id) const
return m_impl->IsPeerRegistered(peer_id);
}
std::vector<NodeId> TxReconciliationTracker::GetFanoutTargets(const Wtxid& wtxid, size_t inbounds_fanout_tx_relay, size_t outbounds_fanout_tx_relay)
{
return m_impl->GetFanoutTargets(wtxid, inbounds_fanout_tx_relay, outbounds_fanout_tx_relay);
}
std::vector<NodeId> TxReconciliationTracker::SortPeersByFewestParents(std::vector<Wtxid> parents)
{
return m_impl->SortPeersByFewestParents(parents);

View file

@ -79,7 +79,7 @@ private:
const std::unique_ptr<Impl> m_impl;
public:
explicit TxReconciliationTracker(uint32_t recon_version);
explicit TxReconciliationTracker(uint32_t recon_version, CSipHasher hasher);
~TxReconciliationTracker();
/**
@ -123,6 +123,13 @@ public:
*/
bool IsPeerRegistered(NodeId peer_id) const;
/**
* Returns an ordered collections of peers for fanout to, including both inbounds and outbounds.
* The collection is composed by a deterministically random subsets of peers that support transaction relay,
* sorted based on both the wtxid of the transaction we are making the decision on, and the peer id.
* The collection is sized based on [inbounds_fanout_tx_relay] and [outbounds_fanout_tx_relay]
*/
std::vector<NodeId> GetFanoutTargets(const Wtxid& wtxid, size_t inbounds_fanout_tx_relay, size_t outbounds_fanout_tx_relay);
/**
* Returns a collections of node ids sorted by how many instances of the provided transaction ids

View file

@ -12,7 +12,8 @@ BOOST_FIXTURE_TEST_SUITE(txreconciliation_tests, BasicTestingSetup)
BOOST_AUTO_TEST_CASE(RegisterPeerTest)
{
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION);
CSipHasher hasher(0x0706050403020100ULL, 0x0F0E0D0C0B0A0908ULL);
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION, hasher);
const uint64_t salt = 0;
// Prepare a peer for reconciliation.
@ -48,9 +49,13 @@ BOOST_AUTO_TEST_CASE(RegisterPeerTest)
BOOST_AUTO_TEST_CASE(ForgetPeerTest)
{
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION);
CSipHasher hasher(0x0706050403020100ULL, 0x0F0E0D0C0B0A0908ULL);
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION, hasher);
NodeId peer_id0 = 0;
// Removing peer which is not there works.
tracker.ForgetPeer(peer_id0);
// Removing peer after pre-registring works and does not let to register the peer.
tracker.PreRegisterPeer(peer_id0);
tracker.ForgetPeer(peer_id0);
@ -67,7 +72,8 @@ BOOST_AUTO_TEST_CASE(ForgetPeerTest)
BOOST_AUTO_TEST_CASE(IsPeerRegisteredTest)
{
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION);
CSipHasher hasher(0x0706050403020100ULL, 0x0F0E0D0C0B0A0908ULL);
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION, hasher);
NodeId peer_id0 = 0;
BOOST_REQUIRE(!tracker.IsPeerRegistered(peer_id0));
@ -83,7 +89,8 @@ BOOST_AUTO_TEST_CASE(IsPeerRegisteredTest)
BOOST_AUTO_TEST_CASE(AddToSetTest)
{
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION);
CSipHasher hasher(0x0706050403020100ULL, 0x0F0E0D0C0B0A0908ULL);
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION, hasher);
NodeId peer_id0 = 0;
FastRandomContext frc{/*fDeterministic=*/true};
@ -130,7 +137,8 @@ BOOST_AUTO_TEST_CASE(AddToSetTest)
BOOST_AUTO_TEST_CASE(TryRemovingFromSetTest)
{
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION);
CSipHasher hasher(0x0706050403020100ULL, 0x0F0E0D0C0B0A0908ULL);
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION, hasher);
NodeId peer_id0 = 0;
FastRandomContext frc{/*fDeterministic=*/true};
@ -155,7 +163,8 @@ BOOST_AUTO_TEST_CASE(TryRemovingFromSetTest)
BOOST_AUTO_TEST_CASE(SortPeersByFewestParentsTest)
{
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION);
CSipHasher hasher(0x0706050403020100ULL, 0x0F0E0D0C0B0A0908ULL);
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION, hasher);
FastRandomContext frc{/*fDeterministic=*/true};
std::vector<NodeId> peers = {0, 1, 2, 3, 4, 5, 6, 7};
@ -179,7 +188,7 @@ BOOST_AUTO_TEST_CASE(SortPeersByFewestParentsTest)
// Lets check ties now. Leave the tree first peers with no parent (so they tie)
// plus add some to the last two
TxReconciliationTracker tracker2(TXRECONCILIATION_VERSION);
TxReconciliationTracker tracker2(TXRECONCILIATION_VERSION, hasher);
peers = {0, 1, 2, 3, 4};
for (auto &peer_id: peers) {
@ -203,4 +212,108 @@ BOOST_AUTO_TEST_CASE(SortPeersByFewestParentsTest)
BOOST_REQUIRE(std::equal(sorted_peers.begin() + 3, sorted_peers.end(), peers.rbegin(), peers.rend() - 3));
}
BOOST_AUTO_TEST_CASE(GetFanoutTargetsTest)
{
auto should_fanout_to = [](NodeId peer_id, std::vector<NodeId> fanout_targets) {
return std::find(fanout_targets.begin(), fanout_targets.end(), peer_id) != fanout_targets.end();
};
CSipHasher hasher(0x0706050403020100ULL, 0x0F0E0D0C0B0A0908ULL);
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION, hasher);
NodeId peer_id0 = 0;
FastRandomContext frc{/*fDeterministic=*/true};
std::vector<NodeId> fanout_targets;
// Registered peers should be selected to receive some transaction via flooding.
// Since there is only one reconciling peer, it will be selected for all transactions.
tracker.PreRegisterPeer(peer_id0);
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, /*is_peer_inbound=*/false, 1, 1), ReconciliationRegisterResult::SUCCESS);
for (int i = 0; i < 100; ++i) {
fanout_targets = tracker.GetFanoutTargets(Wtxid::FromUint256(frc.rand256()), /*inbounds_fanout_tx_relay=*/0, /*outbounds_fanout_tx_relay=*/0);
BOOST_CHECK(should_fanout_to(peer_id0, fanout_targets));
}
// Don't select a fanout target if we already have enough targets
for (int i = 0; i < 100; ++i) {
fanout_targets = tracker.GetFanoutTargets(Wtxid::FromUint256(frc.rand256()), /*inbounds_fanout_tx_relay=*/0, /*outbounds_fanout_tx_relay=*/2);
BOOST_CHECK(!should_fanout_to(peer_id0, fanout_targets));
fanout_targets = tracker.GetFanoutTargets(Wtxid::FromUint256(frc.rand256()), /*inbounds_fanout_tx_relay=*/0, /*outbounds_fanout_tx_relay=*/100);
BOOST_CHECK(!should_fanout_to(peer_id0, fanout_targets));
}
// Now for inbound connections.
// Initialize a new instance with a new hasher to be used later on.
CSipHasher hasher2(0x0706050403020100ULL, 0x4F0E0D0C0B0A0908ULL);
TxReconciliationTracker tracker2(TXRECONCILIATION_VERSION, hasher2);
int inbound_peers = 36;
for (int i = 1; i < inbound_peers; ++i) {
tracker.PreRegisterPeer(i);
tracker2.PreRegisterPeer(i);
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(i, /*is_peer_inbound=*/true, 1, 1), ReconciliationRegisterResult::SUCCESS);
BOOST_REQUIRE_EQUAL(tracker2.RegisterPeer(i, /*is_peer_inbound=*/true, 1, 1), ReconciliationRegisterResult::SUCCESS);
}
// Relay to a fraction of registered inbound peers.
// For 35 peers we will choose 3.5 flooding targets, which means that it's either 3 or 4 with
// 50% chance. Make sure the randomness actually works by checking against a different hasher.
size_t total_fanouted1 = 0;
size_t total_fanouted2 = 0;
auto wtxid = Wtxid::FromUint256(uint256(1)); // determinism is required.
std::vector<NodeId> fanout_targets_t2;
fanout_targets = tracker.GetFanoutTargets(Wtxid::FromUint256(wtxid), /*inbounds_fanout_tx_relay=*/0, /*outbounds_fanout_tx_relay=*/0);
fanout_targets_t2 = tracker2.GetFanoutTargets(Wtxid::FromUint256(wtxid), /*inbounds_fanout_tx_relay=*/0, /*outbounds_fanout_tx_relay=*/0);
for (int i = 1; i < inbound_peers; ++i) {
total_fanouted1 += should_fanout_to(i, fanout_targets);
total_fanouted2 += should_fanout_to(i, fanout_targets_t2);
}
BOOST_CHECK_EQUAL(total_fanouted1, 3);
BOOST_CHECK_EQUAL(total_fanouted2, 4);
// Don't relay if there is sufficient non-reconciling peers
fanout_targets = tracker.GetFanoutTargets(Wtxid::FromUint256(frc.rand256()), /*inbounds_fanout_tx_relay=*/4, /*outbounds_fanout_tx_relay=*/0);
for (int j = 0; j < 100; ++j) {
size_t total_fanouted = 0;
for (int i = 1; i < inbound_peers; ++i) {
total_fanouted += should_fanout_to(i, fanout_targets);
}
BOOST_CHECK_EQUAL(total_fanouted, 0);
}
}
BOOST_AUTO_TEST_CASE(GetFanoutTargetsPreRegistered)
{
// Make sure that GetFanoutTargets works even if we have pre-registered peers. We should never return more inbound/outbound targets than
// inbound/outbound peers we have.
CSipHasher hasher(0x0706050403020100ULL, 0x0F0E0D0C0B0A0908ULL);
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION, hasher);
NodeId peer_id0 = 0;
NodeId peer_id1 = 1;
FastRandomContext frc{/*fDeterministic=*/true};
std::vector<NodeId> fanout_targets;
// If we only have one pre-registered peer, fanout targets should be empty, since there are not available peers to select from
tracker.PreRegisterPeer(peer_id0);
fanout_targets = tracker.GetFanoutTargets(Wtxid::FromUint256(frc.rand256()), /*inbounds_fanout_tx_relay=*/0, /*outbounds_fanout_tx_relay=*/0);
BOOST_REQUIRE(fanout_targets.empty());
// For one outbound and one pre-registered, GetFanoutTargets should return a single (outbound) peer
tracker.PreRegisterPeer(peer_id1);
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id1, /*is_peer_inbound=*/false, 1, 1), ReconciliationRegisterResult::SUCCESS);
fanout_targets = tracker.GetFanoutTargets(Wtxid::FromUint256(frc.rand256()), /*inbounds_fanout_tx_relay=*/0, /*outbounds_fanout_tx_relay=*/0);
BOOST_REQUIRE_EQUAL(fanout_targets.size(), 1);
BOOST_REQUIRE(std::find(fanout_targets.begin(), fanout_targets.end(), peer_id1) != fanout_targets.end());
tracker.ForgetPeer(peer_id1);
// For 10 inbounds* and one pre-registered, GetFanoutTargets should return a single (inbound) peer
// * 10 since we are selecting 10% of inbounds
for (int peer_id = 1; peer_id < 10; ++peer_id) {
tracker.PreRegisterPeer(peer_id);
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id, /*is_peer_inbound=*/true, 1, 1), ReconciliationRegisterResult::SUCCESS);
}
fanout_targets = tracker.GetFanoutTargets(Wtxid::FromUint256(frc.rand256()), /*inbounds_fanout_tx_relay=*/0, /*outbounds_fanout_tx_relay=*/0);
BOOST_REQUIRE_EQUAL(fanout_targets.size(), 1);
}
BOOST_AUTO_TEST_SUITE_END()