diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 2320c0e6e..ef96eaa8a 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -62,8 +62,9 @@ type NeutrinoNotifier struct { rescanErr <-chan error - chainUpdates *queue.ConcurrentQueue - txUpdates *queue.ConcurrentQueue + chainUpdates chan *filteredBlock + + txUpdates *queue.ConcurrentQueue // spendHintCache is a cache used to query and update the latest height // hints for an outpoint. Each height hint represents the earliest @@ -105,8 +106,9 @@ func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache, rescanErr: make(chan error), - chainUpdates: queue.NewConcurrentQueue(10), - txUpdates: queue.NewConcurrentQueue(10), + chainUpdates: make(chan *filteredBlock, 100), + + txUpdates: queue.NewConcurrentQueue(10), spendHintCache: spendHintCache, confirmHintCache: confirmHintCache, @@ -137,7 +139,6 @@ func (n *NeutrinoNotifier) Stop() error { close(n.quit) n.wg.Wait() - n.chainUpdates.Stop() n.txUpdates.Stop() // Notify all pending clients of our shutdown by closing the related @@ -162,7 +163,6 @@ func (n *NeutrinoNotifier) startNotifier() error { // Start our concurrent queues before starting the rescan, to ensure // onFilteredBlockConnected and onRelavantTx callbacks won't be // blocked. - n.chainUpdates.Start() n.txUpdates.Start() // First, we'll obtain the latest block height of the p2p node. We'll @@ -172,7 +172,6 @@ func (n *NeutrinoNotifier) startNotifier() error { startingPoint, err := n.p2pNode.BestBlock() if err != nil { n.txUpdates.Stop() - n.chainUpdates.Stop() return err } n.bestBlock.Hash = &startingPoint.Hash @@ -251,7 +250,7 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32, // Append this new chain update to the end of the queue of new chain // updates. select { - case n.chainUpdates.ChanIn() <- &filteredBlock{ + case n.chainUpdates <- &filteredBlock{ hash: header.BlockHash(), height: uint32(height), txns: txns, @@ -269,7 +268,7 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32, // Append this new chain update to the end of the queue of new chain // disconnects. select { - case n.chainUpdates.ChanIn() <- &filteredBlock{ + case n.chainUpdates <- &filteredBlock{ hash: header.BlockHash(), height: uint32(height), connect: false, @@ -413,8 +412,8 @@ out: msg.errChan <- err } - case item := <-n.chainUpdates.ChanOut(): - update := item.(*filteredBlock) + case item := <-n.chainUpdates: + update := item if update.connect { n.bestBlockMtx.Lock() // Since neutrino has no way of knowing what diff --git a/chainntnfs/neutrinonotify/neutrino_dev.go b/chainntnfs/neutrinonotify/neutrino_dev.go index 728514aa8..d19783cce 100644 --- a/chainntnfs/neutrinonotify/neutrino_dev.go +++ b/chainntnfs/neutrinonotify/neutrino_dev.go @@ -63,7 +63,6 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, ) n.rescanErr = n.chainView.Start() - n.chainUpdates.Start() n.txUpdates.Start() if generateBlocks != nil { @@ -80,8 +79,8 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, loop: for { select { - case ntfn := <-n.chainUpdates.ChanOut(): - lastReceivedNtfn := ntfn.(*filteredBlock) + case ntfn := <-n.chainUpdates: + lastReceivedNtfn := ntfn if lastReceivedNtfn.height >= uint32(syncHeight) { break loop }