Merge bitcoin/bitcoin#27460: rpc: Add importmempool RPC

fa776e61cd Add importmempool RPC (MarcoFalke)
fa20d734a2 refactor: Add and use kernel::ImportMempoolOptions (MarcoFalke)
fa8866990d doc: Clarify the getmempoolinfo.loaded RPC field documentation (MarcoFalke)
6888886cec Remove Chainstate::LoadMempool (MarcoFalke)

Pull request description:

  Currently it is possible to import a mempool by placing it in the datadir and starting the node. However this has many issues:

  * Users aren't expected to fiddle with the datadir, possibly corrupting it
  * An existing mempool file in the datadir may be overwritten
  * The node needs to be restarted
  * Importing an untrusted file this way is dangerous, because it can corrupt the mempool

  Fix all issues by adding a new RPC.

ACKs for top commit:
  ajtowns:
    utACK fa776e61cd
  achow101:
    ACK fa776e61cd
  glozow:
    reACK fa776e61cd

Tree-SHA512: fcb1a92d6460839283c546c47a2d930c363ac1013c4c50dc5215ddf9fe5e51921d23fe0abfae0a5a7631983cfc7e2fff3788b70f95937d0a989a203be4d67546
This commit is contained in:
Andrew Chow 2023-08-15 10:03:51 -04:00
commit cd43a8444b
No known key found for this signature in database
GPG Key ID: 17565732E08E5E41
11 changed files with 160 additions and 32 deletions

View File

@ -116,6 +116,7 @@
#endif
using kernel::DumpMempool;
using kernel::LoadMempool;
using kernel::ValidationCacheSizes;
using node::ApplyArgsManOptions;
@ -1676,7 +1677,10 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
return;
}
// Load mempool from disk
chainman.ActiveChainstate().LoadMempool(ShouldPersistMempool(args) ? MempoolPath(args) : fs::path{});
if (auto* pool{chainman.ActiveChainstate().GetMempool()}) {
LoadMempool(*pool, ShouldPersistMempool(args) ? MempoolPath(args) : fs::path{}, chainman.ActiveChainstate(), {});
pool->SetLoadTried(!chainman.m_interrupt);
}
});
// Wait for genesis block to be processed

View File

@ -19,7 +19,6 @@
#include <util/time.h>
#include <validation.h>
#include <chrono>
#include <cstdint>
#include <cstdio>
#include <exception>
@ -37,11 +36,11 @@ namespace kernel {
static const uint64_t MEMPOOL_DUMP_VERSION = 1;
bool LoadMempool(CTxMemPool& pool, const fs::path& load_path, Chainstate& active_chainstate, FopenFn mockable_fopen_function)
bool LoadMempool(CTxMemPool& pool, const fs::path& load_path, Chainstate& active_chainstate, ImportMempoolOptions&& opts)
{
if (load_path.empty()) return false;
FILE* filestr{mockable_fopen_function(load_path, "rb")};
FILE* filestr{opts.mockable_fopen_function(load_path, "rb")};
CAutoFile file(filestr, SER_DISK, CLIENT_VERSION);
if (file.IsNull()) {
LogPrintf("Failed to open mempool file from disk. Continuing anyway.\n");
@ -53,7 +52,7 @@ bool LoadMempool(CTxMemPool& pool, const fs::path& load_path, Chainstate& active
int64_t failed = 0;
int64_t already_there = 0;
int64_t unbroadcast = 0;
auto now = NodeClock::now();
const auto now{NodeClock::now()};
try {
uint64_t version;
@ -72,8 +71,12 @@ bool LoadMempool(CTxMemPool& pool, const fs::path& load_path, Chainstate& active
file >> nTime;
file >> nFeeDelta;
if (opts.use_current_time) {
nTime = TicksSinceEpoch<std::chrono::seconds>(now);
}
CAmount amountdelta = nFeeDelta;
if (amountdelta) {
if (amountdelta && opts.apply_fee_delta_priority) {
pool.PrioritiseTransaction(tx->GetHash(), amountdelta);
}
if (nTime > TicksSinceEpoch<std::chrono::seconds>(now - pool.m_expiry)) {
@ -101,17 +104,21 @@ bool LoadMempool(CTxMemPool& pool, const fs::path& load_path, Chainstate& active
std::map<uint256, CAmount> mapDeltas;
file >> mapDeltas;
for (const auto& i : mapDeltas) {
pool.PrioritiseTransaction(i.first, i.second);
if (opts.apply_fee_delta_priority) {
for (const auto& i : mapDeltas) {
pool.PrioritiseTransaction(i.first, i.second);
}
}
std::set<uint256> unbroadcast_txids;
file >> unbroadcast_txids;
unbroadcast = unbroadcast_txids.size();
for (const auto& txid : unbroadcast_txids) {
// Ensure transactions were accepted to mempool then add to
// unbroadcast set.
if (pool.get(txid) != nullptr) pool.AddUnbroadcastTx(txid);
if (opts.apply_unbroadcast_set) {
unbroadcast = unbroadcast_txids.size();
for (const auto& txid : unbroadcast_txids) {
// Ensure transactions were accepted to mempool then add to
// unbroadcast set.
if (pool.get(txid) != nullptr) pool.AddUnbroadcastTx(txid);
}
}
} catch (const std::exception& e) {
LogPrintf("Failed to deserialize mempool data on disk: %s. Continuing anyway.\n", e.what());

View File

@ -12,15 +12,21 @@ class CTxMemPool;
namespace kernel {
/** Dump the mempool to disk. */
/** Dump the mempool to a file. */
bool DumpMempool(const CTxMemPool& pool, const fs::path& dump_path,
fsbridge::FopenFn mockable_fopen_function = fsbridge::fopen,
bool skip_file_commit = false);
/** Load the mempool from disk. */
struct ImportMempoolOptions {
fsbridge::FopenFn mockable_fopen_function{fsbridge::fopen};
bool use_current_time{false};
bool apply_fee_delta_priority{true};
bool apply_unbroadcast_set{true};
};
/** Import the file and attempt to add its contents to the mempool. */
bool LoadMempool(CTxMemPool& pool, const fs::path& load_path,
Chainstate& active_chainstate,
fsbridge::FopenFn mockable_fopen_function = fsbridge::fopen);
ImportMempoolOptions&& opts);
} // namespace kernel

View File

@ -229,6 +229,10 @@ static const CRPCConvertParam vRPCConvertParams[] =
{ "importaddress", 2, "rescan" },
{ "importaddress", 3, "p2sh" },
{ "importpubkey", 2, "rescan" },
{ "importmempool", 1, "options" },
{ "importmempool", 1, "apply_fee_delta_priority" },
{ "importmempool", 1, "use_current_time" },
{ "importmempool", 1, "apply_unbroadcast_set" },
{ "importmulti", 0, "requests" },
{ "importmulti", 1, "options" },
{ "importmulti", 1, "rescan" },

View File

@ -696,7 +696,7 @@ static RPCHelpMan getmempoolinfo()
RPCResult{
RPCResult::Type::OBJ, "", "",
{
{RPCResult::Type::BOOL, "loaded", "True if the mempool is fully loaded"},
{RPCResult::Type::BOOL, "loaded", "True if the initial load attempt of the persisted mempool finished"},
{RPCResult::Type::NUM, "size", "Current tx count"},
{RPCResult::Type::NUM, "bytes", "Sum of all virtual transaction sizes as defined in BIP 141. Differs from actual serialized size because witness data is discounted"},
{RPCResult::Type::NUM, "usage", "Total memory usage for the mempool"},
@ -719,6 +719,66 @@ static RPCHelpMan getmempoolinfo()
};
}
static RPCHelpMan importmempool()
{
return RPCHelpMan{
"importmempool",
"Import a mempool.dat file and attempt to add its contents to the mempool.\n"
"Warning: Importing untrusted files is dangerous, especially if metadata from the file is taken over.",
{
{"filepath", RPCArg::Type::STR, RPCArg::Optional::NO, "The mempool file"},
{"options",
RPCArg::Type::OBJ_NAMED_PARAMS,
RPCArg::Optional::OMITTED,
"",
{
{"use_current_time", RPCArg::Type::BOOL, RPCArg::Default{true},
"Whether to use the current system time or use the entry time metadata from the mempool file.\n"
"Warning: Importing untrusted metadata may lead to unexpected issues and undesirable behavior."},
{"apply_fee_delta_priority", RPCArg::Type::BOOL, RPCArg::Default{false},
"Whether to apply the fee delta metadata from the mempool file.\n"
"It will be added to any existing fee deltas.\n"
"The fee delta can be set by the prioritisetransaction RPC.\n"
"Warning: Importing untrusted metadata may lead to unexpected issues and undesirable behavior.\n"
"Only set this bool if you understand what it does."},
{"apply_unbroadcast_set", RPCArg::Type::BOOL, RPCArg::Default{false},
"Whether to apply the unbroadcast set metadata from the mempool file.\n"
"Warning: Importing untrusted metadata may lead to unexpected issues and undesirable behavior."},
},
RPCArgOptions{.oneline_description = "\"options\""}},
},
RPCResult{RPCResult::Type::OBJ, "", "", std::vector<RPCResult>{}},
RPCExamples{HelpExampleCli("importmempool", "/path/to/mempool.dat") + HelpExampleRpc("importmempool", "/path/to/mempool.dat")},
[&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue {
const NodeContext& node{EnsureAnyNodeContext(request.context)};
CTxMemPool& mempool{EnsureMemPool(node)};
Chainstate& chainstate = EnsureChainman(node).ActiveChainstate();
if (chainstate.IsInitialBlockDownload()) {
throw JSONRPCError(RPC_CLIENT_IN_INITIAL_DOWNLOAD, "Can only import the mempool after the block download and sync is done.");
}
const fs::path load_path{fs::u8path(request.params[0].get_str())};
const UniValue& use_current_time{request.params[1]["use_current_time"]};
const UniValue& apply_fee_delta{request.params[1]["apply_fee_delta_priority"]};
const UniValue& apply_unbroadcast{request.params[1]["apply_unbroadcast_set"]};
kernel::ImportMempoolOptions opts{
.use_current_time = use_current_time.isNull() ? true : use_current_time.get_bool(),
.apply_fee_delta_priority = apply_fee_delta.isNull() ? false : apply_fee_delta.get_bool(),
.apply_unbroadcast_set = apply_unbroadcast.isNull() ? false : apply_unbroadcast.get_bool(),
};
if (!kernel::LoadMempool(mempool, load_path, chainstate, std::move(opts))) {
throw JSONRPCError(RPC_MISC_ERROR, "Unable to import mempool file, see debug.log for details.");
}
UniValue ret{UniValue::VOBJ};
return ret;
},
};
}
static RPCHelpMan savemempool()
{
return RPCHelpMan{"savemempool",
@ -921,6 +981,7 @@ void RegisterMempoolRPCCommands(CRPCTable& t)
{"blockchain", &gettxspendingprevout},
{"blockchain", &getmempoolinfo},
{"blockchain", &getrawmempool},
{"blockchain", &importmempool},
{"blockchain", &savemempool},
{"hidden", &submitpackage},
};

View File

@ -78,6 +78,7 @@ const std::vector<std::string> RPC_COMMANDS_NOT_SAFE_FOR_FUZZING{
"generatetoaddress", // avoid prohibitively slow execution (when `num_blocks` is large)
"generatetodescriptor", // avoid prohibitively slow execution (when `nblocks` is large)
"gettxoutproof", // avoid prohibitively slow execution
"importmempool", // avoid reading from disk
"importwallet", // avoid reading from disk
"loadwallet", // avoid reading from disk
"savemempool", // disabled as a precautionary measure: may take a file path argument in the future

View File

@ -20,6 +20,7 @@
#include <vector>
using kernel::DumpMempool;
using kernel::LoadMempool;
using node::MempoolPath;
@ -47,6 +48,10 @@ FUZZ_TARGET(validation_load_mempool, .init = initialize_validation_load_mempool)
auto fuzzed_fopen = [&](const fs::path&, const char*) {
return fuzzed_file_provider.open();
};
(void)chainstate.LoadMempool(MempoolPath(g_setup->m_args), fuzzed_fopen);
(void)LoadMempool(pool, MempoolPath(g_setup->m_args), chainstate,
{
.mockable_fopen_function = fuzzed_fopen,
});
pool.SetLoadTried(true);
(void)DumpMempool(pool, MempoolPath(g_setup->m_args), fuzzed_fopen, true);
}

View File

@ -663,13 +663,13 @@ public:
void GetTransactionAncestry(const uint256& txid, size_t& ancestors, size_t& descendants, size_t* ancestorsize = nullptr, CAmount* ancestorfees = nullptr) const;
/**
* @returns true if we've made an attempt to load the mempool regardless of
* @returns true if an initial attempt to load the persisted mempool was made, regardless of
* whether the attempt was successful or not
*/
bool GetLoadTried() const;
/**
* Set whether or not we've made an attempt to load the mempool (regardless
* Set whether or not an initial attempt to load the persisted mempool was made (regardless
* of whether the attempt was successful or not)
*/
void SetLoadTried(bool load_tried);

View File

@ -69,7 +69,6 @@
using kernel::CCoinsStats;
using kernel::CoinStatsHashType;
using kernel::ComputeUTXOStats;
using kernel::LoadMempool;
using kernel::Notifications;
using fsbridge::FopenFn;
@ -4126,13 +4125,6 @@ void PruneBlockFilesManual(Chainstate& active_chainstate, int nManualPruneHeight
}
}
void Chainstate::LoadMempool(const fs::path& load_path, FopenFn mockable_fopen_function)
{
if (!m_mempool) return;
::LoadMempool(*m_mempool, load_path, *this, mockable_fopen_function);
m_mempool->SetLoadTried(!m_chainman.m_interrupt);
}
bool Chainstate::LoadChainTip()
{
AssertLockHeld(cs_main);

View File

@ -712,9 +712,6 @@ public:
/** Find the last common block of this chain and a locator. */
const CBlockIndex* FindForkInGlobalIndex(const CBlockLocator& locator) const EXCLUSIVE_LOCKS_REQUIRED(cs_main);
/** Load the persisted mempool from disk */
void LoadMempool(const fs::path& load_path, fsbridge::FopenFn mockable_fopen_function = fsbridge::fopen);
/** Update the chain tip based on database information, i.e. CoinsTip()'s best block. */
bool LoadChainTip() EXCLUSIVE_LOCKS_REQUIRED(cs_main);

View File

@ -46,7 +46,7 @@ from test_framework.util import (
assert_greater_than_or_equal,
assert_raises_rpc_error,
)
from test_framework.wallet import MiniWallet
from test_framework.wallet import MiniWallet, COIN
class MempoolPersistTest(BitcoinTestFramework):
@ -159,6 +159,16 @@ class MempoolPersistTest(BitcoinTestFramework):
assert self.nodes[0].getmempoolinfo()["loaded"]
assert_equal(len(self.nodes[0].getrawmempool()), 0)
self.log.debug("Import mempool at runtime to node0.")
assert_equal({}, self.nodes[0].importmempool(mempooldat0))
assert_equal(len(self.nodes[0].getrawmempool()), 7)
fees = self.nodes[0].getmempoolentry(txid=last_txid)["fees"]
assert_equal(fees["base"], fees["modified"])
assert_equal({}, self.nodes[0].importmempool(mempooldat0, {"apply_fee_delta_priority": True, "apply_unbroadcast_set": True}))
assert_equal(2, self.nodes[0].getmempoolinfo()["unbroadcastcount"])
fees = self.nodes[0].getmempoolentry(txid=last_txid)["fees"]
assert_equal(fees["base"] + Decimal("0.00001000"), fees["modified"])
self.log.debug("Stop-start node0. Verify that it has the transactions in its mempool.")
self.stop_nodes()
self.start_node(0)
@ -186,6 +196,7 @@ class MempoolPersistTest(BitcoinTestFramework):
assert_raises_rpc_error(-1, "Unable to dump mempool to disk", self.nodes[1].savemempool)
os.rmdir(mempooldotnew1)
self.test_importmempool_union()
self.test_persist_unbroadcast()
def test_persist_unbroadcast(self):
@ -210,6 +221,46 @@ class MempoolPersistTest(BitcoinTestFramework):
node0.mockscheduler(16 * 60) # 15 min + 1 for buffer
self.wait_until(lambda: len(conn.get_invs()) == 1)
def test_importmempool_union(self):
self.log.debug("Submit different transactions to node0 and node1's mempools")
self.start_node(0)
self.start_node(2)
tx_node0 = self.mini_wallet.send_self_transfer(from_node=self.nodes[0])
tx_node1 = self.mini_wallet.send_self_transfer(from_node=self.nodes[1])
tx_node01 = self.mini_wallet.create_self_transfer()
tx_node01_secret = self.mini_wallet.create_self_transfer()
self.nodes[0].prioritisetransaction(tx_node01["txid"], 0, COIN)
self.nodes[0].prioritisetransaction(tx_node01_secret["txid"], 0, 2 * COIN)
self.nodes[1].prioritisetransaction(tx_node01_secret["txid"], 0, 3 * COIN)
self.nodes[0].sendrawtransaction(tx_node01["hex"])
self.nodes[1].sendrawtransaction(tx_node01["hex"])
assert tx_node0["txid"] in self.nodes[0].getrawmempool()
assert not tx_node0["txid"] in self.nodes[1].getrawmempool()
assert not tx_node1["txid"] in self.nodes[0].getrawmempool()
assert tx_node1["txid"] in self.nodes[1].getrawmempool()
assert tx_node01["txid"] in self.nodes[0].getrawmempool()
assert tx_node01["txid"] in self.nodes[1].getrawmempool()
assert not tx_node01_secret["txid"] in self.nodes[0].getrawmempool()
assert not tx_node01_secret["txid"] in self.nodes[1].getrawmempool()
self.log.debug("Check that importmempool can add txns without replacing the entire mempool")
mempooldat0 = str(self.nodes[0].chain_path / "mempool.dat")
result0 = self.nodes[0].savemempool()
assert_equal(mempooldat0, result0["filename"])
assert_equal({}, self.nodes[1].importmempool(mempooldat0, {"apply_fee_delta_priority": True}))
# All transactions should be in node1's mempool now.
assert tx_node0["txid"] in self.nodes[1].getrawmempool()
assert tx_node1["txid"] in self.nodes[1].getrawmempool()
assert not tx_node1["txid"] in self.nodes[0].getrawmempool()
# For transactions that already existed, priority should be changed
entry_node01 = self.nodes[1].getmempoolentry(tx_node01["txid"])
assert_equal(entry_node01["fees"]["base"] + 1, entry_node01["fees"]["modified"])
# Deltas for not-yet-submitted transactions should be applied as well (prioritisation is stackable).
self.nodes[1].sendrawtransaction(tx_node01_secret["hex"])
entry_node01_secret = self.nodes[1].getmempoolentry(tx_node01_secret["txid"])
assert_equal(entry_node01_secret["fees"]["base"] + 5, entry_node01_secret["fees"]["modified"])
self.stop_nodes()
if __name__ == "__main__":
MempoolPersistTest().main()