diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 195773eda..e35a19299 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -358,7 +358,7 @@ func (d *AuthenticatedGossiper) SynchronizeNode(syncPeer lnpeer.Peer) error { // With all the announcement messages gathered, send them all in a // single batch to the target peer. - return syncPeer.SendMessage(false, announceMessages...) + return syncPeer.SendMessageLazy(false, announceMessages...) } // PropagateChanPolicyUpdate signals the AuthenticatedGossiper to update the @@ -1111,7 +1111,7 @@ func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, encodingType: encoding, chunkSize: encodingTypeToChunkSize[encoding], sendToPeer: func(msgs ...lnwire.Message) error { - return syncPeer.SendMessage(false, msgs...) + return syncPeer.SendMessageLazy(false, msgs...) }, }) copy(syncer.peerPub[:], nodeID[:]) diff --git a/discovery/mock_test.go b/discovery/mock_test.go index 85c6c4f30..9e9451937 100644 --- a/discovery/mock_test.go +++ b/discovery/mock_test.go @@ -35,6 +35,11 @@ func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error { return nil } + +func (p *mockPeer) SendMessageLazy(sync bool, msgs ...lnwire.Message) error { + return p.SendMessage(sync, msgs...) +} + func (p *mockPeer) AddNewChannel(_ *channeldb.OpenChannel, _ <-chan struct{}) error { return nil } diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 22d0d7109..debd16eb7 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -170,6 +170,10 @@ func (n *testNode) SendMessage(_ bool, msg ...lnwire.Message) error { return n.sendMessage(msg[0]) } +func (n *testNode) SendMessageLazy(sync bool, msgs ...lnwire.Message) error { + return n.SendMessage(sync, msgs...) +} + func (n *testNode) WipeChannel(_ *wire.OutPoint) error { return nil } diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 6c5f10bae..8b466948c 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1549,6 +1549,9 @@ func (m *mockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error { } return nil } +func (m *mockPeer) SendMessageLazy(sync bool, msgs ...lnwire.Message) error { + return m.SendMessage(sync, msgs...) +} func (m *mockPeer) AddNewChannel(_ *channeldb.OpenChannel, _ <-chan struct{}) error { return nil diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 5e8994151..cd6a5dcfa 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -502,6 +502,10 @@ func (s *mockServer) SendMessage(sync bool, msgs ...lnwire.Message) error { return nil } +func (s *mockServer) SendMessageLazy(sync bool, msgs ...lnwire.Message) error { + panic("not implemented") +} + func (s *mockServer) readHandler(message lnwire.Message) error { var targetChan lnwire.ChannelID diff --git a/lnpeer/peer.go b/lnpeer/peer.go index 34675aa5b..16cbff639 100644 --- a/lnpeer/peer.go +++ b/lnpeer/peer.go @@ -12,10 +12,17 @@ import ( // Peer is an interface which represents the remote lightning node inside our // system. type Peer interface { - // SendMessage sends a variadic number of message to remote peer. The - // first argument denotes if the method should block until the message - // has been sent to the remote peer. - SendMessage(sync bool, msg ...lnwire.Message) error + // SendMessage sends a variadic number of high-priority message to + // remote peer. The first argument denotes if the method should block + // until the messages have been sent to the remote peer or an error is + // returned, otherwise it returns immediately after queuing. + SendMessage(sync bool, msgs ...lnwire.Message) error + + // SendMessageLazy sends a variadic number of low-priority message to + // remote peer. The first argument denotes if the method should block + // until the messages have been sent to the remote peer or an error is + // returned, otherwise it returns immediately after queueing. + SendMessageLazy(sync bool, msgs ...lnwire.Message) error // AddNewChannel adds a new channel to the peer. The channel should fail // to be added if the cancel channel is closed. diff --git a/peer.go b/peer.go index b0a9aac8f..f1fe9490f 100644 --- a/peer.go +++ b/peer.go @@ -64,8 +64,9 @@ const ( // a buffered channel which will be sent upon once the write is complete. This // buffered channel acts as a semaphore to be used for synchronization purposes. type outgoingMsg struct { - msg lnwire.Message - errChan chan error // MUST be buffered. + priority bool + msg lnwire.Message + errChan chan error // MUST be buffered. } // newChannelMsg packages a channeldb.OpenChannel with a channel that allows @@ -1479,24 +1480,45 @@ out: func (p *peer) queueHandler() { defer p.wg.Done() - // pendingMsgs will hold all messages waiting to be added - // to the sendQueue. - pendingMsgs := list.New() + // priorityMsgs holds an in order list of messages deemed high-priority + // to be added to the sendQueue. This predominately includes messages + // from the funding manager and htlcswitch. + priorityMsgs := list.New() + + // lazyMsgs holds an in order list of messages deemed low-priority to be + // added to the sendQueue only after all high-priority messages have + // been queued. This predominately includes messages from the gossiper. + lazyMsgs := list.New() for { - // Examine the front of the queue. - elem := pendingMsgs.Front() + // Examine the front of the priority queue, if it is empty check + // the low priority queue. + elem := priorityMsgs.Front() + if elem == nil { + elem = lazyMsgs.Front() + } + if elem != nil { + front := elem.Value.(outgoingMsg) + // There's an element on the queue, try adding // it to the sendQueue. We also watch for // messages on the outgoingQueue, in case the // writeHandler cannot accept messages on the // sendQueue. select { - case p.sendQueue <- elem.Value.(outgoingMsg): - pendingMsgs.Remove(elem) + case p.sendQueue <- front: + if front.priority { + priorityMsgs.Remove(elem) + } else { + lazyMsgs.Remove(elem) + } case msg := <-p.outgoingQueue: - pendingMsgs.PushBack(msg) + if msg.priority { + priorityMsgs.PushBack(msg) + } else { + lazyMsgs.PushBack(msg) + } case <-p.quit: return } @@ -1506,7 +1528,11 @@ func (p *peer) queueHandler() { // into the queue from outside sub-systems. select { case msg := <-p.outgoingQueue: - pendingMsgs.PushBack(msg) + if msg.priority { + priorityMsgs.PushBack(msg) + } else { + lazyMsgs.PushBack(msg) + } case <-p.quit: return } @@ -1544,13 +1570,26 @@ func (p *peer) PingTime() int64 { return atomic.LoadInt64(&p.pingTime) } -// queueMsg queues a new lnwire.Message to be eventually sent out on the -// wire. It returns an error if we failed to queue the message. An error -// is sent on errChan if the message fails being sent to the peer, or -// nil otherwise. +// queueMsg adds the lnwire.Message to the back of the high priority send queue. +// If the errChan is non-nil, an error is sent back if the msg failed to queue +// or failed to write, and nil otherwise. func (p *peer) queueMsg(msg lnwire.Message, errChan chan error) { + p.queue(true, msg, errChan) +} + +// queueMsgLazy adds the lnwire.Message to the back of the low priority send +// queue. If the errChan is non-nil, an error is sent back if the msg failed to +// queue or failed to write, and nil otherwise. +func (p *peer) queueMsgLazy(msg lnwire.Message, errChan chan error) { + p.queue(false, msg, errChan) +} + +// queue sends a given message to the queueHandler using the passed priority. If +// the errChan is non-nil, an error is sent back if the msg failed to queue or +// failed to write, and nil otherwise. +func (p *peer) queue(priority bool, msg lnwire.Message, errChan chan error) { select { - case p.outgoingQueue <- outgoingMsg{msg, errChan}: + case p.outgoingQueue <- outgoingMsg{priority, msg, errChan}: case <-p.quit: peerLog.Tracef("Peer shutting down, could not enqueue msg.") if errChan != nil { @@ -2312,16 +2351,38 @@ func (p *peer) resendChanSyncMsg(cid lnwire.ChannelID) error { return nil } -// SendMessage sends a variadic number of message to remote peer. The first -// argument denotes if the method should block until the message has been sent -// to the remote peer. +// SendMessage sends a variadic number of high-priority message to remote peer. +// The first argument denotes if the method should block until the messages have +// been sent to the remote peer or an error is returned, otherwise it returns +// immediately after queuing. // // NOTE: Part of the lnpeer.Peer interface. func (p *peer) SendMessage(sync bool, msgs ...lnwire.Message) error { + return p.sendMessage(sync, true, msgs...) +} + +// SendMessageLazy sends a variadic number of low-priority message to remote +// peer. The first argument denotes if the method should block until the +// messages have been sent to the remote peer or an error is returned, otherwise +// it returns immediately after queueing. +// +// NOTE: Part of the lnpeer.Peer interface. +func (p *peer) SendMessageLazy(sync bool, msgs ...lnwire.Message) error { + return p.sendMessage(sync, false, msgs...) +} + +// sendMessage queues a variadic number of messages using the passed priority +// to the remote peer. If sync is true, this method will block until the +// messages have been sent to the remote peer or an error is returned, otherwise +// it returns immediately after queueing. +func (p *peer) sendMessage(sync, priority bool, msgs ...lnwire.Message) error { // Add all incoming messages to the outgoing queue. A list of error // chans is populated for each message if the caller requested a sync // send. var errChans []chan error + if sync { + errChans = make([]chan error, 0, len(msgs)) + } for _, msg := range msgs { // If a sync send was requested, create an error chan to listen // for an ack from the writeHandler. @@ -2331,7 +2392,11 @@ func (p *peer) SendMessage(sync bool, msgs ...lnwire.Message) error { errChans = append(errChans, errChan) } - p.queueMsg(msg, errChan) + if priority { + p.queueMsg(msg, errChan) + } else { + p.queueMsgLazy(msg, errChan) + } } // Wait for all replies from the writeHandler. For async sends, this diff --git a/server.go b/server.go index b75ea503b..dbe2512f0 100644 --- a/server.go +++ b/server.go @@ -1884,6 +1884,8 @@ func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) { // BroadcastMessage sends a request to the server to broadcast a set of // messages to all peers other than the one specified by the `skips` parameter. +// All messages sent via BroadcastMessage will be queued for lazy delivery to +// the target peers. // // NOTE: This function is safe for concurrent access. func (s *server) BroadcastMessage(skips map[routing.Vertex]struct{}, @@ -1916,7 +1918,12 @@ func (s *server) BroadcastMessage(skips map[routing.Vertex]struct{}, // Dispatch a go routine to enqueue all messages to this peer. wg.Add(1) s.wg.Add(1) - go s.sendPeerMessages(sPeer, msgs, &wg) + go func(p lnpeer.Peer) { + defer s.wg.Done() + defer wg.Done() + + p.SendMessageLazy(false, msgs...) + }(sPeer) } // Wait for all messages to have been dispatched before returning to @@ -1926,53 +1933,6 @@ func (s *server) BroadcastMessage(skips map[routing.Vertex]struct{}, return nil } -// SendToPeer send a message to the server telling it to send the specific set -// of message to a particular peer. If the peer connect be found, then this -// method will return a non-nil error. -// -// NOTE: This function is safe for concurrent access. -func (s *server) SendToPeer(target *btcec.PublicKey, - msgs ...lnwire.Message) error { - - // Compute the target peer's identifier. - targetPubBytes := target.SerializeCompressed() - - srvrLog.Tracef("Attempting to send msgs %v to: %x", - len(msgs), targetPubBytes) - - // Lookup intended target in peersByPub, returning an error to the - // caller if the peer is unknown. Access to peersByPub is synchronized - // here to ensure we consider the exact set of peers present at the - // time of invocation. - s.mu.RLock() - targetPeer, err := s.findPeerByPubStr(string(targetPubBytes)) - s.mu.RUnlock() - if err == ErrPeerNotConnected { - srvrLog.Errorf("unable to send message to %x, "+ - "peer is not connected", targetPubBytes) - return err - } - - // Send messages to the peer and get the error channels that will be - // signaled by the peer's write handler. - errChans := s.sendPeerMessages(targetPeer, msgs, nil) - - // With the server's shared lock released, we now handle all of the - // errors being returned from the target peer's write handler. - for _, errChan := range errChans { - select { - case err := <-errChan: - return err - case <-targetPeer.quit: - return ErrPeerExiting - case <-s.quit: - return ErrServerShuttingDown - } - } - - return nil -} - // NotifyWhenOnline can be called by other subsystems to get notified when a // particular peer comes online. The peer itself is sent across the peerChan. // @@ -2036,56 +1996,6 @@ func (s *server) NotifyWhenOffline(peerPubKey [33]byte) <-chan struct{} { return c } -// sendPeerMessages enqueues a list of messages into the outgoingQueue of the -// `targetPeer`. This method supports additional broadcast-level -// synchronization by using the additional `wg` to coordinate a particular -// broadcast. Since this method will wait for the return error from sending -// each message, it should be run as a goroutine (see comment below) and -// the error ignored if used for broadcasting messages, where the result -// from sending the messages is not of importance. -// -// NOTE: This method must be invoked with a non-nil `wg` if it is spawned as a -// go routine--both `wg` and the server's WaitGroup should be incremented -// beforehand. If this method is not spawned as a go routine, the provided -// `wg` should be nil, and the server's WaitGroup should not be tracking this -// invocation. -func (s *server) sendPeerMessages( - targetPeer *peer, - msgs []lnwire.Message, - wg *sync.WaitGroup) []chan error { - - // If a WaitGroup is provided, we assume that this method was spawned - // as a go routine, and that it is being tracked by both the server's - // WaitGroup, as well as the broadcast-level WaitGroup `wg`. In this - // event, we defer a call to Done on both WaitGroups to 1) ensure that - // server will be able to shutdown after its go routines exit, and 2) - // so the server can return to the caller of BroadcastMessage. - isBroadcast := wg != nil - if isBroadcast { - defer s.wg.Done() - defer wg.Done() - } - - // We queue each message, creating a slice of error channels that - // can be inspected after every message is successfully added to - // the queue. - var errChans []chan error - for _, msg := range msgs { - // If this is not broadcast, create error channels to provide - // synchronous feedback regarding the delivery of the message to - // a specific peer. - var errChan chan error - if !isBroadcast { - errChan = make(chan error, 1) - errChans = append(errChans, errChan) - } - - targetPeer.queueMsg(msg, errChan) - } - - return errChans -} - // FindPeer will return the peer that corresponds to the passed in public key. // This function is used by the funding manager, allowing it to update the // daemon's local representation of the remote peer.