lnd/peer/ping_manager.go
2024-05-21 13:29:31 -07:00

262 lines
6.3 KiB
Go

package peer
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/lightningnetwork/lnd/lnwire"
)
// PingManagerConfig is a structure containing various parameters that govern
// how the PingManager behaves.
type PingManagerConfig struct {
// NewPingPayload is a closure that returns the payload to be packaged
// in the Ping message.
NewPingPayload func() []byte
// NewPongSize is a closure that returns a random value between
// [0, lnwire.MaxPongBytes]. This random value helps to more effectively
// pair Pong messages with Ping.
NewPongSize func() uint16
// IntervalDuration is the Duration between attempted pings.
IntervalDuration time.Duration
// TimeoutDuration is the Duration we wait before declaring a ping
// attempt failed.
TimeoutDuration time.Duration
// SendPing is a closure that is responsible for sending the Ping
// message out to our peer
SendPing func(ping *lnwire.Ping)
// OnPongFailure is a closure that is responsible for executing the
// logic when a Pong message is either late or does not match our
// expectations for that Pong
OnPongFailure func(error)
}
// PingManager is a structure that is designed to manage the internal state
// of the ping pong lifecycle with the remote peer. We assume there is only one
// ping outstanding at once.
//
// NOTE: This structure MUST be initialized with NewPingManager.
type PingManager struct {
cfg *PingManagerConfig
// pingTime is a rough estimate of the RTT (round-trip-time) between us
// and the connected peer.
// To be used atomically.
// TODO(roasbeef): also use a WMA or EMA?
pingTime atomic.Pointer[time.Duration]
// pingLastSend is the time when we sent our last ping message.
// To be used atomically.
pingLastSend *time.Time
// outstandingPongSize is the current size of the requested pong
// payload. This value can only validly range from [0,65531]. Any
// value < 0 is interpreted as if there is no outstanding ping message.
outstandingPongSize int32
// pingTicker is a pointer to a Ticker that fires on every ping
// interval.
pingTicker *time.Ticker
// pingTimeout is a Timer that will fire when we want to time out a
// ping
pingTimeout *time.Timer
// pongChan is the channel on which the pingManager will write Pong
// messages it is evaluating
pongChan chan *lnwire.Pong
started sync.Once
stopped sync.Once
quit chan struct{}
wg sync.WaitGroup
}
// NewPingManager constructs a pingManager in a valid state. It must be started
// before it does anything useful, though.
func NewPingManager(cfg *PingManagerConfig) *PingManager {
m := PingManager{
cfg: cfg,
outstandingPongSize: -1,
pongChan: make(chan *lnwire.Pong, 1),
quit: make(chan struct{}),
}
return &m
}
// Start launches the primary goroutine that is owned by the pingManager.
func (m *PingManager) Start() error {
var err error
m.started.Do(func() {
m.pingTicker = time.NewTicker(m.cfg.IntervalDuration)
m.pingTimeout = time.NewTimer(0)
m.wg.Add(1)
go m.pingHandler()
})
return err
}
// pingHandler is the main goroutine responsible for enforcing the ping/pong
// protocol.
func (m *PingManager) pingHandler() {
defer m.wg.Done()
defer m.pingTimeout.Stop()
// Ensure that the pingTimeout channel is empty.
if !m.pingTimeout.Stop() {
<-m.pingTimeout.C
}
for {
select {
case <-m.pingTicker.C:
// If this occurs it means that the new ping cycle has
// begun while there is still an outstanding ping
// awaiting a pong response. This should never occur,
// but if it does, it implies a timeout.
if m.outstandingPongSize >= 0 {
e := errors.New("impossible: new ping" +
"in unclean state",
)
m.cfg.OnPongFailure(e)
return
}
pongSize := m.cfg.NewPongSize()
ping := &lnwire.Ping{
NumPongBytes: pongSize,
PaddingBytes: m.cfg.NewPingPayload(),
}
// Set up our bookkeeping for the new Ping.
if err := m.setPingState(pongSize); err != nil {
m.cfg.OnPongFailure(err)
return
}
m.cfg.SendPing(ping)
case <-m.pingTimeout.C:
m.resetPingState()
e := errors.New("timeout while waiting for " +
"pong response",
)
m.cfg.OnPongFailure(e)
return
case pong := <-m.pongChan:
pongSize := int32(len(pong.PongBytes))
// Save off values we are about to override when we
// call resetPingState.
expected := m.outstandingPongSize
lastPing := m.pingLastSend
m.resetPingState()
// If the pong we receive doesn't match the ping we
// sent out, then we fail out.
if pongSize != expected {
e := errors.New("pong response does " +
"not match expected size",
)
m.cfg.OnPongFailure(e)
return
}
// Compute RTT of ping and save that for future
// querying.
if lastPing != nil {
rtt := time.Since(*lastPing)
m.pingTime.Store(&rtt)
}
case <-m.quit:
return
}
}
}
// Stop interrupts the goroutines that the PingManager owns.
func (m *PingManager) Stop() {
if m.pingTicker == nil {
return
}
m.stopped.Do(func() {
close(m.quit)
m.wg.Wait()
m.pingTicker.Stop()
m.pingTimeout.Stop()
})
}
// setPingState is a private method to keep track of all of the fields we need
// to set when we send out a Ping.
func (m *PingManager) setPingState(pongSize uint16) error {
t := time.Now()
m.pingLastSend = &t
m.outstandingPongSize = int32(pongSize)
if m.pingTimeout.Reset(m.cfg.TimeoutDuration) {
return fmt.Errorf(
"impossible: ping timeout reset when already active",
)
}
return nil
}
// resetPingState is a private method that resets all of the bookkeeping that
// is tracking a currently outstanding Ping.
func (m *PingManager) resetPingState() {
m.pingLastSend = nil
m.outstandingPongSize = -1
if !m.pingTimeout.Stop() {
select {
case <-m.pingTimeout.C:
default:
}
}
}
// GetPingTimeMicroSeconds reports back the RTT calculated by the pingManager.
func (m *PingManager) GetPingTimeMicroSeconds() int64 {
rtt := m.pingTime.Load()
if rtt == nil {
return -1
}
return rtt.Microseconds()
}
// ReceivedPong is called to evaluate a Pong message against the expectations
// we have for it. It will cause the PingManager to invoke the supplied
// OnPongFailure function if the Pong argument supplied violates expectations.
func (m *PingManager) ReceivedPong(msg *lnwire.Pong) {
select {
case m.pongChan <- msg:
case <-m.quit:
}
}