diff --git a/chainio/consumer.go b/chainio/consumer.go new file mode 100644 index 000000000..e0fe2eefd --- /dev/null +++ b/chainio/consumer.go @@ -0,0 +1,116 @@ +package chainio + +// BeatConsumer defines a supplementary component that should be used by +// subsystems which implement the `Consumer` interface. It partially implements +// the `Consumer` interface by providing the method `ProcessBlock` such that +// subsystems don't need to re-implement it. +// +// While inheritance is not commonly used in Go, subsystems embedding this +// struct cannot pass the interface check for `Consumer` because the `Name` +// method is not implemented, which gives us a "mortise and tenon" structure. +// In addition to reducing code duplication, this design allows `ProcessBlock` +// to work on the concrete type `Beat` to access its internal states. +type BeatConsumer struct { + // BlockbeatChan is a channel to receive blocks from Blockbeat. The + // received block contains the best known height and the txns confirmed + // in this block. + BlockbeatChan chan Blockbeat + + // name is the name of the consumer which embeds the BlockConsumer. + name string + + // quit is a channel that closes when the BlockConsumer is shutting + // down. + // + // NOTE: this quit channel should be mounted to the same quit channel + // used by the subsystem. + quit chan struct{} + + // errChan is a buffered chan that receives an error returned from + // processing this block. + errChan chan error +} + +// NewBeatConsumer creates a new BlockConsumer. +func NewBeatConsumer(quit chan struct{}, name string) BeatConsumer { + // Refuse to start `lnd` if the quit channel is not initialized. We + // treat this case as if we are facing a nil pointer dereference, as + // there's no point to return an error here, which will cause the node + // to fail to be started anyway. + if quit == nil { + panic("quit channel is nil") + } + + b := BeatConsumer{ + BlockbeatChan: make(chan Blockbeat), + name: name, + errChan: make(chan error, 1), + quit: quit, + } + + return b +} + +// ProcessBlock takes a blockbeat and sends it to the consumer's blockbeat +// channel. It will send it to the subsystem's BlockbeatChan, and block until +// the processed result is received from the subsystem. The subsystem must call +// `NotifyBlockProcessed` after it has finished processing the block. +// +// NOTE: part of the `chainio.Consumer` interface. +func (b *BeatConsumer) ProcessBlock(beat Blockbeat) error { + // Update the current height. + beat.logger().Tracef("set current height for [%s]", b.name) + + select { + // Send the beat to the blockbeat channel. It's expected that the + // consumer will read from this channel and process the block. Once + // processed, it should return the error or nil to the beat.Err chan. + case b.BlockbeatChan <- beat: + beat.logger().Tracef("Sent blockbeat to [%s]", b.name) + + case <-b.quit: + beat.logger().Debugf("[%s] received shutdown before sending "+ + "beat", b.name) + + return nil + } + + // Check the consumer's err chan. We expect the consumer to call + // `beat.NotifyBlockProcessed` to send the error back here. + select { + case err := <-b.errChan: + beat.logger().Debugf("[%s] processed beat: err=%v", b.name, err) + + return err + + case <-b.quit: + beat.logger().Debugf("[%s] received shutdown", b.name) + } + + return nil +} + +// NotifyBlockProcessed signals that the block has been processed. It takes the +// blockbeat being processed and an error resulted from processing it. This +// error is then sent back to the consumer's err chan to unblock +// `ProcessBlock`. +// +// NOTE: This method must be called by the subsystem after it has finished +// processing the block. Extreme caution must be taken when returning an error +// as it will shutdown lnd. +// +// NOTE: part of the `chainio.Consumer` interface. +func (b *BeatConsumer) NotifyBlockProcessed(beat Blockbeat, err error) { + // Update the current height. + beat.logger().Debugf("[%s]: notifying beat processed", b.name) + + select { + case b.errChan <- err: + beat.logger().Debugf("[%s]: notified beat processed, err=%v", + b.name, err) + + case <-b.quit: + beat.logger().Debugf("[%s] received shutdown before notifying "+ + "beat processed", b.name) + } +} diff --git a/chainio/consumer_test.go b/chainio/consumer_test.go new file mode 100644 index 000000000..3ef79b61b --- /dev/null +++ b/chainio/consumer_test.go @@ -0,0 +1,202 @@ +package chainio + +import ( + "testing" + "time" + + "github.com/lightningnetwork/lnd/fn" + "github.com/stretchr/testify/require" +) + +// TestNewBeatConsumer tests the NewBeatConsumer function. +func TestNewBeatConsumer(t *testing.T) { + t.Parallel() + + quitChan := make(chan struct{}) + name := "test" + + // Test the NewBeatConsumer function. + b := NewBeatConsumer(quitChan, name) + + // Assert the state. + require.Equal(t, quitChan, b.quit) + require.Equal(t, name, b.name) + require.NotNil(t, b.BlockbeatChan) +} + +// TestProcessBlockSuccess tests when the block is processed successfully, no +// error is returned. +func TestProcessBlockSuccess(t *testing.T) { + t.Parallel() + + // Create a test consumer. + quitChan := make(chan struct{}) + b := NewBeatConsumer(quitChan, "test") + + // Create a mock beat. + mockBeat := &MockBlockbeat{} + defer mockBeat.AssertExpectations(t) + mockBeat.On("logger").Return(clog) + + // Mock the consumer's err chan. + consumerErrChan := make(chan error, 1) + b.errChan = consumerErrChan + + // Call the method under test. + resultChan := make(chan error, 1) + go func() { + resultChan <- b.ProcessBlock(mockBeat) + }() + + // Assert the beat is sent to the blockbeat channel. + beat, err := fn.RecvOrTimeout(b.BlockbeatChan, time.Second) + require.NoError(t, err) + require.Equal(t, mockBeat, beat) + + // Send nil to the consumer's error channel. + consumerErrChan <- nil + + // Assert the result of ProcessBlock is nil. + result, err := fn.RecvOrTimeout(resultChan, time.Second) + require.NoError(t, err) + require.Nil(t, result) +} + +// TestProcessBlockConsumerQuitBeforeSend tests when the consumer is quit +// before sending the beat, the method returns immediately. +func TestProcessBlockConsumerQuitBeforeSend(t *testing.T) { + t.Parallel() + + // Create a test consumer. + quitChan := make(chan struct{}) + b := NewBeatConsumer(quitChan, "test") + + // Create a mock beat. + mockBeat := &MockBlockbeat{} + defer mockBeat.AssertExpectations(t) + mockBeat.On("logger").Return(clog) + + // Call the method under test. + resultChan := make(chan error, 1) + go func() { + resultChan <- b.ProcessBlock(mockBeat) + }() + + // Instead of reading the BlockbeatChan, close the quit channel. + close(quitChan) + + // Assert ProcessBlock returned nil. + result, err := fn.RecvOrTimeout(resultChan, time.Second) + require.NoError(t, err) + require.Nil(t, result) +} + +// TestProcessBlockConsumerQuitAfterSend tests when the consumer is quit after +// sending the beat, the method returns immediately. +func TestProcessBlockConsumerQuitAfterSend(t *testing.T) { + t.Parallel() + + // Create a test consumer. + quitChan := make(chan struct{}) + b := NewBeatConsumer(quitChan, "test") + + // Create a mock beat. + mockBeat := &MockBlockbeat{} + defer mockBeat.AssertExpectations(t) + mockBeat.On("logger").Return(clog) + + // Mock the consumer's err chan. + consumerErrChan := make(chan error, 1) + b.errChan = consumerErrChan + + // Call the method under test. + resultChan := make(chan error, 1) + go func() { + resultChan <- b.ProcessBlock(mockBeat) + }() + + // Assert the beat is sent to the blockbeat channel. + beat, err := fn.RecvOrTimeout(b.BlockbeatChan, time.Second) + require.NoError(t, err) + require.Equal(t, mockBeat, beat) + + // Instead of sending nil to the consumer's error channel, close the + // quit chanel. + close(quitChan) + + // Assert ProcessBlock returned nil. + result, err := fn.RecvOrTimeout(resultChan, time.Second) + require.NoError(t, err) + require.Nil(t, result) +} + +// TestNotifyBlockProcessedSendErr asserts the error can be sent and read by +// the beat via NotifyBlockProcessed. +func TestNotifyBlockProcessedSendErr(t *testing.T) { + t.Parallel() + + // Create a test consumer. + quitChan := make(chan struct{}) + b := NewBeatConsumer(quitChan, "test") + + // Create a mock beat. + mockBeat := &MockBlockbeat{} + defer mockBeat.AssertExpectations(t) + mockBeat.On("logger").Return(clog) + + // Mock the consumer's err chan. + consumerErrChan := make(chan error, 1) + b.errChan = consumerErrChan + + // Call the method under test. + done := make(chan error) + go func() { + defer close(done) + b.NotifyBlockProcessed(mockBeat, errDummy) + }() + + // Assert the error is sent to the beat's err chan. + result, err := fn.RecvOrTimeout(consumerErrChan, time.Second) + require.NoError(t, err) + require.ErrorIs(t, result, errDummy) + + // Assert the done channel is closed. + result, err = fn.RecvOrTimeout(done, time.Second) + require.NoError(t, err) + require.Nil(t, result) +} + +// TestNotifyBlockProcessedOnQuit asserts NotifyBlockProcessed exits +// immediately when the quit channel is closed. +func TestNotifyBlockProcessedOnQuit(t *testing.T) { + t.Parallel() + + // Create a test consumer. + quitChan := make(chan struct{}) + b := NewBeatConsumer(quitChan, "test") + + // Create a mock beat. + mockBeat := &MockBlockbeat{} + defer mockBeat.AssertExpectations(t) + mockBeat.On("logger").Return(clog) + + // Mock the consumer's err chan - we don't buffer it so it will block + // on sending the error. + consumerErrChan := make(chan error) + b.errChan = consumerErrChan + + // Call the method under test. + done := make(chan error) + go func() { + defer close(done) + b.NotifyBlockProcessed(mockBeat, errDummy) + }() + + // Close the quit channel so the method will return. + close(b.quit) + + // Assert the done channel is closed. + result, err := fn.RecvOrTimeout(done, time.Second) + require.NoError(t, err) + require.Nil(t, result) +}