diff --git a/htlcswitch/queue.go b/htlcswitch/queue.go index be95073f5..420d1f9b7 100644 --- a/htlcswitch/queue.go +++ b/htlcswitch/queue.go @@ -3,6 +3,7 @@ package htlcswitch import ( "sync" "sync/atomic" + "time" "github.com/lightningnetwork/lnd/lnwire" ) @@ -20,7 +21,7 @@ type packetQueue struct { // totalHtlcAmt is the sum of the value of all pending HTLC's currently // residing within the overflow queue. This value should only read or // modified *atomically*. - totalHtlcAmt int64 + totalHtlcAmt int64 // To be used atomically. // queueLen is an internal counter that reflects the size of the queue // at any given instance. This value is intended to be use atomically @@ -28,7 +29,9 @@ type packetQueue struct { // the queue w/o grabbing the main lock. This allows callers to avoid a // deadlock situation where the main goroutine is attempting a send // with the lock held. - queueLen int32 + queueLen int32 // To be used atomically. + + streamShutdown int32 // To be used atomically. queue []*htlcPacket @@ -75,7 +78,12 @@ func (p *packetQueue) Start() { func (p *packetQueue) Stop() { close(p.quit) - p.queueCond.Signal() + // Now that we've closed the channel, we'll repeatedly signal the msg + // consumer until we've detected that it has exited. + for atomic.LoadInt32(&p.streamShutdown) == 0 { + p.queueCond.Signal() + time.Sleep(time.Millisecond * 100) + } } // packetCoordinator is a goroutine that handles the packet overflow queue. @@ -92,7 +100,7 @@ func (p *packetQueue) Stop() { // like reg congestion avoidance: // * random dropping, RED, etc func (p *packetQueue) packetCoordinator() { - defer p.wg.Done() + defer atomic.StoreInt32(&p.streamShutdown, 1) for { // First, we'll check our condition. If the queue of packets is