Create blockManagerConfig struct passed to newBlockManager.

The config struct accepts an instance of server as an implementation
of the new PeerNotifier wrapping interface.
This commit is contained in:
Jim Posen 2017-08-14 12:39:07 -07:00
parent 49949d4c96
commit b71d6c3010
3 changed files with 46 additions and 30 deletions

View File

@ -133,10 +133,30 @@ type headerNode struct {
hash *chainhash.Hash hash *chainhash.Hash
} }
// PeerNotifier exposes methods to notify peers of status changes to
// transactions, blocks, etc. Currently server implements this interface.
type PeerNotifier interface {
AnnounceNewTransactions(newTxs []*mempool.TxDesc)
UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int32, updateSource *serverPeer)
RelayInventory(invVect *wire.InvVect, data interface{})
TransactionConfirmed(tx *btcutil.Tx)
}
// blockManangerConfig is a configuration struct used to initialize a new
// blockManager.
type blockManagerConfig struct {
PeerNotifier PeerNotifier
Chain *blockchain.BlockChain
TxMemPool *mempool.TxPool
}
// blockManager provides a concurrency safe block manager for handling all // blockManager provides a concurrency safe block manager for handling all
// incoming blocks. // incoming blocks.
type blockManager struct { type blockManager struct {
server *server peerNotifier PeerNotifier
started int32 started int32
shutdown int32 shutdown int32
chain *blockchain.BlockChain chain *blockchain.BlockChain
@ -467,7 +487,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
return return
} }
b.server.AnnounceNewTransactions(acceptedTxs) b.peerNotifier.AnnounceNewTransactions(acceptedTxs)
} }
// current returns true if we believe we are synced with our peers, false if we // current returns true if we believe we are synced with our peers, false if we
@ -631,7 +651,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
if blkHashUpdate != nil && heightUpdate != 0 { if blkHashUpdate != nil && heightUpdate != 0 {
bmsg.peer.UpdateLastBlockHeight(heightUpdate) bmsg.peer.UpdateLastBlockHeight(heightUpdate)
if isOrphan || b.current() { if isOrphan || b.current() {
go b.server.UpdatePeerHeights(blkHashUpdate, heightUpdate, bmsg.peer) go b.peerNotifier.UpdatePeerHeights(blkHashUpdate, heightUpdate, bmsg.peer)
} }
} }
@ -1182,7 +1202,7 @@ func (b *blockManager) handleBlockchainNotification(notification *blockchain.Not
// Generate the inventory vector and relay it. // Generate the inventory vector and relay it.
iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash()) iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
b.server.RelayInventory(iv, block.MsgBlock().Header) b.peerNotifier.RelayInventory(iv, block.MsgBlock().Header)
// A block has been connected to the main block chain. // A block has been connected to the main block chain.
case blockchain.NTBlockConnected: case blockchain.NTBlockConnected:
@ -1203,9 +1223,9 @@ func (b *blockManager) handleBlockchainNotification(notification *blockchain.Not
b.txMemPool.RemoveTransaction(tx, false) b.txMemPool.RemoveTransaction(tx, false)
b.txMemPool.RemoveDoubleSpends(tx) b.txMemPool.RemoveDoubleSpends(tx)
b.txMemPool.RemoveOrphan(tx) b.txMemPool.RemoveOrphan(tx)
b.server.TransactionConfirmed(tx) b.peerNotifier.TransactionConfirmed(tx)
acceptedTxs := b.txMemPool.ProcessOrphans(tx) acceptedTxs := b.txMemPool.ProcessOrphans(tx)
b.server.AnnounceNewTransactions(acceptedTxs) b.peerNotifier.AnnounceNewTransactions(acceptedTxs)
} }
// A block has been disconnected from the main block chain. // A block has been disconnected from the main block chain.
@ -1360,14 +1380,11 @@ func (b *blockManager) Pause() chan<- struct{} {
// newBlockManager returns a new bitcoin block manager. // newBlockManager returns a new bitcoin block manager.
// Use Start to begin processing asynchronous block and inv updates. // Use Start to begin processing asynchronous block and inv updates.
func newBlockManager( func newBlockManager(config *blockManagerConfig) (*blockManager, error) {
s *server, indexManager blockchain.IndexManager,
chain *blockchain.BlockChain, txMemPool *mempool.TxPool,
) (*blockManager, error) {
bm := blockManager{ bm := blockManager{
server: s, peerNotifier: config.PeerNotifier,
chain: chain, chain: config.Chain,
txMemPool: txMemPool, txMemPool: config.TxMemPool,
rejectedTxns: make(map[chainhash.Hash]struct{}), rejectedTxns: make(map[chainhash.Hash]struct{}),
requestedTxns: make(map[chainhash.Hash]struct{}), requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}), requestedBlocks: make(map[chainhash.Hash]struct{}),
@ -1377,9 +1394,6 @@ func newBlockManager(
quit: make(chan struct{}), quit: make(chan struct{}),
} }
// Register blockchain notification callbacks
bm.chain.Subscribe(bm.handleNotifyMsg)
best := bm.chain.BestSnapshot() best := bm.chain.BestSnapshot()
if !cfg.DisableCheckpoints { if !cfg.DisableCheckpoints {
// Initialize the next checkpoint based on the current height. // Initialize the next checkpoint based on the current height.
@ -1391,6 +1405,8 @@ func newBlockManager(
bmgrLog.Info("Checkpoints are disabled") bmgrLog.Info("Checkpoints are disabled")
} }
bm.chain.Subscribe(bm.handleBlockchainNotification)
return &bm, nil return &bm, nil
} }

View File

@ -4095,9 +4095,13 @@ func newRPCServer(listenAddrs []string, generator *mining.BlkTmplGenerator, s *s
rpc.listeners = listeners rpc.listeners = listeners
s.chain.Subscribe(rpc.handleBlockchainNotification)
return &rpc, nil return &rpc, nil
} }
// Callback for notifications from blockchain. It notifies clients that are
// long polling for changes or subscribed to websockets notifications.
func (s *rpcServer) handleBlockchainNotification(notification *blockchain.Notification) { func (s *rpcServer) handleBlockchainNotification(notification *blockchain.Notification) {
switch notification.Type { switch notification.Type {
case blockchain.NTBlockAccepted: case blockchain.NTBlockAccepted:

View File

@ -2475,14 +2475,6 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
ChainParams: s.chainParams, ChainParams: s.chainParams,
Checkpoints: checkpoints, Checkpoints: checkpoints,
TimeSource: s.timeSource, TimeSource: s.timeSource,
// TODO: Modify blockchain to be able to register multiple listeners and
// have the block manager and RPC server subscribe directly.
Notifications: func(notification *blockchain.Notification) {
s.blockManager.handleBlockchainNotification(notification)
if s.rpcServer != nil {
s.rpcServer.handleBlockchainNotification(notification)
}
},
SigCache: s.sigCache, SigCache: s.sigCache,
IndexManager: indexManager, IndexManager: indexManager,
HashCache: s.hashCache, HashCache: s.hashCache,
@ -2509,14 +2501,18 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
CalcSequenceLock: func(tx *btcutil.Tx, view *blockchain.UtxoViewpoint) (*blockchain.SequenceLock, error) { CalcSequenceLock: func(tx *btcutil.Tx, view *blockchain.UtxoViewpoint) (*blockchain.SequenceLock, error) {
return s.chain.CalcSequenceLock(tx, view, true) return s.chain.CalcSequenceLock(tx, view, true)
}, },
IsDeploymentActive: bm.chain.IsDeploymentActive, IsDeploymentActive: s.chain.IsDeploymentActive,
SigCache: s.sigCache, SigCache: s.sigCache,
HashCache: s.hashCache, HashCache: s.hashCache,
AddrIndex: s.addrIndex, AddrIndex: s.addrIndex,
} }
s.txMemPool = mempool.New(&txC) s.txMemPool = mempool.New(&txC)
s.blockManager, err = newBlockManager(&s, indexManager, s.chain, s.txMemPool) s.blockManager, err = newBlockManager(&blockManagerConfig{
PeerNotifier: &s,
Chain: s.chain,
TxMemPool: s.txMemPool,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }