mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-02-22 22:25:24 +01:00
protofsm: use prefixed logger for StateMachine
So that we dont have to remember to add the `FSM(%v)` prefix each time we write a log line.
This commit is contained in:
parent
b887c1cc5d
commit
575ea7af83
2 changed files with 34 additions and 43 deletions
|
@ -1,7 +1,7 @@
|
||||||
package protofsm
|
package protofsm
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/btcsuite/btclog"
|
"github.com/btcsuite/btclog/v2"
|
||||||
"github.com/lightningnetwork/lnd/build"
|
"github.com/lightningnetwork/lnd/build"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"github.com/btcsuite/btcd/btcec/v2"
|
"github.com/btcsuite/btcd/btcec/v2"
|
||||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||||
"github.com/btcsuite/btcd/wire"
|
"github.com/btcsuite/btcd/wire"
|
||||||
|
"github.com/btcsuite/btclog/v2"
|
||||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||||
"github.com/lightningnetwork/lnd/fn/v2"
|
"github.com/lightningnetwork/lnd/fn/v2"
|
||||||
"github.com/lightningnetwork/lnd/lnutils"
|
"github.com/lightningnetwork/lnd/lnutils"
|
||||||
|
@ -130,6 +131,8 @@ type stateQuery[Event any, Env Environment] struct {
|
||||||
type StateMachine[Event any, Env Environment] struct {
|
type StateMachine[Event any, Env Environment] struct {
|
||||||
cfg StateMachineCfg[Event, Env]
|
cfg StateMachineCfg[Event, Env]
|
||||||
|
|
||||||
|
log btclog.Logger
|
||||||
|
|
||||||
// events is the channel that will be used to send new events to the
|
// events is the channel that will be used to send new events to the
|
||||||
// FSM.
|
// FSM.
|
||||||
events chan Event
|
events chan Event
|
||||||
|
@ -197,7 +200,10 @@ func NewStateMachine[Event any, Env Environment](
|
||||||
cfg StateMachineCfg[Event, Env]) StateMachine[Event, Env] {
|
cfg StateMachineCfg[Event, Env]) StateMachine[Event, Env] {
|
||||||
|
|
||||||
return StateMachine[Event, Env]{
|
return StateMachine[Event, Env]{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
|
log: log.WithPrefix(
|
||||||
|
fmt.Sprintf("FSM(%v):", cfg.Env.Name()),
|
||||||
|
),
|
||||||
events: make(chan Event, 1),
|
events: make(chan Event, 1),
|
||||||
stateQuery: make(chan stateQuery[Event, Env]),
|
stateQuery: make(chan stateQuery[Event, Env]),
|
||||||
wg: *fn.NewGoroutineManager(),
|
wg: *fn.NewGoroutineManager(),
|
||||||
|
@ -229,9 +235,7 @@ func (s *StateMachine[Event, Env]) Stop() {
|
||||||
//
|
//
|
||||||
// TODO(roasbeef): bool if processed?
|
// TODO(roasbeef): bool if processed?
|
||||||
func (s *StateMachine[Event, Env]) SendEvent(ctx context.Context, event Event) {
|
func (s *StateMachine[Event, Env]) SendEvent(ctx context.Context, event Event) {
|
||||||
log.Debugf("FSM(%v): sending event: %v", s.cfg.Env.Name(),
|
s.log.Debugf("Sending event: %v", lnutils.SpewLogClosure(event))
|
||||||
lnutils.SpewLogClosure(event),
|
|
||||||
)
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case s.events <- event:
|
case s.events <- event:
|
||||||
|
@ -269,9 +273,7 @@ func (s *StateMachine[Event, Env]) SendMessage(ctx context.Context,
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("FSM(%v): sending msg: %v", s.cfg.Env.Name(),
|
s.log.Debugf("Sending msg: %v", lnutils.SpewLogClosure(msg))
|
||||||
lnutils.SpewLogClosure(msg),
|
|
||||||
)
|
|
||||||
|
|
||||||
// Otherwise, try to map the message using the default message mapper.
|
// Otherwise, try to map the message using the default message mapper.
|
||||||
// If we can't extract an event, then we'll return false to indicate
|
// If we can't extract an event, then we'll return false to indicate
|
||||||
|
@ -342,11 +344,10 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
|
||||||
// any preconditions as well as post-send events.
|
// any preconditions as well as post-send events.
|
||||||
case *SendMsgEvent[Event]:
|
case *SendMsgEvent[Event]:
|
||||||
sendAndCleanUp := func() error {
|
sendAndCleanUp := func() error {
|
||||||
log.Debugf("FSM(%v): sending message to target(%x): "+
|
s.log.Debugf("Sending message to target(%x): "+
|
||||||
"%v", s.cfg.Env.Name(),
|
"%v",
|
||||||
daemonEvent.TargetPeer.SerializeCompressed(),
|
daemonEvent.TargetPeer.SerializeCompressed(),
|
||||||
lnutils.SpewLogClosure(daemonEvent.Msgs),
|
lnutils.SpewLogClosure(daemonEvent.Msgs))
|
||||||
)
|
|
||||||
|
|
||||||
err := s.cfg.Daemon.SendMessages(
|
err := s.cfg.Daemon.SendMessages(
|
||||||
daemonEvent.TargetPeer, daemonEvent.Msgs,
|
daemonEvent.TargetPeer, daemonEvent.Msgs,
|
||||||
|
@ -361,9 +362,8 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
|
||||||
return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:ll
|
return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:ll
|
||||||
launched := s.wg.Go(
|
launched := s.wg.Go(
|
||||||
ctx, func(ctx context.Context) {
|
ctx, func(ctx context.Context) {
|
||||||
log.Debugf("FSM(%v): sending "+
|
s.log.Debugf("Sending "+
|
||||||
"post-send event: %v",
|
"post-send event: %v",
|
||||||
s.cfg.Env.Name(),
|
|
||||||
lnutils.SpewLogClosure(event))
|
lnutils.SpewLogClosure(event))
|
||||||
|
|
||||||
s.SendEvent(ctx, event)
|
s.SendEvent(ctx, event)
|
||||||
|
@ -393,8 +393,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
|
||||||
)
|
)
|
||||||
defer predicateTicker.Stop()
|
defer predicateTicker.Stop()
|
||||||
|
|
||||||
log.Infof("FSM(%v): waiting for send predicate to "+
|
s.log.Infof("Waiting for send predicate to be true")
|
||||||
"be true", s.cfg.Env.Name())
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -407,14 +406,13 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
|
||||||
)
|
)
|
||||||
|
|
||||||
if canSend {
|
if canSend {
|
||||||
log.Infof("FSM(%v): send "+
|
s.log.Infof("Send active " +
|
||||||
"active predicate",
|
"predicate")
|
||||||
s.cfg.Env.Name())
|
|
||||||
|
|
||||||
err := sendAndCleanUp()
|
err := sendAndCleanUp()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
//nolint:ll
|
//nolint:ll
|
||||||
log.Errorf("FSM(%v): unable to send message: %v", err)
|
s.log.Errorf("Unable to send message: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -435,8 +433,8 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
|
||||||
// If this is a broadcast transaction event, then we'll broadcast with
|
// If this is a broadcast transaction event, then we'll broadcast with
|
||||||
// the label attached.
|
// the label attached.
|
||||||
case *BroadcastTxn:
|
case *BroadcastTxn:
|
||||||
log.Debugf("FSM(%v): broadcasting txn, txid=%v",
|
s.log.Debugf("Broadcasting txn, txid=%v",
|
||||||
s.cfg.Env.Name(), daemonEvent.Tx.TxHash())
|
daemonEvent.Tx.TxHash())
|
||||||
|
|
||||||
err := s.cfg.Daemon.BroadcastTransaction(
|
err := s.cfg.Daemon.BroadcastTransaction(
|
||||||
daemonEvent.Tx, daemonEvent.Label,
|
daemonEvent.Tx, daemonEvent.Label,
|
||||||
|
@ -450,8 +448,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
|
||||||
// The state machine has requested a new event to be sent once a
|
// The state machine has requested a new event to be sent once a
|
||||||
// transaction spending a specified outpoint has confirmed.
|
// transaction spending a specified outpoint has confirmed.
|
||||||
case *RegisterSpend[Event]:
|
case *RegisterSpend[Event]:
|
||||||
log.Debugf("FSM(%v): registering spend: %v", s.cfg.Env.Name(),
|
s.log.Debugf("Registering spend: %v", daemonEvent.OutPoint)
|
||||||
daemonEvent.OutPoint)
|
|
||||||
|
|
||||||
spendEvent, err := s.cfg.Daemon.RegisterSpendNtfn(
|
spendEvent, err := s.cfg.Daemon.RegisterSpendNtfn(
|
||||||
&daemonEvent.OutPoint, daemonEvent.PkScript,
|
&daemonEvent.OutPoint, daemonEvent.PkScript,
|
||||||
|
@ -495,8 +492,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context,
|
||||||
// The state machine has requested a new event to be sent once a
|
// The state machine has requested a new event to be sent once a
|
||||||
// specified txid+pkScript pair has confirmed.
|
// specified txid+pkScript pair has confirmed.
|
||||||
case *RegisterConf[Event]:
|
case *RegisterConf[Event]:
|
||||||
log.Debugf("FSM(%v): registering conf: %v", s.cfg.Env.Name(),
|
s.log.Debugf("Registering conf: %v", daemonEvent.Txid)
|
||||||
daemonEvent.Txid)
|
|
||||||
|
|
||||||
numConfs := daemonEvent.NumConfs.UnwrapOr(1)
|
numConfs := daemonEvent.NumConfs.UnwrapOr(1)
|
||||||
confEvent, err := s.cfg.Daemon.RegisterConfirmationsNtfn(
|
confEvent, err := s.cfg.Daemon.RegisterConfirmationsNtfn(
|
||||||
|
@ -547,9 +543,9 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
|
||||||
currentState State[Event, Env], newEvent Event) (State[Event, Env],
|
currentState State[Event, Env], newEvent Event) (State[Event, Env],
|
||||||
error) {
|
error) {
|
||||||
|
|
||||||
log.Debugf("FSM(%v): applying new event", s.cfg.Env.Name(),
|
s.log.Debugf("Applying new event: %v",
|
||||||
lnutils.SpewLogClosure(newEvent),
|
lnutils.SpewLogClosure(newEvent))
|
||||||
)
|
|
||||||
eventQueue := fn.NewQueue(newEvent)
|
eventQueue := fn.NewQueue(newEvent)
|
||||||
|
|
||||||
// Given the next event to handle, we'll process the event, then add
|
// Given the next event to handle, we'll process the event, then add
|
||||||
|
@ -560,10 +556,8 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
|
||||||
//nolint:ll
|
//nolint:ll
|
||||||
for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() {
|
for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() {
|
||||||
err := fn.MapOptionZ(nextEvent, func(event Event) error {
|
err := fn.MapOptionZ(nextEvent, func(event Event) error {
|
||||||
log.Debugf("FSM(%v): processing event: %v",
|
s.log.Debugf("Processing event: %v",
|
||||||
s.cfg.Env.Name(),
|
lnutils.SpewLogClosure(event))
|
||||||
lnutils.SpewLogClosure(event),
|
|
||||||
)
|
|
||||||
|
|
||||||
// Apply the state transition function of the current
|
// Apply the state transition function of the current
|
||||||
// state given this new event and our existing env.
|
// state given this new event and our existing env.
|
||||||
|
@ -593,14 +587,12 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
|
||||||
//
|
//
|
||||||
//nolint:ll
|
//nolint:ll
|
||||||
for _, inEvent := range events.InternalEvent {
|
for _, inEvent := range events.InternalEvent {
|
||||||
log.Debugf("FSM(%v): adding "+
|
s.log.Debugf("Adding "+
|
||||||
"new internal event "+
|
"new internal event "+
|
||||||
"to queue: %v",
|
"to queue: %v",
|
||||||
s.cfg.Env.Name(),
|
|
||||||
lnutils.SpewLogClosure(
|
lnutils.SpewLogClosure(
|
||||||
inEvent,
|
inEvent,
|
||||||
),
|
))
|
||||||
)
|
|
||||||
|
|
||||||
eventQueue.Enqueue(inEvent)
|
eventQueue.Enqueue(inEvent)
|
||||||
}
|
}
|
||||||
|
@ -611,9 +603,8 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("FSM(%v): state transition: from_state=%T, "+
|
s.log.Infof("State transition: from_state=%T, "+
|
||||||
"to_state=%T",
|
"to_state=%T", currentState,
|
||||||
s.cfg.Env.Name(), currentState,
|
|
||||||
transition.NextState)
|
transition.NextState)
|
||||||
|
|
||||||
// With our events processed, we'll now update our
|
// With our events processed, we'll now update our
|
||||||
|
@ -640,7 +631,7 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context,
|
||||||
// incoming events, and then drives the state machine forward until it reaches
|
// incoming events, and then drives the state machine forward until it reaches
|
||||||
// a terminal state.
|
// a terminal state.
|
||||||
func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
|
func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
|
||||||
log.Debugf("FSM(%v): starting state machine", s.cfg.Env.Name())
|
s.log.Debugf("Starting state machine")
|
||||||
|
|
||||||
currentState := s.cfg.InitialState
|
currentState := s.cfg.InitialState
|
||||||
|
|
||||||
|
@ -650,7 +641,7 @@ func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
|
||||||
return s.executeDaemonEvent(ctx, event)
|
return s.executeDaemonEvent(ctx, event)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("unable to execute init event: %w", err)
|
s.log.Errorf("Unable to execute init event: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -670,7 +661,7 @@ func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.cfg.ErrorReporter.ReportError(err)
|
s.cfg.ErrorReporter.ReportError(err)
|
||||||
|
|
||||||
log.Errorf("unable to apply event: %v", err)
|
s.log.Errorf("Unable to apply event: %v", err)
|
||||||
|
|
||||||
// An error occurred, so we'll tear down the
|
// An error occurred, so we'll tear down the
|
||||||
// entire state machine as we can't proceed.
|
// entire state machine as we can't proceed.
|
||||||
|
|
Loading…
Add table
Reference in a new issue