Merge #20027: Use mockable time everywhere in net_processing

b6834e312a Avoid 'timing mishap' warnings when mocking (Pieter Wuille)
ec3916f40a Use mockable time everywhere in net_processing (Pieter Wuille)

Pull request description:

  The fact that net_processing uses a mix of mockable tand non-mockable time functions made it hard to write functional tests for #19988.

  I'm opening this as a separate PR as I believe it's independently useful. In some ways this doesn't go quite as far as it could, as there are now several data structures that could be converted to `std::chrono` types as well now. I haven't done that here, but I'm happy to reconsider that.

ACKs for top commit:
  MarcoFalke:
    ACK b6834e312a 🌶
  jnewbery:
    utACK b6834e312a
  naumenkogs:
    utACK b6834e3

Tree-SHA512: 6528a167c57926ca12894e0c476826411baf5de2f7b01c2125b97e5f710e620f427bbb13f72bdfc3de59072e56a9c1447bce832f41c725e00e81fea019518f0e
This commit is contained in:
MarcoFalke 2020-10-08 11:38:36 +02:00
commit 9dd4de2832
No known key found for this signature in database
GPG key ID: D2EA4850E7528B25
3 changed files with 34 additions and 39 deletions

View file

@ -582,7 +582,7 @@ static bool MarkBlockAsReceived(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs
} }
if (state->vBlocksInFlight.begin() == itInFlight->second.second) { if (state->vBlocksInFlight.begin() == itInFlight->second.second) {
// First block on the queue was received, update the start download time for the next one // First block on the queue was received, update the start download time for the next one
state->nDownloadingSince = std::max(state->nDownloadingSince, GetTimeMicros()); state->nDownloadingSince = std::max(state->nDownloadingSince, count_microseconds(GetTime<std::chrono::microseconds>()));
} }
state->vBlocksInFlight.erase(itInFlight->second.second); state->vBlocksInFlight.erase(itInFlight->second.second);
state->nBlocksInFlight--; state->nBlocksInFlight--;
@ -617,7 +617,7 @@ static bool MarkBlockAsInFlight(CTxMemPool& mempool, NodeId nodeid, const uint25
state->nBlocksInFlightValidHeaders += it->fValidatedHeaders; state->nBlocksInFlightValidHeaders += it->fValidatedHeaders;
if (state->nBlocksInFlight == 1) { if (state->nBlocksInFlight == 1) {
// We're starting a block download (batch) from this peer. // We're starting a block download (batch) from this peer.
state->nDownloadingSince = GetTimeMicros(); state->nDownloadingSince = GetTime<std::chrono::microseconds>().count();
} }
if (state->nBlocksInFlightValidHeaders == 1 && pindex != nullptr) { if (state->nBlocksInFlightValidHeaders == 1 && pindex != nullptr) {
nPeersWithValidatedDownloads++; nPeersWithValidatedDownloads++;
@ -3637,7 +3637,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
// Matching pong received, this ping is no longer outstanding // Matching pong received, this ping is no longer outstanding
bPingFinished = true; bPingFinished = true;
const auto ping_time = ping_end - pfrom.m_ping_start.load(); const auto ping_time = ping_end - pfrom.m_ping_start.load();
if (ping_time.count() > 0) { if (ping_time.count() >= 0) {
// Successful ping time measurement, replace previous // Successful ping time measurement, replace previous
pfrom.nPingUsecTime = count_microseconds(ping_time); pfrom.nPingUsecTime = count_microseconds(ping_time);
pfrom.nMinPingUsecTime = std::min(pfrom.nMinPingUsecTime.load(), count_microseconds(ping_time)); pfrom.nMinPingUsecTime = std::min(pfrom.nMinPingUsecTime.load(), count_microseconds(ping_time));
@ -4102,7 +4102,6 @@ bool PeerManager::SendMessages(CNode* pto)
CNodeState &state = *State(pto->GetId()); CNodeState &state = *State(pto->GetId());
// Address refresh broadcast // Address refresh broadcast
int64_t nNow = GetTimeMicros();
auto current_time = GetTime<std::chrono::microseconds>(); auto current_time = GetTime<std::chrono::microseconds>();
if (pto->RelayAddrsWithConn() && !::ChainstateActive().IsInitialBlockDownload() && pto->m_next_local_addr_send < current_time) { if (pto->RelayAddrsWithConn() && !::ChainstateActive().IsInitialBlockDownload() && pto->m_next_local_addr_send < current_time) {
@ -4148,7 +4147,7 @@ bool PeerManager::SendMessages(CNode* pto)
// Only actively request headers from a single peer, unless we're close to today. // Only actively request headers from a single peer, unless we're close to today.
if ((nSyncStarted == 0 && fFetch) || pindexBestHeader->GetBlockTime() > GetAdjustedTime() - 24 * 60 * 60) { if ((nSyncStarted == 0 && fFetch) || pindexBestHeader->GetBlockTime() > GetAdjustedTime() - 24 * 60 * 60) {
state.fSyncStarted = true; state.fSyncStarted = true;
state.nHeadersSyncTimeout = GetTimeMicros() + HEADERS_DOWNLOAD_TIMEOUT_BASE + HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER * (GetAdjustedTime() - pindexBestHeader->GetBlockTime())/(consensusParams.nPowTargetSpacing); state.nHeadersSyncTimeout = count_microseconds(current_time) + HEADERS_DOWNLOAD_TIMEOUT_BASE + HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER * (GetAdjustedTime() - pindexBestHeader->GetBlockTime())/(consensusParams.nPowTargetSpacing);
nSyncStarted++; nSyncStarted++;
const CBlockIndex *pindexStart = pindexBestHeader; const CBlockIndex *pindexStart = pindexBestHeader;
/* If possible, start at the block preceding the currently /* If possible, start at the block preceding the currently
@ -4329,7 +4328,7 @@ bool PeerManager::SendMessages(CNode* pto)
if (pto->m_tx_relay->nNextInvSend < current_time) { if (pto->m_tx_relay->nNextInvSend < current_time) {
fSendTrickle = true; fSendTrickle = true;
if (pto->IsInboundConn()) { if (pto->IsInboundConn()) {
pto->m_tx_relay->nNextInvSend = std::chrono::microseconds{m_connman.PoissonNextSendInbound(nNow, INVENTORY_BROADCAST_INTERVAL)}; pto->m_tx_relay->nNextInvSend = std::chrono::microseconds{m_connman.PoissonNextSendInbound(count_microseconds(current_time), INVENTORY_BROADCAST_INTERVAL)};
} else { } else {
// Use half the delay for outbound peers, as there is less privacy concern for them. // Use half the delay for outbound peers, as there is less privacy concern for them.
pto->m_tx_relay->nNextInvSend = PoissonNextSend(current_time, std::chrono::seconds{INVENTORY_BROADCAST_INTERVAL >> 1}); pto->m_tx_relay->nNextInvSend = PoissonNextSend(current_time, std::chrono::seconds{INVENTORY_BROADCAST_INTERVAL >> 1});
@ -4428,7 +4427,7 @@ bool PeerManager::SendMessages(CNode* pto)
nRelayedTransactions++; nRelayedTransactions++;
{ {
// Expire old relay messages // Expire old relay messages
while (!vRelayExpiration.empty() && vRelayExpiration.front().first < nNow) while (!vRelayExpiration.empty() && vRelayExpiration.front().first < count_microseconds(current_time))
{ {
mapRelay.erase(vRelayExpiration.front().second); mapRelay.erase(vRelayExpiration.front().second);
vRelayExpiration.pop_front(); vRelayExpiration.pop_front();
@ -4436,12 +4435,12 @@ bool PeerManager::SendMessages(CNode* pto)
auto ret = mapRelay.emplace(txid, std::move(txinfo.tx)); auto ret = mapRelay.emplace(txid, std::move(txinfo.tx));
if (ret.second) { if (ret.second) {
vRelayExpiration.emplace_back(nNow + std::chrono::microseconds{RELAY_TX_CACHE_TIME}.count(), ret.first); vRelayExpiration.emplace_back(count_microseconds(current_time + std::chrono::microseconds{RELAY_TX_CACHE_TIME}), ret.first);
} }
// Add wtxid-based lookup into mapRelay as well, so that peers can request by wtxid // Add wtxid-based lookup into mapRelay as well, so that peers can request by wtxid
auto ret2 = mapRelay.emplace(wtxid, ret.first->second); auto ret2 = mapRelay.emplace(wtxid, ret.first->second);
if (ret2.second) { if (ret2.second) {
vRelayExpiration.emplace_back(nNow + std::chrono::microseconds{RELAY_TX_CACHE_TIME}.count(), ret2.first); vRelayExpiration.emplace_back(count_microseconds(current_time + std::chrono::microseconds{RELAY_TX_CACHE_TIME}), ret2.first);
} }
} }
if (vInv.size() == MAX_INV_SZ) { if (vInv.size() == MAX_INV_SZ) {
@ -4466,10 +4465,7 @@ bool PeerManager::SendMessages(CNode* pto)
// Detect whether we're stalling // Detect whether we're stalling
current_time = GetTime<std::chrono::microseconds>(); current_time = GetTime<std::chrono::microseconds>();
// nNow is the current system time (GetTimeMicros is not mockable) and if (state.nStallingSince && state.nStallingSince < count_microseconds(current_time) - 1000000 * BLOCK_STALLING_TIMEOUT) {
// should be replaced by the mockable current_time eventually
nNow = GetTimeMicros();
if (state.nStallingSince && state.nStallingSince < nNow - 1000000 * BLOCK_STALLING_TIMEOUT) {
// Stalling only triggers when the block download window cannot move. During normal steady state, // Stalling only triggers when the block download window cannot move. During normal steady state,
// the download window should be much larger than the to-be-downloaded set of blocks, so disconnection // the download window should be much larger than the to-be-downloaded set of blocks, so disconnection
// should only happen during initial block download. // should only happen during initial block download.
@ -4485,7 +4481,7 @@ bool PeerManager::SendMessages(CNode* pto)
if (state.vBlocksInFlight.size() > 0) { if (state.vBlocksInFlight.size() > 0) {
QueuedBlock &queuedBlock = state.vBlocksInFlight.front(); QueuedBlock &queuedBlock = state.vBlocksInFlight.front();
int nOtherPeersWithValidatedDownloads = nPeersWithValidatedDownloads - (state.nBlocksInFlightValidHeaders > 0); int nOtherPeersWithValidatedDownloads = nPeersWithValidatedDownloads - (state.nBlocksInFlightValidHeaders > 0);
if (nNow > state.nDownloadingSince + consensusParams.nPowTargetSpacing * (BLOCK_DOWNLOAD_TIMEOUT_BASE + BLOCK_DOWNLOAD_TIMEOUT_PER_PEER * nOtherPeersWithValidatedDownloads)) { if (count_microseconds(current_time) > state.nDownloadingSince + consensusParams.nPowTargetSpacing * (BLOCK_DOWNLOAD_TIMEOUT_BASE + BLOCK_DOWNLOAD_TIMEOUT_PER_PEER * nOtherPeersWithValidatedDownloads)) {
LogPrintf("Timeout downloading block %s from peer=%d, disconnecting\n", queuedBlock.hash.ToString(), pto->GetId()); LogPrintf("Timeout downloading block %s from peer=%d, disconnecting\n", queuedBlock.hash.ToString(), pto->GetId());
pto->fDisconnect = true; pto->fDisconnect = true;
return true; return true;
@ -4495,7 +4491,7 @@ bool PeerManager::SendMessages(CNode* pto)
if (state.fSyncStarted && state.nHeadersSyncTimeout < std::numeric_limits<int64_t>::max()) { if (state.fSyncStarted && state.nHeadersSyncTimeout < std::numeric_limits<int64_t>::max()) {
// Detect whether this is a stalling initial-headers-sync peer // Detect whether this is a stalling initial-headers-sync peer
if (pindexBestHeader->GetBlockTime() <= GetAdjustedTime() - 24 * 60 * 60) { if (pindexBestHeader->GetBlockTime() <= GetAdjustedTime() - 24 * 60 * 60) {
if (nNow > state.nHeadersSyncTimeout && nSyncStarted == 1 && (nPreferredDownload - state.fPreferredDownload >= 1)) { if (count_microseconds(current_time) > state.nHeadersSyncTimeout && nSyncStarted == 1 && (nPreferredDownload - state.fPreferredDownload >= 1)) {
// Disconnect a peer (without the noban permission) if it is our only sync peer, // Disconnect a peer (without the noban permission) if it is our only sync peer,
// and we have others we could be using instead. // and we have others we could be using instead.
// Note: If all our peers are inbound, then we won't // Note: If all our peers are inbound, then we won't
@ -4545,7 +4541,7 @@ bool PeerManager::SendMessages(CNode* pto)
} }
if (state.nBlocksInFlight == 0 && staller != -1) { if (state.nBlocksInFlight == 0 && staller != -1) {
if (State(staller)->nStallingSince == 0) { if (State(staller)->nStallingSince == 0) {
State(staller)->nStallingSince = nNow; State(staller)->nStallingSince = count_microseconds(current_time);
LogPrint(BCLog::NET, "Stall started peer=%d\n", staller); LogPrint(BCLog::NET, "Stall started peer=%d\n", staller);
} }
} }
@ -4629,7 +4625,6 @@ bool PeerManager::SendMessages(CNode* pto)
!pto->HasPermission(PF_FORCERELAY) // peers with the forcerelay permission should not filter txs to us !pto->HasPermission(PF_FORCERELAY) // peers with the forcerelay permission should not filter txs to us
) { ) {
CAmount currentFilter = m_mempool.GetMinFee(gArgs.GetArg("-maxmempool", DEFAULT_MAX_MEMPOOL_SIZE) * 1000000).GetFeePerK(); CAmount currentFilter = m_mempool.GetMinFee(gArgs.GetArg("-maxmempool", DEFAULT_MAX_MEMPOOL_SIZE) * 1000000).GetFeePerK();
int64_t timeNow = GetTimeMicros();
static FeeFilterRounder g_filter_rounder{CFeeRate{DEFAULT_MIN_RELAY_TX_FEE}}; static FeeFilterRounder g_filter_rounder{CFeeRate{DEFAULT_MIN_RELAY_TX_FEE}};
if (m_chainman.ActiveChainstate().IsInitialBlockDownload()) { if (m_chainman.ActiveChainstate().IsInitialBlockDownload()) {
// Received tx-inv messages are discarded when the active // Received tx-inv messages are discarded when the active
@ -4640,10 +4635,10 @@ bool PeerManager::SendMessages(CNode* pto)
if (pto->m_tx_relay->lastSentFeeFilter == MAX_FILTER) { if (pto->m_tx_relay->lastSentFeeFilter == MAX_FILTER) {
// Send the current filter if we sent MAX_FILTER previously // Send the current filter if we sent MAX_FILTER previously
// and made it out of IBD. // and made it out of IBD.
pto->m_tx_relay->nextSendTimeFeeFilter = timeNow - 1; pto->m_tx_relay->nextSendTimeFeeFilter = count_microseconds(current_time) - 1;
} }
} }
if (timeNow > pto->m_tx_relay->nextSendTimeFeeFilter) { if (count_microseconds(current_time) > pto->m_tx_relay->nextSendTimeFeeFilter) {
CAmount filterToSend = g_filter_rounder.round(currentFilter); CAmount filterToSend = g_filter_rounder.round(currentFilter);
// We always have a fee filter of at least minRelayTxFee // We always have a fee filter of at least minRelayTxFee
filterToSend = std::max(filterToSend, ::minRelayTxFee.GetFeePerK()); filterToSend = std::max(filterToSend, ::minRelayTxFee.GetFeePerK());
@ -4651,13 +4646,13 @@ bool PeerManager::SendMessages(CNode* pto)
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::FEEFILTER, filterToSend)); m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::FEEFILTER, filterToSend));
pto->m_tx_relay->lastSentFeeFilter = filterToSend; pto->m_tx_relay->lastSentFeeFilter = filterToSend;
} }
pto->m_tx_relay->nextSendTimeFeeFilter = PoissonNextSend(timeNow, AVG_FEEFILTER_BROADCAST_INTERVAL); pto->m_tx_relay->nextSendTimeFeeFilter = PoissonNextSend(count_microseconds(current_time), AVG_FEEFILTER_BROADCAST_INTERVAL);
} }
// If the fee filter has changed substantially and it's still more than MAX_FEEFILTER_CHANGE_DELAY // If the fee filter has changed substantially and it's still more than MAX_FEEFILTER_CHANGE_DELAY
// until scheduled broadcast, then move the broadcast to within MAX_FEEFILTER_CHANGE_DELAY. // until scheduled broadcast, then move the broadcast to within MAX_FEEFILTER_CHANGE_DELAY.
else if (timeNow + MAX_FEEFILTER_CHANGE_DELAY * 1000000 < pto->m_tx_relay->nextSendTimeFeeFilter && else if (count_microseconds(current_time) + MAX_FEEFILTER_CHANGE_DELAY * 1000000 < pto->m_tx_relay->nextSendTimeFeeFilter &&
(currentFilter < 3 * pto->m_tx_relay->lastSentFeeFilter / 4 || currentFilter > 4 * pto->m_tx_relay->lastSentFeeFilter / 3)) { (currentFilter < 3 * pto->m_tx_relay->lastSentFeeFilter / 4 || currentFilter > 4 * pto->m_tx_relay->lastSentFeeFilter / 3)) {
pto->m_tx_relay->nextSendTimeFeeFilter = timeNow + GetRandInt(MAX_FEEFILTER_CHANGE_DELAY) * 1000000; pto->m_tx_relay->nextSendTimeFeeFilter = count_microseconds(current_time) + GetRandInt(MAX_FEEFILTER_CHANGE_DELAY) * 1000000;
} }
} }
} // release cs_main } // release cs_main

View file

@ -158,23 +158,19 @@ class TxDownloadTest(BitcoinTestFramework):
self.nodes[0].p2ps[0].send_message(msg_notfound(vec=[CInv(MSG_TX, 1)])) self.nodes[0].p2ps[0].send_message(msg_notfound(vec=[CInv(MSG_TX, 1)]))
def run_test(self): def run_test(self):
# Run each test against new bitcoind instances, as setting mocktimes has long-term effects on when
# the next trickle relay event happens.
for test in [self.test_spurious_notfound, self.test_in_flight_max, self.test_inv_block, self.test_tx_requests]:
self.stop_nodes()
self.start_nodes()
self.connect_nodes(1, 0)
# Setup the p2p connections # Setup the p2p connections
self.peers = [] self.peers = []
for node in self.nodes: for node in self.nodes:
for _ in range(NUM_INBOUND): for _ in range(NUM_INBOUND):
self.peers.append(node.add_p2p_connection(TestP2PConn())) self.peers.append(node.add_p2p_connection(TestP2PConn()))
self.log.info("Nodes are setup with {} incoming connections each".format(NUM_INBOUND)) self.log.info("Nodes are setup with {} incoming connections each".format(NUM_INBOUND))
test()
self.test_spurious_notfound()
# Test the in-flight max first, because we want no transactions in
# flight ahead of this test.
self.test_in_flight_max()
self.test_inv_block()
self.test_tx_requests()
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -64,6 +64,10 @@ class ResendWalletTransactionsTest(BitcoinTestFramework):
# Transaction should be rebroadcast approximately 24 hours in the future, # Transaction should be rebroadcast approximately 24 hours in the future,
# but can range from 12-36. So bump 36 hours to be sure. # but can range from 12-36. So bump 36 hours to be sure.
node.setmocktime(now + 36 * 60 * 60) node.setmocktime(now + 36 * 60 * 60)
# Tell scheduler to call MaybeResendWalletTxn now.
node.mockscheduler(1)
# Give some time for trickle to occur
node.setmocktime(now + 36 * 60 * 60 + 600)
peer_second.wait_for_broadcast([txid]) peer_second.wait_for_broadcast([txid])