chainntnfs: Use the new ConcurrentQueue in btcd notifier.

This commit is contained in:
Jim Posen 2017-11-09 14:59:14 -08:00 committed by Olaoluwa Osuntokun
parent c9ad9b2269
commit 0297042a8d

View file

@ -72,13 +72,8 @@ type BtcdNotifier struct {
disconnectedBlockHashes chan *blockNtfn disconnectedBlockHashes chan *blockNtfn
chainUpdates []*chainUpdate chainUpdates *chainntnfs.ConcurrentQueue
chainUpdateSignal chan struct{} txUpdates *chainntnfs.ConcurrentQueue
chainUpdateMtx sync.Mutex
txUpdates []*txUpdate
txUpdateSignal chan struct{}
txUpdateMtx sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{} quit chan struct{}
@ -104,8 +99,8 @@ func New(config *rpcclient.ConnConfig) (*BtcdNotifier, error) {
disconnectedBlockHashes: make(chan *blockNtfn, 20), disconnectedBlockHashes: make(chan *blockNtfn, 20),
chainUpdateSignal: make(chan struct{}), chainUpdates: chainntnfs.NewConcurrentQueue(10),
txUpdateSignal: make(chan struct{}), txUpdates: chainntnfs.NewConcurrentQueue(10),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@ -151,6 +146,9 @@ func (b *BtcdNotifier) Start() error {
return err return err
} }
b.chainUpdates.Start()
b.txUpdates.Start()
b.wg.Add(1) b.wg.Add(1)
go b.notificationDispatcher(currentHeight) go b.notificationDispatcher(currentHeight)
@ -171,6 +169,9 @@ func (b *BtcdNotifier) Stop() error {
close(b.quit) close(b.quit)
b.wg.Wait() b.wg.Wait()
b.chainUpdates.Stop()
b.txUpdates.Stop()
// Notify all pending clients of our shutdown by closing the related // Notify all pending clients of our shutdown by closing the related
// notification channels. // notification channels.
for _, spendClients := range b.spendNotifications { for _, spendClients := range b.spendNotifications {
@ -204,16 +205,7 @@ type blockNtfn struct {
func (b *BtcdNotifier) onBlockConnected(hash *chainhash.Hash, height int32, t time.Time) { func (b *BtcdNotifier) onBlockConnected(hash *chainhash.Hash, height int32, t time.Time) {
// Append this new chain update to the end of the queue of new chain // Append this new chain update to the end of the queue of new chain
// updates. // updates.
b.chainUpdateMtx.Lock() b.chainUpdates.ChanIn() <- &chainUpdate{hash, height}
b.chainUpdates = append(b.chainUpdates, &chainUpdate{hash, height})
b.chainUpdateMtx.Unlock()
// Launch a goroutine to signal the notification dispatcher that a new
// block update is available. We do this in a new goroutine in order to
// avoid blocking the main loop of the rpc client.
go func() {
b.chainUpdateSignal <- struct{}{}
}()
} }
// onBlockDisconnected implements on OnBlockDisconnected callback for rpcclient. // onBlockDisconnected implements on OnBlockDisconnected callback for rpcclient.
@ -224,16 +216,7 @@ func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t
func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetails) { func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetails) {
// Append this new transaction update to the end of the queue of new // Append this new transaction update to the end of the queue of new
// chain updates. // chain updates.
b.txUpdateMtx.Lock() b.txUpdates.ChanIn() <- &txUpdate{tx, details}
b.txUpdates = append(b.txUpdates, &txUpdate{tx, details})
b.txUpdateMtx.Unlock()
// Launch a goroutine to signal the notification dispatcher that a new
// transaction update is available. We do this in a new goroutine in
// order to avoid blocking the main loop of the rpc client.
go func() {
b.txUpdateSignal <- struct{}{}
}()
} }
// notificationDispatcher is the primary goroutine which handles client // notificationDispatcher is the primary goroutine which handles client
@ -314,15 +297,8 @@ out:
chainntnfs.Log.Warnf("Block disconnected from main "+ chainntnfs.Log.Warnf("Block disconnected from main "+
"chain: %v", staleBlockHash) "chain: %v", staleBlockHash)
case <-b.chainUpdateSignal: case item := <-b.chainUpdates.ChanOut():
// A new update is available, so pop the new chain update := item.(*chainUpdate)
// update from the front of the update queue.
b.chainUpdateMtx.Lock()
update := b.chainUpdates[0]
b.chainUpdates[0] = nil // Set to nil to prevent GC leak.
b.chainUpdates = b.chainUpdates[1:]
b.chainUpdateMtx.Unlock()
currentHeight = update.blockHeight currentHeight = update.blockHeight
newBlock, err := b.chainConn.GetBlock(update.blockHash) newBlock, err := b.chainConn.GetBlock(update.blockHash)
@ -355,15 +331,8 @@ out:
// which may have been triggered by this new block. // which may have been triggered by this new block.
b.notifyConfs(newHeight) b.notifyConfs(newHeight)
case <-b.txUpdateSignal: case item := <-b.txUpdates.ChanOut():
// A new update is available, so pop the new chain newSpend := item.(*txUpdate)
// update from the front of the update queue.
b.txUpdateMtx.Lock()
newSpend := b.txUpdates[0]
b.txUpdates[0] = nil // Set to nil to prevent GC leak.
b.txUpdates = b.txUpdates[1:]
b.txUpdateMtx.Unlock()
spendingTx := newSpend.tx spendingTx := newSpend.tx
// First, check if this transaction spends an output // First, check if this transaction spends an output