mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-02-21 14:34:49 +01:00
net: Use mockable time for tx download
This commit is contained in:
parent
fce4123242
commit
fa883ab35a
3 changed files with 42 additions and 32 deletions
|
@ -68,13 +68,13 @@ static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100;
|
|||
/** Maximum number of announced transactions from a peer */
|
||||
static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ;
|
||||
/** How many microseconds to delay requesting transactions from inbound peers */
|
||||
static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; // 2 seconds
|
||||
static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{std::chrono::seconds{2}};
|
||||
/** How long to wait (in microseconds) before downloading a transaction from an additional peer */
|
||||
static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000; // 1 minute
|
||||
static constexpr std::chrono::microseconds GETDATA_TX_INTERVAL{std::chrono::seconds{60}};
|
||||
/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */
|
||||
static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; // 2 seconds
|
||||
static constexpr std::chrono::microseconds MAX_GETDATA_RANDOM_DELAY{std::chrono::seconds{2}};
|
||||
/** How long to wait (in microseconds) before expiring an in-flight getdata request to a peer */
|
||||
static constexpr int64_t TX_EXPIRY_INTERVAL = 10 * GETDATA_TX_INTERVAL;
|
||||
static constexpr std::chrono::microseconds TX_EXPIRY_INTERVAL{GETDATA_TX_INTERVAL * 10};
|
||||
static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY,
|
||||
"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY");
|
||||
/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */
|
||||
|
@ -340,16 +340,16 @@ struct CNodeState {
|
|||
/* Track when to attempt download of announced transactions (process
|
||||
* time in micros -> txid)
|
||||
*/
|
||||
std::multimap<int64_t, uint256> m_tx_process_time;
|
||||
std::multimap<std::chrono::microseconds, uint256> m_tx_process_time;
|
||||
|
||||
//! Store all the transactions a peer has recently announced
|
||||
std::set<uint256> m_tx_announced;
|
||||
|
||||
//! Store transactions which were requested by us, with timestamp
|
||||
std::map<uint256, int64_t> m_tx_in_flight;
|
||||
std::map<uint256, std::chrono::microseconds> m_tx_in_flight;
|
||||
|
||||
//! Periodically check for stuck getdata requests
|
||||
int64_t m_check_expiry_timer{0};
|
||||
std::chrono::microseconds m_check_expiry_timer{0};
|
||||
};
|
||||
|
||||
TxDownloadState m_tx_download;
|
||||
|
@ -391,7 +391,7 @@ struct CNodeState {
|
|||
};
|
||||
|
||||
// Keeps track of the time (in microseconds) when transactions were requested last time
|
||||
limitedmap<uint256, int64_t> g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ);
|
||||
limitedmap<uint256, std::chrono::microseconds> g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ);
|
||||
|
||||
/** Map maintaining per-node state. */
|
||||
static std::map<NodeId, CNodeState> mapNodeState GUARDED_BY(cs_main);
|
||||
|
@ -688,16 +688,16 @@ void EraseTxRequest(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
|||
g_already_asked_for.erase(txid);
|
||||
}
|
||||
|
||||
int64_t GetTxRequestTime(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||
std::chrono::microseconds GetTxRequestTime(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||
{
|
||||
auto it = g_already_asked_for.find(txid);
|
||||
if (it != g_already_asked_for.end()) {
|
||||
return it->second;
|
||||
}
|
||||
return 0;
|
||||
return {};
|
||||
}
|
||||
|
||||
void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||
void UpdateTxRequestTime(const uint256& txid, std::chrono::microseconds request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||
{
|
||||
auto it = g_already_asked_for.find(txid);
|
||||
if (it == g_already_asked_for.end()) {
|
||||
|
@ -707,17 +707,17 @@ void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LO
|
|||
}
|
||||
}
|
||||
|
||||
int64_t CalculateTxGetDataTime(const uint256& txid, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||
std::chrono::microseconds CalculateTxGetDataTime(const uint256& txid, std::chrono::microseconds current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||
{
|
||||
int64_t process_time;
|
||||
int64_t last_request_time = GetTxRequestTime(txid);
|
||||
std::chrono::microseconds process_time;
|
||||
const auto last_request_time = GetTxRequestTime(txid);
|
||||
// First time requesting this tx
|
||||
if (last_request_time == 0) {
|
||||
if (last_request_time.count() == 0) {
|
||||
process_time = current_time;
|
||||
} else {
|
||||
// Randomize the delay to avoid biasing some peers over others (such as due to
|
||||
// fixed ordering of peer processing in ThreadMessageHandler)
|
||||
process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand(MAX_GETDATA_RANDOM_DELAY);
|
||||
process_time = last_request_time + GETDATA_TX_INTERVAL + GetRandMicros(MAX_GETDATA_RANDOM_DELAY);
|
||||
}
|
||||
|
||||
// We delay processing announcements from inbound peers
|
||||
|
@ -726,7 +726,7 @@ int64_t CalculateTxGetDataTime(const uint256& txid, int64_t current_time, bool u
|
|||
return process_time;
|
||||
}
|
||||
|
||||
void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||
void RequestTx(CNodeState* state, const uint256& txid, std::chrono::microseconds current_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||
{
|
||||
CNodeState::TxDownloadState& peer_download_state = state->m_tx_download;
|
||||
if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
|
||||
|
@ -740,7 +740,7 @@ void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_L
|
|||
|
||||
// Calculate the time to try requesting this transaction. Use
|
||||
// fPreferredDownload as a proxy for outbound peers.
|
||||
int64_t process_time = CalculateTxGetDataTime(txid, nNow, !state->fPreferredDownload);
|
||||
const auto process_time = CalculateTxGetDataTime(txid, current_time, !state->fPreferredDownload);
|
||||
|
||||
peer_download_state.m_tx_process_time.emplace(process_time, txid);
|
||||
}
|
||||
|
@ -2218,7 +2218,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
|
|||
LOCK(cs_main);
|
||||
|
||||
uint32_t nFetchFlags = GetFetchFlags(pfrom);
|
||||
int64_t nNow = GetTimeMicros();
|
||||
const auto current_time = GetTime<std::chrono::microseconds>();
|
||||
|
||||
for (CInv &inv : vInv)
|
||||
{
|
||||
|
@ -2250,7 +2250,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
|
|||
if (fBlocksOnly) {
|
||||
LogPrint(BCLog::NET, "transaction (%s) inv sent in violation of protocol peer=%d\n", inv.hash.ToString(), pfrom->GetId());
|
||||
} else if (!fAlreadyHave && !fImporting && !fReindex && !::ChainstateActive().IsInitialBlockDownload()) {
|
||||
RequestTx(State(pfrom->GetId()), inv.hash, nNow);
|
||||
RequestTx(State(pfrom->GetId()), inv.hash, current_time);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2524,12 +2524,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
|
|||
}
|
||||
if (!fRejectedParents) {
|
||||
uint32_t nFetchFlags = GetFetchFlags(pfrom);
|
||||
int64_t nNow = GetTimeMicros();
|
||||
const auto current_time = GetTime<std::chrono::microseconds>();
|
||||
|
||||
for (const CTxIn& txin : tx.vin) {
|
||||
CInv _inv(MSG_TX | nFetchFlags, txin.prevout.hash);
|
||||
pfrom->AddInventoryKnown(_inv);
|
||||
if (!AlreadyHave(_inv)) RequestTx(State(pfrom->GetId()), _inv.hash, nNow);
|
||||
if (!AlreadyHave(_inv)) RequestTx(State(pfrom->GetId()), _inv.hash, current_time);
|
||||
}
|
||||
AddOrphanTx(ptx, pfrom->GetId());
|
||||
|
||||
|
@ -3900,6 +3900,9 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
|
|||
connman->PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
|
||||
|
||||
// Detect whether we're stalling
|
||||
const auto current_time = GetTime<std::chrono::microseconds>();
|
||||
// nNow is the current system time (GetTimeMicros is not mockable) and
|
||||
// should be replaced by the mockable current_time eventually
|
||||
nNow = GetTimeMicros();
|
||||
if (state.nStallingSince && state.nStallingSince < nNow - 1000000 * BLOCK_STALLING_TIMEOUT) {
|
||||
// Stalling only triggers when the block download window cannot move. During normal steady state,
|
||||
|
@ -3992,9 +3995,9 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
|
|||
// were unresponsive in the past.
|
||||
// Eventually we should consider disconnecting peers, but this is
|
||||
// conservative.
|
||||
if (state.m_tx_download.m_check_expiry_timer <= nNow) {
|
||||
if (state.m_tx_download.m_check_expiry_timer <= current_time) {
|
||||
for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) {
|
||||
if (it->second <= nNow - TX_EXPIRY_INTERVAL) {
|
||||
if (it->second <= current_time - TX_EXPIRY_INTERVAL) {
|
||||
LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId());
|
||||
state.m_tx_download.m_tx_announced.erase(it->first);
|
||||
state.m_tx_download.m_tx_in_flight.erase(it++);
|
||||
|
@ -4004,11 +4007,11 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
|
|||
}
|
||||
// On average, we do this check every TX_EXPIRY_INTERVAL. Randomize
|
||||
// so that we're not doing this for all peers at the same time.
|
||||
state.m_tx_download.m_check_expiry_timer = nNow + TX_EXPIRY_INTERVAL/2 + GetRand(TX_EXPIRY_INTERVAL);
|
||||
state.m_tx_download.m_check_expiry_timer = current_time + TX_EXPIRY_INTERVAL / 2 + GetRandMicros(TX_EXPIRY_INTERVAL);
|
||||
}
|
||||
|
||||
auto& tx_process_time = state.m_tx_download.m_tx_process_time;
|
||||
while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) {
|
||||
while (!tx_process_time.empty() && tx_process_time.begin()->first <= current_time && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) {
|
||||
const uint256 txid = tx_process_time.begin()->second;
|
||||
// Erase this entry from tx_process_time (it may be added back for
|
||||
// processing at a later time, see below)
|
||||
|
@ -4017,22 +4020,22 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
|
|||
if (!AlreadyHave(inv)) {
|
||||
// If this transaction was last requested more than 1 minute ago,
|
||||
// then request.
|
||||
int64_t last_request_time = GetTxRequestTime(inv.hash);
|
||||
if (last_request_time <= nNow - GETDATA_TX_INTERVAL) {
|
||||
const auto last_request_time = GetTxRequestTime(inv.hash);
|
||||
if (last_request_time <= current_time - GETDATA_TX_INTERVAL) {
|
||||
LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId());
|
||||
vGetData.push_back(inv);
|
||||
if (vGetData.size() >= MAX_GETDATA_SZ) {
|
||||
connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
|
||||
vGetData.clear();
|
||||
}
|
||||
UpdateTxRequestTime(inv.hash, nNow);
|
||||
state.m_tx_download.m_tx_in_flight.emplace(inv.hash, nNow);
|
||||
UpdateTxRequestTime(inv.hash, current_time);
|
||||
state.m_tx_download.m_tx_in_flight.emplace(inv.hash, current_time);
|
||||
} else {
|
||||
// This transaction is in flight from someone else; queue
|
||||
// up processing to happen after the download times out
|
||||
// (with a slight delay for inbound peers, to prefer
|
||||
// requests to outbound peers).
|
||||
int64_t next_process_time = CalculateTxGetDataTime(txid, nNow, !state.fPreferredDownload);
|
||||
const auto next_process_time = CalculateTxGetDataTime(txid, current_time, !state.fPreferredDownload);
|
||||
tx_process_time.emplace(next_process_time, txid);
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -667,6 +667,11 @@ uint64_t GetRand(uint64_t nMax) noexcept
|
|||
return FastRandomContext(g_mock_deterministic_tests).randrange(nMax);
|
||||
}
|
||||
|
||||
std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) noexcept
|
||||
{
|
||||
return std::chrono::microseconds{GetRand(duration_max.count())};
|
||||
}
|
||||
|
||||
int GetRandInt(int nMax) noexcept
|
||||
{
|
||||
return GetRand(nMax);
|
||||
|
|
|
@ -10,7 +10,8 @@
|
|||
#include <crypto/common.h>
|
||||
#include <uint256.h>
|
||||
|
||||
#include <stdint.h>
|
||||
#include <chrono> // For std::chrono::microseconds
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
|
||||
/**
|
||||
|
@ -69,6 +70,7 @@
|
|||
*/
|
||||
void GetRandBytes(unsigned char* buf, int num) noexcept;
|
||||
uint64_t GetRand(uint64_t nMax) noexcept;
|
||||
std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) noexcept;
|
||||
int GetRandInt(int nMax) noexcept;
|
||||
uint256 GetRandHash() noexcept;
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue