From bf10e31167970c490a516d770f32c62790a01a8c Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 25 Jan 2024 16:56:21 -0800 Subject: [PATCH] protofsm: convert state machine args into config --- protofsm/state_machine.go | 68 ++++++++++++++++++++-------------- protofsm/state_machine_test.go | 30 ++++++++++----- 2 files changed, 61 insertions(+), 37 deletions(-) diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index 9c4db8dc1..6968e36e9 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -119,46 +119,60 @@ type stateQuery[Event any, Env Environment] struct { // // TODO(roasbeef): terminal check, daemon event execution, init? type StateMachine[Event any, Env Environment] struct { - currentState State[Event, Env] - env Env - - daemon DaemonAdapters + cfg StateMachineCfg[Event, Env] + // events is the channel that will be used to send new events to the + // FSM. events chan Event - quit chan struct{} - wg sync.WaitGroup - // newStateEvents is an EventDistributor that will be used to notify // any relevant callers of new state transitions that occur. newStateEvents *fn.EventDistributor[State[Event, Env]] + // stateQuery is a channel that will be used by outside callers to + // query the internal state machine state. stateQuery chan stateQuery[Event, Env] - initEvent fn.Option[DaemonEvent] - startOnce sync.Once stopOnce sync.Once // TODO(roasbeef): also use that context guard here? + quit chan struct{} + wg sync.WaitGroup +} + +// StateMachineCfg is a configuration struct that's used to create a new state +// machine. +type StateMachineCfg[Event any, Env Environment] struct { + // Daemon is a set of adapters that will be used to bridge the FSM to + // the daemon. + Daemon DaemonAdapters + + // InitialState is the initial state of the state machine. + InitialState State[Event, Env] + + // Env is the environment that the state machine will use to execute. + Env Env + + // InitEvent is an optional event that will be sent to the state + // machine as if it was emitted at the onset of the state machine. This + // can be used to set up tracking state such as a txid confirmation + // event. + InitEvent fn.Option[DaemonEvent] } // NewStateMachine creates a new state machine given a set of daemon adapters, // an initial state, an environment, and an event to process as if emitted at // the onset of the state machine. Such an event can be used to set up tracking // state such as a txid confirmation event. -func NewStateMachine[Event any, Env Environment](adapters DaemonAdapters, - initialState State[Event, Env], env Env, - initEvent fn.Option[DaemonEvent]) StateMachine[Event, Env] { +func NewStateMachine[Event any, Env Environment](cfg StateMachineCfg[Event, Env], +) StateMachine[Event, Env] { return StateMachine[Event, Env]{ - daemon: adapters, + cfg: cfg, events: make(chan Event, 1), - currentState: initialState, stateQuery: make(chan stateQuery[Event, Env]), quit: make(chan struct{}), - env: env, - initEvent: initEvent, newStateEvents: fn.NewEventDistributor[State[Event, Env]](), } } @@ -237,7 +251,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { // any preconditions as well as post-send events. case *SendMsgEvent[Event]: sendAndCleanUp := func() error { - err := s.daemon.SendMessages( + err := s.cfg.Daemon.SendMessages( daemonEvent.TargetPeer, daemonEvent.Msgs, ) if err != nil { @@ -301,7 +315,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { // If this is a broadcast transaction event, then we'll broadcast with // the label attached. case *BroadcastTxn: - err := s.daemon.BroadcastTransaction( + err := s.cfg.Daemon.BroadcastTransaction( daemonEvent.Tx, daemonEvent.Label, ) if err != nil { @@ -315,7 +329,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { // The state machine has requested a new event to be sent once a // transaction spending a specified outpoint has confirmed. case *RegisterSpend[Event]: - spendEvent, err := s.daemon.RegisterSpendNtfn( + spendEvent, err := s.cfg.Daemon.RegisterSpendNtfn( &daemonEvent.OutPoint, daemonEvent.PkScript, daemonEvent.HeightHint, ) @@ -351,7 +365,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { // specified txid+pkScript pair has confirmed. case *RegisterConf[Event]: numConfs := daemonEvent.NumConfs.UnwrapOr(1) - confEvent, err := s.daemon.RegisterConfirmationsNtfn( + confEvent, err := s.cfg.Daemon.RegisterConfirmationsNtfn( &daemonEvent.Txid, daemonEvent.PkScript, numConfs, daemonEvent.HeightHint, ) @@ -391,9 +405,8 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { // applyEvents applies a new event to the state machine. This will continue // until no further events are emitted by the state machine. Along the way, // we'll also ensure to execute any daemon events that are emitted. -func (s *StateMachine[Event, Env]) applyEvents(newEvent Event) (State[Event, Env], error) { - // TODO(roasbeef): make starting state as part of env? - currentState := s.currentState +func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], + newEvent Event) (State[Event, Env], error) { eventQueue := fn.NewQueue(newEvent) @@ -406,7 +419,7 @@ func (s *StateMachine[Event, Env]) applyEvents(newEvent Event) (State[Event, Env // Apply the state transition function of the current // state given this new event and our existing env. transition, err := currentState.ProcessEvent( - event, s.env, + event, s.cfg.Env, ) if err != nil { return err @@ -469,12 +482,11 @@ func (s *StateMachine[Event, Env]) applyEvents(newEvent Event) (State[Event, Env func (s *StateMachine[Event, Env]) driveMachine() { defer s.wg.Done() - // TODO(roasbeef): move into env? read only to start with - currentState := s.currentState + currentState := s.cfg.InitialState // Before we start, if we have an init daemon event specified, then // we'll handle that now. - err := fn.MapOptionZ(s.initEvent, func(event DaemonEvent) error { + err := fn.MapOptionZ(s.cfg.InitEvent, func(event DaemonEvent) error { return s.executeDaemonEvent(event) }) if err != nil { @@ -492,7 +504,7 @@ func (s *StateMachine[Event, Env]) driveMachine() { // machine forward until we either run out of internal events, // or we reach a terminal state. case newEvent := <-s.events: - newState, err := s.applyEvents(newEvent) + newState, err := s.applyEvents(currentState, newEvent) if err != nil { // TODO(roasbeef): hard error? log.Errorf("unable to apply event: %v", err) diff --git a/protofsm/state_machine_test.go b/protofsm/state_machine_test.go index da765574e..82d4431f2 100644 --- a/protofsm/state_machine_test.go +++ b/protofsm/state_machine_test.go @@ -228,9 +228,13 @@ func TestStateMachineOnInitDaemonEvent(t *testing.T) { PostSendEvent: fn.Some(dummyEvents(&goToFin{})), } - stateMachine := NewStateMachine[dummyEvents, *dummyEnv]( - adapters, startingState, env, fn.Some[DaemonEvent](initEvent), - ) + cfg := StateMachineCfg[dummyEvents, *dummyEnv]{ + Daemon: adapters, + InitialState: startingState, + Env: env, + InitEvent: fn.Some[DaemonEvent](initEvent), + } + stateMachine := NewStateMachine(cfg) // Before we start up the state machine, we'll assert that the send // message adapter is called on start up. @@ -270,9 +274,13 @@ func TestStateMachineInternalEvents(t *testing.T) { adapters := newDaemonAdapters() - stateMachine := NewStateMachine[dummyEvents, *dummyEnv]( - adapters, startingState, env, fn.None[DaemonEvent](), - ) + cfg := StateMachineCfg[dummyEvents, *dummyEnv]{ + Daemon: adapters, + InitialState: startingState, + Env: env, + InitEvent: fn.None[DaemonEvent](), + } + stateMachine := NewStateMachine(cfg) stateMachine.Start() defer stateMachine.Stop() @@ -317,9 +325,13 @@ func TestStateMachineDaemonEvents(t *testing.T) { adapters := newDaemonAdapters() - stateMachine := NewStateMachine[dummyEvents, *dummyEnv]( - adapters, startingState, env, fn.None[DaemonEvent](), - ) + cfg := StateMachineCfg[dummyEvents, *dummyEnv]{ + Daemon: adapters, + InitialState: startingState, + Env: env, + InitEvent: fn.None[DaemonEvent](), + } + stateMachine := NewStateMachine(cfg) stateMachine.Start() defer stateMachine.Stop()