diff --git a/peer.go b/peer.go index 5007bb2eb..728ea02f2 100644 --- a/peer.go +++ b/peer.go @@ -111,16 +111,11 @@ type peer struct { // objects to queue messages to be sent out on the wire. outgoingQueue chan outgoinMsg - // sendQueueSync is used as a semaphore to synchronize writes between - // the writeHandler and the queueHandler. - sendQueueSync chan struct{} - // activeChannels is a map which stores the state machines of all // active channels. Channels are indexed into the map by the txid of // the funding transaction which opened the channel. - activeChanMtx sync.RWMutex - activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel - chanSnapshotReqs chan *chanSnapshotReq + activeChanMtx sync.RWMutex + activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel // newChannels is used by the fundingManager to send fully opened // channels to the source peer which handled the funding workflow. @@ -172,13 +167,11 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, server: server, - sendQueueSync: make(chan struct{}, 1), - sendQueue: make(chan outgoinMsg, 1), - outgoingQueue: make(chan outgoinMsg, outgoingQueueLen), + sendQueue: make(chan outgoinMsg), + outgoingQueue: make(chan outgoinMsg), - activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel), - chanSnapshotReqs: make(chan *chanSnapshotReq), - newChannels: make(chan *newChannelMsg, 1), + activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel), + newChannels: make(chan *newChannelMsg, 1), localCloseChanReqs: make(chan *htlcswitch.ChanClose), shutdownChanReqs: make(chan *lnwire.Shutdown), @@ -215,7 +208,10 @@ func (p *peer) Start() error { // message MUST be sent before any other message. readErr := make(chan error, 1) msgChan := make(chan lnwire.Message, 1) + p.wg.Add(1) go func() { + defer p.wg.Done() + msg, err := p.readNextMessage() if err != nil { readErr <- err @@ -360,6 +356,8 @@ func (p *peer) Disconnect(reason error) { p.conn.Close() close(p.quit) + + p.wg.Wait() } // String returns the string representation of this peer. @@ -419,6 +417,7 @@ type chanMsgStream struct { mtx sync.Mutex + wg sync.WaitGroup quit chan struct{} } @@ -441,6 +440,7 @@ func newChanMsgStream(f *fundingManager, h *htlcswitch.Switch, p *peer, // Start starts the chanMsgStream. func (c *chanMsgStream) Start() { + c.wg.Add(1) go c.msgConsumer() } @@ -452,11 +452,15 @@ func (c *chanMsgStream) Stop() { // Wake up the msgConsumer is we've been signalled to exit. c.msgCond.Signal() + + c.wg.Wait() } // msgConsumer is the main goroutine that streams messages from the peer's // readHandler directly to the target channel. func (c *chanMsgStream) msgConsumer() { + defer c.wg.Done() + peerLog.Tracef("Update stream for ChannelID(%x) created", c.cid[:]) for { @@ -529,6 +533,8 @@ func (c *chanMsgStream) AddMsg(msg lnwire.Message) { // // NOTE: This method MUST be run as a goroutine. func (p *peer) readHandler() { + defer p.wg.Done() + chanMsgStreams := make(map[lnwire.ChannelID]*chanMsgStream) out: for atomic.LoadInt32(&p.disconnect) == 0 { @@ -651,7 +657,6 @@ out: delete(chanMsgStreams, cid) } - p.wg.Done() peerLog.Tracef("readHandler for peer %v done", p) } @@ -820,6 +825,8 @@ func (p *peer) queueHandler() { // // NOTE: This method MUST be run as a goroutine. func (p *peer) pingHandler() { + defer p.wg.Done() + pingTicker := time.NewTicker(pingInterval) defer pingTicker.Stop() @@ -835,8 +842,6 @@ out: break out } } - - p.wg.Done() } // PingTime returns the estimated ping time to the peer in microseconds. @@ -857,9 +862,16 @@ func (p *peer) queueMsg(msg lnwire.Message, doneChan chan struct{}) { // ChannelSnapshots returns a slice of channel snapshots detailing all // currently active channels maintained with the remote peer. func (p *peer) ChannelSnapshots() []*channeldb.ChannelSnapshot { - resp := make(chan []*channeldb.ChannelSnapshot, 1) - p.chanSnapshotReqs <- &chanSnapshotReq{resp} - return <-resp + p.activeChanMtx.RLock() + defer p.activeChanMtx.RUnlock() + + snapshots := make([]*channeldb.ChannelSnapshot, 0, len(p.activeChannels)) + for _, activeChan := range p.activeChannels { + snapshot := activeChan.StateSnapshot() + snapshots = append(snapshots, snapshot) + } + + return snapshots } // closingScripts are the set of clsoign deslivery scripts for each party. This @@ -877,6 +889,8 @@ type closingScripts struct { // // NOTE: This method MUST be run as a goroutine. func (p *peer) channelManager() { + defer p.wg.Done() + // chanShutdowns is a map of channels for which our node has initiated // a cooperative channel close. When an lnwire.Shutdown is received, // this allows the node to determine the next step to be taken in the @@ -907,17 +921,6 @@ func (p *peer) channelManager() { out: for { select { - case req := <-p.chanSnapshotReqs: - p.activeChanMtx.RLock() - snapshots := make([]*channeldb.ChannelSnapshot, 0, - len(p.activeChannels)) - for _, activeChan := range p.activeChannels { - snapshot := activeChan.StateSnapshot() - snapshots = append(snapshots, snapshot) - } - p.activeChanMtx.RUnlock() - req.resp <- snapshots - // A new channel has arrived which means we've just completed a // funding workflow. We'll initialize the necessary local // state, and notify the htlc switch of a new link. @@ -1117,8 +1120,6 @@ out: break out } } - - p.wg.Done() } // handleLocalClose kicks-off the workflow to execute a cooperative or forced @@ -1356,7 +1357,6 @@ func (p *peer) handleInitClosingSigned(req *htlcswitch.ChanClose, notifier := p.server.cc.chainNotifier go waitForChanToClose(uint32(bestHeight), notifier, req.Err, req.ChanPoint, &closingTxid, func() { - // First, we'll mark the database as being fully closed // so we'll no longer watch for its ultimate closure // upon startup. @@ -1463,7 +1463,8 @@ func (p *peer) handleResponseClosingSigned(msg *lnwire.ClosingSigned, // upon startup. err := p.server.chanDB.MarkChanFullyClosed(chanPoint) if err != nil { - peerLog.Errorf("unable to mark channel as closed: %v", err) + peerLog.Errorf("unable to mark channel "+ + "as closed: %v", err) return } },