mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-03-05 02:02:06 +01:00
Merge pull request #2819 from cfromknecht/peer-write-retry
peer: catch write timeouts, retry with backoff
This commit is contained in:
commit
b935f69a6e
3 changed files with 117 additions and 34 deletions
|
@ -5,11 +5,11 @@ import "fmt"
|
||||||
const (
|
const (
|
||||||
// DefaultReadWorkers is the default maximum number of concurrent
|
// DefaultReadWorkers is the default maximum number of concurrent
|
||||||
// workers used by the daemon's read pool.
|
// workers used by the daemon's read pool.
|
||||||
DefaultReadWorkers = 16
|
DefaultReadWorkers = 100
|
||||||
|
|
||||||
// DefaultWriteWorkers is the default maximum number of concurrent
|
// DefaultWriteWorkers is the default maximum number of concurrent
|
||||||
// workers used by the daemon's write pool.
|
// workers used by the daemon's write pool.
|
||||||
DefaultWriteWorkers = 16
|
DefaultWriteWorkers = 100
|
||||||
|
|
||||||
// DefaultSigWorkers is the default maximum number of concurrent workers
|
// DefaultSigWorkers is the default maximum number of concurrent workers
|
||||||
// used by the daemon's sig pool.
|
// used by the daemon's sig pool.
|
||||||
|
|
94
peer.go
94
peer.go
|
@ -45,7 +45,7 @@ const (
|
||||||
idleTimeout = 5 * time.Minute
|
idleTimeout = 5 * time.Minute
|
||||||
|
|
||||||
// writeMessageTimeout is the timeout used when writing a message to peer.
|
// writeMessageTimeout is the timeout used when writing a message to peer.
|
||||||
writeMessageTimeout = 50 * time.Second
|
writeMessageTimeout = 5 * time.Second
|
||||||
|
|
||||||
// readMessageTimeout is the timeout used when reading a message from a
|
// readMessageTimeout is the timeout used when reading a message from a
|
||||||
// peer.
|
// peer.
|
||||||
|
@ -638,7 +638,7 @@ func (p *peer) Disconnect(reason error) {
|
||||||
|
|
||||||
// String returns the string representation of this peer.
|
// String returns the string representation of this peer.
|
||||||
func (p *peer) String() string {
|
func (p *peer) String() string {
|
||||||
return p.conn.RemoteAddr().String()
|
return fmt.Sprintf("%x@%s", p.pubKeyBytes, p.conn.RemoteAddr())
|
||||||
}
|
}
|
||||||
|
|
||||||
// readNextMessage reads, and returns the next message on the wire along with
|
// readNextMessage reads, and returns the next message on the wire along with
|
||||||
|
@ -1005,7 +1005,12 @@ func (p *peer) readHandler() {
|
||||||
out:
|
out:
|
||||||
for atomic.LoadInt32(&p.disconnect) == 0 {
|
for atomic.LoadInt32(&p.disconnect) == 0 {
|
||||||
nextMsg, err := p.readNextMessage()
|
nextMsg, err := p.readNextMessage()
|
||||||
idleTimer.Stop()
|
if !idleTimer.Stop() {
|
||||||
|
select {
|
||||||
|
case <-idleTimer.C:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
peerLog.Infof("unable to read message from %v: %v",
|
peerLog.Infof("unable to read message from %v: %v",
|
||||||
p, err)
|
p, err)
|
||||||
|
@ -1427,29 +1432,100 @@ func (p *peer) writeMessage(msg lnwire.Message) error {
|
||||||
//
|
//
|
||||||
// NOTE: This method MUST be run as a goroutine.
|
// NOTE: This method MUST be run as a goroutine.
|
||||||
func (p *peer) writeHandler() {
|
func (p *peer) writeHandler() {
|
||||||
|
// We'll stop the timer after a new messages is sent, and also reset it
|
||||||
|
// after we process the next message.
|
||||||
|
idleTimer := time.AfterFunc(idleTimeout, func() {
|
||||||
|
err := fmt.Errorf("Peer %s no write for %s -- disconnecting",
|
||||||
|
p, idleTimeout)
|
||||||
|
p.Disconnect(err)
|
||||||
|
})
|
||||||
|
|
||||||
var exitErr error
|
var exitErr error
|
||||||
|
|
||||||
|
const (
|
||||||
|
minRetryDelay = 5 * time.Second
|
||||||
|
maxRetryDelay = time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
out:
|
out:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case outMsg := <-p.sendQueue:
|
case outMsg := <-p.sendQueue:
|
||||||
switch outMsg.msg.(type) {
|
// Record the time at which we first attempt to send the
|
||||||
|
// message.
|
||||||
|
startTime := time.Now()
|
||||||
|
|
||||||
|
// Initialize a retry delay of zero, which will be
|
||||||
|
// increased if we encounter a write timeout on the
|
||||||
|
// send.
|
||||||
|
var retryDelay time.Duration
|
||||||
|
retryWithDelay:
|
||||||
|
if retryDelay > 0 {
|
||||||
|
select {
|
||||||
|
case <-time.After(retryDelay):
|
||||||
|
case <-p.quit:
|
||||||
|
// Inform synchronous writes that the
|
||||||
|
// peer is exiting.
|
||||||
|
if outMsg.errChan != nil {
|
||||||
|
outMsg.errChan <- ErrPeerExiting
|
||||||
|
}
|
||||||
|
exitErr = ErrPeerExiting
|
||||||
|
break out
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// If we're about to send a ping message, then log the
|
// If we're about to send a ping message, then log the
|
||||||
// exact time in which we send the message so we can
|
// exact time in which we send the message so we can
|
||||||
// use the delay as a rough estimate of latency to the
|
// use the delay as a rough estimate of latency to the
|
||||||
// remote peer.
|
// remote peer.
|
||||||
case *lnwire.Ping:
|
if _, ok := outMsg.msg.(*lnwire.Ping); ok {
|
||||||
// TODO(roasbeef): do this before the write?
|
// TODO(roasbeef): do this before the write?
|
||||||
// possibly account for processing within func?
|
// possibly account for processing within func?
|
||||||
now := time.Now().UnixNano()
|
now := time.Now().UnixNano()
|
||||||
atomic.StoreInt64(&p.pingLastSend, now)
|
atomic.StoreInt64(&p.pingLastSend, now)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write out the message to the socket, responding with
|
// Write out the message to the socket. If a timeout
|
||||||
// error if `errChan` is non-nil. The `errChan` allows
|
// error is encountered, we will catch this and retry
|
||||||
// callers to optionally synchronize sends with the
|
// after backing off in case the remote peer is just
|
||||||
// writeHandler.
|
// slow to process messages from the wire.
|
||||||
err := p.writeMessage(outMsg.msg)
|
err := p.writeMessage(outMsg.msg)
|
||||||
|
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
|
||||||
|
// Increase the retry delay in the event of a
|
||||||
|
// timeout error, this prevents us from
|
||||||
|
// disconnecting if the remote party is slow to
|
||||||
|
// pull messages off the wire. We back off
|
||||||
|
// exponentially up to our max delay to prevent
|
||||||
|
// blocking the write pool.
|
||||||
|
if retryDelay == 0 {
|
||||||
|
retryDelay = minRetryDelay
|
||||||
|
} else {
|
||||||
|
retryDelay *= 2
|
||||||
|
if retryDelay > maxRetryDelay {
|
||||||
|
retryDelay = maxRetryDelay
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
peerLog.Debugf("Write timeout detected for "+
|
||||||
|
"peer %s, retrying after %v, "+
|
||||||
|
"first attempted %v ago", p, retryDelay,
|
||||||
|
time.Since(startTime))
|
||||||
|
|
||||||
|
goto retryWithDelay
|
||||||
|
}
|
||||||
|
|
||||||
|
// The write succeeded, reset the idle timer to prevent
|
||||||
|
// us from disconnecting the peer.
|
||||||
|
if !idleTimer.Stop() {
|
||||||
|
select {
|
||||||
|
case <-idleTimer.C:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
idleTimer.Reset(idleTimeout)
|
||||||
|
|
||||||
|
// If the peer requested a synchronous write, respond
|
||||||
|
// with the error.
|
||||||
if outMsg.errChan != nil {
|
if outMsg.errChan != nil {
|
||||||
outMsg.errChan <- err
|
outMsg.errChan <- err
|
||||||
}
|
}
|
||||||
|
|
53
server.go
53
server.go
|
@ -2156,14 +2156,17 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
|
||||||
|
|
||||||
case nil:
|
case nil:
|
||||||
// We already have a connection with the incoming peer. If the
|
// We already have a connection with the incoming peer. If the
|
||||||
// connection we've already established should be kept and is not of
|
// connection we've already established should be kept and is
|
||||||
// the same type of the new connection (inbound), then we'll close out
|
// not of the same type of the new connection (inbound), then
|
||||||
// the new connection s.t there's only a single connection between us.
|
// we'll close out the new connection s.t there's only a single
|
||||||
|
// connection between us.
|
||||||
localPub := s.identityPriv.PubKey()
|
localPub := s.identityPriv.PubKey()
|
||||||
if !connectedPeer.inbound && !shouldDropLocalConnection(localPub, nodePub) {
|
if !connectedPeer.inbound &&
|
||||||
srvrLog.Warnf("Received inbound connection from peer %x, "+
|
!shouldDropLocalConnection(localPub, nodePub) {
|
||||||
"but already have outbound connection, dropping conn",
|
|
||||||
nodePub.SerializeCompressed())
|
srvrLog.Warnf("Received inbound connection from "+
|
||||||
|
"peer %v, but already have outbound "+
|
||||||
|
"connection, dropping conn", connectedPeer)
|
||||||
conn.Close()
|
conn.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -2236,7 +2239,8 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
srvrLog.Infof("Established connection to: %v", conn.RemoteAddr())
|
srvrLog.Infof("Established connection to: %x@%v", pubStr,
|
||||||
|
conn.RemoteAddr())
|
||||||
|
|
||||||
if connReq != nil {
|
if connReq != nil {
|
||||||
// A successful connection was returned by the connmgr.
|
// A successful connection was returned by the connmgr.
|
||||||
|
@ -2263,14 +2267,17 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
|
||||||
|
|
||||||
case nil:
|
case nil:
|
||||||
// We already have a connection with the incoming peer. If the
|
// We already have a connection with the incoming peer. If the
|
||||||
// connection we've already established should be kept and is not of
|
// connection we've already established should be kept and is
|
||||||
// the same type of the new connection (outbound), then we'll close out
|
// not of the same type of the new connection (outbound), then
|
||||||
// the new connection s.t there's only a single connection between us.
|
// we'll close out the new connection s.t there's only a single
|
||||||
|
// connection between us.
|
||||||
localPub := s.identityPriv.PubKey()
|
localPub := s.identityPriv.PubKey()
|
||||||
if connectedPeer.inbound && shouldDropLocalConnection(localPub, nodePub) {
|
if connectedPeer.inbound &&
|
||||||
srvrLog.Warnf("Established outbound connection to peer %x, "+
|
shouldDropLocalConnection(localPub, nodePub) {
|
||||||
"but already have inbound connection, dropping conn",
|
|
||||||
nodePub.SerializeCompressed())
|
srvrLog.Warnf("Established outbound connection to "+
|
||||||
|
"peer %v, but already have inbound "+
|
||||||
|
"connection, dropping conn", connectedPeer)
|
||||||
if connReq != nil {
|
if connReq != nil {
|
||||||
s.connMgr.Remove(connReq.ID())
|
s.connMgr.Remove(connReq.ID())
|
||||||
}
|
}
|
||||||
|
@ -2355,8 +2362,8 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
|
||||||
addr := conn.RemoteAddr()
|
addr := conn.RemoteAddr()
|
||||||
pubKey := brontideConn.RemotePub()
|
pubKey := brontideConn.RemotePub()
|
||||||
|
|
||||||
srvrLog.Infof("Finalizing connection to %x, inbound=%v",
|
srvrLog.Infof("Finalizing connection to %x@%s, inbound=%v",
|
||||||
pubKey.SerializeCompressed(), inbound)
|
pubKey.SerializeCompressed(), addr, inbound)
|
||||||
|
|
||||||
peerAddr := &lnwire.NetAddress{
|
peerAddr := &lnwire.NetAddress{
|
||||||
IdentityKey: pubKey,
|
IdentityKey: pubKey,
|
||||||
|
@ -2473,7 +2480,7 @@ func (s *server) peerInitializer(p *peer) {
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
// Check if there are listeners waiting for this peer to come online.
|
// Check if there are listeners waiting for this peer to come online.
|
||||||
srvrLog.Debugf("Notifying that peer %x is online", p.PubKey())
|
srvrLog.Debugf("Notifying that peer %v is online", p)
|
||||||
for _, peerChan := range s.peerConnectedListeners[pubStr] {
|
for _, peerChan := range s.peerConnectedListeners[pubStr] {
|
||||||
select {
|
select {
|
||||||
case peerChan <- p:
|
case peerChan <- p:
|
||||||
|
@ -2527,8 +2534,7 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) {
|
||||||
// TODO(roasbeef): instead add a PurgeInterfaceLinks function?
|
// TODO(roasbeef): instead add a PurgeInterfaceLinks function?
|
||||||
links, err := p.server.htlcSwitch.GetLinksByInterface(p.pubKeyBytes)
|
links, err := p.server.htlcSwitch.GetLinksByInterface(p.pubKeyBytes)
|
||||||
if err != nil && err != htlcswitch.ErrNoLinksFound {
|
if err != nil && err != htlcswitch.ErrNoLinksFound {
|
||||||
srvrLog.Errorf("Unable to get channel links for %x: %v",
|
srvrLog.Errorf("Unable to get channel links for %v: %v", p, err)
|
||||||
p.PubKey(), err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, link := range links {
|
for _, link := range links {
|
||||||
|
@ -2540,7 +2546,7 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) {
|
||||||
|
|
||||||
// If there were any notification requests for when this peer
|
// If there were any notification requests for when this peer
|
||||||
// disconnected, we can trigger them now.
|
// disconnected, we can trigger them now.
|
||||||
srvrLog.Debugf("Notifying that peer %x is offline", p.PubKey())
|
srvrLog.Debugf("Notifying that peer %x is offline", p)
|
||||||
pubStr := string(pubKey.SerializeCompressed())
|
pubStr := string(pubKey.SerializeCompressed())
|
||||||
for _, offlineChan := range s.peerDisconnectedListeners[pubStr] {
|
for _, offlineChan := range s.peerDisconnectedListeners[pubStr] {
|
||||||
close(offlineChan)
|
close(offlineChan)
|
||||||
|
@ -2736,13 +2742,14 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error {
|
||||||
// connection.
|
// connection.
|
||||||
if reqs, ok := s.persistentConnReqs[targetPub]; ok {
|
if reqs, ok := s.persistentConnReqs[targetPub]; ok {
|
||||||
srvrLog.Warnf("Already have %d persistent connection "+
|
srvrLog.Warnf("Already have %d persistent connection "+
|
||||||
"requests for %v, connecting anyway.", len(reqs), addr)
|
"requests for %x@%v, connecting anyway.", len(reqs),
|
||||||
|
targetPub, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there's not already a pending or active connection to this node,
|
// If there's not already a pending or active connection to this node,
|
||||||
// then instruct the connection manager to attempt to establish a
|
// then instruct the connection manager to attempt to establish a
|
||||||
// persistent connection to the peer.
|
// persistent connection to the peer.
|
||||||
srvrLog.Debugf("Connecting to %v", addr)
|
srvrLog.Debugf("Connecting to %x@%v", targetPub, addr)
|
||||||
if perm {
|
if perm {
|
||||||
connReq := &connmgr.ConnReq{
|
connReq := &connmgr.ConnReq{
|
||||||
Addr: addr,
|
Addr: addr,
|
||||||
|
|
Loading…
Add table
Reference in a new issue