mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-02-21 14:34:49 +01:00
Merge bitcoin/bitcoin#27626: Parallel compact block downloads, take 3
d7f359b35e
Add tests for parallel compact block downloads (Greg Sanders)03423f8bd1
Support up to 3 parallel compact block txn fetchings (Greg Sanders)13f9b20b4c
Only request full blocks from the peer we thought had the block in-flight (Greg Sanders)cce96182ba
Convert mapBlocksInFlight to a multimap (Greg Sanders)a90595478d
Remove nBlocksInFlight (Greg Sanders)86cff8bf18
alias BlockDownloadMap for mapBlocksInFlight (Greg Sanders) Pull request description: This is an attempt at mitigating https://github.com/bitcoin/bitcoin/issues/25258 , which is a revival of https://github.com/bitcoin/bitcoin/pull/10984, which is a revival of https://github.com/bitcoin/bitcoin/pull/9447. This PR attempts to mitigate a single case, where high bandwidth peers can bail us out of a flakey peer not completing blocks for us. We allow up to 2 additional getblocktxns requests per unique block. This would hopefully allow the chance for an honest high bandwidth peer to hand us the transactions even if the first in flight peer stalls out. In contrast to previous effort: 1) it will not help if subsequent peers send block headers only, so only high-bandwidth peers this time. See: https://github.com/bitcoin/bitcoin/pull/10984/files#diff-6875de769e90cec84d2e8a9c1b962cdbcda44d870d42e4215827e599e11e90e3R1411 2) `MAX_GETBLOCKTXN_TXN_AFTER_FIRST_IN_FLIGHT` is removed, in favor of aiding recovery during turbulent mempools 3) We require one of the 3 block fetching slots to be an outbound peer. This can be the original offering peer, or subsequent compact blocks given by high bandwidth peers. ACKs for top commit: sdaftuar: ACKd7f359b35e
mzumsande: Code Review ACKd7f359b35e
Tree-SHA512: 54980eac179e30f12a0bd49df147b2c3d63cd8f9401abb23c7baf02f76eeb59f2cfaaa155227990d0d39384de9fa38663f88774e891600a3837ae927f04f0db3
This commit is contained in:
commit
51c050787f
5 changed files with 245 additions and 79 deletions
|
@ -200,7 +200,9 @@ public:
|
|||
int nVersion;
|
||||
std::string cleanSubVer;
|
||||
bool fInbound;
|
||||
// We requested high bandwidth connection to peer
|
||||
bool m_bip152_highbandwidth_to;
|
||||
// Peer requested high bandwidth connection
|
||||
bool m_bip152_highbandwidth_from;
|
||||
int m_starting_height;
|
||||
uint64_t nSendBytes;
|
||||
|
|
|
@ -431,7 +431,6 @@ struct CNodeState {
|
|||
std::list<QueuedBlock> vBlocksInFlight;
|
||||
//! When the first entry in vBlocksInFlight started downloading. Don't care when vBlocksInFlight is empty.
|
||||
std::chrono::microseconds m_downloading_since{0us};
|
||||
int nBlocksInFlight{0};
|
||||
//! Whether we consider this a preferred download peer.
|
||||
bool fPreferredDownload{false};
|
||||
/** Whether this peer wants invs or cmpctblocks (when possible) for block announcements. */
|
||||
|
@ -877,6 +876,9 @@ private:
|
|||
/** Have we requested this block from a peer */
|
||||
bool IsBlockRequested(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
|
||||
|
||||
/** Have we requested this block from an outbound peer */
|
||||
bool IsBlockRequestedFromOutbound(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
|
||||
|
||||
/** Remove this block from our tracked requested blocks. Called if:
|
||||
* - the block has been received from a peer
|
||||
* - the request for the block has timed out
|
||||
|
@ -899,7 +901,9 @@ private:
|
|||
*/
|
||||
void FindNextBlocksToDownload(const Peer& peer, unsigned int count, std::vector<const CBlockIndex*>& vBlocks, NodeId& nodeStaller) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
|
||||
|
||||
std::map<uint256, std::pair<NodeId, std::list<QueuedBlock>::iterator> > mapBlocksInFlight GUARDED_BY(cs_main);
|
||||
/* Multimap used to preserve insertion order */
|
||||
typedef std::multimap<uint256, std::pair<NodeId, std::list<QueuedBlock>::iterator>> BlockDownloadMap;
|
||||
BlockDownloadMap mapBlocksInFlight GUARDED_BY(cs_main);
|
||||
|
||||
/** When our tip was last updated. */
|
||||
std::atomic<std::chrono::seconds> m_last_tip_update{0s};
|
||||
|
@ -1117,40 +1121,55 @@ std::chrono::microseconds PeerManagerImpl::NextInvToInbounds(std::chrono::micros
|
|||
|
||||
bool PeerManagerImpl::IsBlockRequested(const uint256& hash)
|
||||
{
|
||||
return mapBlocksInFlight.find(hash) != mapBlocksInFlight.end();
|
||||
return mapBlocksInFlight.count(hash);
|
||||
}
|
||||
|
||||
bool PeerManagerImpl::IsBlockRequestedFromOutbound(const uint256& hash)
|
||||
{
|
||||
for (auto range = mapBlocksInFlight.equal_range(hash); range.first != range.second; range.first++) {
|
||||
auto [nodeid, block_it] = range.first->second;
|
||||
CNodeState& nodestate = *Assert(State(nodeid));
|
||||
if (!nodestate.m_is_inbound) return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void PeerManagerImpl::RemoveBlockRequest(const uint256& hash, std::optional<NodeId> from_peer)
|
||||
{
|
||||
auto it = mapBlocksInFlight.find(hash);
|
||||
if (it == mapBlocksInFlight.end()) {
|
||||
// Block was not requested
|
||||
auto range = mapBlocksInFlight.equal_range(hash);
|
||||
if (range.first == range.second) {
|
||||
// Block was not requested from any peer
|
||||
return;
|
||||
}
|
||||
|
||||
auto [node_id, list_it] = it->second;
|
||||
// We should not have requested too many of this block
|
||||
Assume(mapBlocksInFlight.count(hash) <= MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK);
|
||||
|
||||
if (from_peer && node_id != *from_peer) {
|
||||
// Block was requested by another peer
|
||||
return;
|
||||
while (range.first != range.second) {
|
||||
auto [node_id, list_it] = range.first->second;
|
||||
|
||||
if (from_peer && *from_peer != node_id) {
|
||||
range.first++;
|
||||
continue;
|
||||
}
|
||||
|
||||
CNodeState& state = *Assert(State(node_id));
|
||||
|
||||
if (state.vBlocksInFlight.begin() == list_it) {
|
||||
// First block on the queue was received, update the start download time for the next one
|
||||
state.m_downloading_since = std::max(state.m_downloading_since, GetTime<std::chrono::microseconds>());
|
||||
}
|
||||
state.vBlocksInFlight.erase(list_it);
|
||||
|
||||
if (state.vBlocksInFlight.empty()) {
|
||||
// Last validated block on the queue for this peer was received.
|
||||
m_peers_downloading_from--;
|
||||
}
|
||||
state.m_stalling_since = 0us;
|
||||
|
||||
range.first = mapBlocksInFlight.erase(range.first);
|
||||
}
|
||||
|
||||
CNodeState *state = State(node_id);
|
||||
assert(state != nullptr);
|
||||
|
||||
if (state->vBlocksInFlight.begin() == list_it) {
|
||||
// First block on the queue was received, update the start download time for the next one
|
||||
state->m_downloading_since = std::max(state->m_downloading_since, GetTime<std::chrono::microseconds>());
|
||||
}
|
||||
state->vBlocksInFlight.erase(list_it);
|
||||
|
||||
state->nBlocksInFlight--;
|
||||
if (state->nBlocksInFlight == 0) {
|
||||
// Last validated block on the queue was received.
|
||||
m_peers_downloading_from--;
|
||||
}
|
||||
state->m_stalling_since = 0us;
|
||||
mapBlocksInFlight.erase(it);
|
||||
}
|
||||
|
||||
bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, std::list<QueuedBlock>::iterator** pit)
|
||||
|
@ -1160,27 +1179,29 @@ bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, st
|
|||
CNodeState *state = State(nodeid);
|
||||
assert(state != nullptr);
|
||||
|
||||
Assume(mapBlocksInFlight.count(hash) <= MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK);
|
||||
|
||||
// Short-circuit most stuff in case it is from the same node
|
||||
std::map<uint256, std::pair<NodeId, std::list<QueuedBlock>::iterator> >::iterator itInFlight = mapBlocksInFlight.find(hash);
|
||||
if (itInFlight != mapBlocksInFlight.end() && itInFlight->second.first == nodeid) {
|
||||
if (pit) {
|
||||
*pit = &itInFlight->second.second;
|
||||
for (auto range = mapBlocksInFlight.equal_range(hash); range.first != range.second; range.first++) {
|
||||
if (range.first->second.first == nodeid) {
|
||||
if (pit) {
|
||||
*pit = &range.first->second.second;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Make sure it's not listed somewhere already.
|
||||
RemoveBlockRequest(hash, std::nullopt);
|
||||
// Make sure it's not being fetched already from same peer.
|
||||
RemoveBlockRequest(hash, nodeid);
|
||||
|
||||
std::list<QueuedBlock>::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(),
|
||||
{&block, std::unique_ptr<PartiallyDownloadedBlock>(pit ? new PartiallyDownloadedBlock(&m_mempool) : nullptr)});
|
||||
state->nBlocksInFlight++;
|
||||
if (state->nBlocksInFlight == 1) {
|
||||
if (state->vBlocksInFlight.size() == 1) {
|
||||
// We're starting a block download (batch) from this peer.
|
||||
state->m_downloading_since = GetTime<std::chrono::microseconds>();
|
||||
m_peers_downloading_from++;
|
||||
}
|
||||
itInFlight = mapBlocksInFlight.insert(std::make_pair(hash, std::make_pair(nodeid, it))).first;
|
||||
auto itInFlight = mapBlocksInFlight.insert(std::make_pair(hash, std::make_pair(nodeid, it)));
|
||||
if (pit) {
|
||||
*pit = &itInFlight->second.second;
|
||||
}
|
||||
|
@ -1383,7 +1404,7 @@ void PeerManagerImpl::FindNextBlocksToDownload(const Peer& peer, unsigned int co
|
|||
}
|
||||
} else if (waitingfor == -1) {
|
||||
// This is the first already-in-flight block.
|
||||
waitingfor = mapBlocksInFlight[pindex->GetBlockHash()].first;
|
||||
waitingfor = mapBlocksInFlight.lower_bound(pindex->GetBlockHash())->second.first;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1513,13 +1534,21 @@ void PeerManagerImpl::FinalizeNode(const CNode& node)
|
|||
nSyncStarted--;
|
||||
|
||||
for (const QueuedBlock& entry : state->vBlocksInFlight) {
|
||||
mapBlocksInFlight.erase(entry.pindex->GetBlockHash());
|
||||
auto range = mapBlocksInFlight.equal_range(entry.pindex->GetBlockHash());
|
||||
while (range.first != range.second) {
|
||||
auto [node_id, list_it] = range.first->second;
|
||||
if (node_id != nodeid) {
|
||||
range.first++;
|
||||
} else {
|
||||
range.first = mapBlocksInFlight.erase(range.first);
|
||||
}
|
||||
}
|
||||
}
|
||||
m_orphanage.EraseForPeer(nodeid);
|
||||
m_txrequest.DisconnectedPeer(nodeid);
|
||||
if (m_txreconciliation) m_txreconciliation->ForgetPeer(nodeid);
|
||||
m_num_preferred_download_peers -= state->fPreferredDownload;
|
||||
m_peers_downloading_from -= (state->nBlocksInFlight != 0);
|
||||
m_peers_downloading_from -= (!state->vBlocksInFlight.empty());
|
||||
assert(m_peers_downloading_from >= 0);
|
||||
m_outbound_peers_with_protect_from_disconnect -= state->m_chain_sync.m_protect;
|
||||
assert(m_outbound_peers_with_protect_from_disconnect >= 0);
|
||||
|
@ -1760,11 +1789,10 @@ std::optional<std::string> PeerManagerImpl::FetchBlock(NodeId peer_id, const CBl
|
|||
|
||||
LOCK(cs_main);
|
||||
|
||||
// Mark block as in-flight unless it already is (for this peer).
|
||||
// If the peer does not send us a block, vBlocksInFlight remains non-empty,
|
||||
// causing us to timeout and disconnect.
|
||||
// If a block was already in-flight for a different peer, its BLOCKTXN
|
||||
// response will be dropped.
|
||||
// Forget about all prior requests
|
||||
RemoveBlockRequest(block_index.GetBlockHash(), std::nullopt);
|
||||
|
||||
// Mark block as in-flight
|
||||
if (!BlockRequested(peer_id, block_index)) return "Already requested from this peer";
|
||||
|
||||
// Construct message to request the block
|
||||
|
@ -2680,7 +2708,7 @@ void PeerManagerImpl::HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, c
|
|||
std::vector<CInv> vGetData;
|
||||
// Download as much as possible, from earliest to latest.
|
||||
for (const CBlockIndex *pindex : reverse_iterate(vToFetch)) {
|
||||
if (nodestate->nBlocksInFlight >= MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
|
||||
if (nodestate->vBlocksInFlight.size() >= MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
|
||||
// Can't download any more from this peer
|
||||
break;
|
||||
}
|
||||
|
@ -4274,15 +4302,27 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
|
|||
nodestate->m_last_block_announcement = GetTime();
|
||||
}
|
||||
|
||||
std::map<uint256, std::pair<NodeId, std::list<QueuedBlock>::iterator> >::iterator blockInFlightIt = mapBlocksInFlight.find(pindex->GetBlockHash());
|
||||
bool fAlreadyInFlight = blockInFlightIt != mapBlocksInFlight.end();
|
||||
|
||||
if (pindex->nStatus & BLOCK_HAVE_DATA) // Nothing to do here
|
||||
return;
|
||||
|
||||
auto range_flight = mapBlocksInFlight.equal_range(pindex->GetBlockHash());
|
||||
size_t already_in_flight = std::distance(range_flight.first, range_flight.second);
|
||||
bool requested_block_from_this_peer{false};
|
||||
|
||||
// Multimap ensures ordering of outstanding requests. It's either empty or first in line.
|
||||
bool first_in_flight = already_in_flight == 0 || (range_flight.first->second.first == pfrom.GetId());
|
||||
|
||||
while (range_flight.first != range_flight.second) {
|
||||
if (range_flight.first->second.first == pfrom.GetId()) {
|
||||
requested_block_from_this_peer = true;
|
||||
break;
|
||||
}
|
||||
range_flight.first++;
|
||||
}
|
||||
|
||||
if (pindex->nChainWork <= m_chainman.ActiveChain().Tip()->nChainWork || // We know something better
|
||||
pindex->nTx != 0) { // We had this block at some point, but pruned it
|
||||
if (fAlreadyInFlight) {
|
||||
if (requested_block_from_this_peer) {
|
||||
// We requested this block for some reason, but our mempool will probably be useless
|
||||
// so we just grab the block via normal getdata
|
||||
std::vector<CInv> vInv(1);
|
||||
|
@ -4293,15 +4333,15 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
|
|||
}
|
||||
|
||||
// If we're not close to tip yet, give up and let parallel block fetch work its magic
|
||||
if (!fAlreadyInFlight && !CanDirectFetch()) {
|
||||
if (!already_in_flight && !CanDirectFetch()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// We want to be a bit conservative just to be extra careful about DoS
|
||||
// possibilities in compact block processing...
|
||||
if (pindex->nHeight <= m_chainman.ActiveChain().Height() + 2) {
|
||||
if ((!fAlreadyInFlight && nodestate->nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) ||
|
||||
(fAlreadyInFlight && blockInFlightIt->second.first == pfrom.GetId())) {
|
||||
if ((already_in_flight < MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK && nodestate->vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) ||
|
||||
requested_block_from_this_peer) {
|
||||
std::list<QueuedBlock>::iterator* queuedBlockIt = nullptr;
|
||||
if (!BlockRequested(pfrom.GetId(), *pindex, &queuedBlockIt)) {
|
||||
if (!(*queuedBlockIt)->partialBlock)
|
||||
|
@ -4320,11 +4360,16 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
|
|||
Misbehaving(*peer, 100, "invalid compact block");
|
||||
return;
|
||||
} else if (status == READ_STATUS_FAILED) {
|
||||
// Duplicate txindexes, the block is now in-flight, so just request it
|
||||
std::vector<CInv> vInv(1);
|
||||
vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(*peer), blockhash);
|
||||
m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv));
|
||||
return;
|
||||
if (first_in_flight) {
|
||||
// Duplicate txindexes, the block is now in-flight, so just request it
|
||||
std::vector<CInv> vInv(1);
|
||||
vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(*peer), blockhash);
|
||||
m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv));
|
||||
return;
|
||||
} else {
|
||||
// Give up for this peer and wait for other peer(s)
|
||||
RemoveBlockRequest(pindex->GetBlockHash(), pfrom.GetId());
|
||||
}
|
||||
}
|
||||
|
||||
BlockTransactionsRequest req;
|
||||
|
@ -4338,9 +4383,24 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
|
|||
txn.blockhash = blockhash;
|
||||
blockTxnMsg << txn;
|
||||
fProcessBLOCKTXN = true;
|
||||
} else {
|
||||
} else if (first_in_flight) {
|
||||
// We will try to round-trip any compact blocks we get on failure,
|
||||
// as long as it's first...
|
||||
req.blockhash = pindex->GetBlockHash();
|
||||
m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETBLOCKTXN, req));
|
||||
} else if (pfrom.m_bip152_highbandwidth_to &&
|
||||
(!pfrom.IsInboundConn() ||
|
||||
IsBlockRequestedFromOutbound(blockhash) ||
|
||||
already_in_flight < MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK - 1)) {
|
||||
// ... or it's a hb relay peer and:
|
||||
// - peer is outbound, or
|
||||
// - we already have an outbound attempt in flight(so we'll take what we can get), or
|
||||
// - it's not the final parallel download slot (which we may reserve for first outbound)
|
||||
req.blockhash = pindex->GetBlockHash();
|
||||
m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETBLOCKTXN, req));
|
||||
} else {
|
||||
// Give up for this peer and wait for other peer(s)
|
||||
RemoveBlockRequest(pindex->GetBlockHash(), pfrom.GetId());
|
||||
}
|
||||
} else {
|
||||
// This block is either already in flight from a different
|
||||
|
@ -4361,7 +4421,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
|
|||
}
|
||||
}
|
||||
} else {
|
||||
if (fAlreadyInFlight) {
|
||||
if (requested_block_from_this_peer) {
|
||||
// We requested this block, but its far into the future, so our
|
||||
// mempool will probably be useless - request the block normally
|
||||
std::vector<CInv> vInv(1);
|
||||
|
@ -4433,24 +4493,44 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
|
|||
{
|
||||
LOCK(cs_main);
|
||||
|
||||
std::map<uint256, std::pair<NodeId, std::list<QueuedBlock>::iterator> >::iterator it = mapBlocksInFlight.find(resp.blockhash);
|
||||
if (it == mapBlocksInFlight.end() || !it->second.second->partialBlock ||
|
||||
it->second.first != pfrom.GetId()) {
|
||||
auto range_flight = mapBlocksInFlight.equal_range(resp.blockhash);
|
||||
size_t already_in_flight = std::distance(range_flight.first, range_flight.second);
|
||||
bool requested_block_from_this_peer{false};
|
||||
|
||||
// Multimap ensures ordering of outstanding requests. It's either empty or first in line.
|
||||
bool first_in_flight = already_in_flight == 0 || (range_flight.first->second.first == pfrom.GetId());
|
||||
|
||||
while (range_flight.first != range_flight.second) {
|
||||
auto [node_id, block_it] = range_flight.first->second;
|
||||
if (node_id == pfrom.GetId() && block_it->partialBlock) {
|
||||
requested_block_from_this_peer = true;
|
||||
break;
|
||||
}
|
||||
range_flight.first++;
|
||||
}
|
||||
|
||||
if (!requested_block_from_this_peer) {
|
||||
LogPrint(BCLog::NET, "Peer %d sent us block transactions for block we weren't expecting\n", pfrom.GetId());
|
||||
return;
|
||||
}
|
||||
|
||||
PartiallyDownloadedBlock& partialBlock = *it->second.second->partialBlock;
|
||||
PartiallyDownloadedBlock& partialBlock = *range_flight.first->second.second->partialBlock;
|
||||
ReadStatus status = partialBlock.FillBlock(*pblock, resp.txn);
|
||||
if (status == READ_STATUS_INVALID) {
|
||||
RemoveBlockRequest(resp.blockhash, pfrom.GetId()); // Reset in-flight state in case Misbehaving does not result in a disconnect
|
||||
Misbehaving(*peer, 100, "invalid compact block/non-matching block transactions");
|
||||
return;
|
||||
} else if (status == READ_STATUS_FAILED) {
|
||||
// Might have collided, fall back to getdata now :(
|
||||
std::vector<CInv> invs;
|
||||
invs.push_back(CInv(MSG_BLOCK | GetFetchFlags(*peer), resp.blockhash));
|
||||
m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, invs));
|
||||
if (first_in_flight) {
|
||||
// Might have collided, fall back to getdata now :(
|
||||
std::vector<CInv> invs;
|
||||
invs.push_back(CInv(MSG_BLOCK | GetFetchFlags(*peer), resp.blockhash));
|
||||
m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, invs));
|
||||
} else {
|
||||
RemoveBlockRequest(resp.blockhash, pfrom.GetId());
|
||||
LogPrint(BCLog::NET, "Peer %d sent us a compact block but it failed to reconstruct, waiting on first download to complete\n", pfrom.GetId());
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// Block is either okay, or possibly we received
|
||||
// READ_STATUS_CHECKBLOCK_FAILED.
|
||||
|
@ -5043,14 +5123,14 @@ void PeerManagerImpl::EvictExtraOutboundPeers(std::chrono::seconds now)
|
|||
// valid headers chain with at least as much work as our tip.
|
||||
CNodeState *node_state = State(pnode->GetId());
|
||||
if (node_state == nullptr ||
|
||||
(now - pnode->m_connected >= MINIMUM_CONNECT_TIME && node_state->nBlocksInFlight == 0)) {
|
||||
(now - pnode->m_connected >= MINIMUM_CONNECT_TIME && node_state->vBlocksInFlight.empty())) {
|
||||
pnode->fDisconnect = true;
|
||||
LogPrint(BCLog::NET, "disconnecting extra block-relay-only peer=%d (last block received at time %d)\n",
|
||||
pnode->GetId(), count_seconds(pnode->m_last_block_time));
|
||||
return true;
|
||||
} else {
|
||||
LogPrint(BCLog::NET, "keeping block-relay-only peer=%d chosen for eviction (connect time: %d, blocks_in_flight: %d)\n",
|
||||
pnode->GetId(), count_seconds(pnode->m_connected), node_state->nBlocksInFlight);
|
||||
pnode->GetId(), count_seconds(pnode->m_connected), node_state->vBlocksInFlight.size());
|
||||
}
|
||||
return false;
|
||||
});
|
||||
|
@ -5090,13 +5170,13 @@ void PeerManagerImpl::EvictExtraOutboundPeers(std::chrono::seconds now)
|
|||
// Also don't disconnect any peer we're trying to download a
|
||||
// block from.
|
||||
CNodeState &state = *State(pnode->GetId());
|
||||
if (now - pnode->m_connected > MINIMUM_CONNECT_TIME && state.nBlocksInFlight == 0) {
|
||||
if (now - pnode->m_connected > MINIMUM_CONNECT_TIME && state.vBlocksInFlight.empty()) {
|
||||
LogPrint(BCLog::NET, "disconnecting extra outbound peer=%d (last block announcement received at time %d)\n", pnode->GetId(), oldest_block_announcement);
|
||||
pnode->fDisconnect = true;
|
||||
return true;
|
||||
} else {
|
||||
LogPrint(BCLog::NET, "keeping outbound peer=%d chosen for eviction (connect time: %d, blocks_in_flight: %d)\n",
|
||||
pnode->GetId(), count_seconds(pnode->m_connected), state.nBlocksInFlight);
|
||||
pnode->GetId(), count_seconds(pnode->m_connected), state.vBlocksInFlight.size());
|
||||
return false;
|
||||
}
|
||||
});
|
||||
|
@ -5816,10 +5896,10 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
|
|||
// Message: getdata (blocks)
|
||||
//
|
||||
std::vector<CInv> vGetData;
|
||||
if (CanServeBlocks(*peer) && ((sync_blocks_and_headers_from_peer && !IsLimitedPeer(*peer)) || !m_chainman.ActiveChainstate().IsInitialBlockDownload()) && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
|
||||
if (CanServeBlocks(*peer) && ((sync_blocks_and_headers_from_peer && !IsLimitedPeer(*peer)) || !m_chainman.ActiveChainstate().IsInitialBlockDownload()) && state.vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
|
||||
std::vector<const CBlockIndex*> vToDownload;
|
||||
NodeId staller = -1;
|
||||
FindNextBlocksToDownload(*peer, MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.nBlocksInFlight, vToDownload, staller);
|
||||
FindNextBlocksToDownload(*peer, MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.vBlocksInFlight.size(), vToDownload, staller);
|
||||
for (const CBlockIndex *pindex : vToDownload) {
|
||||
uint32_t nFetchFlags = GetFetchFlags(*peer);
|
||||
vGetData.push_back(CInv(MSG_BLOCK | nFetchFlags, pindex->GetBlockHash()));
|
||||
|
@ -5827,7 +5907,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
|
|||
LogPrint(BCLog::NET, "Requesting block %s (%d) peer=%d\n", pindex->GetBlockHash().ToString(),
|
||||
pindex->nHeight, pto->GetId());
|
||||
}
|
||||
if (state.nBlocksInFlight == 0 && staller != -1) {
|
||||
if (state.vBlocksInFlight.empty() && staller != -1) {
|
||||
if (State(staller)->m_stalling_since == 0us) {
|
||||
State(staller)->m_stalling_since = current_time;
|
||||
LogPrint(BCLog::NET, "Stall started peer=%d\n", staller);
|
||||
|
|
|
@ -22,6 +22,8 @@ static const bool DEFAULT_PEERBLOOMFILTERS = false;
|
|||
static const bool DEFAULT_PEERBLOCKFILTERS = false;
|
||||
/** Threshold for marking a node to be discouraged, e.g. disconnected and added to the discouragement filter. */
|
||||
static const int DISCOURAGEMENT_THRESHOLD{100};
|
||||
/** Maximum number of outstanding CMPCTBLOCK requests for the same block. */
|
||||
static const unsigned int MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK = 3;
|
||||
|
||||
struct CNodeStateStats {
|
||||
int nSyncHeight = -1;
|
||||
|
|
|
@ -428,7 +428,7 @@ static RPCHelpMan getblockfrompeer()
|
|||
"getblockfrompeer",
|
||||
"Attempt to fetch block from a given peer.\n\n"
|
||||
"We must have the header for this block, e.g. using submitheader.\n"
|
||||
"Subsequent calls for the same block and a new peer will cause the response from the previous peer to be ignored.\n"
|
||||
"Subsequent calls for the same block may cause the response from the previous peer to be ignored.\n"
|
||||
"Peers generally ignore requests for a stale block that they never fully verified, or one that is more than a month old.\n"
|
||||
"When a peer does not respond with a block, we will disconnect.\n"
|
||||
"Note: The block could be re-pruned as soon as it is received.\n\n"
|
||||
|
|
|
@ -105,6 +105,10 @@ class TestP2PConn(P2PInterface):
|
|||
self.last_message.pop("headers", None)
|
||||
self.last_message.pop("cmpctblock", None)
|
||||
|
||||
def clear_getblocktxn(self):
|
||||
with p2p_lock:
|
||||
self.last_message.pop("getblocktxn", None)
|
||||
|
||||
def get_headers(self, locator, hashstop):
|
||||
msg = msg_getheaders()
|
||||
msg.locator.vHave = locator
|
||||
|
@ -745,7 +749,7 @@ class CompactBlocksTest(BitcoinTestFramework):
|
|||
peer.get_headers(locator=[int(tip, 16)], hashstop=0)
|
||||
peer.send_and_ping(msg_sendcmpct(announce=True, version=2))
|
||||
|
||||
def test_compactblock_reconstruction_multiple_peers(self, stalling_peer, delivery_peer):
|
||||
def test_compactblock_reconstruction_stalling_peer(self, stalling_peer, delivery_peer):
|
||||
node = self.nodes[0]
|
||||
assert len(self.utxos)
|
||||
|
||||
|
@ -823,12 +827,85 @@ class CompactBlocksTest(BitcoinTestFramework):
|
|||
hb_test_node.send_and_ping(msg_sendcmpct(announce=False, version=2))
|
||||
assert_highbandwidth_states(self.nodes[0], hb_to=True, hb_from=False)
|
||||
|
||||
def test_compactblock_reconstruction_parallel_reconstruction(self, stalling_peer, delivery_peer, inbound_peer, outbound_peer):
|
||||
""" All p2p connections are inbound except outbound_peer. We test that ultimate parallel slot
|
||||
can only be taken by an outbound node unless prior attempts were done by an outbound
|
||||
"""
|
||||
node = self.nodes[0]
|
||||
assert len(self.utxos)
|
||||
|
||||
def announce_cmpct_block(node, peer, txn_count):
|
||||
utxo = self.utxos.pop(0)
|
||||
block = self.build_block_with_transactions(node, utxo, txn_count)
|
||||
|
||||
cmpct_block = HeaderAndShortIDs()
|
||||
cmpct_block.initialize_from_block(block)
|
||||
msg = msg_cmpctblock(cmpct_block.to_p2p())
|
||||
peer.send_and_ping(msg)
|
||||
with p2p_lock:
|
||||
assert "getblocktxn" in peer.last_message
|
||||
return block, cmpct_block
|
||||
|
||||
for name, peer in [("delivery", delivery_peer), ("inbound", inbound_peer), ("outbound", outbound_peer)]:
|
||||
self.log.info(f"Setting {name} as high bandwidth peer")
|
||||
block, cmpct_block = announce_cmpct_block(node, peer, 1)
|
||||
msg = msg_blocktxn()
|
||||
msg.block_transactions.blockhash = block.sha256
|
||||
msg.block_transactions.transactions = block.vtx[1:]
|
||||
peer.send_and_ping(msg)
|
||||
assert_equal(int(node.getbestblockhash(), 16), block.sha256)
|
||||
peer.clear_getblocktxn()
|
||||
|
||||
# Test the simple parallel download case...
|
||||
for num_missing in [1, 5, 20]:
|
||||
|
||||
# Remaining low-bandwidth peer is stalling_peer, who announces first
|
||||
assert_equal([peer['bip152_hb_to'] for peer in node.getpeerinfo()], [False, True, True, True])
|
||||
|
||||
block, cmpct_block = announce_cmpct_block(node, stalling_peer, num_missing)
|
||||
|
||||
delivery_peer.send_and_ping(msg_cmpctblock(cmpct_block.to_p2p()))
|
||||
with p2p_lock:
|
||||
# The second peer to announce should still get a getblocktxn
|
||||
assert "getblocktxn" in delivery_peer.last_message
|
||||
assert int(node.getbestblockhash(), 16) != block.sha256
|
||||
|
||||
inbound_peer.send_and_ping(msg_cmpctblock(cmpct_block.to_p2p()))
|
||||
with p2p_lock:
|
||||
# The third inbound peer to announce should *not* get a getblocktxn
|
||||
assert "getblocktxn" not in inbound_peer.last_message
|
||||
assert int(node.getbestblockhash(), 16) != block.sha256
|
||||
|
||||
outbound_peer.send_and_ping(msg_cmpctblock(cmpct_block.to_p2p()))
|
||||
with p2p_lock:
|
||||
# The third peer to announce should get a getblocktxn if outbound
|
||||
assert "getblocktxn" in outbound_peer.last_message
|
||||
assert int(node.getbestblockhash(), 16) != block.sha256
|
||||
|
||||
# Second peer completes the compact block first
|
||||
msg = msg_blocktxn()
|
||||
msg.block_transactions.blockhash = block.sha256
|
||||
msg.block_transactions.transactions = block.vtx[1:]
|
||||
delivery_peer.send_and_ping(msg)
|
||||
assert_equal(int(node.getbestblockhash(), 16), block.sha256)
|
||||
|
||||
# Nothing bad should happen if we get a late fill from the first peer...
|
||||
stalling_peer.send_and_ping(msg)
|
||||
self.utxos.append([block.vtx[-1].sha256, 0, block.vtx[-1].vout[0].nValue])
|
||||
|
||||
delivery_peer.clear_getblocktxn()
|
||||
inbound_peer.clear_getblocktxn()
|
||||
outbound_peer.clear_getblocktxn()
|
||||
|
||||
|
||||
def run_test(self):
|
||||
self.wallet = MiniWallet(self.nodes[0])
|
||||
|
||||
# Setup the p2p connections
|
||||
self.segwit_node = self.nodes[0].add_p2p_connection(TestP2PConn())
|
||||
self.additional_segwit_node = self.nodes[0].add_p2p_connection(TestP2PConn())
|
||||
self.onemore_inbound_node = self.nodes[0].add_p2p_connection(TestP2PConn())
|
||||
self.outbound_node = self.nodes[0].add_outbound_p2p_connection(TestP2PConn(), p2p_idx=3, connection_type="outbound-full-relay")
|
||||
|
||||
# We will need UTXOs to construct transactions in later tests.
|
||||
self.make_utxos()
|
||||
|
@ -838,6 +915,8 @@ class CompactBlocksTest(BitcoinTestFramework):
|
|||
self.log.info("Testing SENDCMPCT p2p message... ")
|
||||
self.test_sendcmpct(self.segwit_node)
|
||||
self.test_sendcmpct(self.additional_segwit_node)
|
||||
self.test_sendcmpct(self.onemore_inbound_node)
|
||||
self.test_sendcmpct(self.outbound_node)
|
||||
|
||||
self.log.info("Testing compactblock construction...")
|
||||
self.test_compactblock_construction(self.segwit_node)
|
||||
|
@ -860,8 +939,11 @@ class CompactBlocksTest(BitcoinTestFramework):
|
|||
self.log.info("Testing handling of incorrect blocktxn responses...")
|
||||
self.test_incorrect_blocktxn_response(self.segwit_node)
|
||||
|
||||
self.log.info("Testing reconstructing compact blocks from all peers...")
|
||||
self.test_compactblock_reconstruction_multiple_peers(self.segwit_node, self.additional_segwit_node)
|
||||
self.log.info("Testing reconstructing compact blocks with a stalling peer...")
|
||||
self.test_compactblock_reconstruction_stalling_peer(self.segwit_node, self.additional_segwit_node)
|
||||
|
||||
self.log.info("Testing reconstructing compact blocks from multiple peers...")
|
||||
self.test_compactblock_reconstruction_parallel_reconstruction(stalling_peer=self.segwit_node, inbound_peer=self.onemore_inbound_node, delivery_peer=self.additional_segwit_node, outbound_peer=self.outbound_node)
|
||||
|
||||
# Test that if we submitblock to node1, we'll get a compact block
|
||||
# announcement to all peers.
|
||||
|
|
Loading…
Add table
Reference in a new issue