From a7a10294455540b8f68a632feda6de6312aec858 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Sat, 2 Apr 2016 16:58:01 -0500 Subject: [PATCH] rpcserver: Decouple from server. This decouples the RPC server from the internal btcd server to move closer to being able to split it out into a separate package. In order to accomplish this, it introduces an rpcserverConfig type and several new interfaces, named rpcserverPeer, rpcserverConnManager, and rpcserverBlockManager, which are necessary to break the direct dependencies on the main server and block manager instances. It also adds concrete implementations of the new interfaces and uses them to configure the RPC server. Ultimately, the RPC server should ideally be decoupled even more such that all of the types in the configuration struct use interfaces instead of the concrete types. Doing this would make the RPC server much easier to internally test since it would allow creating lightweight stubs for the various pieces. --- rpcadaptors.go | 277 +++++++++++++++++++++++++++++ rpcserver.go | 457 +++++++++++++++++++++++++++++++++--------------- rpcwebsocket.go | 17 +- server.go | 131 ++++---------- 4 files changed, 632 insertions(+), 250 deletions(-) create mode 100644 rpcadaptors.go diff --git a/rpcadaptors.go b/rpcadaptors.go new file mode 100644 index 00000000..eebf47d8 --- /dev/null +++ b/rpcadaptors.go @@ -0,0 +1,277 @@ +// Copyright (c) 2017 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main + +import ( + "sync/atomic" + + "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/mempool" + "github.com/btcsuite/btcd/peer" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" +) + +// rpcPeer provides a peer for use with the RPC server and implements the +// rpcserverPeer interface. +type rpcPeer serverPeer + +// Ensure rpcPeer implements the rpcserverPeer interface. +var _ rpcserverPeer = (*rpcPeer)(nil) + +// ToPeer returns the underlying peer instance. +// +// This function is safe for concurrent access and is part of the rpcserverPeer +// interface implementation. +func (p *rpcPeer) ToPeer() *peer.Peer { + if p == nil { + return nil + } + return (*serverPeer)(p).Peer +} + +// IsTxRelayDisabled returns whether or not the peer has disabled transaction +// relay. +// +// This function is safe for concurrent access and is part of the rpcserverPeer +// interface implementation. +func (p *rpcPeer) IsTxRelayDisabled() bool { + return (*serverPeer)(p).disableRelayTx +} + +// BanScore returns the current integer value that represents how close the peer +// is to being banned. +// +// This function is safe for concurrent access and is part of the rpcserverPeer +// interface implementation. +func (p *rpcPeer) BanScore() uint32 { + return (*serverPeer)(p).banScore.Int() +} + +// FeeFilter returns the requested current minimum fee rate for which +// transactions should be announced. +// +// This function is safe for concurrent access and is part of the rpcserverPeer +// interface implementation. +func (p *rpcPeer) FeeFilter() int64 { + return atomic.LoadInt64(&(*serverPeer)(p).feeFilter) +} + +// rpcConnManager provides a connection manager for use with the RPC server and +// implements the rpcserverConnManager interface. +type rpcConnManager struct { + server *server +} + +// Ensure rpcConnManager implements the rpcserverConnManager interface. +var _ rpcserverConnManager = &rpcConnManager{} + +// Connect adds the provided address as a new outbound peer. The permanent flag +// indicates whether or not to make the peer persistent and reconnect if the +// connection is lost. Attempting to connect to an already existing peer will +// return an error. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) Connect(addr string, permanent bool) error { + replyChan := make(chan error) + cm.server.query <- connectNodeMsg{ + addr: addr, + permanent: permanent, + reply: replyChan, + } + return <-replyChan +} + +// RemoveByID removes the peer associated with the provided id from the list of +// persistent peers. Attempting to remove an id that does not exist will return +// an error. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) RemoveByID(id int32) error { + replyChan := make(chan error) + cm.server.query <- removeNodeMsg{ + cmp: func(sp *serverPeer) bool { return sp.ID() == id }, + reply: replyChan, + } + return <-replyChan +} + +// RemoveByAddr removes the peer associated with the provided address from the +// list of persistent peers. Attempting to remove an address that does not +// exist will return an error. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) RemoveByAddr(addr string) error { + replyChan := make(chan error) + cm.server.query <- removeNodeMsg{ + cmp: func(sp *serverPeer) bool { return sp.Addr() == addr }, + reply: replyChan, + } + return <-replyChan +} + +// DisconnectByID disconnects the peer associated with the provided id. This +// applies to both inbound and outbound peers. Attempting to remove an id that +// does not exist will return an error. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) DisconnectByID(id int32) error { + replyChan := make(chan error) + cm.server.query <- disconnectNodeMsg{ + cmp: func(sp *serverPeer) bool { return sp.ID() == id }, + reply: replyChan, + } + return <-replyChan +} + +// DisconnectByAddr disconnects the peer associated with the provided address. +// This applies to both inbound and outbound peers. Attempting to remove an +// address that does not exist will return an error. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) DisconnectByAddr(addr string) error { + replyChan := make(chan error) + cm.server.query <- disconnectNodeMsg{ + cmp: func(sp *serverPeer) bool { return sp.Addr() == addr }, + reply: replyChan, + } + return <-replyChan +} + +// ConnectedCount returns the number of currently connected peers. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) ConnectedCount() int32 { + return cm.server.ConnectedCount() +} + +// NetTotals returns the sum of all bytes received and sent across the network +// for all peers. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) NetTotals() (uint64, uint64) { + return cm.server.NetTotals() +} + +// ConnectedPeers returns an array consisting of all connected peers. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) ConnectedPeers() []rpcserverPeer { + replyChan := make(chan []*serverPeer) + cm.server.query <- getPeersMsg{reply: replyChan} + serverPeers := <-replyChan + + // Convert to RPC server peers. + peers := make([]rpcserverPeer, 0, len(serverPeers)) + for _, sp := range serverPeers { + peers = append(peers, (*rpcPeer)(sp)) + } + return peers +} + +// PersistentPeers returns an array consisting of all the added persistent +// peers. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) PersistentPeers() []rpcserverPeer { + replyChan := make(chan []*serverPeer) + cm.server.query <- getAddedNodesMsg{reply: replyChan} + serverPeers := <-replyChan + + // Convert to generic peers. + peers := make([]rpcserverPeer, 0, len(serverPeers)) + for _, sp := range serverPeers { + peers = append(peers, (*rpcPeer)(sp)) + } + return peers +} + +// BroadcastMessage sends the provided message to all currently connected peers. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) BroadcastMessage(msg wire.Message) { + cm.server.BroadcastMessage(msg) +} + +// AddRebroadcastInventory adds the provided inventory to the list of +// inventories to be rebroadcast at random intervals until they show up in a +// block. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) AddRebroadcastInventory(iv *wire.InvVect, data interface{}) { + cm.server.AddRebroadcastInventory(iv, data) +} + +// RelayTransactions generates and relays inventory vectors for all of the +// passed transactions to all connected peers. +func (cm *rpcConnManager) RelayTransactions(txns []*mempool.TxDesc) { + cm.server.relayTransactions(txns) +} + +// rpcSyncMgr provides a block manager for use with the RPC server and +// implements the rpcserverSyncManager interface. +type rpcSyncMgr struct { + server *server + blockMgr *blockManager +} + +// Ensure rpcSyncMgr implements the rpcserverSyncManager interface. +var _ rpcserverSyncManager = (*rpcSyncMgr)(nil) + +// IsCurrent returns whether or not the sync manager believes the chain is +// current as compared to the rest of the network. +// +// This function is safe for concurrent access and is part of the +// rpcserverSyncManager interface implementation. +func (b *rpcSyncMgr) IsCurrent() bool { + return b.blockMgr.IsCurrent() +} + +// SubmitBlock submits the provided block to the network after processing it +// locally. +// +// This function is safe for concurrent access and is part of the +// rpcserverSyncManager interface implementation. +func (b *rpcSyncMgr) SubmitBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) { + return b.blockMgr.ProcessBlock(block, flags) +} + +// Pause pauses the sync manager until the returned channel is closed. +// +// This function is safe for concurrent access and is part of the +// rpcserverSyncManager interface implementation. +func (b *rpcSyncMgr) Pause() chan<- struct{} { + return b.blockMgr.Pause() +} + +// SyncPeer returns the peer that is currently the peer being used to sync from. +// +// This function is safe for concurrent access and is part of the +// rpcserverSyncManager interface implementation. +func (b *rpcSyncMgr) SyncPeer() rpcserverPeer { + return (*rpcPeer)(b.blockMgr.SyncPeer()) +} + +// LocateBlocks returns the hashes of the blocks after the first known block in +// the provided locators until the provided stop hash or the current tip is +// reached, up to a max of wire.MaxBlockHeadersPerMsg hashes. +// +// This function is safe for concurrent access and is part of the +// rpcserverSyncManager interface implementation. +func (b *rpcSyncMgr) LocateBlocks(locators []*chainhash.Hash, hashStop *chainhash.Hash) ([]chainhash.Hash, error) { + return b.server.locateBlocks(locators, hashStop) +} diff --git a/rpcserver.go b/rpcserver.go index f8b8ecf2..197b6ab0 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -29,6 +29,7 @@ import ( "time" "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/blockchain/indexers" "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/chaincfg" @@ -36,6 +37,8 @@ import ( "github.com/btcsuite/btcd/database" "github.com/btcsuite/btcd/mempool" "github.com/btcsuite/btcd/mining" + "github.com/btcsuite/btcd/mining/cpuminer" + "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" @@ -360,11 +363,11 @@ func handleAddNode(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (in var err error switch c.SubCmd { case "add": - err = s.server.ConnectNode(addr, true) + err = s.cfg.ConnMgr.Connect(addr, true) case "remove": - err = s.server.RemoveNodeByAddr(addr) + err = s.cfg.ConnMgr.RemoveByAddr(addr) case "onetry": - err = s.server.ConnectNode(addr, false) + err = s.cfg.ConnMgr.Connect(addr, false) default: return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCInvalidParameter, @@ -396,11 +399,11 @@ func handleNode(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter // attempt to disconnect by address, returning an error if a // valid IP address is not supplied. if nodeID, errN = strconv.ParseUint(c.Target, 10, 32); errN == nil { - err = s.server.DisconnectNodeByID(int32(nodeID)) + err = s.cfg.ConnMgr.DisconnectByID(int32(nodeID)) } else { if _, _, errP := net.SplitHostPort(c.Target); errP == nil || net.ParseIP(c.Target) != nil { addr = normalizeAddress(c.Target, activeNetParams.DefaultPort) - err = s.server.DisconnectNodeByAddr(addr) + err = s.cfg.ConnMgr.DisconnectByAddr(addr) } else { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCInvalidParameter, @@ -408,22 +411,24 @@ func handleNode(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter } } } - if err != nil && peerExists(s.server.Peers(), addr, int32(nodeID)) { + if err != nil && peerExists(s.cfg.ConnMgr, addr, int32(nodeID)) { + return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCMisc, Message: "can't disconnect a permanent peer, use remove", } } + case "remove": // If we have a valid uint disconnect by node id. Otherwise, // attempt to disconnect by address, returning an error if a // valid IP address is not supplied. if nodeID, errN = strconv.ParseUint(c.Target, 10, 32); errN == nil { - err = s.server.RemoveNodeByID(int32(nodeID)) + err = s.cfg.ConnMgr.RemoveByID(int32(nodeID)) } else { if _, _, errP := net.SplitHostPort(c.Target); errP == nil || net.ParseIP(c.Target) != nil { addr = normalizeAddress(c.Target, activeNetParams.DefaultPort) - err = s.server.RemoveNodeByAddr(addr) + err = s.cfg.ConnMgr.RemoveByAddr(addr) } else { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCInvalidParameter, @@ -431,12 +436,13 @@ func handleNode(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter } } } - if err != nil && peerExists(s.server.Peers(), addr, int32(nodeID)) { + if err != nil && peerExists(s.cfg.ConnMgr, addr, int32(nodeID)) { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCMisc, Message: "can't remove a temporary peer, use disconnect", } } + case "connect": addr = normalizeAddress(c.Target, activeNetParams.DefaultPort) @@ -448,7 +454,7 @@ func handleNode(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter switch subCmd { case "perm", "temp": - err = s.server.ConnectNode(addr, subCmd == "perm") + err = s.cfg.ConnMgr.Connect(addr, subCmd == "perm") default: return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCInvalidParameter, @@ -476,9 +482,9 @@ func handleNode(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter // peerExists determines if a certain peer is currently connected given // information about all currently connected peers. Peer existence is // determined using either a target address or node id. -func peerExists(peers []*serverPeer, addr string, nodeID int32) bool { - for _, p := range peers { - if p.ID() == nodeID || p.Addr() == addr { +func peerExists(connMgr rpcserverConnManager, addr string, nodeID int32) bool { + for _, p := range connMgr.ConnectedPeers() { + if p.ToPeer().ID() == nodeID || p.ToPeer().Addr() == addr { return true } } @@ -560,7 +566,7 @@ func handleCreateRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan Message: "Invalid address or key", } } - if !addr.IsForNet(s.server.chainParams) { + if !addr.IsForNet(s.cfg.ChainParams) { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCInvalidAddressOrKey, Message: "Invalid address: " + encodedAddr + @@ -789,7 +795,7 @@ func handleDecodeRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan Version: mtx.Version, Locktime: mtx.LockTime, Vin: createVinList(&mtx), - Vout: createVoutList(&mtx, s.server.chainParams, nil), + Vout: createVoutList(&mtx, s.cfg.ChainParams, nil), } return txReply, nil } @@ -816,14 +822,14 @@ func handleDecodeScript(s *rpcServer, cmd interface{}, closeChan <-chan struct{} // Ignore the error here since an error means the script couldn't parse // and there is no additinal information about it anyways. scriptClass, addrs, reqSigs, _ := txscript.ExtractPkScriptAddrs(script, - s.server.chainParams) + s.cfg.ChainParams) addresses := make([]string, len(addrs)) for i, addr := range addrs { addresses[i] = addr.EncodeAddress() } // Convert the script itself to a pay-to-script-hash address. - p2sh, err := btcutil.NewAddressScriptHash(script, s.server.chainParams) + p2sh, err := btcutil.NewAddressScriptHash(script, s.cfg.ChainParams) if err != nil { context := "Failed to convert script to pay-to-script-hash" return nil, internalRPCError(err.Error(), context) @@ -856,14 +862,13 @@ func handleGenerate(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i // Respond with an error if there's virtually 0 chance of mining a block // with the CPU. - params := s.server.chainParams - if !s.server.chainParams.GenerateSupported { + if !s.cfg.ChainParams.GenerateSupported { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCDifficulty, Message: fmt.Sprintf("No support for `generate` on "+ "the current network, %s, as it's unlikely to "+ "be possible to main a block with the CPU.", - params.Net), + s.cfg.ChainParams.Net), } } @@ -880,7 +885,7 @@ func handleGenerate(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i // Create a reply reply := make([]string, c.NumBlocks) - blockHashes, err := s.server.cpuMiner.GenerateNBlocks(c.NumBlocks) + blockHashes, err := s.cfg.CPUMiner.GenerateNBlocks(c.NumBlocks) if err != nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCInternal.Code, @@ -901,14 +906,14 @@ func handleGenerate(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i func handleGetAddedNodeInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.GetAddedNodeInfoCmd) - // Retrieve a list of persistent (added) peers from the bitcoin server - // and filter the list of peers per the specified address (if any). - peers := s.server.AddedNodeInfo() + // Retrieve a list of persistent (added) peers from the server and + // filter the list of peers per the specified address (if any). + peers := s.cfg.ConnMgr.PersistentPeers() if c.Node != nil { node := *c.Node found := false for i, peer := range peers { - if peer.Addr() == node { + if peer.ToPeer().Addr() == node { peers = peers[i : i+1] found = true } @@ -926,7 +931,7 @@ func handleGetAddedNodeInfo(s *rpcServer, cmd interface{}, closeChan <-chan stru if !c.DNS { results := make([]string, 0, len(peers)) for _, peer := range peers { - results = append(results, peer.Addr()) + results = append(results, peer.ToPeer().Addr()) } return results, nil } @@ -934,9 +939,10 @@ func handleGetAddedNodeInfo(s *rpcServer, cmd interface{}, closeChan <-chan stru // With the dns flag, the result is an array of JSON objects which // include the result of DNS lookups for each peer. results := make([]*btcjson.GetAddedNodeInfoResult, 0, len(peers)) - for _, peer := range peers { + for _, rpcPeer := range peers { // Set the "address" of the peer which could be an ip address // or a domain name. + peer := rpcPeer.ToPeer() var result btcjson.GetAddedNodeInfoResult result.AddedNode = peer.Addr() result.Connected = btcjson.Bool(peer.Connected()) @@ -991,7 +997,7 @@ func handleGetBestBlock(s *rpcServer, cmd interface{}, closeChan <-chan struct{} // All other "get block" commands give either the height, the // hash, or both but require the block SHA. This gets both for // the best block. - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() result := &btcjson.GetBestBlockResult{ Hash: best.Hash.String(), Height: best.Height, @@ -1001,7 +1007,7 @@ func handleGetBestBlock(s *rpcServer, cmd interface{}, closeChan <-chan struct{} // handleGetBestBlockHash implements the getbestblockhash command. func handleGetBestBlockHash(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() return best.Hash.String(), nil } @@ -1035,7 +1041,7 @@ func handleGetBlock(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i return nil, rpcDecodeHexError(c.Hash) } var blkBytes []byte - err = s.server.db.View(func(dbTx database.Tx) error { + err = s.cfg.DB.View(func(dbTx database.Tx) error { var err error blkBytes, err = dbTx.FetchBlock(hash) return err @@ -1063,18 +1069,18 @@ func handleGetBlock(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i } // Get the block height from chain. - blockHeight, err := s.chain.BlockHeightByHash(hash) + blockHeight, err := s.cfg.Chain.BlockHeightByHash(hash) if err != nil { context := "Failed to obtain block height" return nil, internalRPCError(err.Error(), context) } blk.SetHeight(blockHeight) - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() // Get next block hash unless there are none. var nextHashString string if blockHeight < best.Height { - nextHash, err := s.chain.BlockHashByHeight(blockHeight + 1) + nextHash, err := s.cfg.Chain.BlockHashByHeight(blockHeight + 1) if err != nil { context := "No next block" return nil, internalRPCError(err.Error(), context) @@ -1113,7 +1119,7 @@ func handleGetBlock(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i txns := blk.Transactions() rawTxns := make([]btcjson.TxRawResult, len(txns)) for i, tx := range txns { - rawTxn, err := createTxRawResult(s.server.chainParams, + rawTxn, err := createTxRawResult(s.cfg.ChainParams, tx.MsgTx(), tx.Hash().String(), blockHeader, hash.String(), blockHeight, best.Height) if err != nil { @@ -1148,10 +1154,10 @@ func softForkStatus(state blockchain.ThresholdState) (string, error) { // handleGetBlockChainInfo implements the getblockchaininfo command. func handleGetBlockChainInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - // Obtain a snapshot of the current best known blockchain state. We'll // populate the response to this call primarily from this snapshot. - chainSnapshot := s.chain.BestSnapshot() + chain := s.cfg.Chain + chainSnapshot := chain.BestSnapshot() chainInfo := &btcjson.GetBlockChainInfoResult{ Chain: activeNetParams.Name, @@ -1224,7 +1230,7 @@ func handleGetBlockChainInfo(s *rpcServer, cmd interface{}, closeChan <-chan str // Query the chain for the current status of the deployment as // identified by its deployment ID. - deploymentStatus, err := s.chain.ThresholdState(uint32(deployment)) + deploymentStatus, err := chain.ThresholdState(uint32(deployment)) if err != nil { context := "Failed to obtain deployment status" return nil, internalRPCError(err.Error(), context) @@ -1257,14 +1263,14 @@ func handleGetBlockChainInfo(s *rpcServer, cmd interface{}, closeChan <-chan str // handleGetBlockCount implements the getblockcount command. func handleGetBlockCount(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() return int64(best.Height), nil } // handleGetBlockHash implements the getblockhash command. func handleGetBlockHash(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.GetBlockHashCmd) - hash, err := s.chain.BlockHashByHeight(int32(c.Index)) + hash, err := s.cfg.Chain.BlockHashByHeight(int32(c.Index)) if err != nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCOutOfRange, @@ -1284,7 +1290,7 @@ func handleGetBlockHeader(s *rpcServer, cmd interface{}, closeChan <-chan struct if err != nil { return nil, rpcDecodeHexError(c.Hash) } - blockHeader, err := s.chain.FetchHeader(hash) + blockHeader, err := s.cfg.Chain.FetchHeader(hash) if err != nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCBlockNotFound, @@ -1307,17 +1313,17 @@ func handleGetBlockHeader(s *rpcServer, cmd interface{}, closeChan <-chan struct // The verbose flag is set, so generate the JSON object and return it. // Get the block height from chain. - blockHeight, err := s.chain.BlockHeightByHash(hash) + blockHeight, err := s.cfg.Chain.BlockHeightByHash(hash) if err != nil { context := "Failed to obtain block height" return nil, internalRPCError(err.Error(), context) } - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() // Get next block hash unless there are none. var nextHashString string if blockHeight < best.Height { - nextHash, err := s.chain.BlockHashByHeight(blockHeight + 1) + nextHash, err := s.cfg.Chain.BlockHashByHeight(blockHeight + 1) if err != nil { context := "No next block" return nil, internalRPCError(err.Error(), context) @@ -1495,7 +1501,8 @@ func (state *gbtWorkState) templateUpdateChan(prevHash *chainhash.Hash, lastGene // // This function MUST be called with the state locked. func (state *gbtWorkState) updateBlockTemplate(s *rpcServer, useCoinbaseValue bool) error { - lastTxUpdate := s.server.txMemPool.LastUpdated() + generator := s.cfg.Generator + lastTxUpdate := generator.TxSource().LastUpdated() if lastTxUpdate.IsZero() { lastTxUpdate = time.Now() } @@ -1506,7 +1513,7 @@ func (state *gbtWorkState) updateBlockTemplate(s *rpcServer, useCoinbaseValue bo // generated. var msgBlock *wire.MsgBlock var targetDifficulty string - latestHash := &s.server.blockManager.chain.BestSnapshot().Hash + latestHash := &s.cfg.Chain.BestSnapshot().Hash template := state.template if template == nil || state.prevHash == nil || !state.prevHash.IsEqual(latestHash) || @@ -1532,7 +1539,7 @@ func (state *gbtWorkState) updateBlockTemplate(s *rpcServer, useCoinbaseValue bo // block template doesn't include the coinbase, so the caller // will ultimately create their own coinbase which pays to the // appropriate address(es). - blkTemplate, err := s.generator.NewBlockTemplate(payAddr) + blkTemplate, err := generator.NewBlockTemplate(payAddr) if err != nil { return internalRPCError("Failed to create new block "+ "template: "+err.Error(), "") @@ -1545,7 +1552,7 @@ func (state *gbtWorkState) updateBlockTemplate(s *rpcServer, useCoinbaseValue bo // Get the minimum allowed timestamp for the block based on the // median timestamp of the last several blocks per the chain // consensus rules. - best := s.server.blockManager.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() minTimestamp := mining.MinimumMedianTime(best) // Update work state to ensure another block template isn't @@ -1605,7 +1612,7 @@ func (state *gbtWorkState) updateBlockTemplate(s *rpcServer, useCoinbaseValue bo // Update the time of the block template to the current time // while accounting for the median time of the past several // blocks per the chain consensus rules. - s.generator.UpdateBlockTime(msgBlock) + generator.UpdateBlockTime(msgBlock) msgBlock.Header.Nonce = 0 rpcsLog.Debugf("Updated block template (timestamp %v, "+ @@ -1900,7 +1907,9 @@ func handleGetBlockTemplateRequest(s *rpcServer, request *btcjson.TemplateReques // way to relay a found block or receive transactions to work on. // However, allow this state when running in the regression test or // simulation test mode. - if !(cfg.RegressionTest || cfg.SimNet) && s.server.ConnectedCount() == 0 { + if !(cfg.RegressionTest || cfg.SimNet) && + s.cfg.ConnMgr.ConnectedCount() == 0 { + return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCClientNotConnected, Message: "Bitcoin is not connected", @@ -1908,8 +1917,8 @@ func handleGetBlockTemplateRequest(s *rpcServer, request *btcjson.TemplateReques } // No point in generating or accepting work before the chain is synced. - currentHeight := s.server.blockManager.chain.BestSnapshot().Height - if currentHeight != 0 && !s.server.blockManager.IsCurrent() { + currentHeight := s.cfg.Chain.BestSnapshot().Height + if currentHeight != 0 && !s.cfg.SyncMgr.IsCurrent() { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCClientInInitialDownload, Message: "Bitcoin is downloading blocks...", @@ -2069,14 +2078,14 @@ func handleGetBlockTemplateProposal(s *rpcServer, request *btcjson.TemplateReque block := btcutil.NewBlock(&msgBlock) // Ensure the block is building from the expected previous block. - expectedPrevHash := &s.server.blockManager.chain.BestSnapshot().Hash + expectedPrevHash := s.cfg.Chain.BestSnapshot().Hash prevHash := &block.MsgBlock().Header.PrevBlock if !expectedPrevHash.IsEqual(prevHash) { return "bad-prevblk", nil } flags := blockchain.BFDryRun | blockchain.BFNoPoWCheck - isOrphan, err := s.server.blockManager.ProcessBlock(block, flags) + isOrphan, err := s.cfg.SyncMgr.SubmitBlock(block, flags) if err != nil { if _, ok := err.(blockchain.RuleError); !ok { errStr := fmt.Sprintf("Failed to process block proposal: %v", err) @@ -2126,28 +2135,28 @@ func handleGetBlockTemplate(s *rpcServer, cmd interface{}, closeChan <-chan stru // handleGetConnectionCount implements the getconnectioncount command. func handleGetConnectionCount(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - return s.server.ConnectedCount(), nil + return s.cfg.ConnMgr.ConnectedCount(), nil } // handleGetCurrentNet implements the getcurrentnet command. func handleGetCurrentNet(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - return s.server.chainParams.Net, nil + return s.cfg.ChainParams.Net, nil } // handleGetDifficulty implements the getdifficulty command. func handleGetDifficulty(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() return getDifficultyRatio(best.Bits), nil } // handleGetGenerate implements the getgenerate command. func handleGetGenerate(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - return s.server.cpuMiner.IsMining(), nil + return s.cfg.CPUMiner.IsMining(), nil } // handleGetHashesPerSec implements the gethashespersec command. func handleGetHashesPerSec(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - return int64(s.server.cpuMiner.HashesPerSecond()), nil + return int64(s.cfg.CPUMiner.HashesPerSecond()), nil } // handleGetHeaders implements the getheaders command. @@ -2174,7 +2183,7 @@ func handleGetHeaders(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) return nil, rpcDecodeHexError(c.HashStop) } } - blockHashes, err := s.server.locateBlocks(blockLocators, &hashStop) + blockHashes, err := s.cfg.SyncMgr.LocateBlocks(blockLocators, &hashStop) if err != nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCDatabase, @@ -2184,7 +2193,7 @@ func handleGetHeaders(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) } headers := make([]wire.BlockHeader, 0, len(blockHashes)) for i := range blockHashes { - header, err := s.chain.FetchHeader(&blockHashes[i]) + header, err := s.cfg.Chain.FetchHeader(&blockHashes[i]) if err != nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCBlockNotFound, @@ -2213,13 +2222,13 @@ func handleGetHeaders(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) // handleGetInfo implements the getinfo command. We only return the fields // that are not related to wallet functionality. func handleGetInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() ret := &btcjson.InfoChainResult{ Version: int32(1000000*appMajor + 10000*appMinor + 100*appPatch), ProtocolVersion: int32(maxProtocolVersion), Blocks: best.Height, - TimeOffset: int64(s.server.timeSource.Offset().Seconds()), - Connections: s.server.ConnectedCount(), + TimeOffset: int64(s.cfg.TimeSource.Offset().Seconds()), + Connections: s.cfg.ConnMgr.ConnectedCount(), Proxy: cfg.Proxy, Difficulty: getDifficultyRatio(best.Bits), TestNet: cfg.TestNet3, @@ -2231,7 +2240,7 @@ func handleGetInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (in // handleGetMempoolInfo implements the getmempoolinfo command. func handleGetMempoolInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - mempoolTxns := s.server.txMemPool.TxDescs() + mempoolTxns := s.cfg.TxMemPool.TxDescs() var numBytes int64 for _, txD := range mempoolTxns { @@ -2265,18 +2274,18 @@ func handleGetMiningInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{ } } - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() result := btcjson.GetMiningInfoResult{ Blocks: int64(best.Height), CurrentBlockSize: best.BlockSize, CurrentBlockWeight: best.BlockWeight, CurrentBlockTx: best.NumTxns, Difficulty: getDifficultyRatio(best.Bits), - Generate: s.server.cpuMiner.IsMining(), - GenProcLimit: s.server.cpuMiner.NumWorkers(), - HashesPerSec: int64(s.server.cpuMiner.HashesPerSecond()), + Generate: s.cfg.CPUMiner.IsMining(), + GenProcLimit: s.cfg.CPUMiner.NumWorkers(), + HashesPerSec: int64(s.cfg.CPUMiner.HashesPerSecond()), NetworkHashPS: networkHashesPerSec, - PooledTx: uint64(s.server.txMemPool.Count()), + PooledTx: uint64(s.cfg.TxMemPool.Count()), TestNet: cfg.TestNet3, } return &result, nil @@ -2284,7 +2293,7 @@ func handleGetMiningInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{ // handleGetNetTotals implements the getnettotals command. func handleGetNetTotals(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - totalBytesRecv, totalBytesSent := s.server.NetTotals() + totalBytesRecv, totalBytesSent := s.cfg.ConnMgr.NetTotals() reply := &btcjson.GetNetTotalsResult{ TotalBytesRecv: totalBytesRecv, TotalBytesSent: totalBytesSent, @@ -2305,7 +2314,7 @@ func handleGetNetworkHashPS(s *rpcServer, cmd interface{}, closeChan <-chan stru // since we can't reasonably calculate the number of network hashes // per second from invalid values. When it's negative, use the current // best block height. - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() endHeight := int32(-1) if c.Height != nil { endHeight = int32(*c.Height) @@ -2319,8 +2328,8 @@ func handleGetNetworkHashPS(s *rpcServer, cmd interface{}, closeChan <-chan stru // Calculate the number of blocks per retarget interval based on the // chain parameters. - blocksPerRetarget := int32(s.server.chainParams.TargetTimespan / - s.server.chainParams.TargetTimePerBlock) + blocksPerRetarget := int32(s.cfg.ChainParams.TargetTimespan / + s.cfg.ChainParams.TargetTimePerBlock) // Calculate the starting block height based on the passed number of // blocks. When the passed value is negative, use the last block the @@ -2347,14 +2356,14 @@ func handleGetNetworkHashPS(s *rpcServer, cmd interface{}, closeChan <-chan stru var minTimestamp, maxTimestamp time.Time totalWork := big.NewInt(0) for curHeight := startHeight; curHeight <= endHeight; curHeight++ { - hash, err := s.chain.BlockHashByHeight(curHeight) + hash, err := s.cfg.Chain.BlockHashByHeight(curHeight) if err != nil { context := "Failed to fetch block hash" return nil, internalRPCError(err.Error(), context) } // Fetch the header from chain. - header, err := s.chain.FetchHeader(hash) + header, err := s.cfg.Chain.FetchHeader(hash) if err != nil { context := "Failed to fetch block header" return nil, internalRPCError(err.Error(), context) @@ -2389,17 +2398,17 @@ func handleGetNetworkHashPS(s *rpcServer, cmd interface{}, closeChan <-chan stru // handleGetPeerInfo implements the getpeerinfo command. func handleGetPeerInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - peers := s.server.Peers() - syncPeer := s.server.blockManager.SyncPeer() + peers := s.cfg.ConnMgr.ConnectedPeers() + syncPeer := s.cfg.SyncMgr.SyncPeer().ToPeer() infos := make([]*btcjson.GetPeerInfoResult, 0, len(peers)) for _, p := range peers { - statsSnap := p.StatsSnapshot() + statsSnap := p.ToPeer().StatsSnapshot() info := &btcjson.GetPeerInfoResult{ ID: statsSnap.ID, Addr: statsSnap.Addr, - AddrLocal: p.LocalAddr().String(), + AddrLocal: p.ToPeer().LocalAddr().String(), Services: fmt.Sprintf("%08d", uint64(statsSnap.Services)), - RelayTxes: !p.disableRelayTx, + RelayTxes: !p.IsTxRelayDisabled(), LastSend: statsSnap.LastSend.Unix(), LastRecv: statsSnap.LastRecv.Unix(), BytesSent: statsSnap.BytesSent, @@ -2412,11 +2421,11 @@ func handleGetPeerInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) Inbound: statsSnap.Inbound, StartingHeight: statsSnap.StartingHeight, CurrentHeight: statsSnap.LastBlock, - BanScore: int32(p.banScore.Int()), - FeeFilter: atomic.LoadInt64(&p.feeFilter), - SyncNode: p == syncPeer, + BanScore: int32(p.BanScore()), + FeeFilter: p.FeeFilter(), + SyncNode: p.ToPeer() == syncPeer, } - if p.LastPingNonce() != 0 { + if p.ToPeer().LastPingNonce() != 0 { wait := float64(time.Since(statsSnap.LastPingTime).Nanoseconds()) // We actually want microseconds. info.PingWait = wait / 1000 @@ -2429,7 +2438,7 @@ func handleGetPeerInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) // handleGetRawMempool implements the getrawmempool command. func handleGetRawMempool(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.GetRawMempoolCmd) - mp := s.server.txMemPool + mp := s.cfg.TxMemPool if c.Verbose != nil && *c.Verbose { return mp.RawMempoolVerbose(), nil @@ -2466,10 +2475,9 @@ func handleGetRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan str var mtx *wire.MsgTx var blkHash *chainhash.Hash var blkHeight int32 - tx, err := s.server.txMemPool.FetchTransaction(txHash) + tx, err := s.cfg.TxMemPool.FetchTransaction(txHash) if err != nil { - txIndex := s.server.txIndex - if txIndex == nil { + if s.cfg.TxIndex == nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCNoTxInfo, Message: "The transaction index must be " + @@ -2479,7 +2487,7 @@ func handleGetRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan str } // Look up the location of the transaction. - blockRegion, err := txIndex.TxBlockRegion(txHash) + blockRegion, err := s.cfg.TxIndex.TxBlockRegion(txHash) if err != nil { context := "Failed to retrieve transaction location" return nil, internalRPCError(err.Error(), context) @@ -2490,7 +2498,7 @@ func handleGetRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan str // Load the raw transaction bytes from the database. var txBytes []byte - err = s.server.db.View(func(dbTx database.Tx) error { + err = s.cfg.DB.View(func(dbTx database.Tx) error { var err error txBytes, err = dbTx.FetchBlockRegion(blockRegion) return err @@ -2508,7 +2516,7 @@ func handleGetRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan str // Grab the block height. blkHash = blockRegion.Hash - blkHeight, err = s.chain.BlockHeightByHash(blkHash) + blkHeight, err = s.cfg.Chain.BlockHeightByHash(blkHash) if err != nil { context := "Failed to retrieve block height" return nil, internalRPCError(err.Error(), context) @@ -2547,7 +2555,7 @@ func handleGetRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan str var chainHeight int32 if blkHash != nil { // Fetch the header from chain. - header, err := s.chain.FetchHeader(blkHash) + header, err := s.cfg.Chain.FetchHeader(blkHash) if err != nil { context := "Failed to fetch block header" return nil, internalRPCError(err.Error(), context) @@ -2555,11 +2563,11 @@ func handleGetRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan str blkHeader = &header blkHashStr = blkHash.String() - chainHeight = s.chain.BestSnapshot().Height + chainHeight = s.cfg.Chain.BestSnapshot().Height } - rawTxn, err := createTxRawResult(s.server.chainParams, mtx, - txHash.String(), blkHeader, blkHashStr, blkHeight, chainHeight) + rawTxn, err := createTxRawResult(s.cfg.ChainParams, mtx, txHash.String(), + blkHeader, blkHashStr, blkHeight, chainHeight) if err != nil { return nil, err } @@ -2590,8 +2598,8 @@ func handleGetTxOut(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i } // TODO: This is racy. It should attempt to fetch it directly and check // the error. - if includeMempool && s.server.txMemPool.HaveTransaction(txHash) { - tx, err := s.server.txMemPool.FetchTransaction(txHash) + if includeMempool && s.cfg.TxMemPool.HaveTransaction(txHash) { + tx, err := s.cfg.TxMemPool.FetchTransaction(txHash) if err != nil { return nil, rpcNoTxInfoError(txHash) } @@ -2612,7 +2620,7 @@ func handleGetTxOut(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i return nil, internalRPCError(errStr, "") } - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() bestBlockHash = best.Hash.String() confirmations = 0 txVersion = mtx.Version @@ -2620,7 +2628,7 @@ func handleGetTxOut(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i pkScript = txOut.PkScript isCoinbase = blockchain.IsCoinBaseTx(mtx) } else { - entry, err := s.chain.FetchUtxoEntry(txHash) + entry, err := s.cfg.Chain.FetchUtxoEntry(txHash) if err != nil { return nil, rpcNoTxInfoError(txHash) } @@ -2634,7 +2642,7 @@ func handleGetTxOut(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i return nil, nil } - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() bestBlockHash = best.Hash.String() confirmations = 1 + best.Height - entry.BlockHeight() txVersion = entry.Version() @@ -2652,7 +2660,7 @@ func handleGetTxOut(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i // Ignore the error here since an error means the script couldn't parse // and there is no additional information about it anyways. scriptClass, addrs, reqSigs, _ := txscript.ExtractPkScriptAddrs(pkScript, - s.server.chainParams) + s.cfg.ChainParams) addresses := make([]string, len(addrs)) for i, addr := range addrs { addresses[i] = addr.EncodeAddress() @@ -2722,7 +2730,7 @@ func handlePing(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter return nil, internalRPCError("Not sending ping - failed to "+ "generate nonce: "+err.Error(), "") } - s.server.BroadcastMessage(wire.NewMsgPing(nonce)) + s.cfg.ConnMgr.BroadcastMessage(wire.NewMsgPing(nonce)) return nil, nil } @@ -2744,7 +2752,7 @@ type retrievedTx struct { // inputs to the passed transaction by checking the transaction mempool first // then the transaction index for those already mined into blocks. func fetchInputTxos(s *rpcServer, tx *wire.MsgTx) (map[wire.OutPoint]wire.TxOut, error) { - mp := s.server.txMemPool + mp := s.cfg.TxMemPool originOutputs := make(map[wire.OutPoint]wire.TxOut) for txInIndex, txIn := range tx.TxIn { // Attempt to fetch and use the referenced transaction from the @@ -2765,7 +2773,7 @@ func fetchInputTxos(s *rpcServer, tx *wire.MsgTx) (map[wire.OutPoint]wire.TxOut, } // Look up the location of the transaction. - blockRegion, err := s.server.txIndex.TxBlockRegion(&origin.Hash) + blockRegion, err := s.cfg.TxIndex.TxBlockRegion(&origin.Hash) if err != nil { context := "Failed to retrieve transaction location" return nil, internalRPCError(err.Error(), context) @@ -2776,7 +2784,7 @@ func fetchInputTxos(s *rpcServer, tx *wire.MsgTx) (map[wire.OutPoint]wire.TxOut, // Load the raw transaction bytes from the database. var txBytes []byte - err = s.server.db.View(func(dbTx database.Tx) error { + err = s.cfg.DB.View(func(dbTx database.Tx) error { var err error txBytes, err = dbTx.FetchBlockRegion(blockRegion) return err @@ -2933,7 +2941,7 @@ func createVinListPrevOut(s *rpcServer, mtx *wire.MsgTx, chainParams *chaincfg.P func fetchMempoolTxnsForAddress(s *rpcServer, addr btcutil.Address, numToSkip, numRequested uint32) ([]*btcutil.Tx, uint32) { // There are no entries to return when there are less available than the // number being skipped. - mpTxns := s.server.addrIndex.UnconfirmedTxnsForAddress(addr) + mpTxns := s.cfg.AddrIndex.UnconfirmedTxnsForAddress(addr) numAvailable := uint32(len(mpTxns)) if numToSkip > numAvailable { return nil, numAvailable @@ -2951,7 +2959,7 @@ func fetchMempoolTxnsForAddress(s *rpcServer, addr btcutil.Address, numToSkip, n // handleSearchRawTransactions implements the searchrawtransactions command. func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { // Respond with an error if the address index is not enabled. - addrIndex := s.server.addrIndex + addrIndex := s.cfg.AddrIndex if addrIndex == nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCMisc, @@ -2971,7 +2979,7 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan // transaction index. Currently the address index relies on the // transaction index, so this check is redundant, but it's better to be // safe in case the address index is ever changed to not rely on it. - if vinExtra && s.server.txIndex == nil { + if vinExtra && s.cfg.TxIndex == nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCMisc, Message: "Transaction index must be enabled (--txindex)", @@ -2979,7 +2987,7 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan } // Attempt to decode the supplied address. - addr, err := btcutil.DecodeAddress(c.Address, s.server.chainParams) + addr, err := btcutil.DecodeAddress(c.Address, s.cfg.ChainParams) if err != nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCInvalidAddressOrKey, @@ -3040,7 +3048,7 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan // Fetch transactions from the database in the desired order if more are // needed. if len(addressTxns) < numRequested { - err = s.server.db.View(func(dbTx database.Tx) error { + err = s.cfg.DB.View(func(dbTx database.Tx) error { regions, dbSkipped, err := addrIndex.TxRegionsForAddress( dbTx, addr, uint32(numToSkip)-numSkipped, uint32(numRequested-len(addressTxns)), reverse) @@ -3134,8 +3142,8 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan } // The verbose flag is set, so generate the JSON object and return it. - best := s.chain.BestSnapshot() - chainParams := s.server.chainParams + best := s.cfg.Chain.BestSnapshot() + chainParams := s.cfg.ChainParams srtList := make([]btcjson.SearchRawTransactionsResult, len(addressTxns)) for i := range addressTxns { // The deserialized transaction is needed, so deserialize the @@ -3178,7 +3186,7 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan var blkHeight int32 if blkHash := rtx.blkHash; blkHash != nil { // Fetch the header from chain. - header, err := s.chain.FetchHeader(blkHash) + header, err := s.cfg.Chain.FetchHeader(blkHash) if err != nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCBlockNotFound, @@ -3187,7 +3195,7 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan } // Get the block height from chain. - height, err := s.chain.BlockHeightByHash(blkHash) + height, err := s.cfg.Chain.BlockHeightByHash(blkHash) if err != nil { context := "Failed to obtain block height" return nil, internalRPCError(err.Error(), context) @@ -3235,8 +3243,7 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st // Use 0 for the tag to represent local node. tx := btcutil.NewTx(&msgTx) - acceptedTxs, err := s.server.txMemPool.ProcessTransaction(tx, false, - false, 0) + acceptedTxs, err := s.cfg.TxMemPool.ProcessTransaction(tx, false, false, 0) if err != nil { // When the error is a rule error, it means the transaction was // simply rejected as opposed to something actually going wrong, @@ -3265,20 +3272,27 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st // Also, since an error is being returned to the caller, ensure the // transaction is removed from the memory pool. if len(acceptedTxs) == 0 || !acceptedTxs[0].Tx.Hash().IsEqual(tx.Hash()) { - s.server.txMemPool.RemoveTransaction(tx, true) + s.cfg.TxMemPool.RemoveTransaction(tx, true) errStr := fmt.Sprintf("transaction %v is not in accepted list", tx.Hash()) return nil, internalRPCError(errStr, "") } - s.server.AnnounceNewTransactions(acceptedTxs) + // Generate and relay inventory vectors for all newly accepted + // transactions into the memory pool due to the original being + // accepted. + s.cfg.ConnMgr.RelayTransactions(acceptedTxs) + + // Notify both websocket and getblocktemplate long poll clients of all + // newly accepted transactions. + s.NotifyNewTransactions(acceptedTxs) // Keep track of all the sendrawtransaction request txns so that they // can be rebroadcast if they don't make their way into a block. txD := acceptedTxs[0] iv := wire.NewInvVect(wire.InvTypeTx, txD.Tx.Hash()) - s.server.AddRebroadcastInventory(iv, txD) + s.cfg.ConnMgr.AddRebroadcastInventory(iv, txD) return tx.Hash().String(), nil } @@ -3300,7 +3314,7 @@ func handleSetGenerate(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) } if !generate { - s.server.cpuMiner.Stop() + s.cfg.CPUMiner.Stop() } else { // Respond with an error if there are no addresses to pay the // created blocks to. @@ -3313,8 +3327,8 @@ func handleSetGenerate(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) } // It's safe to call start even if it's already started. - s.server.cpuMiner.SetNumWorkers(int32(genProcLimit)) - s.server.cpuMiner.Start() + s.cfg.CPUMiner.SetNumWorkers(int32(genProcLimit)) + s.cfg.CPUMiner.Start() } return nil, nil } @@ -3350,7 +3364,9 @@ func handleSubmitBlock(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) } } - _, err = s.server.blockManager.ProcessBlock(block, blockchain.BFNone) + // Process this block using the same rules as blocks coming from other + // nodes. This will in turn relay it to the network like normal. + _, err = s.cfg.SyncMgr.SubmitBlock(block, blockchain.BFNone) if err != nil { return fmt.Sprintf("rejected: %s", err.Error()), nil } @@ -3361,7 +3377,7 @@ func handleSubmitBlock(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) // handleUptime implements the uptime command. func handleUptime(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - return time.Now().Unix() - s.server.startupTime, nil + return time.Now().Unix() - s.cfg.StartupTime, nil } // handleValidateAddress implements the validateaddress command. @@ -3382,7 +3398,7 @@ func handleValidateAddress(s *rpcServer, cmd interface{}, closeChan <-chan struc } func verifyChain(s *rpcServer, level, depth int32) error { - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() finishHeight := best.Height - depth if finishHeight < 0 { finishHeight = 0 @@ -3392,7 +3408,7 @@ func verifyChain(s *rpcServer, level, depth int32) error { for height := best.Height; height > finishHeight; height-- { // Level 0 just looks up the block. - block, err := s.chain.BlockByHeight(height) + block, err := s.cfg.Chain.BlockByHeight(height) if err != nil { rpcsLog.Errorf("Verify is unable to fetch block at "+ "height %d: %v", height, err) @@ -3402,7 +3418,7 @@ func verifyChain(s *rpcServer, level, depth int32) error { // Level 1 does basic chain sanity checks. if level > 0 { err := blockchain.CheckBlockSanity(block, - activeNetParams.PowLimit, s.server.timeSource) + s.cfg.ChainParams.PowLimit, s.cfg.TimeSource) if err != nil { rpcsLog.Errorf("Verify is unable to validate "+ "block at hash %v height %d: %v", @@ -3497,8 +3513,7 @@ func handleVerifyMessage(s *rpcServer, cmd interface{}, closeChan <-chan struct{ // handleVersion implements the version command. // -// NOTE: This is a btcsuite extension ported from -// github.com/decred/dcrd. +// NOTE: This is a btcsuite extension ported from github.com/decred/dcrd. func handleVersion(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { result := map[string]btcjson.VersionResult{ "btcdjsonrpcapi": { @@ -3511,14 +3526,11 @@ func handleVersion(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (in return result, nil } -// rpcServer holds the items the rpc server may need to access (config, -// shutdown, main server, etc.) +// rpcServer provides a concurrent safe RPC server to a chain server. type rpcServer struct { started int32 shutdown int32 - generator *mining.BlkTmplGenerator - server *server - chain *blockchain.BlockChain + cfg rpcserverConfig authsha [sha256.Size]byte limitauthsha [sha256.Size]byte ntfnMgr *wsNotificationManager @@ -3617,6 +3629,20 @@ func (s *rpcServer) RequestedProcessShutdown() <-chan struct{} { return s.requestProcessShutdown } +// NotifyNewTransactions notifies both websocket and getblocktemplate long +// poll clients of the passed transactions. This function should be called +// whenever new transactions are added to the mempool. +func (s *rpcServer) NotifyNewTransactions(txns []*mempool.TxDesc) { + for _, txD := range txns { + // Notify websocket clients about mempool transactions. + s.ntfnMgr.NotifyMempoolTx(txD.Tx, true) + + // Potentially notify any getblocktemplate long poll clients + // about stale block templates due to the new transaction. + s.gbtWorkState.NotifyMempoolTx(s.cfg.TxMemPool.LastUpdated()) + } +} + // limitConnections responds with a 503 service unavailable and returns true if // adding another client would exceed the maximum allow RPC clients. // @@ -4013,14 +4039,163 @@ func genCertPair(certFile, keyFile string) error { return nil } +// rpcserverPeer represents a peer for use with the RPC server. +// +// The interface contract requires that all of these methods are safe for +// concurrent access. +type rpcserverPeer interface { + // ToPeer returns the underlying peer instance. + ToPeer() *peer.Peer + + // IsTxRelayDisabled returns whether or not the peer has disabled + // transaction relay. + IsTxRelayDisabled() bool + + // BanScore returns the current integer value that represents how close + // the peer is to being banned. + BanScore() uint32 + + // FeeFilter returns the requested current minimum fee rate for which + // transactions should be announced. + FeeFilter() int64 +} + +// rpcserverConnManager represents a connection manager for use with the RPC +// server. +// +// The interface contract requires that all of these methods are safe for +// concurrent access. +type rpcserverConnManager interface { + // Connect adds the provided address as a new outbound peer. The + // permanent flag indicates whether or not to make the peer persistent + // and reconnect if the connection is lost. Attempting to connect to an + // already existing peer will return an error. + Connect(addr string, permanent bool) error + + // RemoveByID removes the peer associated with the provided id from the + // list of persistent peers. Attempting to remove an id that does not + // exist will return an error. + RemoveByID(id int32) error + + // RemoveByAddr removes the peer associated with the provided address + // from the list of persistent peers. Attempting to remove an address + // that does not exist will return an error. + RemoveByAddr(addr string) error + + // DisconnectByID disconnects the peer associated with the provided id. + // This applies to both inbound and outbound peers. Attempting to + // remove an id that does not exist will return an error. + DisconnectByID(id int32) error + + // DisconnectByAddr disconnects the peer associated with the provided + // address. This applies to both inbound and outbound peers. + // Attempting to remove an address that does not exist will return an + // error. + DisconnectByAddr(addr string) error + + // ConnectedCount returns the number of currently connected peers. + ConnectedCount() int32 + + // NetTotals returns the sum of all bytes received and sent across the + // network for all peers. + NetTotals() (uint64, uint64) + + // ConnectedPeers returns an array consisting of all connected peers. + ConnectedPeers() []rpcserverPeer + + // PersistentPeers returns an array consisting of all the persistent + // peers. + PersistentPeers() []rpcserverPeer + + // BroadcastMessage sends the provided message to all currently + // connected peers. + BroadcastMessage(msg wire.Message) + + // AddRebroadcastInventory adds the provided inventory to the list of + // inventories to be rebroadcast at random intervals until they show up + // in a block. + AddRebroadcastInventory(iv *wire.InvVect, data interface{}) + + // RelayTransactions generates and relays inventory vectors for all of + // the passed transactions to all connected peers. + RelayTransactions(txns []*mempool.TxDesc) +} + +// rpcserverSyncManager represents a sync manager for use with the RPC server. +// +// The interface contract requires that all of these methods are safe for +// concurrent access. +type rpcserverSyncManager interface { + // IsCurrent returns whether or not the sync manager believes the chain + // is current as compared to the rest of the network. + IsCurrent() bool + + // SubmitBlock submits the provided block to the network after + // processing it locally. + SubmitBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) + + // Pause pauses the sync manager until the returned channel is closed. + Pause() chan<- struct{} + + // SyncPeer returns the peer that is currently the peer being used to + // sync from. + SyncPeer() rpcserverPeer + + // LocateBlocks returns the hashes of the blocks after the first known + // block in the provided locators until the provided stop hash or the + // current tip is reached, up to a max of wire.MaxBlockHeadersPerMsg + // hashes. + LocateBlocks(locators []*chainhash.Hash, hashStop *chainhash.Hash) ([]chainhash.Hash, error) +} + +// rpcserverConfig is a descriptor containing the RPC server configuration. +type rpcserverConfig struct { + // ListenAddrs are the addresses the RPC server should listen on. + ListenAddrs []string + + // StartupTime is the unix timestamp for when the server that is hosting + // the RPC server started. + StartupTime int64 + + // ConnMgr defines the connection manager for the RPC server to use. It + // provides the RPC server with a means to do things such as add, + // remove, connect, disconnect, and query peers as well as other + // connection-related data and tasks. + ConnMgr rpcserverConnManager + + // SyncMgr defines the sync manager for the RPC server to use. + SyncMgr rpcserverSyncManager + + // These fields allow the RPC server to interface with the local block + // chain data and state. + TimeSource blockchain.MedianTimeSource + Chain *blockchain.BlockChain + ChainParams *chaincfg.Params + DB database.DB + + // TxMemPool defines the transaction memory pool to interact with. + TxMemPool *mempool.TxPool + + // These fields allow the RPC server to interface with mining. + // + // Generator produces block templates and the CPUMiner solves them using + // the CPU. CPU mining is typically only useful for test purposes when + // doing regression or simulation testing. + Generator *mining.BlkTmplGenerator + CPUMiner *cpuminer.CPUMiner + + // These fields define any optional indexes the RPC server can make use + // of to provide additional data when queried. + TxIndex *indexers.TxIndex + AddrIndex *indexers.AddrIndex +} + // newRPCServer returns a new instance of the rpcServer struct. -func newRPCServer(listenAddrs []string, generator *mining.BlkTmplGenerator, s *server) (*rpcServer, error) { +func newRPCServer(config *rpcserverConfig) (*rpcServer, error) { rpc := rpcServer{ - server: s, - generator: generator, - chain: s.blockManager.chain, + cfg: *config, statusLines: make(map[int]string), - gbtWorkState: newGbtWorkState(s.timeSource), + gbtWorkState: newGbtWorkState(config.TimeSource), helpCacher: newHelpCacher(), requestProcessShutdown: make(chan struct{}), quit: make(chan int), @@ -4066,7 +4241,7 @@ func newRPCServer(listenAddrs []string, generator *mining.BlkTmplGenerator, s *s // TODO: this code is similar to that in server, should be // factored into something shared. - ipv4ListenAddrs, ipv6ListenAddrs, _, err := parseListeners(listenAddrs) + ipv4ListenAddrs, ipv6ListenAddrs, _, err := parseListeners(config.ListenAddrs) if err != nil { return nil, err } @@ -4095,7 +4270,7 @@ func newRPCServer(listenAddrs []string, generator *mining.BlkTmplGenerator, s *s rpc.listeners = listeners - s.chain.Subscribe(rpc.handleBlockchainNotification) + rpc.cfg.Chain.Subscribe(rpc.handleBlockchainNotification) return &rpc, nil } diff --git a/rpcwebsocket.go b/rpcwebsocket.go index ecfcc38b..d787050d 100644 --- a/rpcwebsocket.go +++ b/rpcwebsocket.go @@ -656,7 +656,7 @@ func (m *wsNotificationManager) subscribedClients(tx *btcutil.Tx, for i, output := range msgTx.TxOut { _, addrs, _, err := txscript.ExtractPkScriptAddrs( - output.PkScript, m.server.server.chainParams) + output.PkScript, m.server.cfg.ChainParams) if err != nil { // Clients are not able to subscribe to // nonstandard or non-address outputs. @@ -846,7 +846,7 @@ func (m *wsNotificationManager) notifyForNewTx(clients map[chan struct{}]*wsClie continue } - net := m.server.server.chainParams + net := m.server.cfg.ChainParams rawTx, err := createTxRawResult(net, mtx, txHashStr, nil, "", 0, 0) if err != nil { @@ -983,7 +983,7 @@ func (m *wsNotificationManager) notifyForTxOuts(ops map[wire.OutPoint]map[chan s wscNotified := make(map[chan struct{}]struct{}) for i, txOut := range tx.MsgTx().TxOut { _, txAddrs, _, err := txscript.ExtractPkScriptAddrs( - txOut.PkScript, m.server.server.chainParams) + txOut.PkScript, m.server.cfg.ChainParams) if err != nil { continue } @@ -2029,7 +2029,7 @@ func rescanBlock(wsc *wsClient, lookups *rescanKeys, blk *btcutil.Block) { for txOutIdx, txout := range tx.MsgTx().TxOut { _, addrs, _, _ := txscript.ExtractPkScriptAddrs( - txout.PkScript, wsc.server.server.chainParams) + txout.PkScript, wsc.server.cfg.ChainParams) for _, addr := range addrs { switch a := addr.(type) { @@ -2218,7 +2218,7 @@ func handleRescanBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) { // Iterate over each block in the request and rescan. When a block // contains relevant transactions, add it to the response. - bc := wsc.server.server.blockManager.chain + bc := wsc.server.cfg.Chain var lastBlockHash *chainhash.Hash for i := range blockHashes { block, err := bc.BlockByHash(blockHashes[i]) @@ -2389,7 +2389,7 @@ func handleRescan(wsc *wsClient, icmd interface{}) (interface{}, error) { lookups.unspent[*outpoint] = struct{}{} } - chain := wsc.server.chain + chain := wsc.server.cfg.Chain minBlockHash, err := chainhash.NewHashFromStr(cmd.BeginBlock) if err != nil { @@ -2469,9 +2469,8 @@ fetchRange: // continuous notifications if necessary. Otherwise, // continue the fetch loop again to rescan the new // blocks (or error due to an irrecoverable reorganize). - blockManager := wsc.server.server.blockManager - pauseGuard := blockManager.Pause() - best := blockManager.chain.BestSnapshot() + pauseGuard := wsc.server.cfg.SyncMgr.Pause() + best := wsc.server.cfg.Chain.BestSnapshot() curHash := &best.Hash again := true if lastBlockHash == nil || *lastBlockHash == *curHash { diff --git a/server.go b/server.go index a408348f..888ccb55 100644 --- a/server.go +++ b/server.go @@ -1072,28 +1072,28 @@ func (s *server) RemoveRebroadcastInventory(iv *wire.InvVect) { s.modifyRebroadcastInv <- broadcastInventoryDel(iv) } +// relayTransactions generates and relays inventory vectors for all of the +// passed transactions to all connected peers. +func (s *server) relayTransactions(txns []*mempool.TxDesc) { + for _, txD := range txns { + iv := wire.NewInvVect(wire.InvTypeTx, txD.Tx.Hash()) + s.RelayInventory(iv, txD) + } +} + // AnnounceNewTransactions generates and relays inventory vectors and notifies // both websocket and getblocktemplate long poll clients of the passed // transactions. This function should be called whenever new transactions // are added to the mempool. -func (s *server) AnnounceNewTransactions(newTxs []*mempool.TxDesc) { +func (s *server) AnnounceNewTransactions(txns []*mempool.TxDesc) { // Generate and relay inventory vectors for all newly accepted - // transactions into the memory pool due to the original being - // accepted. - for _, txD := range newTxs { - // Generate the inventory vector and relay it. - iv := wire.NewInvVect(wire.InvTypeTx, txD.Tx.Hash()) - s.RelayInventory(iv, txD) + // transactions. + s.relayTransactions(txns) - if s.rpcServer != nil { - // Notify websocket clients about mempool transactions. - s.rpcServer.ntfnMgr.NotifyMempoolTx(txD.Tx, true) - - // Potentially notify any getblocktemplate long poll clients - // about stale block templates due to the new transaction. - s.rpcServer.gbtWorkState.NotifyMempoolTx( - s.txMemPool.LastUpdated()) - } + // Notify both websocket and getblocktemplate long poll clients of all + // newly accepted transactions. + if s.rpcServer != nil { + s.rpcServer.NotifyNewTransactions(txns) } } @@ -1892,88 +1892,6 @@ func (s *server) OutboundGroupCount(key string) int { return <-replyChan } -// AddedNodeInfo returns an array of btcjson.GetAddedNodeInfoResult structures -// describing the persistent (added) nodes. -func (s *server) AddedNodeInfo() []*serverPeer { - replyChan := make(chan []*serverPeer) - s.query <- getAddedNodesMsg{reply: replyChan} - return <-replyChan -} - -// Peers returns an array of all connected peers. -func (s *server) Peers() []*serverPeer { - replyChan := make(chan []*serverPeer) - - s.query <- getPeersMsg{reply: replyChan} - - return <-replyChan -} - -// DisconnectNodeByAddr disconnects a peer by target address. Both outbound and -// inbound nodes will be searched for the target node. An error message will -// be returned if the peer was not found. -func (s *server) DisconnectNodeByAddr(addr string) error { - replyChan := make(chan error) - - s.query <- disconnectNodeMsg{ - cmp: func(sp *serverPeer) bool { return sp.Addr() == addr }, - reply: replyChan, - } - - return <-replyChan -} - -// DisconnectNodeByID disconnects a peer by target node id. Both outbound and -// inbound nodes will be searched for the target node. An error message will be -// returned if the peer was not found. -func (s *server) DisconnectNodeByID(id int32) error { - replyChan := make(chan error) - - s.query <- disconnectNodeMsg{ - cmp: func(sp *serverPeer) bool { return sp.ID() == id }, - reply: replyChan, - } - - return <-replyChan -} - -// RemoveNodeByAddr removes a peer from the list of persistent peers if -// present. An error will be returned if the peer was not found. -func (s *server) RemoveNodeByAddr(addr string) error { - replyChan := make(chan error) - - s.query <- removeNodeMsg{ - cmp: func(sp *serverPeer) bool { return sp.Addr() == addr }, - reply: replyChan, - } - - return <-replyChan -} - -// RemoveNodeByID removes a peer by node ID from the list of persistent peers -// if present. An error will be returned if the peer was not found. -func (s *server) RemoveNodeByID(id int32) error { - replyChan := make(chan error) - - s.query <- removeNodeMsg{ - cmp: func(sp *serverPeer) bool { return sp.ID() == id }, - reply: replyChan, - } - - return <-replyChan -} - -// ConnectNode adds `addr' as a new outbound peer. If permanent is true then the -// peer will be persistent and reconnect if the connection is lost. -// It is an error to call this with an already existing peer. -func (s *server) ConnectNode(addr string, permanent bool) error { - replyChan := make(chan error) - - s.query <- connectNodeMsg{addr: addr, permanent: permanent, reply: replyChan} - - return <-replyChan -} - // AddBytesSent adds the passed number of bytes to the total bytes sent counter // for the server. It is safe for concurrent access. func (s *server) AddBytesSent(bytesSent uint64) { @@ -2625,8 +2543,21 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param } if !cfg.DisableRPC { - s.rpcServer, err = newRPCServer(cfg.RPCListeners, - blockTemplateGenerator, &s) + s.rpcServer, err = newRPCServer(&rpcserverConfig{ + ListenAddrs: cfg.RPCListeners, + StartupTime: s.startupTime, + ConnMgr: &rpcConnManager{&s}, + SyncMgr: &rpcSyncMgr{&s, s.blockManager}, + TimeSource: s.timeSource, + Chain: s.blockManager.chain, + ChainParams: chainParams, + DB: db, + TxMemPool: s.txMemPool, + Generator: blockTemplateGenerator, + CPUMiner: s.cpuMiner, + TxIndex: s.txIndex, + AddrIndex: s.addrIndex, + }) if err != nil { return nil, err }