peer: refactor main event loop for ping handler

The error was never used as the init couldn't return an error, so we do
away with that. We also modify the main event loop dispatch to more
closely match other areas of the codebase.
This commit is contained in:
Olaoluwa Osuntokun 2024-01-15 13:33:43 -08:00
parent b5cbeb4ad7
commit 185119f5c3
No known key found for this signature in database
GPG key ID: 3BBD59E99B280306

View file

@ -98,16 +98,20 @@ func NewPingManager(cfg *PingManagerConfig) *PingManager {
func (m *PingManager) Start() error {
var err error
m.started.Do(func() {
err = m.start()
m.pingTicker = time.NewTicker(m.cfg.IntervalDuration)
m.pingTimeout = time.NewTimer(0)
m.wg.Add(1)
go m.pingHandler()
})
return err
}
func (m *PingManager) start() error {
m.pingTicker = time.NewTicker(m.cfg.IntervalDuration)
m.pingTimeout = time.NewTimer(0)
// pingHandler is the main goroutine responsible for enforcing the ping/pong
// protocol.
func (m *PingManager) pingHandler() {
defer m.wg.Done()
defer m.pingTimeout.Stop()
// Ensure that the pingTimeout channel is empty.
@ -115,89 +119,81 @@ func (m *PingManager) start() error {
<-m.pingTimeout.C
}
m.wg.Add(1)
go func() {
defer m.wg.Done()
for {
select {
case <-m.pingTicker.C:
// If this occurs it means that the new ping
// cycle has begun while there is still an
// outstanding ping awaiting a pong response.
// This should never occur, but if it does, it
// implies a timeout.
if m.outstandingPongSize >= 0 {
e := errors.New("impossible: new ping" +
"in unclean state",
)
m.cfg.OnPongFailure(e)
for {
select {
case <-m.pingTicker.C:
// If this occurs it means that the new ping cycle has
// begun while there is still an outstanding ping
// awaiting a pong response. This should never occur,
// but if it does, it implies a timeout.
if m.outstandingPongSize >= 0 {
e := errors.New("impossible: new ping" +
"in unclean state",
)
m.cfg.OnPongFailure(e)
return
}
return
}
pongSize := m.cfg.NewPongSize()
ping := &lnwire.Ping{
NumPongBytes: pongSize,
PaddingBytes: m.cfg.NewPingPayload(),
}
pongSize := m.cfg.NewPongSize()
ping := &lnwire.Ping{
NumPongBytes: pongSize,
PaddingBytes: m.cfg.NewPingPayload(),
}
// Set up our bookkeeping for the new Ping.
if err := m.setPingState(pongSize); err != nil {
// Set up our bookkeeping for the new Ping.
if err := m.setPingState(pongSize); err != nil {
m.cfg.OnPongFailure(err)
m.cfg.OnPongFailure(err)
return
}
return
}
m.cfg.SendPing(ping)
m.cfg.SendPing(ping)
case <-m.pingTimeout.C:
m.resetPingState()
case <-m.pingTimeout.C:
m.resetPingState()
e := errors.New("timeout while waiting for " +
"pong response",
)
e := errors.New("timeout while waiting for " +
"pong response",
m.cfg.OnPongFailure(e)
return
case pong := <-m.pongChan:
pongSize := int32(len(pong.PongBytes))
// Save off values we are about to override when we
// call resetPingState.
expected := m.outstandingPongSize
lastPing := m.pingLastSend
m.resetPingState()
// If the pong we receive doesn't match the ping we
// sent out, then we fail out.
if pongSize != expected {
e := errors.New("pong response does " +
"not match expected size",
)
m.cfg.OnPongFailure(e)
return
case pong := <-m.pongChan:
pongSize := int32(len(pong.PongBytes))
// Save off values we are about to override
// when we call resetPingState.
expected := m.outstandingPongSize
lastPing := m.pingLastSend
m.resetPingState()
// If the pong we receive doesn't match the
// ping we sent out, then we fail out.
if pongSize != expected {
e := errors.New("pong response does " +
"not match expected size",
)
m.cfg.OnPongFailure(e)
return
}
// Compute RTT of ping and save that for future
// querying.
if lastPing != nil {
rtt := time.Since(*lastPing)
m.pingTime.Store(&rtt)
}
case <-m.quit:
return
}
}
}()
return nil
// Compute RTT of ping and save that for future
// querying.
if lastPing != nil {
rtt := time.Since(*lastPing)
m.pingTime.Store(&rtt)
}
case <-m.quit:
return
}
}
}
// Stop interrupts the goroutines that the PingManager owns. Can only be called