mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-02-21 14:34:49 +01:00
Merge #18038: P2P: Mempool tracks locally submitted transactions to improve wallet privacy
50fc4df6c4
[mempool] Persist unbroadcast set to mempool.dat (Amiti Uttarwar)297a178536
[test] Integration tests for unbroadcast functionality (Amiti Uttarwar)6851502472
[refactor/test] Extract P2PTxInvStore into test framework (Amiti Uttarwar)dc1da48dc5
[wallet] Update the rebroadcast frequency to be ~1/day. (Amiti Uttarwar)e25e42f20a
[p2p] Reattempt initial send of unbroadcast transactions (Amiti Uttarwar)7e93eecce3
[util] Add method that returns random time in milliseconds (Amiti Uttarwar)89eeb4a333
[mempool] Track "unbroadcast" transactions (Amiti Uttarwar) Pull request description: This PR introduces mempool tracking of unbroadcast transactions and periodic reattempts at initial broadcast. This is a part of the rebroadcast project, and a standalone privacy win. The current rebroadcast logic is terrible for privacy because 1. only the source wallet rebroadcasts transactions and 2. it does so quite frequently. In the current system, if a user submits a transaction that does not immediately get broadcast to the network (eg. they are offline), this "rebroadcast" behavior is the safety net that can actually serve as the initial broadcast. So, keeping the attempts frequent is important for initial delivery within a reasonable timespan. This PR aims to improve # 2 by reducing the wallet rebroadcast frequency to ~1/day from ~1/15 min. It achieves this by separating the notion of initial broadcast from rebroadcasts. With these changes, the mempool tracks locally submitted transactions & periodically reattempts initial broadcast. Transactions submitted via the wallet or RPC are added to an "unbroadcast" set & are removed when a peer sends a `getdata` request, or the transaction is removed from the mempool. Every 10-15 minutes, the node reattempts an initial broadcast. This enables reducing the wallet rebroadcast frequency while ensuring the transactions will be propagated to the network. For privacy improvements around # 1, please see #16698. Thank you to gmaxwell for the idea of how to break out this subset of functionality (https://github.com/bitcoin/bitcoin/pull/16698#issuecomment-571399346) ACKs for top commit: fjahr: Code review ACK50fc4df6c4
MarcoFalke: ACK50fc4df6c4
, I think this is ready for merge now 👻 amitiuttarwar: The current tip `50fc4df` currently has 6 ACKs on it, so I've opened #18807 to address the last bits. jnewbery: utACK50fc4df6c4
. ariard: Code Review ACK50fc4df
(minor points no need to invalid other ACKs) robot-visions: ACK50fc4df6c4
sipa: utACK50fc4df6c4
naumenkogs: utACK50fc4df
Tree-SHA512: 2dd935d645d5e209f8abf87bfaa3ef0e4492705ce7e89ea64279cb27ffd37f4727fa94ad62d41be331177332f8edbebf3c7f4972f8cda10dd951b80a28ab3c0f
This commit is contained in:
commit
0ef0d33f75
14 changed files with 248 additions and 31 deletions
|
@ -810,6 +810,19 @@ void PeerLogicValidation::InitializeNode(CNode *pnode) {
|
|||
PushNodeVersion(pnode, connman, GetTime());
|
||||
}
|
||||
|
||||
void PeerLogicValidation::ReattemptInitialBroadcast(CScheduler& scheduler) const
|
||||
{
|
||||
std::set<uint256> unbroadcast_txids = m_mempool.GetUnbroadcastTxs();
|
||||
|
||||
for (const uint256& txid : unbroadcast_txids) {
|
||||
RelayTransaction(txid, *connman);
|
||||
}
|
||||
|
||||
// schedule next run for 10-15 minutes in the future
|
||||
const std::chrono::milliseconds delta = std::chrono::minutes{10} + GetRandMillis(std::chrono::minutes{5});
|
||||
scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta);
|
||||
}
|
||||
|
||||
void PeerLogicValidation::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) {
|
||||
fUpdateConnectionTime = false;
|
||||
LOCK(cs_main);
|
||||
|
@ -1159,6 +1172,10 @@ PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, BanMan* banman, CS
|
|||
// timer.
|
||||
static_assert(EXTRA_PEER_CHECK_INTERVAL < STALE_CHECK_INTERVAL, "peer eviction timer should be less than stale tip check timer");
|
||||
scheduler.scheduleEvery([this, consensusParams] { this->CheckForStaleTipAndEvictPeers(consensusParams); }, std::chrono::seconds{EXTRA_PEER_CHECK_INTERVAL});
|
||||
|
||||
// schedule next run for 10-15 minutes in the future
|
||||
const std::chrono::milliseconds delta = std::chrono::minutes{10} + GetRandMillis(std::chrono::minutes{5});
|
||||
scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1587,7 +1604,7 @@ void static ProcessGetBlockData(CNode* pfrom, const CChainParams& chainparams, c
|
|||
}
|
||||
}
|
||||
|
||||
void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnman* connman, const CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) LOCKS_EXCLUDED(cs_main)
|
||||
void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnman* connman, CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) LOCKS_EXCLUDED(cs_main)
|
||||
{
|
||||
AssertLockNotHeld(cs_main);
|
||||
|
||||
|
@ -1636,7 +1653,13 @@ void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnm
|
|||
push = true;
|
||||
}
|
||||
}
|
||||
if (!push) {
|
||||
|
||||
if (push) {
|
||||
// We interpret fulfilling a GETDATA for a transaction as a
|
||||
// successful initial broadcast and remove it from our
|
||||
// unbroadcast set.
|
||||
mempool.RemoveUnbroadcastTx(inv.hash);
|
||||
} else {
|
||||
vNotFound.push_back(inv);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -76,6 +76,8 @@ public:
|
|||
void CheckForStaleTipAndEvictPeers(const Consensus::Params &consensusParams);
|
||||
/** If we have extra outbound peers, try to disconnect the one with the oldest block announcement */
|
||||
void EvictExtraOutboundPeers(int64_t time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
|
||||
/** Retrieve unbroadcast transactions from the mempool and reattempt sending to peers */
|
||||
void ReattemptInitialBroadcast(CScheduler& scheduler) const;
|
||||
|
||||
private:
|
||||
int64_t m_stale_tip_check_time; //!< Next time to check for stale tip
|
||||
|
|
|
@ -78,6 +78,10 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t
|
|||
}
|
||||
|
||||
if (relay) {
|
||||
// the mempool tracks locally submitted transactions to make a
|
||||
// best-effort of initial broadcast
|
||||
node.mempool->AddUnbroadcastTx(hashTx);
|
||||
|
||||
RelayTransaction(hashTx, *node.connman);
|
||||
}
|
||||
|
||||
|
|
|
@ -592,6 +592,11 @@ std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max)
|
|||
return std::chrono::microseconds{GetRand(duration_max.count())};
|
||||
}
|
||||
|
||||
std::chrono::milliseconds GetRandMillis(std::chrono::milliseconds duration_max) noexcept
|
||||
{
|
||||
return std::chrono::milliseconds{GetRand(duration_max.count())};
|
||||
}
|
||||
|
||||
int GetRandInt(int nMax) noexcept
|
||||
{
|
||||
return GetRand(nMax);
|
||||
|
|
|
@ -69,6 +69,7 @@
|
|||
void GetRandBytes(unsigned char* buf, int num) noexcept;
|
||||
uint64_t GetRand(uint64_t nMax) noexcept;
|
||||
std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) noexcept;
|
||||
std::chrono::milliseconds GetRandMillis(std::chrono::milliseconds duration_max) noexcept;
|
||||
int GetRandInt(int nMax) noexcept;
|
||||
uint256 GetRandHash() noexcept;
|
||||
|
||||
|
|
|
@ -417,6 +417,8 @@ void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason)
|
|||
for (const CTxIn& txin : it->GetTx().vin)
|
||||
mapNextTx.erase(txin.prevout);
|
||||
|
||||
RemoveUnbroadcastTx(hash, true /* add logging because unchecked */ );
|
||||
|
||||
if (vTxHashes.size() > 1) {
|
||||
vTxHashes[it->vTxHashesIdx] = std::move(vTxHashes.back());
|
||||
vTxHashes[it->vTxHashesIdx].second->vTxHashesIdx = it->vTxHashesIdx;
|
||||
|
@ -919,6 +921,15 @@ size_t CTxMemPool::DynamicMemoryUsage() const {
|
|||
return memusage::MallocUsage(sizeof(CTxMemPoolEntry) + 12 * sizeof(void*)) * mapTx.size() + memusage::DynamicUsage(mapNextTx) + memusage::DynamicUsage(mapDeltas) + memusage::DynamicUsage(mapLinks) + memusage::DynamicUsage(vTxHashes) + cachedInnerUsage;
|
||||
}
|
||||
|
||||
void CTxMemPool::RemoveUnbroadcastTx(const uint256& txid, const bool unchecked) {
|
||||
LOCK(cs);
|
||||
|
||||
if (m_unbroadcast_txids.erase(txid))
|
||||
{
|
||||
LogPrint(BCLog::MEMPOOL, "Removed %i from set of unbroadcast txns%s\n", txid.GetHex(), (unchecked ? " before confirmation that txn was sent out" : ""));
|
||||
}
|
||||
}
|
||||
|
||||
void CTxMemPool::RemoveStaged(setEntries &stage, bool updateDescendants, MemPoolRemovalReason reason) {
|
||||
AssertLockHeld(cs);
|
||||
UpdateForRemoveFromMempool(stage, updateDescendants);
|
||||
|
|
|
@ -549,6 +549,9 @@ private:
|
|||
|
||||
std::vector<indexed_transaction_set::const_iterator> GetSortedDepthAndScore() const EXCLUSIVE_LOCKS_REQUIRED(cs);
|
||||
|
||||
/** track locally submitted transactions to periodically retry initial broadcast */
|
||||
std::set<uint256> m_unbroadcast_txids GUARDED_BY(cs);
|
||||
|
||||
public:
|
||||
indirectmap<COutPoint, const CTransaction*> mapNextTx GUARDED_BY(cs);
|
||||
std::map<uint256, CAmount> mapDeltas;
|
||||
|
@ -698,6 +701,21 @@ public:
|
|||
|
||||
size_t DynamicMemoryUsage() const;
|
||||
|
||||
/** Adds a transaction to the unbroadcast set */
|
||||
void AddUnbroadcastTx(const uint256& txid) {
|
||||
LOCK(cs);
|
||||
m_unbroadcast_txids.insert(txid);
|
||||
}
|
||||
|
||||
/** Removes a transaction from the unbroadcast set */
|
||||
void RemoveUnbroadcastTx(const uint256& txid, const bool unchecked = false);
|
||||
|
||||
/** Returns transactions in unbroadcast set */
|
||||
const std::set<uint256> GetUnbroadcastTxs() const {
|
||||
LOCK(cs);
|
||||
return m_unbroadcast_txids;
|
||||
}
|
||||
|
||||
private:
|
||||
/** UpdateForDescendants is used by UpdateTransactionsFromBlock to update
|
||||
* the descendants for a single transaction that has been added to the
|
||||
|
|
|
@ -4998,6 +4998,7 @@ bool LoadMempool(CTxMemPool& pool)
|
|||
int64_t expired = 0;
|
||||
int64_t failed = 0;
|
||||
int64_t already_there = 0;
|
||||
int64_t unbroadcast = 0;
|
||||
int64_t nNow = GetTime();
|
||||
|
||||
try {
|
||||
|
@ -5051,12 +5052,21 @@ bool LoadMempool(CTxMemPool& pool)
|
|||
for (const auto& i : mapDeltas) {
|
||||
pool.PrioritiseTransaction(i.first, i.second);
|
||||
}
|
||||
|
||||
std::set<uint256> unbroadcast_txids;
|
||||
file >> unbroadcast_txids;
|
||||
unbroadcast = unbroadcast_txids.size();
|
||||
|
||||
for (const auto& txid : unbroadcast_txids) {
|
||||
pool.AddUnbroadcastTx(txid);
|
||||
}
|
||||
|
||||
} catch (const std::exception& e) {
|
||||
LogPrintf("Failed to deserialize mempool data on disk: %s. Continuing anyway.\n", e.what());
|
||||
return false;
|
||||
}
|
||||
|
||||
LogPrintf("Imported mempool transactions from disk: %i succeeded, %i failed, %i expired, %i already there\n", count, failed, expired, already_there);
|
||||
LogPrintf("Imported mempool transactions from disk: %i succeeded, %i failed, %i expired, %i already there, %i waiting for initial broadcast\n", count, failed, expired, already_there, unbroadcast);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -5066,6 +5076,7 @@ bool DumpMempool(const CTxMemPool& pool)
|
|||
|
||||
std::map<uint256, CAmount> mapDeltas;
|
||||
std::vector<TxMempoolInfo> vinfo;
|
||||
std::set<uint256> unbroadcast_txids;
|
||||
|
||||
static Mutex dump_mutex;
|
||||
LOCK(dump_mutex);
|
||||
|
@ -5076,6 +5087,7 @@ bool DumpMempool(const CTxMemPool& pool)
|
|||
mapDeltas[i.first] = i.second;
|
||||
}
|
||||
vinfo = pool.infoAll();
|
||||
unbroadcast_txids = pool.GetUnbroadcastTxs();
|
||||
}
|
||||
|
||||
int64_t mid = GetTimeMicros();
|
||||
|
@ -5100,6 +5112,10 @@ bool DumpMempool(const CTxMemPool& pool)
|
|||
}
|
||||
|
||||
file << mapDeltas;
|
||||
|
||||
LogPrintf("Writing %d unbroadcast transactions to disk.\n", unbroadcast_txids.size());
|
||||
file << unbroadcast_txids;
|
||||
|
||||
if (!FileCommit(file.Get()))
|
||||
throw std::runtime_error("FileCommit failed");
|
||||
file.fclose();
|
||||
|
|
|
@ -1993,7 +1993,8 @@ void CWallet::ResendWalletTransactions()
|
|||
// that these are our transactions.
|
||||
if (GetTime() < nNextResend || !fBroadcastTransactions) return;
|
||||
bool fFirst = (nNextResend == 0);
|
||||
nNextResend = GetTime() + GetRand(30 * 60);
|
||||
// resend 12-36 hours from now, ~1 day on average.
|
||||
nNextResend = GetTime() + (12 * 60 * 60) + GetRand(24 * 60 * 60);
|
||||
if (fFirst) return;
|
||||
|
||||
// Only do it if there's been a new block since last time
|
||||
|
|
|
@ -40,10 +40,13 @@ import os
|
|||
import time
|
||||
|
||||
from test_framework.test_framework import BitcoinTestFramework
|
||||
from test_framework.mininode import P2PTxInvStore
|
||||
from test_framework.util import (
|
||||
assert_equal,
|
||||
assert_greater_than_or_equal,
|
||||
assert_raises_rpc_error,
|
||||
connect_nodes,
|
||||
disconnect_nodes,
|
||||
wait_until,
|
||||
)
|
||||
|
||||
|
@ -80,6 +83,11 @@ class MempoolPersistTest(BitcoinTestFramework):
|
|||
assert_greater_than_or_equal(tx_creation_time, tx_creation_time_lower)
|
||||
assert_greater_than_or_equal(tx_creation_time_higher, tx_creation_time)
|
||||
|
||||
# disconnect nodes & make a txn that remains in the unbroadcast set.
|
||||
disconnect_nodes(self.nodes[0], 2)
|
||||
self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), Decimal("12"))
|
||||
connect_nodes(self.nodes[0], 2)
|
||||
|
||||
self.log.debug("Stop-start the nodes. Verify that node0 has the transactions in its mempool and node1 does not. Verify that node2 calculates its balance correctly after loading wallet transactions.")
|
||||
self.stop_nodes()
|
||||
# Give this node a head-start, so we can be "extra-sure" that it didn't load anything later
|
||||
|
@ -89,7 +97,7 @@ class MempoolPersistTest(BitcoinTestFramework):
|
|||
self.start_node(2)
|
||||
wait_until(lambda: self.nodes[0].getmempoolinfo()["loaded"], timeout=1)
|
||||
wait_until(lambda: self.nodes[2].getmempoolinfo()["loaded"], timeout=1)
|
||||
assert_equal(len(self.nodes[0].getrawmempool()), 5)
|
||||
assert_equal(len(self.nodes[0].getrawmempool()), 6)
|
||||
assert_equal(len(self.nodes[2].getrawmempool()), 5)
|
||||
# The others have loaded their mempool. If node_1 loaded anything, we'd probably notice by now:
|
||||
assert_equal(len(self.nodes[1].getrawmempool()), 0)
|
||||
|
@ -105,9 +113,10 @@ class MempoolPersistTest(BitcoinTestFramework):
|
|||
self.nodes[2].syncwithvalidationinterfacequeue() # Flush mempool to wallet
|
||||
assert_equal(node2_balance, self.nodes[2].getbalance())
|
||||
|
||||
# start node0 with wallet disabled so wallet transactions don't get resubmitted
|
||||
self.log.debug("Stop-start node0 with -persistmempool=0. Verify that it doesn't load its mempool.dat file.")
|
||||
self.stop_nodes()
|
||||
self.start_node(0, extra_args=["-persistmempool=0"])
|
||||
self.start_node(0, extra_args=["-persistmempool=0", "-disablewallet"])
|
||||
wait_until(lambda: self.nodes[0].getmempoolinfo()["loaded"])
|
||||
assert_equal(len(self.nodes[0].getrawmempool()), 0)
|
||||
|
||||
|
@ -115,7 +124,7 @@ class MempoolPersistTest(BitcoinTestFramework):
|
|||
self.stop_nodes()
|
||||
self.start_node(0)
|
||||
wait_until(lambda: self.nodes[0].getmempoolinfo()["loaded"])
|
||||
assert_equal(len(self.nodes[0].getrawmempool()), 5)
|
||||
assert_equal(len(self.nodes[0].getrawmempool()), 6)
|
||||
|
||||
mempooldat0 = os.path.join(self.nodes[0].datadir, self.chain, 'mempool.dat')
|
||||
mempooldat1 = os.path.join(self.nodes[1].datadir, self.chain, 'mempool.dat')
|
||||
|
@ -124,12 +133,12 @@ class MempoolPersistTest(BitcoinTestFramework):
|
|||
self.nodes[0].savemempool()
|
||||
assert os.path.isfile(mempooldat0)
|
||||
|
||||
self.log.debug("Stop nodes, make node1 use mempool.dat from node0. Verify it has 5 transactions")
|
||||
self.log.debug("Stop nodes, make node1 use mempool.dat from node0. Verify it has 6 transactions")
|
||||
os.rename(mempooldat0, mempooldat1)
|
||||
self.stop_nodes()
|
||||
self.start_node(1, extra_args=[])
|
||||
wait_until(lambda: self.nodes[1].getmempoolinfo()["loaded"])
|
||||
assert_equal(len(self.nodes[1].getrawmempool()), 5)
|
||||
assert_equal(len(self.nodes[1].getrawmempool()), 6)
|
||||
|
||||
self.log.debug("Prevent bitcoind from writing mempool.dat to disk. Verify that `savemempool` fails")
|
||||
# to test the exception we are creating a tmp folder called mempool.dat.new
|
||||
|
@ -139,6 +148,27 @@ class MempoolPersistTest(BitcoinTestFramework):
|
|||
assert_raises_rpc_error(-1, "Unable to dump mempool to disk", self.nodes[1].savemempool)
|
||||
os.rmdir(mempooldotnew1)
|
||||
|
||||
self.test_persist_unbroadcast()
|
||||
|
||||
def test_persist_unbroadcast(self):
|
||||
node0 = self.nodes[0]
|
||||
self.start_node(0)
|
||||
|
||||
# clear out mempool
|
||||
node0.generate(1)
|
||||
|
||||
# disconnect nodes to make a txn that remains in the unbroadcast set.
|
||||
disconnect_nodes(node0, 1)
|
||||
node0.sendtoaddress(self.nodes[1].getnewaddress(), Decimal("12"))
|
||||
|
||||
# shutdown, then startup with wallet disabled
|
||||
self.stop_nodes()
|
||||
self.start_node(0, extra_args=["-disablewallet"])
|
||||
|
||||
# check that txn gets broadcast due to unbroadcast logic
|
||||
conn = node0.add_p2p_connection(P2PTxInvStore())
|
||||
node0.mockscheduler(16*60) # 15 min + 1 for buffer
|
||||
wait_until(lambda: len(conn.get_invs()) == 1)
|
||||
|
||||
if __name__ == '__main__':
|
||||
MempoolPersistTest().main()
|
||||
|
|
99
test/functional/mempool_unbroadcast.py
Executable file
99
test/functional/mempool_unbroadcast.py
Executable file
|
@ -0,0 +1,99 @@
|
|||
#!/usr/bin/env python3
|
||||
# Copyright (c) 2017-2020 The Bitcoin Core developers
|
||||
# Distributed under the MIT software license, see the accompanying
|
||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
"""Test that the mempool ensures transaction delivery by periodically sending
|
||||
to peers until a GETDATA is received."""
|
||||
|
||||
import time
|
||||
|
||||
from test_framework.mininode import P2PTxInvStore
|
||||
from test_framework.test_framework import BitcoinTestFramework
|
||||
from test_framework.util import (
|
||||
assert_equal,
|
||||
connect_nodes,
|
||||
create_confirmed_utxos,
|
||||
disconnect_nodes,
|
||||
)
|
||||
|
||||
|
||||
class MempoolUnbroadcastTest(BitcoinTestFramework):
|
||||
def set_test_params(self):
|
||||
self.num_nodes = 2
|
||||
|
||||
def skip_test_if_missing_module(self):
|
||||
self.skip_if_no_wallet()
|
||||
|
||||
def run_test(self):
|
||||
self.test_broadcast()
|
||||
self.test_txn_removal()
|
||||
|
||||
def test_broadcast(self):
|
||||
self.log.info("Test that mempool reattempts delivery of locally submitted transaction")
|
||||
node = self.nodes[0]
|
||||
|
||||
min_relay_fee = node.getnetworkinfo()["relayfee"]
|
||||
utxos = create_confirmed_utxos(min_relay_fee, node, 10)
|
||||
|
||||
disconnect_nodes(node, 1)
|
||||
|
||||
self.log.info("Generate transactions that only node 0 knows about")
|
||||
|
||||
# generate a wallet txn
|
||||
addr = node.getnewaddress()
|
||||
wallet_tx_hsh = node.sendtoaddress(addr, 0.0001)
|
||||
|
||||
# generate a txn using sendrawtransaction
|
||||
us0 = utxos.pop()
|
||||
inputs = [{"txid": us0["txid"], "vout": us0["vout"]}]
|
||||
outputs = {addr: 0.0001}
|
||||
tx = node.createrawtransaction(inputs, outputs)
|
||||
node.settxfee(min_relay_fee)
|
||||
txF = node.fundrawtransaction(tx)
|
||||
txFS = node.signrawtransactionwithwallet(txF["hex"])
|
||||
rpc_tx_hsh = node.sendrawtransaction(txFS["hex"])
|
||||
|
||||
# check that second node doesn't have these two txns
|
||||
mempool = self.nodes[1].getrawmempool()
|
||||
assert rpc_tx_hsh not in mempool
|
||||
assert wallet_tx_hsh not in mempool
|
||||
|
||||
# ensure that unbroadcast txs are persisted to mempool.dat
|
||||
self.restart_node(0)
|
||||
|
||||
self.log.info("Reconnect nodes & check if they are sent to node 1")
|
||||
connect_nodes(node, 1)
|
||||
|
||||
# fast forward into the future & ensure that the second node has the txns
|
||||
node.mockscheduler(15 * 60) # 15 min in seconds
|
||||
self.sync_mempools(timeout=30)
|
||||
mempool = self.nodes[1].getrawmempool()
|
||||
assert rpc_tx_hsh in mempool
|
||||
assert wallet_tx_hsh in mempool
|
||||
|
||||
self.log.info("Add another connection & ensure transactions aren't broadcast again")
|
||||
|
||||
conn = node.add_p2p_connection(P2PTxInvStore())
|
||||
node.mockscheduler(15 * 60)
|
||||
time.sleep(5)
|
||||
assert_equal(len(conn.get_invs()), 0)
|
||||
|
||||
def test_txn_removal(self):
|
||||
self.log.info("Test that transactions removed from mempool are removed from unbroadcast set")
|
||||
node = self.nodes[0]
|
||||
disconnect_nodes(node, 1)
|
||||
node.disconnect_p2ps
|
||||
|
||||
# since the node doesn't have any connections, it will not receive
|
||||
# any GETDATAs & thus the transaction will remain in the unbroadcast set.
|
||||
addr = node.getnewaddress()
|
||||
txhsh = node.sendtoaddress(addr, 0.0001)
|
||||
|
||||
# check transaction was removed from unbroadcast set due to presence in
|
||||
# a block
|
||||
removal_reason = "Removed {} from set of unbroadcast txns before confirmation that txn was sent out".format(txhsh)
|
||||
with node.assert_debug_log([removal_reason]):
|
||||
node.generate(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
MempoolUnbroadcastTest().main()
|
|
@ -12,7 +12,10 @@ found in the mini-node branch of http://github.com/jgarzik/pynode.
|
|||
P2PConnection: A low-level connection object to a node's P2P interface
|
||||
P2PInterface: A high-level interface object for communicating to a node over P2P
|
||||
P2PDataStore: A p2p interface class that keeps a store of transactions and blocks
|
||||
and can respond correctly to getdata and getheaders messages"""
|
||||
and can respond correctly to getdata and getheaders messages
|
||||
P2PTxInvStore: A p2p interface class that inherits from P2PDataStore, and keeps
|
||||
a count of how many times each txid has been announced."""
|
||||
|
||||
import asyncio
|
||||
from collections import defaultdict
|
||||
from io import BytesIO
|
||||
|
@ -627,3 +630,20 @@ class P2PDataStore(P2PInterface):
|
|||
# Check that none of the txs are now in the mempool
|
||||
for tx in txs:
|
||||
assert tx.hash not in raw_mempool, "{} tx found in mempool".format(tx.hash)
|
||||
|
||||
class P2PTxInvStore(P2PInterface):
|
||||
"""A P2PInterface which stores a count of how many times each txid has been announced."""
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.tx_invs_received = defaultdict(int)
|
||||
|
||||
def on_inv(self, message):
|
||||
# Store how many times invs have been received for each tx.
|
||||
for i in message.inv:
|
||||
if i.type == MSG_TX:
|
||||
# save txid
|
||||
self.tx_invs_received[i.hash] += 1
|
||||
|
||||
def get_invs(self):
|
||||
with mininode_lock:
|
||||
return list(self.tx_invs_received.keys())
|
||||
|
|
|
@ -220,6 +220,7 @@ BASE_SCRIPTS = [
|
|||
'p2p_unrequested_blocks.py',
|
||||
'feature_includeconf.py',
|
||||
'feature_asmap.py',
|
||||
'mempool_unbroadcast.py',
|
||||
'rpc_deriveaddresses.py',
|
||||
'rpc_deriveaddresses.py --usecli',
|
||||
'rpc_scantxoutset.py',
|
||||
|
|
|
@ -3,29 +3,14 @@
|
|||
# Distributed under the MIT software license, see the accompanying
|
||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
"""Test that the wallet resends transactions periodically."""
|
||||
from collections import defaultdict
|
||||
import time
|
||||
|
||||
from test_framework.blocktools import create_block, create_coinbase
|
||||
from test_framework.messages import ToHex
|
||||
from test_framework.mininode import P2PInterface, mininode_lock
|
||||
from test_framework.mininode import P2PTxInvStore, mininode_lock
|
||||
from test_framework.test_framework import BitcoinTestFramework
|
||||
from test_framework.util import assert_equal, wait_until
|
||||
|
||||
|
||||
class P2PStoreTxInvs(P2PInterface):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.tx_invs_received = defaultdict(int)
|
||||
|
||||
def on_inv(self, message):
|
||||
# Store how many times invs have been received for each tx.
|
||||
for i in message.inv:
|
||||
if i.type == 1:
|
||||
# save txid
|
||||
self.tx_invs_received[i.hash] += 1
|
||||
|
||||
|
||||
class ResendWalletTransactionsTest(BitcoinTestFramework):
|
||||
def set_test_params(self):
|
||||
self.num_nodes = 1
|
||||
|
@ -36,7 +21,7 @@ class ResendWalletTransactionsTest(BitcoinTestFramework):
|
|||
def run_test(self):
|
||||
node = self.nodes[0] # alias
|
||||
|
||||
node.add_p2p_connection(P2PStoreTxInvs())
|
||||
node.add_p2p_connection(P2PTxInvStore())
|
||||
|
||||
self.log.info("Create a new transaction and wait until it's broadcast")
|
||||
txid = int(node.sendtoaddress(node.getnewaddress(), 1), 16)
|
||||
|
@ -51,7 +36,7 @@ class ResendWalletTransactionsTest(BitcoinTestFramework):
|
|||
wait_until(lambda: node.p2p.tx_invs_received[txid] >= 1, lock=mininode_lock)
|
||||
|
||||
# Add a second peer since txs aren't rebroadcast to the same peer (see filterInventoryKnown)
|
||||
node.add_p2p_connection(P2PStoreTxInvs())
|
||||
node.add_p2p_connection(P2PTxInvStore())
|
||||
|
||||
self.log.info("Create a block")
|
||||
# Create and submit a block without the transaction.
|
||||
|
@ -69,9 +54,10 @@ class ResendWalletTransactionsTest(BitcoinTestFramework):
|
|||
node.p2ps[1].sync_with_ping()
|
||||
assert_equal(node.p2ps[1].tx_invs_received[txid], 0)
|
||||
|
||||
self.log.info("Transaction should be rebroadcast after 30 minutes")
|
||||
# Use mocktime and give an extra 5 minutes to be sure.
|
||||
rebroadcast_time = int(time.time()) + 41 * 60
|
||||
self.log.info("Bump time & check that transaction is rebroadcast")
|
||||
# Transaction should be rebroadcast approximately 24 hours in the future,
|
||||
# but can range from 12-36. So bump 36 hours to be sure.
|
||||
rebroadcast_time = int(time.time()) + 36 * 60 * 60
|
||||
node.setmocktime(rebroadcast_time)
|
||||
wait_until(lambda: node.p2ps[1].tx_invs_received[txid] >= 1, lock=mininode_lock)
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue