diff --git a/rpcadaptors.go b/rpcadaptors.go new file mode 100644 index 00000000..eebf47d8 --- /dev/null +++ b/rpcadaptors.go @@ -0,0 +1,277 @@ +// Copyright (c) 2017 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main + +import ( + "sync/atomic" + + "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/mempool" + "github.com/btcsuite/btcd/peer" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" +) + +// rpcPeer provides a peer for use with the RPC server and implements the +// rpcserverPeer interface. +type rpcPeer serverPeer + +// Ensure rpcPeer implements the rpcserverPeer interface. +var _ rpcserverPeer = (*rpcPeer)(nil) + +// ToPeer returns the underlying peer instance. +// +// This function is safe for concurrent access and is part of the rpcserverPeer +// interface implementation. +func (p *rpcPeer) ToPeer() *peer.Peer { + if p == nil { + return nil + } + return (*serverPeer)(p).Peer +} + +// IsTxRelayDisabled returns whether or not the peer has disabled transaction +// relay. +// +// This function is safe for concurrent access and is part of the rpcserverPeer +// interface implementation. +func (p *rpcPeer) IsTxRelayDisabled() bool { + return (*serverPeer)(p).disableRelayTx +} + +// BanScore returns the current integer value that represents how close the peer +// is to being banned. +// +// This function is safe for concurrent access and is part of the rpcserverPeer +// interface implementation. +func (p *rpcPeer) BanScore() uint32 { + return (*serverPeer)(p).banScore.Int() +} + +// FeeFilter returns the requested current minimum fee rate for which +// transactions should be announced. +// +// This function is safe for concurrent access and is part of the rpcserverPeer +// interface implementation. +func (p *rpcPeer) FeeFilter() int64 { + return atomic.LoadInt64(&(*serverPeer)(p).feeFilter) +} + +// rpcConnManager provides a connection manager for use with the RPC server and +// implements the rpcserverConnManager interface. +type rpcConnManager struct { + server *server +} + +// Ensure rpcConnManager implements the rpcserverConnManager interface. +var _ rpcserverConnManager = &rpcConnManager{} + +// Connect adds the provided address as a new outbound peer. The permanent flag +// indicates whether or not to make the peer persistent and reconnect if the +// connection is lost. Attempting to connect to an already existing peer will +// return an error. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) Connect(addr string, permanent bool) error { + replyChan := make(chan error) + cm.server.query <- connectNodeMsg{ + addr: addr, + permanent: permanent, + reply: replyChan, + } + return <-replyChan +} + +// RemoveByID removes the peer associated with the provided id from the list of +// persistent peers. Attempting to remove an id that does not exist will return +// an error. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) RemoveByID(id int32) error { + replyChan := make(chan error) + cm.server.query <- removeNodeMsg{ + cmp: func(sp *serverPeer) bool { return sp.ID() == id }, + reply: replyChan, + } + return <-replyChan +} + +// RemoveByAddr removes the peer associated with the provided address from the +// list of persistent peers. Attempting to remove an address that does not +// exist will return an error. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) RemoveByAddr(addr string) error { + replyChan := make(chan error) + cm.server.query <- removeNodeMsg{ + cmp: func(sp *serverPeer) bool { return sp.Addr() == addr }, + reply: replyChan, + } + return <-replyChan +} + +// DisconnectByID disconnects the peer associated with the provided id. This +// applies to both inbound and outbound peers. Attempting to remove an id that +// does not exist will return an error. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) DisconnectByID(id int32) error { + replyChan := make(chan error) + cm.server.query <- disconnectNodeMsg{ + cmp: func(sp *serverPeer) bool { return sp.ID() == id }, + reply: replyChan, + } + return <-replyChan +} + +// DisconnectByAddr disconnects the peer associated with the provided address. +// This applies to both inbound and outbound peers. Attempting to remove an +// address that does not exist will return an error. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) DisconnectByAddr(addr string) error { + replyChan := make(chan error) + cm.server.query <- disconnectNodeMsg{ + cmp: func(sp *serverPeer) bool { return sp.Addr() == addr }, + reply: replyChan, + } + return <-replyChan +} + +// ConnectedCount returns the number of currently connected peers. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) ConnectedCount() int32 { + return cm.server.ConnectedCount() +} + +// NetTotals returns the sum of all bytes received and sent across the network +// for all peers. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) NetTotals() (uint64, uint64) { + return cm.server.NetTotals() +} + +// ConnectedPeers returns an array consisting of all connected peers. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) ConnectedPeers() []rpcserverPeer { + replyChan := make(chan []*serverPeer) + cm.server.query <- getPeersMsg{reply: replyChan} + serverPeers := <-replyChan + + // Convert to RPC server peers. + peers := make([]rpcserverPeer, 0, len(serverPeers)) + for _, sp := range serverPeers { + peers = append(peers, (*rpcPeer)(sp)) + } + return peers +} + +// PersistentPeers returns an array consisting of all the added persistent +// peers. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) PersistentPeers() []rpcserverPeer { + replyChan := make(chan []*serverPeer) + cm.server.query <- getAddedNodesMsg{reply: replyChan} + serverPeers := <-replyChan + + // Convert to generic peers. + peers := make([]rpcserverPeer, 0, len(serverPeers)) + for _, sp := range serverPeers { + peers = append(peers, (*rpcPeer)(sp)) + } + return peers +} + +// BroadcastMessage sends the provided message to all currently connected peers. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) BroadcastMessage(msg wire.Message) { + cm.server.BroadcastMessage(msg) +} + +// AddRebroadcastInventory adds the provided inventory to the list of +// inventories to be rebroadcast at random intervals until they show up in a +// block. +// +// This function is safe for concurrent access and is part of the +// rpcserverConnManager interface implementation. +func (cm *rpcConnManager) AddRebroadcastInventory(iv *wire.InvVect, data interface{}) { + cm.server.AddRebroadcastInventory(iv, data) +} + +// RelayTransactions generates and relays inventory vectors for all of the +// passed transactions to all connected peers. +func (cm *rpcConnManager) RelayTransactions(txns []*mempool.TxDesc) { + cm.server.relayTransactions(txns) +} + +// rpcSyncMgr provides a block manager for use with the RPC server and +// implements the rpcserverSyncManager interface. +type rpcSyncMgr struct { + server *server + blockMgr *blockManager +} + +// Ensure rpcSyncMgr implements the rpcserverSyncManager interface. +var _ rpcserverSyncManager = (*rpcSyncMgr)(nil) + +// IsCurrent returns whether or not the sync manager believes the chain is +// current as compared to the rest of the network. +// +// This function is safe for concurrent access and is part of the +// rpcserverSyncManager interface implementation. +func (b *rpcSyncMgr) IsCurrent() bool { + return b.blockMgr.IsCurrent() +} + +// SubmitBlock submits the provided block to the network after processing it +// locally. +// +// This function is safe for concurrent access and is part of the +// rpcserverSyncManager interface implementation. +func (b *rpcSyncMgr) SubmitBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) { + return b.blockMgr.ProcessBlock(block, flags) +} + +// Pause pauses the sync manager until the returned channel is closed. +// +// This function is safe for concurrent access and is part of the +// rpcserverSyncManager interface implementation. +func (b *rpcSyncMgr) Pause() chan<- struct{} { + return b.blockMgr.Pause() +} + +// SyncPeer returns the peer that is currently the peer being used to sync from. +// +// This function is safe for concurrent access and is part of the +// rpcserverSyncManager interface implementation. +func (b *rpcSyncMgr) SyncPeer() rpcserverPeer { + return (*rpcPeer)(b.blockMgr.SyncPeer()) +} + +// LocateBlocks returns the hashes of the blocks after the first known block in +// the provided locators until the provided stop hash or the current tip is +// reached, up to a max of wire.MaxBlockHeadersPerMsg hashes. +// +// This function is safe for concurrent access and is part of the +// rpcserverSyncManager interface implementation. +func (b *rpcSyncMgr) LocateBlocks(locators []*chainhash.Hash, hashStop *chainhash.Hash) ([]chainhash.Hash, error) { + return b.server.locateBlocks(locators, hashStop) +} diff --git a/rpcserver.go b/rpcserver.go index f8b8ecf2..197b6ab0 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -29,6 +29,7 @@ import ( "time" "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/blockchain/indexers" "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/chaincfg" @@ -36,6 +37,8 @@ import ( "github.com/btcsuite/btcd/database" "github.com/btcsuite/btcd/mempool" "github.com/btcsuite/btcd/mining" + "github.com/btcsuite/btcd/mining/cpuminer" + "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" @@ -360,11 +363,11 @@ func handleAddNode(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (in var err error switch c.SubCmd { case "add": - err = s.server.ConnectNode(addr, true) + err = s.cfg.ConnMgr.Connect(addr, true) case "remove": - err = s.server.RemoveNodeByAddr(addr) + err = s.cfg.ConnMgr.RemoveByAddr(addr) case "onetry": - err = s.server.ConnectNode(addr, false) + err = s.cfg.ConnMgr.Connect(addr, false) default: return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCInvalidParameter, @@ -396,11 +399,11 @@ func handleNode(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter // attempt to disconnect by address, returning an error if a // valid IP address is not supplied. if nodeID, errN = strconv.ParseUint(c.Target, 10, 32); errN == nil { - err = s.server.DisconnectNodeByID(int32(nodeID)) + err = s.cfg.ConnMgr.DisconnectByID(int32(nodeID)) } else { if _, _, errP := net.SplitHostPort(c.Target); errP == nil || net.ParseIP(c.Target) != nil { addr = normalizeAddress(c.Target, activeNetParams.DefaultPort) - err = s.server.DisconnectNodeByAddr(addr) + err = s.cfg.ConnMgr.DisconnectByAddr(addr) } else { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCInvalidParameter, @@ -408,22 +411,24 @@ func handleNode(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter } } } - if err != nil && peerExists(s.server.Peers(), addr, int32(nodeID)) { + if err != nil && peerExists(s.cfg.ConnMgr, addr, int32(nodeID)) { + return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCMisc, Message: "can't disconnect a permanent peer, use remove", } } + case "remove": // If we have a valid uint disconnect by node id. Otherwise, // attempt to disconnect by address, returning an error if a // valid IP address is not supplied. if nodeID, errN = strconv.ParseUint(c.Target, 10, 32); errN == nil { - err = s.server.RemoveNodeByID(int32(nodeID)) + err = s.cfg.ConnMgr.RemoveByID(int32(nodeID)) } else { if _, _, errP := net.SplitHostPort(c.Target); errP == nil || net.ParseIP(c.Target) != nil { addr = normalizeAddress(c.Target, activeNetParams.DefaultPort) - err = s.server.RemoveNodeByAddr(addr) + err = s.cfg.ConnMgr.RemoveByAddr(addr) } else { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCInvalidParameter, @@ -431,12 +436,13 @@ func handleNode(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter } } } - if err != nil && peerExists(s.server.Peers(), addr, int32(nodeID)) { + if err != nil && peerExists(s.cfg.ConnMgr, addr, int32(nodeID)) { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCMisc, Message: "can't remove a temporary peer, use disconnect", } } + case "connect": addr = normalizeAddress(c.Target, activeNetParams.DefaultPort) @@ -448,7 +454,7 @@ func handleNode(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter switch subCmd { case "perm", "temp": - err = s.server.ConnectNode(addr, subCmd == "perm") + err = s.cfg.ConnMgr.Connect(addr, subCmd == "perm") default: return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCInvalidParameter, @@ -476,9 +482,9 @@ func handleNode(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter // peerExists determines if a certain peer is currently connected given // information about all currently connected peers. Peer existence is // determined using either a target address or node id. -func peerExists(peers []*serverPeer, addr string, nodeID int32) bool { - for _, p := range peers { - if p.ID() == nodeID || p.Addr() == addr { +func peerExists(connMgr rpcserverConnManager, addr string, nodeID int32) bool { + for _, p := range connMgr.ConnectedPeers() { + if p.ToPeer().ID() == nodeID || p.ToPeer().Addr() == addr { return true } } @@ -560,7 +566,7 @@ func handleCreateRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan Message: "Invalid address or key", } } - if !addr.IsForNet(s.server.chainParams) { + if !addr.IsForNet(s.cfg.ChainParams) { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCInvalidAddressOrKey, Message: "Invalid address: " + encodedAddr + @@ -789,7 +795,7 @@ func handleDecodeRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan Version: mtx.Version, Locktime: mtx.LockTime, Vin: createVinList(&mtx), - Vout: createVoutList(&mtx, s.server.chainParams, nil), + Vout: createVoutList(&mtx, s.cfg.ChainParams, nil), } return txReply, nil } @@ -816,14 +822,14 @@ func handleDecodeScript(s *rpcServer, cmd interface{}, closeChan <-chan struct{} // Ignore the error here since an error means the script couldn't parse // and there is no additinal information about it anyways. scriptClass, addrs, reqSigs, _ := txscript.ExtractPkScriptAddrs(script, - s.server.chainParams) + s.cfg.ChainParams) addresses := make([]string, len(addrs)) for i, addr := range addrs { addresses[i] = addr.EncodeAddress() } // Convert the script itself to a pay-to-script-hash address. - p2sh, err := btcutil.NewAddressScriptHash(script, s.server.chainParams) + p2sh, err := btcutil.NewAddressScriptHash(script, s.cfg.ChainParams) if err != nil { context := "Failed to convert script to pay-to-script-hash" return nil, internalRPCError(err.Error(), context) @@ -856,14 +862,13 @@ func handleGenerate(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i // Respond with an error if there's virtually 0 chance of mining a block // with the CPU. - params := s.server.chainParams - if !s.server.chainParams.GenerateSupported { + if !s.cfg.ChainParams.GenerateSupported { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCDifficulty, Message: fmt.Sprintf("No support for `generate` on "+ "the current network, %s, as it's unlikely to "+ "be possible to main a block with the CPU.", - params.Net), + s.cfg.ChainParams.Net), } } @@ -880,7 +885,7 @@ func handleGenerate(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i // Create a reply reply := make([]string, c.NumBlocks) - blockHashes, err := s.server.cpuMiner.GenerateNBlocks(c.NumBlocks) + blockHashes, err := s.cfg.CPUMiner.GenerateNBlocks(c.NumBlocks) if err != nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCInternal.Code, @@ -901,14 +906,14 @@ func handleGenerate(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i func handleGetAddedNodeInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.GetAddedNodeInfoCmd) - // Retrieve a list of persistent (added) peers from the bitcoin server - // and filter the list of peers per the specified address (if any). - peers := s.server.AddedNodeInfo() + // Retrieve a list of persistent (added) peers from the server and + // filter the list of peers per the specified address (if any). + peers := s.cfg.ConnMgr.PersistentPeers() if c.Node != nil { node := *c.Node found := false for i, peer := range peers { - if peer.Addr() == node { + if peer.ToPeer().Addr() == node { peers = peers[i : i+1] found = true } @@ -926,7 +931,7 @@ func handleGetAddedNodeInfo(s *rpcServer, cmd interface{}, closeChan <-chan stru if !c.DNS { results := make([]string, 0, len(peers)) for _, peer := range peers { - results = append(results, peer.Addr()) + results = append(results, peer.ToPeer().Addr()) } return results, nil } @@ -934,9 +939,10 @@ func handleGetAddedNodeInfo(s *rpcServer, cmd interface{}, closeChan <-chan stru // With the dns flag, the result is an array of JSON objects which // include the result of DNS lookups for each peer. results := make([]*btcjson.GetAddedNodeInfoResult, 0, len(peers)) - for _, peer := range peers { + for _, rpcPeer := range peers { // Set the "address" of the peer which could be an ip address // or a domain name. + peer := rpcPeer.ToPeer() var result btcjson.GetAddedNodeInfoResult result.AddedNode = peer.Addr() result.Connected = btcjson.Bool(peer.Connected()) @@ -991,7 +997,7 @@ func handleGetBestBlock(s *rpcServer, cmd interface{}, closeChan <-chan struct{} // All other "get block" commands give either the height, the // hash, or both but require the block SHA. This gets both for // the best block. - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() result := &btcjson.GetBestBlockResult{ Hash: best.Hash.String(), Height: best.Height, @@ -1001,7 +1007,7 @@ func handleGetBestBlock(s *rpcServer, cmd interface{}, closeChan <-chan struct{} // handleGetBestBlockHash implements the getbestblockhash command. func handleGetBestBlockHash(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() return best.Hash.String(), nil } @@ -1035,7 +1041,7 @@ func handleGetBlock(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i return nil, rpcDecodeHexError(c.Hash) } var blkBytes []byte - err = s.server.db.View(func(dbTx database.Tx) error { + err = s.cfg.DB.View(func(dbTx database.Tx) error { var err error blkBytes, err = dbTx.FetchBlock(hash) return err @@ -1063,18 +1069,18 @@ func handleGetBlock(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i } // Get the block height from chain. - blockHeight, err := s.chain.BlockHeightByHash(hash) + blockHeight, err := s.cfg.Chain.BlockHeightByHash(hash) if err != nil { context := "Failed to obtain block height" return nil, internalRPCError(err.Error(), context) } blk.SetHeight(blockHeight) - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() // Get next block hash unless there are none. var nextHashString string if blockHeight < best.Height { - nextHash, err := s.chain.BlockHashByHeight(blockHeight + 1) + nextHash, err := s.cfg.Chain.BlockHashByHeight(blockHeight + 1) if err != nil { context := "No next block" return nil, internalRPCError(err.Error(), context) @@ -1113,7 +1119,7 @@ func handleGetBlock(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i txns := blk.Transactions() rawTxns := make([]btcjson.TxRawResult, len(txns)) for i, tx := range txns { - rawTxn, err := createTxRawResult(s.server.chainParams, + rawTxn, err := createTxRawResult(s.cfg.ChainParams, tx.MsgTx(), tx.Hash().String(), blockHeader, hash.String(), blockHeight, best.Height) if err != nil { @@ -1148,10 +1154,10 @@ func softForkStatus(state blockchain.ThresholdState) (string, error) { // handleGetBlockChainInfo implements the getblockchaininfo command. func handleGetBlockChainInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - // Obtain a snapshot of the current best known blockchain state. We'll // populate the response to this call primarily from this snapshot. - chainSnapshot := s.chain.BestSnapshot() + chain := s.cfg.Chain + chainSnapshot := chain.BestSnapshot() chainInfo := &btcjson.GetBlockChainInfoResult{ Chain: activeNetParams.Name, @@ -1224,7 +1230,7 @@ func handleGetBlockChainInfo(s *rpcServer, cmd interface{}, closeChan <-chan str // Query the chain for the current status of the deployment as // identified by its deployment ID. - deploymentStatus, err := s.chain.ThresholdState(uint32(deployment)) + deploymentStatus, err := chain.ThresholdState(uint32(deployment)) if err != nil { context := "Failed to obtain deployment status" return nil, internalRPCError(err.Error(), context) @@ -1257,14 +1263,14 @@ func handleGetBlockChainInfo(s *rpcServer, cmd interface{}, closeChan <-chan str // handleGetBlockCount implements the getblockcount command. func handleGetBlockCount(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() return int64(best.Height), nil } // handleGetBlockHash implements the getblockhash command. func handleGetBlockHash(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.GetBlockHashCmd) - hash, err := s.chain.BlockHashByHeight(int32(c.Index)) + hash, err := s.cfg.Chain.BlockHashByHeight(int32(c.Index)) if err != nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCOutOfRange, @@ -1284,7 +1290,7 @@ func handleGetBlockHeader(s *rpcServer, cmd interface{}, closeChan <-chan struct if err != nil { return nil, rpcDecodeHexError(c.Hash) } - blockHeader, err := s.chain.FetchHeader(hash) + blockHeader, err := s.cfg.Chain.FetchHeader(hash) if err != nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCBlockNotFound, @@ -1307,17 +1313,17 @@ func handleGetBlockHeader(s *rpcServer, cmd interface{}, closeChan <-chan struct // The verbose flag is set, so generate the JSON object and return it. // Get the block height from chain. - blockHeight, err := s.chain.BlockHeightByHash(hash) + blockHeight, err := s.cfg.Chain.BlockHeightByHash(hash) if err != nil { context := "Failed to obtain block height" return nil, internalRPCError(err.Error(), context) } - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() // Get next block hash unless there are none. var nextHashString string if blockHeight < best.Height { - nextHash, err := s.chain.BlockHashByHeight(blockHeight + 1) + nextHash, err := s.cfg.Chain.BlockHashByHeight(blockHeight + 1) if err != nil { context := "No next block" return nil, internalRPCError(err.Error(), context) @@ -1495,7 +1501,8 @@ func (state *gbtWorkState) templateUpdateChan(prevHash *chainhash.Hash, lastGene // // This function MUST be called with the state locked. func (state *gbtWorkState) updateBlockTemplate(s *rpcServer, useCoinbaseValue bool) error { - lastTxUpdate := s.server.txMemPool.LastUpdated() + generator := s.cfg.Generator + lastTxUpdate := generator.TxSource().LastUpdated() if lastTxUpdate.IsZero() { lastTxUpdate = time.Now() } @@ -1506,7 +1513,7 @@ func (state *gbtWorkState) updateBlockTemplate(s *rpcServer, useCoinbaseValue bo // generated. var msgBlock *wire.MsgBlock var targetDifficulty string - latestHash := &s.server.blockManager.chain.BestSnapshot().Hash + latestHash := &s.cfg.Chain.BestSnapshot().Hash template := state.template if template == nil || state.prevHash == nil || !state.prevHash.IsEqual(latestHash) || @@ -1532,7 +1539,7 @@ func (state *gbtWorkState) updateBlockTemplate(s *rpcServer, useCoinbaseValue bo // block template doesn't include the coinbase, so the caller // will ultimately create their own coinbase which pays to the // appropriate address(es). - blkTemplate, err := s.generator.NewBlockTemplate(payAddr) + blkTemplate, err := generator.NewBlockTemplate(payAddr) if err != nil { return internalRPCError("Failed to create new block "+ "template: "+err.Error(), "") @@ -1545,7 +1552,7 @@ func (state *gbtWorkState) updateBlockTemplate(s *rpcServer, useCoinbaseValue bo // Get the minimum allowed timestamp for the block based on the // median timestamp of the last several blocks per the chain // consensus rules. - best := s.server.blockManager.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() minTimestamp := mining.MinimumMedianTime(best) // Update work state to ensure another block template isn't @@ -1605,7 +1612,7 @@ func (state *gbtWorkState) updateBlockTemplate(s *rpcServer, useCoinbaseValue bo // Update the time of the block template to the current time // while accounting for the median time of the past several // blocks per the chain consensus rules. - s.generator.UpdateBlockTime(msgBlock) + generator.UpdateBlockTime(msgBlock) msgBlock.Header.Nonce = 0 rpcsLog.Debugf("Updated block template (timestamp %v, "+ @@ -1900,7 +1907,9 @@ func handleGetBlockTemplateRequest(s *rpcServer, request *btcjson.TemplateReques // way to relay a found block or receive transactions to work on. // However, allow this state when running in the regression test or // simulation test mode. - if !(cfg.RegressionTest || cfg.SimNet) && s.server.ConnectedCount() == 0 { + if !(cfg.RegressionTest || cfg.SimNet) && + s.cfg.ConnMgr.ConnectedCount() == 0 { + return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCClientNotConnected, Message: "Bitcoin is not connected", @@ -1908,8 +1917,8 @@ func handleGetBlockTemplateRequest(s *rpcServer, request *btcjson.TemplateReques } // No point in generating or accepting work before the chain is synced. - currentHeight := s.server.blockManager.chain.BestSnapshot().Height - if currentHeight != 0 && !s.server.blockManager.IsCurrent() { + currentHeight := s.cfg.Chain.BestSnapshot().Height + if currentHeight != 0 && !s.cfg.SyncMgr.IsCurrent() { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCClientInInitialDownload, Message: "Bitcoin is downloading blocks...", @@ -2069,14 +2078,14 @@ func handleGetBlockTemplateProposal(s *rpcServer, request *btcjson.TemplateReque block := btcutil.NewBlock(&msgBlock) // Ensure the block is building from the expected previous block. - expectedPrevHash := &s.server.blockManager.chain.BestSnapshot().Hash + expectedPrevHash := s.cfg.Chain.BestSnapshot().Hash prevHash := &block.MsgBlock().Header.PrevBlock if !expectedPrevHash.IsEqual(prevHash) { return "bad-prevblk", nil } flags := blockchain.BFDryRun | blockchain.BFNoPoWCheck - isOrphan, err := s.server.blockManager.ProcessBlock(block, flags) + isOrphan, err := s.cfg.SyncMgr.SubmitBlock(block, flags) if err != nil { if _, ok := err.(blockchain.RuleError); !ok { errStr := fmt.Sprintf("Failed to process block proposal: %v", err) @@ -2126,28 +2135,28 @@ func handleGetBlockTemplate(s *rpcServer, cmd interface{}, closeChan <-chan stru // handleGetConnectionCount implements the getconnectioncount command. func handleGetConnectionCount(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - return s.server.ConnectedCount(), nil + return s.cfg.ConnMgr.ConnectedCount(), nil } // handleGetCurrentNet implements the getcurrentnet command. func handleGetCurrentNet(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - return s.server.chainParams.Net, nil + return s.cfg.ChainParams.Net, nil } // handleGetDifficulty implements the getdifficulty command. func handleGetDifficulty(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() return getDifficultyRatio(best.Bits), nil } // handleGetGenerate implements the getgenerate command. func handleGetGenerate(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - return s.server.cpuMiner.IsMining(), nil + return s.cfg.CPUMiner.IsMining(), nil } // handleGetHashesPerSec implements the gethashespersec command. func handleGetHashesPerSec(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - return int64(s.server.cpuMiner.HashesPerSecond()), nil + return int64(s.cfg.CPUMiner.HashesPerSecond()), nil } // handleGetHeaders implements the getheaders command. @@ -2174,7 +2183,7 @@ func handleGetHeaders(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) return nil, rpcDecodeHexError(c.HashStop) } } - blockHashes, err := s.server.locateBlocks(blockLocators, &hashStop) + blockHashes, err := s.cfg.SyncMgr.LocateBlocks(blockLocators, &hashStop) if err != nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCDatabase, @@ -2184,7 +2193,7 @@ func handleGetHeaders(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) } headers := make([]wire.BlockHeader, 0, len(blockHashes)) for i := range blockHashes { - header, err := s.chain.FetchHeader(&blockHashes[i]) + header, err := s.cfg.Chain.FetchHeader(&blockHashes[i]) if err != nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCBlockNotFound, @@ -2213,13 +2222,13 @@ func handleGetHeaders(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) // handleGetInfo implements the getinfo command. We only return the fields // that are not related to wallet functionality. func handleGetInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() ret := &btcjson.InfoChainResult{ Version: int32(1000000*appMajor + 10000*appMinor + 100*appPatch), ProtocolVersion: int32(maxProtocolVersion), Blocks: best.Height, - TimeOffset: int64(s.server.timeSource.Offset().Seconds()), - Connections: s.server.ConnectedCount(), + TimeOffset: int64(s.cfg.TimeSource.Offset().Seconds()), + Connections: s.cfg.ConnMgr.ConnectedCount(), Proxy: cfg.Proxy, Difficulty: getDifficultyRatio(best.Bits), TestNet: cfg.TestNet3, @@ -2231,7 +2240,7 @@ func handleGetInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (in // handleGetMempoolInfo implements the getmempoolinfo command. func handleGetMempoolInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - mempoolTxns := s.server.txMemPool.TxDescs() + mempoolTxns := s.cfg.TxMemPool.TxDescs() var numBytes int64 for _, txD := range mempoolTxns { @@ -2265,18 +2274,18 @@ func handleGetMiningInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{ } } - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() result := btcjson.GetMiningInfoResult{ Blocks: int64(best.Height), CurrentBlockSize: best.BlockSize, CurrentBlockWeight: best.BlockWeight, CurrentBlockTx: best.NumTxns, Difficulty: getDifficultyRatio(best.Bits), - Generate: s.server.cpuMiner.IsMining(), - GenProcLimit: s.server.cpuMiner.NumWorkers(), - HashesPerSec: int64(s.server.cpuMiner.HashesPerSecond()), + Generate: s.cfg.CPUMiner.IsMining(), + GenProcLimit: s.cfg.CPUMiner.NumWorkers(), + HashesPerSec: int64(s.cfg.CPUMiner.HashesPerSecond()), NetworkHashPS: networkHashesPerSec, - PooledTx: uint64(s.server.txMemPool.Count()), + PooledTx: uint64(s.cfg.TxMemPool.Count()), TestNet: cfg.TestNet3, } return &result, nil @@ -2284,7 +2293,7 @@ func handleGetMiningInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{ // handleGetNetTotals implements the getnettotals command. func handleGetNetTotals(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - totalBytesRecv, totalBytesSent := s.server.NetTotals() + totalBytesRecv, totalBytesSent := s.cfg.ConnMgr.NetTotals() reply := &btcjson.GetNetTotalsResult{ TotalBytesRecv: totalBytesRecv, TotalBytesSent: totalBytesSent, @@ -2305,7 +2314,7 @@ func handleGetNetworkHashPS(s *rpcServer, cmd interface{}, closeChan <-chan stru // since we can't reasonably calculate the number of network hashes // per second from invalid values. When it's negative, use the current // best block height. - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() endHeight := int32(-1) if c.Height != nil { endHeight = int32(*c.Height) @@ -2319,8 +2328,8 @@ func handleGetNetworkHashPS(s *rpcServer, cmd interface{}, closeChan <-chan stru // Calculate the number of blocks per retarget interval based on the // chain parameters. - blocksPerRetarget := int32(s.server.chainParams.TargetTimespan / - s.server.chainParams.TargetTimePerBlock) + blocksPerRetarget := int32(s.cfg.ChainParams.TargetTimespan / + s.cfg.ChainParams.TargetTimePerBlock) // Calculate the starting block height based on the passed number of // blocks. When the passed value is negative, use the last block the @@ -2347,14 +2356,14 @@ func handleGetNetworkHashPS(s *rpcServer, cmd interface{}, closeChan <-chan stru var minTimestamp, maxTimestamp time.Time totalWork := big.NewInt(0) for curHeight := startHeight; curHeight <= endHeight; curHeight++ { - hash, err := s.chain.BlockHashByHeight(curHeight) + hash, err := s.cfg.Chain.BlockHashByHeight(curHeight) if err != nil { context := "Failed to fetch block hash" return nil, internalRPCError(err.Error(), context) } // Fetch the header from chain. - header, err := s.chain.FetchHeader(hash) + header, err := s.cfg.Chain.FetchHeader(hash) if err != nil { context := "Failed to fetch block header" return nil, internalRPCError(err.Error(), context) @@ -2389,17 +2398,17 @@ func handleGetNetworkHashPS(s *rpcServer, cmd interface{}, closeChan <-chan stru // handleGetPeerInfo implements the getpeerinfo command. func handleGetPeerInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - peers := s.server.Peers() - syncPeer := s.server.blockManager.SyncPeer() + peers := s.cfg.ConnMgr.ConnectedPeers() + syncPeer := s.cfg.SyncMgr.SyncPeer().ToPeer() infos := make([]*btcjson.GetPeerInfoResult, 0, len(peers)) for _, p := range peers { - statsSnap := p.StatsSnapshot() + statsSnap := p.ToPeer().StatsSnapshot() info := &btcjson.GetPeerInfoResult{ ID: statsSnap.ID, Addr: statsSnap.Addr, - AddrLocal: p.LocalAddr().String(), + AddrLocal: p.ToPeer().LocalAddr().String(), Services: fmt.Sprintf("%08d", uint64(statsSnap.Services)), - RelayTxes: !p.disableRelayTx, + RelayTxes: !p.IsTxRelayDisabled(), LastSend: statsSnap.LastSend.Unix(), LastRecv: statsSnap.LastRecv.Unix(), BytesSent: statsSnap.BytesSent, @@ -2412,11 +2421,11 @@ func handleGetPeerInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) Inbound: statsSnap.Inbound, StartingHeight: statsSnap.StartingHeight, CurrentHeight: statsSnap.LastBlock, - BanScore: int32(p.banScore.Int()), - FeeFilter: atomic.LoadInt64(&p.feeFilter), - SyncNode: p == syncPeer, + BanScore: int32(p.BanScore()), + FeeFilter: p.FeeFilter(), + SyncNode: p.ToPeer() == syncPeer, } - if p.LastPingNonce() != 0 { + if p.ToPeer().LastPingNonce() != 0 { wait := float64(time.Since(statsSnap.LastPingTime).Nanoseconds()) // We actually want microseconds. info.PingWait = wait / 1000 @@ -2429,7 +2438,7 @@ func handleGetPeerInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) // handleGetRawMempool implements the getrawmempool command. func handleGetRawMempool(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.GetRawMempoolCmd) - mp := s.server.txMemPool + mp := s.cfg.TxMemPool if c.Verbose != nil && *c.Verbose { return mp.RawMempoolVerbose(), nil @@ -2466,10 +2475,9 @@ func handleGetRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan str var mtx *wire.MsgTx var blkHash *chainhash.Hash var blkHeight int32 - tx, err := s.server.txMemPool.FetchTransaction(txHash) + tx, err := s.cfg.TxMemPool.FetchTransaction(txHash) if err != nil { - txIndex := s.server.txIndex - if txIndex == nil { + if s.cfg.TxIndex == nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCNoTxInfo, Message: "The transaction index must be " + @@ -2479,7 +2487,7 @@ func handleGetRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan str } // Look up the location of the transaction. - blockRegion, err := txIndex.TxBlockRegion(txHash) + blockRegion, err := s.cfg.TxIndex.TxBlockRegion(txHash) if err != nil { context := "Failed to retrieve transaction location" return nil, internalRPCError(err.Error(), context) @@ -2490,7 +2498,7 @@ func handleGetRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan str // Load the raw transaction bytes from the database. var txBytes []byte - err = s.server.db.View(func(dbTx database.Tx) error { + err = s.cfg.DB.View(func(dbTx database.Tx) error { var err error txBytes, err = dbTx.FetchBlockRegion(blockRegion) return err @@ -2508,7 +2516,7 @@ func handleGetRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan str // Grab the block height. blkHash = blockRegion.Hash - blkHeight, err = s.chain.BlockHeightByHash(blkHash) + blkHeight, err = s.cfg.Chain.BlockHeightByHash(blkHash) if err != nil { context := "Failed to retrieve block height" return nil, internalRPCError(err.Error(), context) @@ -2547,7 +2555,7 @@ func handleGetRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan str var chainHeight int32 if blkHash != nil { // Fetch the header from chain. - header, err := s.chain.FetchHeader(blkHash) + header, err := s.cfg.Chain.FetchHeader(blkHash) if err != nil { context := "Failed to fetch block header" return nil, internalRPCError(err.Error(), context) @@ -2555,11 +2563,11 @@ func handleGetRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan str blkHeader = &header blkHashStr = blkHash.String() - chainHeight = s.chain.BestSnapshot().Height + chainHeight = s.cfg.Chain.BestSnapshot().Height } - rawTxn, err := createTxRawResult(s.server.chainParams, mtx, - txHash.String(), blkHeader, blkHashStr, blkHeight, chainHeight) + rawTxn, err := createTxRawResult(s.cfg.ChainParams, mtx, txHash.String(), + blkHeader, blkHashStr, blkHeight, chainHeight) if err != nil { return nil, err } @@ -2590,8 +2598,8 @@ func handleGetTxOut(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i } // TODO: This is racy. It should attempt to fetch it directly and check // the error. - if includeMempool && s.server.txMemPool.HaveTransaction(txHash) { - tx, err := s.server.txMemPool.FetchTransaction(txHash) + if includeMempool && s.cfg.TxMemPool.HaveTransaction(txHash) { + tx, err := s.cfg.TxMemPool.FetchTransaction(txHash) if err != nil { return nil, rpcNoTxInfoError(txHash) } @@ -2612,7 +2620,7 @@ func handleGetTxOut(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i return nil, internalRPCError(errStr, "") } - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() bestBlockHash = best.Hash.String() confirmations = 0 txVersion = mtx.Version @@ -2620,7 +2628,7 @@ func handleGetTxOut(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i pkScript = txOut.PkScript isCoinbase = blockchain.IsCoinBaseTx(mtx) } else { - entry, err := s.chain.FetchUtxoEntry(txHash) + entry, err := s.cfg.Chain.FetchUtxoEntry(txHash) if err != nil { return nil, rpcNoTxInfoError(txHash) } @@ -2634,7 +2642,7 @@ func handleGetTxOut(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i return nil, nil } - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() bestBlockHash = best.Hash.String() confirmations = 1 + best.Height - entry.BlockHeight() txVersion = entry.Version() @@ -2652,7 +2660,7 @@ func handleGetTxOut(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i // Ignore the error here since an error means the script couldn't parse // and there is no additional information about it anyways. scriptClass, addrs, reqSigs, _ := txscript.ExtractPkScriptAddrs(pkScript, - s.server.chainParams) + s.cfg.ChainParams) addresses := make([]string, len(addrs)) for i, addr := range addrs { addresses[i] = addr.EncodeAddress() @@ -2722,7 +2730,7 @@ func handlePing(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter return nil, internalRPCError("Not sending ping - failed to "+ "generate nonce: "+err.Error(), "") } - s.server.BroadcastMessage(wire.NewMsgPing(nonce)) + s.cfg.ConnMgr.BroadcastMessage(wire.NewMsgPing(nonce)) return nil, nil } @@ -2744,7 +2752,7 @@ type retrievedTx struct { // inputs to the passed transaction by checking the transaction mempool first // then the transaction index for those already mined into blocks. func fetchInputTxos(s *rpcServer, tx *wire.MsgTx) (map[wire.OutPoint]wire.TxOut, error) { - mp := s.server.txMemPool + mp := s.cfg.TxMemPool originOutputs := make(map[wire.OutPoint]wire.TxOut) for txInIndex, txIn := range tx.TxIn { // Attempt to fetch and use the referenced transaction from the @@ -2765,7 +2773,7 @@ func fetchInputTxos(s *rpcServer, tx *wire.MsgTx) (map[wire.OutPoint]wire.TxOut, } // Look up the location of the transaction. - blockRegion, err := s.server.txIndex.TxBlockRegion(&origin.Hash) + blockRegion, err := s.cfg.TxIndex.TxBlockRegion(&origin.Hash) if err != nil { context := "Failed to retrieve transaction location" return nil, internalRPCError(err.Error(), context) @@ -2776,7 +2784,7 @@ func fetchInputTxos(s *rpcServer, tx *wire.MsgTx) (map[wire.OutPoint]wire.TxOut, // Load the raw transaction bytes from the database. var txBytes []byte - err = s.server.db.View(func(dbTx database.Tx) error { + err = s.cfg.DB.View(func(dbTx database.Tx) error { var err error txBytes, err = dbTx.FetchBlockRegion(blockRegion) return err @@ -2933,7 +2941,7 @@ func createVinListPrevOut(s *rpcServer, mtx *wire.MsgTx, chainParams *chaincfg.P func fetchMempoolTxnsForAddress(s *rpcServer, addr btcutil.Address, numToSkip, numRequested uint32) ([]*btcutil.Tx, uint32) { // There are no entries to return when there are less available than the // number being skipped. - mpTxns := s.server.addrIndex.UnconfirmedTxnsForAddress(addr) + mpTxns := s.cfg.AddrIndex.UnconfirmedTxnsForAddress(addr) numAvailable := uint32(len(mpTxns)) if numToSkip > numAvailable { return nil, numAvailable @@ -2951,7 +2959,7 @@ func fetchMempoolTxnsForAddress(s *rpcServer, addr btcutil.Address, numToSkip, n // handleSearchRawTransactions implements the searchrawtransactions command. func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { // Respond with an error if the address index is not enabled. - addrIndex := s.server.addrIndex + addrIndex := s.cfg.AddrIndex if addrIndex == nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCMisc, @@ -2971,7 +2979,7 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan // transaction index. Currently the address index relies on the // transaction index, so this check is redundant, but it's better to be // safe in case the address index is ever changed to not rely on it. - if vinExtra && s.server.txIndex == nil { + if vinExtra && s.cfg.TxIndex == nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCMisc, Message: "Transaction index must be enabled (--txindex)", @@ -2979,7 +2987,7 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan } // Attempt to decode the supplied address. - addr, err := btcutil.DecodeAddress(c.Address, s.server.chainParams) + addr, err := btcutil.DecodeAddress(c.Address, s.cfg.ChainParams) if err != nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCInvalidAddressOrKey, @@ -3040,7 +3048,7 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan // Fetch transactions from the database in the desired order if more are // needed. if len(addressTxns) < numRequested { - err = s.server.db.View(func(dbTx database.Tx) error { + err = s.cfg.DB.View(func(dbTx database.Tx) error { regions, dbSkipped, err := addrIndex.TxRegionsForAddress( dbTx, addr, uint32(numToSkip)-numSkipped, uint32(numRequested-len(addressTxns)), reverse) @@ -3134,8 +3142,8 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan } // The verbose flag is set, so generate the JSON object and return it. - best := s.chain.BestSnapshot() - chainParams := s.server.chainParams + best := s.cfg.Chain.BestSnapshot() + chainParams := s.cfg.ChainParams srtList := make([]btcjson.SearchRawTransactionsResult, len(addressTxns)) for i := range addressTxns { // The deserialized transaction is needed, so deserialize the @@ -3178,7 +3186,7 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan var blkHeight int32 if blkHash := rtx.blkHash; blkHash != nil { // Fetch the header from chain. - header, err := s.chain.FetchHeader(blkHash) + header, err := s.cfg.Chain.FetchHeader(blkHash) if err != nil { return nil, &btcjson.RPCError{ Code: btcjson.ErrRPCBlockNotFound, @@ -3187,7 +3195,7 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan } // Get the block height from chain. - height, err := s.chain.BlockHeightByHash(blkHash) + height, err := s.cfg.Chain.BlockHeightByHash(blkHash) if err != nil { context := "Failed to obtain block height" return nil, internalRPCError(err.Error(), context) @@ -3235,8 +3243,7 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st // Use 0 for the tag to represent local node. tx := btcutil.NewTx(&msgTx) - acceptedTxs, err := s.server.txMemPool.ProcessTransaction(tx, false, - false, 0) + acceptedTxs, err := s.cfg.TxMemPool.ProcessTransaction(tx, false, false, 0) if err != nil { // When the error is a rule error, it means the transaction was // simply rejected as opposed to something actually going wrong, @@ -3265,20 +3272,27 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st // Also, since an error is being returned to the caller, ensure the // transaction is removed from the memory pool. if len(acceptedTxs) == 0 || !acceptedTxs[0].Tx.Hash().IsEqual(tx.Hash()) { - s.server.txMemPool.RemoveTransaction(tx, true) + s.cfg.TxMemPool.RemoveTransaction(tx, true) errStr := fmt.Sprintf("transaction %v is not in accepted list", tx.Hash()) return nil, internalRPCError(errStr, "") } - s.server.AnnounceNewTransactions(acceptedTxs) + // Generate and relay inventory vectors for all newly accepted + // transactions into the memory pool due to the original being + // accepted. + s.cfg.ConnMgr.RelayTransactions(acceptedTxs) + + // Notify both websocket and getblocktemplate long poll clients of all + // newly accepted transactions. + s.NotifyNewTransactions(acceptedTxs) // Keep track of all the sendrawtransaction request txns so that they // can be rebroadcast if they don't make their way into a block. txD := acceptedTxs[0] iv := wire.NewInvVect(wire.InvTypeTx, txD.Tx.Hash()) - s.server.AddRebroadcastInventory(iv, txD) + s.cfg.ConnMgr.AddRebroadcastInventory(iv, txD) return tx.Hash().String(), nil } @@ -3300,7 +3314,7 @@ func handleSetGenerate(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) } if !generate { - s.server.cpuMiner.Stop() + s.cfg.CPUMiner.Stop() } else { // Respond with an error if there are no addresses to pay the // created blocks to. @@ -3313,8 +3327,8 @@ func handleSetGenerate(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) } // It's safe to call start even if it's already started. - s.server.cpuMiner.SetNumWorkers(int32(genProcLimit)) - s.server.cpuMiner.Start() + s.cfg.CPUMiner.SetNumWorkers(int32(genProcLimit)) + s.cfg.CPUMiner.Start() } return nil, nil } @@ -3350,7 +3364,9 @@ func handleSubmitBlock(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) } } - _, err = s.server.blockManager.ProcessBlock(block, blockchain.BFNone) + // Process this block using the same rules as blocks coming from other + // nodes. This will in turn relay it to the network like normal. + _, err = s.cfg.SyncMgr.SubmitBlock(block, blockchain.BFNone) if err != nil { return fmt.Sprintf("rejected: %s", err.Error()), nil } @@ -3361,7 +3377,7 @@ func handleSubmitBlock(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) // handleUptime implements the uptime command. func handleUptime(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - return time.Now().Unix() - s.server.startupTime, nil + return time.Now().Unix() - s.cfg.StartupTime, nil } // handleValidateAddress implements the validateaddress command. @@ -3382,7 +3398,7 @@ func handleValidateAddress(s *rpcServer, cmd interface{}, closeChan <-chan struc } func verifyChain(s *rpcServer, level, depth int32) error { - best := s.chain.BestSnapshot() + best := s.cfg.Chain.BestSnapshot() finishHeight := best.Height - depth if finishHeight < 0 { finishHeight = 0 @@ -3392,7 +3408,7 @@ func verifyChain(s *rpcServer, level, depth int32) error { for height := best.Height; height > finishHeight; height-- { // Level 0 just looks up the block. - block, err := s.chain.BlockByHeight(height) + block, err := s.cfg.Chain.BlockByHeight(height) if err != nil { rpcsLog.Errorf("Verify is unable to fetch block at "+ "height %d: %v", height, err) @@ -3402,7 +3418,7 @@ func verifyChain(s *rpcServer, level, depth int32) error { // Level 1 does basic chain sanity checks. if level > 0 { err := blockchain.CheckBlockSanity(block, - activeNetParams.PowLimit, s.server.timeSource) + s.cfg.ChainParams.PowLimit, s.cfg.TimeSource) if err != nil { rpcsLog.Errorf("Verify is unable to validate "+ "block at hash %v height %d: %v", @@ -3497,8 +3513,7 @@ func handleVerifyMessage(s *rpcServer, cmd interface{}, closeChan <-chan struct{ // handleVersion implements the version command. // -// NOTE: This is a btcsuite extension ported from -// github.com/decred/dcrd. +// NOTE: This is a btcsuite extension ported from github.com/decred/dcrd. func handleVersion(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { result := map[string]btcjson.VersionResult{ "btcdjsonrpcapi": { @@ -3511,14 +3526,11 @@ func handleVersion(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (in return result, nil } -// rpcServer holds the items the rpc server may need to access (config, -// shutdown, main server, etc.) +// rpcServer provides a concurrent safe RPC server to a chain server. type rpcServer struct { started int32 shutdown int32 - generator *mining.BlkTmplGenerator - server *server - chain *blockchain.BlockChain + cfg rpcserverConfig authsha [sha256.Size]byte limitauthsha [sha256.Size]byte ntfnMgr *wsNotificationManager @@ -3617,6 +3629,20 @@ func (s *rpcServer) RequestedProcessShutdown() <-chan struct{} { return s.requestProcessShutdown } +// NotifyNewTransactions notifies both websocket and getblocktemplate long +// poll clients of the passed transactions. This function should be called +// whenever new transactions are added to the mempool. +func (s *rpcServer) NotifyNewTransactions(txns []*mempool.TxDesc) { + for _, txD := range txns { + // Notify websocket clients about mempool transactions. + s.ntfnMgr.NotifyMempoolTx(txD.Tx, true) + + // Potentially notify any getblocktemplate long poll clients + // about stale block templates due to the new transaction. + s.gbtWorkState.NotifyMempoolTx(s.cfg.TxMemPool.LastUpdated()) + } +} + // limitConnections responds with a 503 service unavailable and returns true if // adding another client would exceed the maximum allow RPC clients. // @@ -4013,14 +4039,163 @@ func genCertPair(certFile, keyFile string) error { return nil } +// rpcserverPeer represents a peer for use with the RPC server. +// +// The interface contract requires that all of these methods are safe for +// concurrent access. +type rpcserverPeer interface { + // ToPeer returns the underlying peer instance. + ToPeer() *peer.Peer + + // IsTxRelayDisabled returns whether or not the peer has disabled + // transaction relay. + IsTxRelayDisabled() bool + + // BanScore returns the current integer value that represents how close + // the peer is to being banned. + BanScore() uint32 + + // FeeFilter returns the requested current minimum fee rate for which + // transactions should be announced. + FeeFilter() int64 +} + +// rpcserverConnManager represents a connection manager for use with the RPC +// server. +// +// The interface contract requires that all of these methods are safe for +// concurrent access. +type rpcserverConnManager interface { + // Connect adds the provided address as a new outbound peer. The + // permanent flag indicates whether or not to make the peer persistent + // and reconnect if the connection is lost. Attempting to connect to an + // already existing peer will return an error. + Connect(addr string, permanent bool) error + + // RemoveByID removes the peer associated with the provided id from the + // list of persistent peers. Attempting to remove an id that does not + // exist will return an error. + RemoveByID(id int32) error + + // RemoveByAddr removes the peer associated with the provided address + // from the list of persistent peers. Attempting to remove an address + // that does not exist will return an error. + RemoveByAddr(addr string) error + + // DisconnectByID disconnects the peer associated with the provided id. + // This applies to both inbound and outbound peers. Attempting to + // remove an id that does not exist will return an error. + DisconnectByID(id int32) error + + // DisconnectByAddr disconnects the peer associated with the provided + // address. This applies to both inbound and outbound peers. + // Attempting to remove an address that does not exist will return an + // error. + DisconnectByAddr(addr string) error + + // ConnectedCount returns the number of currently connected peers. + ConnectedCount() int32 + + // NetTotals returns the sum of all bytes received and sent across the + // network for all peers. + NetTotals() (uint64, uint64) + + // ConnectedPeers returns an array consisting of all connected peers. + ConnectedPeers() []rpcserverPeer + + // PersistentPeers returns an array consisting of all the persistent + // peers. + PersistentPeers() []rpcserverPeer + + // BroadcastMessage sends the provided message to all currently + // connected peers. + BroadcastMessage(msg wire.Message) + + // AddRebroadcastInventory adds the provided inventory to the list of + // inventories to be rebroadcast at random intervals until they show up + // in a block. + AddRebroadcastInventory(iv *wire.InvVect, data interface{}) + + // RelayTransactions generates and relays inventory vectors for all of + // the passed transactions to all connected peers. + RelayTransactions(txns []*mempool.TxDesc) +} + +// rpcserverSyncManager represents a sync manager for use with the RPC server. +// +// The interface contract requires that all of these methods are safe for +// concurrent access. +type rpcserverSyncManager interface { + // IsCurrent returns whether or not the sync manager believes the chain + // is current as compared to the rest of the network. + IsCurrent() bool + + // SubmitBlock submits the provided block to the network after + // processing it locally. + SubmitBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) + + // Pause pauses the sync manager until the returned channel is closed. + Pause() chan<- struct{} + + // SyncPeer returns the peer that is currently the peer being used to + // sync from. + SyncPeer() rpcserverPeer + + // LocateBlocks returns the hashes of the blocks after the first known + // block in the provided locators until the provided stop hash or the + // current tip is reached, up to a max of wire.MaxBlockHeadersPerMsg + // hashes. + LocateBlocks(locators []*chainhash.Hash, hashStop *chainhash.Hash) ([]chainhash.Hash, error) +} + +// rpcserverConfig is a descriptor containing the RPC server configuration. +type rpcserverConfig struct { + // ListenAddrs are the addresses the RPC server should listen on. + ListenAddrs []string + + // StartupTime is the unix timestamp for when the server that is hosting + // the RPC server started. + StartupTime int64 + + // ConnMgr defines the connection manager for the RPC server to use. It + // provides the RPC server with a means to do things such as add, + // remove, connect, disconnect, and query peers as well as other + // connection-related data and tasks. + ConnMgr rpcserverConnManager + + // SyncMgr defines the sync manager for the RPC server to use. + SyncMgr rpcserverSyncManager + + // These fields allow the RPC server to interface with the local block + // chain data and state. + TimeSource blockchain.MedianTimeSource + Chain *blockchain.BlockChain + ChainParams *chaincfg.Params + DB database.DB + + // TxMemPool defines the transaction memory pool to interact with. + TxMemPool *mempool.TxPool + + // These fields allow the RPC server to interface with mining. + // + // Generator produces block templates and the CPUMiner solves them using + // the CPU. CPU mining is typically only useful for test purposes when + // doing regression or simulation testing. + Generator *mining.BlkTmplGenerator + CPUMiner *cpuminer.CPUMiner + + // These fields define any optional indexes the RPC server can make use + // of to provide additional data when queried. + TxIndex *indexers.TxIndex + AddrIndex *indexers.AddrIndex +} + // newRPCServer returns a new instance of the rpcServer struct. -func newRPCServer(listenAddrs []string, generator *mining.BlkTmplGenerator, s *server) (*rpcServer, error) { +func newRPCServer(config *rpcserverConfig) (*rpcServer, error) { rpc := rpcServer{ - server: s, - generator: generator, - chain: s.blockManager.chain, + cfg: *config, statusLines: make(map[int]string), - gbtWorkState: newGbtWorkState(s.timeSource), + gbtWorkState: newGbtWorkState(config.TimeSource), helpCacher: newHelpCacher(), requestProcessShutdown: make(chan struct{}), quit: make(chan int), @@ -4066,7 +4241,7 @@ func newRPCServer(listenAddrs []string, generator *mining.BlkTmplGenerator, s *s // TODO: this code is similar to that in server, should be // factored into something shared. - ipv4ListenAddrs, ipv6ListenAddrs, _, err := parseListeners(listenAddrs) + ipv4ListenAddrs, ipv6ListenAddrs, _, err := parseListeners(config.ListenAddrs) if err != nil { return nil, err } @@ -4095,7 +4270,7 @@ func newRPCServer(listenAddrs []string, generator *mining.BlkTmplGenerator, s *s rpc.listeners = listeners - s.chain.Subscribe(rpc.handleBlockchainNotification) + rpc.cfg.Chain.Subscribe(rpc.handleBlockchainNotification) return &rpc, nil } diff --git a/rpcwebsocket.go b/rpcwebsocket.go index ecfcc38b..d787050d 100644 --- a/rpcwebsocket.go +++ b/rpcwebsocket.go @@ -656,7 +656,7 @@ func (m *wsNotificationManager) subscribedClients(tx *btcutil.Tx, for i, output := range msgTx.TxOut { _, addrs, _, err := txscript.ExtractPkScriptAddrs( - output.PkScript, m.server.server.chainParams) + output.PkScript, m.server.cfg.ChainParams) if err != nil { // Clients are not able to subscribe to // nonstandard or non-address outputs. @@ -846,7 +846,7 @@ func (m *wsNotificationManager) notifyForNewTx(clients map[chan struct{}]*wsClie continue } - net := m.server.server.chainParams + net := m.server.cfg.ChainParams rawTx, err := createTxRawResult(net, mtx, txHashStr, nil, "", 0, 0) if err != nil { @@ -983,7 +983,7 @@ func (m *wsNotificationManager) notifyForTxOuts(ops map[wire.OutPoint]map[chan s wscNotified := make(map[chan struct{}]struct{}) for i, txOut := range tx.MsgTx().TxOut { _, txAddrs, _, err := txscript.ExtractPkScriptAddrs( - txOut.PkScript, m.server.server.chainParams) + txOut.PkScript, m.server.cfg.ChainParams) if err != nil { continue } @@ -2029,7 +2029,7 @@ func rescanBlock(wsc *wsClient, lookups *rescanKeys, blk *btcutil.Block) { for txOutIdx, txout := range tx.MsgTx().TxOut { _, addrs, _, _ := txscript.ExtractPkScriptAddrs( - txout.PkScript, wsc.server.server.chainParams) + txout.PkScript, wsc.server.cfg.ChainParams) for _, addr := range addrs { switch a := addr.(type) { @@ -2218,7 +2218,7 @@ func handleRescanBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) { // Iterate over each block in the request and rescan. When a block // contains relevant transactions, add it to the response. - bc := wsc.server.server.blockManager.chain + bc := wsc.server.cfg.Chain var lastBlockHash *chainhash.Hash for i := range blockHashes { block, err := bc.BlockByHash(blockHashes[i]) @@ -2389,7 +2389,7 @@ func handleRescan(wsc *wsClient, icmd interface{}) (interface{}, error) { lookups.unspent[*outpoint] = struct{}{} } - chain := wsc.server.chain + chain := wsc.server.cfg.Chain minBlockHash, err := chainhash.NewHashFromStr(cmd.BeginBlock) if err != nil { @@ -2469,9 +2469,8 @@ fetchRange: // continuous notifications if necessary. Otherwise, // continue the fetch loop again to rescan the new // blocks (or error due to an irrecoverable reorganize). - blockManager := wsc.server.server.blockManager - pauseGuard := blockManager.Pause() - best := blockManager.chain.BestSnapshot() + pauseGuard := wsc.server.cfg.SyncMgr.Pause() + best := wsc.server.cfg.Chain.BestSnapshot() curHash := &best.Hash again := true if lastBlockHash == nil || *lastBlockHash == *curHash { diff --git a/server.go b/server.go index a408348f..888ccb55 100644 --- a/server.go +++ b/server.go @@ -1072,28 +1072,28 @@ func (s *server) RemoveRebroadcastInventory(iv *wire.InvVect) { s.modifyRebroadcastInv <- broadcastInventoryDel(iv) } +// relayTransactions generates and relays inventory vectors for all of the +// passed transactions to all connected peers. +func (s *server) relayTransactions(txns []*mempool.TxDesc) { + for _, txD := range txns { + iv := wire.NewInvVect(wire.InvTypeTx, txD.Tx.Hash()) + s.RelayInventory(iv, txD) + } +} + // AnnounceNewTransactions generates and relays inventory vectors and notifies // both websocket and getblocktemplate long poll clients of the passed // transactions. This function should be called whenever new transactions // are added to the mempool. -func (s *server) AnnounceNewTransactions(newTxs []*mempool.TxDesc) { +func (s *server) AnnounceNewTransactions(txns []*mempool.TxDesc) { // Generate and relay inventory vectors for all newly accepted - // transactions into the memory pool due to the original being - // accepted. - for _, txD := range newTxs { - // Generate the inventory vector and relay it. - iv := wire.NewInvVect(wire.InvTypeTx, txD.Tx.Hash()) - s.RelayInventory(iv, txD) + // transactions. + s.relayTransactions(txns) - if s.rpcServer != nil { - // Notify websocket clients about mempool transactions. - s.rpcServer.ntfnMgr.NotifyMempoolTx(txD.Tx, true) - - // Potentially notify any getblocktemplate long poll clients - // about stale block templates due to the new transaction. - s.rpcServer.gbtWorkState.NotifyMempoolTx( - s.txMemPool.LastUpdated()) - } + // Notify both websocket and getblocktemplate long poll clients of all + // newly accepted transactions. + if s.rpcServer != nil { + s.rpcServer.NotifyNewTransactions(txns) } } @@ -1892,88 +1892,6 @@ func (s *server) OutboundGroupCount(key string) int { return <-replyChan } -// AddedNodeInfo returns an array of btcjson.GetAddedNodeInfoResult structures -// describing the persistent (added) nodes. -func (s *server) AddedNodeInfo() []*serverPeer { - replyChan := make(chan []*serverPeer) - s.query <- getAddedNodesMsg{reply: replyChan} - return <-replyChan -} - -// Peers returns an array of all connected peers. -func (s *server) Peers() []*serverPeer { - replyChan := make(chan []*serverPeer) - - s.query <- getPeersMsg{reply: replyChan} - - return <-replyChan -} - -// DisconnectNodeByAddr disconnects a peer by target address. Both outbound and -// inbound nodes will be searched for the target node. An error message will -// be returned if the peer was not found. -func (s *server) DisconnectNodeByAddr(addr string) error { - replyChan := make(chan error) - - s.query <- disconnectNodeMsg{ - cmp: func(sp *serverPeer) bool { return sp.Addr() == addr }, - reply: replyChan, - } - - return <-replyChan -} - -// DisconnectNodeByID disconnects a peer by target node id. Both outbound and -// inbound nodes will be searched for the target node. An error message will be -// returned if the peer was not found. -func (s *server) DisconnectNodeByID(id int32) error { - replyChan := make(chan error) - - s.query <- disconnectNodeMsg{ - cmp: func(sp *serverPeer) bool { return sp.ID() == id }, - reply: replyChan, - } - - return <-replyChan -} - -// RemoveNodeByAddr removes a peer from the list of persistent peers if -// present. An error will be returned if the peer was not found. -func (s *server) RemoveNodeByAddr(addr string) error { - replyChan := make(chan error) - - s.query <- removeNodeMsg{ - cmp: func(sp *serverPeer) bool { return sp.Addr() == addr }, - reply: replyChan, - } - - return <-replyChan -} - -// RemoveNodeByID removes a peer by node ID from the list of persistent peers -// if present. An error will be returned if the peer was not found. -func (s *server) RemoveNodeByID(id int32) error { - replyChan := make(chan error) - - s.query <- removeNodeMsg{ - cmp: func(sp *serverPeer) bool { return sp.ID() == id }, - reply: replyChan, - } - - return <-replyChan -} - -// ConnectNode adds `addr' as a new outbound peer. If permanent is true then the -// peer will be persistent and reconnect if the connection is lost. -// It is an error to call this with an already existing peer. -func (s *server) ConnectNode(addr string, permanent bool) error { - replyChan := make(chan error) - - s.query <- connectNodeMsg{addr: addr, permanent: permanent, reply: replyChan} - - return <-replyChan -} - // AddBytesSent adds the passed number of bytes to the total bytes sent counter // for the server. It is safe for concurrent access. func (s *server) AddBytesSent(bytesSent uint64) { @@ -2625,8 +2543,21 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param } if !cfg.DisableRPC { - s.rpcServer, err = newRPCServer(cfg.RPCListeners, - blockTemplateGenerator, &s) + s.rpcServer, err = newRPCServer(&rpcserverConfig{ + ListenAddrs: cfg.RPCListeners, + StartupTime: s.startupTime, + ConnMgr: &rpcConnManager{&s}, + SyncMgr: &rpcSyncMgr{&s, s.blockManager}, + TimeSource: s.timeSource, + Chain: s.blockManager.chain, + ChainParams: chainParams, + DB: db, + TxMemPool: s.txMemPool, + Generator: blockTemplateGenerator, + CPUMiner: s.cpuMiner, + TxIndex: s.txIndex, + AddrIndex: s.addrIndex, + }) if err != nil { return nil, err }