chainntnfs/neutrinonotify: make chainUpdates a buffered chan

ConcurrentQueue has internal structures so if a filterUpdate exists
in it, the idea of draining the queue may not work reliably. The
update may exist in the ConcurrentQueue but may not be available via
ChanOut() when we're ready to drain the ConcurrentQueue. Fix this by
using a regular buffered chan, which will either have the update or
not have the update. Its size is set to 100 as our tests may generate
quite a bit of updates.
This commit is contained in:
eugene 2021-07-16 15:35:43 -04:00
parent 86f28cdc1d
commit b7de0eae93
No known key found for this signature in database
GPG Key ID: 118759E83439A9B1
2 changed files with 12 additions and 14 deletions

View File

@ -62,8 +62,9 @@ type NeutrinoNotifier struct {
rescanErr <-chan error
chainUpdates *queue.ConcurrentQueue
txUpdates *queue.ConcurrentQueue
chainUpdates chan *filteredBlock
txUpdates *queue.ConcurrentQueue
// spendHintCache is a cache used to query and update the latest height
// hints for an outpoint. Each height hint represents the earliest
@ -105,8 +106,9 @@ func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache,
rescanErr: make(chan error),
chainUpdates: queue.NewConcurrentQueue(10),
txUpdates: queue.NewConcurrentQueue(10),
chainUpdates: make(chan *filteredBlock, 100),
txUpdates: queue.NewConcurrentQueue(10),
spendHintCache: spendHintCache,
confirmHintCache: confirmHintCache,
@ -137,7 +139,6 @@ func (n *NeutrinoNotifier) Stop() error {
close(n.quit)
n.wg.Wait()
n.chainUpdates.Stop()
n.txUpdates.Stop()
// Notify all pending clients of our shutdown by closing the related
@ -162,7 +163,6 @@ func (n *NeutrinoNotifier) startNotifier() error {
// Start our concurrent queues before starting the rescan, to ensure
// onFilteredBlockConnected and onRelavantTx callbacks won't be
// blocked.
n.chainUpdates.Start()
n.txUpdates.Start()
// First, we'll obtain the latest block height of the p2p node. We'll
@ -172,7 +172,6 @@ func (n *NeutrinoNotifier) startNotifier() error {
startingPoint, err := n.p2pNode.BestBlock()
if err != nil {
n.txUpdates.Stop()
n.chainUpdates.Stop()
return err
}
n.bestBlock.Hash = &startingPoint.Hash
@ -251,7 +250,7 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32,
// Append this new chain update to the end of the queue of new chain
// updates.
select {
case n.chainUpdates.ChanIn() <- &filteredBlock{
case n.chainUpdates <- &filteredBlock{
hash: header.BlockHash(),
height: uint32(height),
txns: txns,
@ -269,7 +268,7 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32,
// Append this new chain update to the end of the queue of new chain
// disconnects.
select {
case n.chainUpdates.ChanIn() <- &filteredBlock{
case n.chainUpdates <- &filteredBlock{
hash: header.BlockHash(),
height: uint32(height),
connect: false,
@ -413,8 +412,8 @@ out:
msg.errChan <- err
}
case item := <-n.chainUpdates.ChanOut():
update := item.(*filteredBlock)
case item := <-n.chainUpdates:
update := item
if update.connect {
n.bestBlockMtx.Lock()
// Since neutrino has no way of knowing what

View File

@ -63,7 +63,6 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32,
)
n.rescanErr = n.chainView.Start()
n.chainUpdates.Start()
n.txUpdates.Start()
if generateBlocks != nil {
@ -80,8 +79,8 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32,
loop:
for {
select {
case ntfn := <-n.chainUpdates.ChanOut():
lastReceivedNtfn := ntfn.(*filteredBlock)
case ntfn := <-n.chainUpdates:
lastReceivedNtfn := ntfn
if lastReceivedNtfn.height >= uint32(syncHeight) {
break loop
}