diff --git a/server.go b/server.go index 5bf1a22f3..dbe2512f0 100644 --- a/server.go +++ b/server.go @@ -1884,6 +1884,8 @@ func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) { // BroadcastMessage sends a request to the server to broadcast a set of // messages to all peers other than the one specified by the `skips` parameter. +// All messages sent via BroadcastMessage will be queued for lazy delivery to +// the target peers. // // NOTE: This function is safe for concurrent access. func (s *server) BroadcastMessage(skips map[routing.Vertex]struct{}, @@ -1916,7 +1918,12 @@ func (s *server) BroadcastMessage(skips map[routing.Vertex]struct{}, // Dispatch a go routine to enqueue all messages to this peer. wg.Add(1) s.wg.Add(1) - go s.sendPeerMessages(sPeer, msgs, &wg) + go func(p lnpeer.Peer) { + defer s.wg.Done() + defer wg.Done() + + p.SendMessageLazy(false, msgs...) + }(sPeer) } // Wait for all messages to have been dispatched before returning to @@ -1989,56 +1996,6 @@ func (s *server) NotifyWhenOffline(peerPubKey [33]byte) <-chan struct{} { return c } -// sendPeerMessages enqueues a list of messages into the outgoingQueue of the -// `targetPeer`. This method supports additional broadcast-level -// synchronization by using the additional `wg` to coordinate a particular -// broadcast. Since this method will wait for the return error from sending -// each message, it should be run as a goroutine (see comment below) and -// the error ignored if used for broadcasting messages, where the result -// from sending the messages is not of importance. -// -// NOTE: This method must be invoked with a non-nil `wg` if it is spawned as a -// go routine--both `wg` and the server's WaitGroup should be incremented -// beforehand. If this method is not spawned as a go routine, the provided -// `wg` should be nil, and the server's WaitGroup should not be tracking this -// invocation. -func (s *server) sendPeerMessages( - targetPeer *peer, - msgs []lnwire.Message, - wg *sync.WaitGroup) []chan error { - - // If a WaitGroup is provided, we assume that this method was spawned - // as a go routine, and that it is being tracked by both the server's - // WaitGroup, as well as the broadcast-level WaitGroup `wg`. In this - // event, we defer a call to Done on both WaitGroups to 1) ensure that - // server will be able to shutdown after its go routines exit, and 2) - // so the server can return to the caller of BroadcastMessage. - isBroadcast := wg != nil - if isBroadcast { - defer s.wg.Done() - defer wg.Done() - } - - // We queue each message, creating a slice of error channels that - // can be inspected after every message is successfully added to - // the queue. - var errChans []chan error - for _, msg := range msgs { - // If this is not broadcast, create error channels to provide - // synchronous feedback regarding the delivery of the message to - // a specific peer. - var errChan chan error - if !isBroadcast { - errChan = make(chan error, 1) - errChans = append(errChans, errChan) - } - - targetPeer.queueMsgLazy(msg, errChan) - } - - return errChans -} - // FindPeer will return the peer that corresponds to the passed in public key. // This function is used by the funding manager, allowing it to update the // daemon's local representation of the remote peer.