mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-03-04 09:48:19 +01:00
chainio: add partial implementation of Consumer
interface
This commit is contained in:
parent
4b83d87baa
commit
b5a3a27c77
2 changed files with 315 additions and 0 deletions
113
chainio/consumer.go
Normal file
113
chainio/consumer.go
Normal file
|
@ -0,0 +1,113 @@
|
||||||
|
package chainio
|
||||||
|
|
||||||
|
// BeatConsumer defines a supplementary component that should be used by
|
||||||
|
// subsystems which implement the `Consumer` interface. It partially implements
|
||||||
|
// the `Consumer` interface by providing the method `ProcessBlock` such that
|
||||||
|
// subsystems don't need to re-implement it.
|
||||||
|
//
|
||||||
|
// While inheritance is not commonly used in Go, subsystems embedding this
|
||||||
|
// struct cannot pass the interface check for `Consumer` because the `Name`
|
||||||
|
// method is not implemented, which gives us a "mortise and tenon" structure.
|
||||||
|
// In addition to reducing code duplication, this design allows `ProcessBlock`
|
||||||
|
// to work on the concrete type `Beat` to access its internal states.
|
||||||
|
type BeatConsumer struct {
|
||||||
|
// BlockbeatChan is a channel to receive blocks from Blockbeat. The
|
||||||
|
// received block contains the best known height and the txns confirmed
|
||||||
|
// in this block.
|
||||||
|
BlockbeatChan chan Blockbeat
|
||||||
|
|
||||||
|
// name is the name of the consumer which embeds the BlockConsumer.
|
||||||
|
name string
|
||||||
|
|
||||||
|
// quit is a channel that closes when the BlockConsumer is shutting
|
||||||
|
// down.
|
||||||
|
//
|
||||||
|
// NOTE: this quit channel should be mounted to the same quit channel
|
||||||
|
// used by the subsystem.
|
||||||
|
quit chan struct{}
|
||||||
|
|
||||||
|
// errChan is a buffered chan that receives an error returned from
|
||||||
|
// processing this block.
|
||||||
|
errChan chan error
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBeatConsumer creates a new BlockConsumer.
|
||||||
|
func NewBeatConsumer(quit chan struct{}, name string) BeatConsumer {
|
||||||
|
// Refuse to start `lnd` if the quit channel is not initialized. We
|
||||||
|
// treat this case as if we are facing a nil pointer dereference, as
|
||||||
|
// there's no point to return an error here, which will cause the node
|
||||||
|
// to fail to be started anyway.
|
||||||
|
if quit == nil {
|
||||||
|
panic("quit channel is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
b := BeatConsumer{
|
||||||
|
BlockbeatChan: make(chan Blockbeat),
|
||||||
|
name: name,
|
||||||
|
errChan: make(chan error, 1),
|
||||||
|
quit: quit,
|
||||||
|
}
|
||||||
|
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProcessBlock takes a blockbeat and sends it to the consumer's blockbeat
|
||||||
|
// channel. It will send it to the subsystem's BlockbeatChan, and block until
|
||||||
|
// the processed result is received from the subsystem. The subsystem must call
|
||||||
|
// `NotifyBlockProcessed` after it has finished processing the block.
|
||||||
|
//
|
||||||
|
// NOTE: part of the `chainio.Consumer` interface.
|
||||||
|
func (b *BeatConsumer) ProcessBlock(beat Blockbeat) error {
|
||||||
|
// Update the current height.
|
||||||
|
beat.logger().Tracef("set current height for [%s]", b.name)
|
||||||
|
|
||||||
|
select {
|
||||||
|
// Send the beat to the blockbeat channel. It's expected that the
|
||||||
|
// consumer will read from this channel and process the block. Once
|
||||||
|
// processed, it should return the error or nil to the beat.Err chan.
|
||||||
|
case b.BlockbeatChan <- beat:
|
||||||
|
beat.logger().Tracef("Sent blockbeat to [%s]", b.name)
|
||||||
|
|
||||||
|
case <-b.quit:
|
||||||
|
beat.logger().Debugf("[%s] received shutdown before sending "+
|
||||||
|
"beat", b.name)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the consumer's err chan. We expect the consumer to call
|
||||||
|
// `beat.NotifyBlockProcessed` to send the error back here.
|
||||||
|
select {
|
||||||
|
case err := <-b.errChan:
|
||||||
|
beat.logger().Debugf("[%s] processed beat: err=%v", b.name, err)
|
||||||
|
|
||||||
|
return err
|
||||||
|
|
||||||
|
case <-b.quit:
|
||||||
|
beat.logger().Debugf("[%s] received shutdown", b.name)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotifyBlockProcessed signals that the block has been processed. It takes the
|
||||||
|
// blockbeat being processed and an error resulted from processing it. This
|
||||||
|
// error is then sent back to the consumer's err chan to unblock
|
||||||
|
// `ProcessBlock`.
|
||||||
|
//
|
||||||
|
// NOTE: This method must be called by the subsystem after it has finished
|
||||||
|
// processing the block.
|
||||||
|
func (b *BeatConsumer) NotifyBlockProcessed(beat Blockbeat, err error) {
|
||||||
|
// Update the current height.
|
||||||
|
beat.logger().Debugf("[%s]: notifying beat processed", b.name)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case b.errChan <- err:
|
||||||
|
beat.logger().Debugf("[%s]: notified beat processed, err=%v",
|
||||||
|
b.name, err)
|
||||||
|
|
||||||
|
case <-b.quit:
|
||||||
|
beat.logger().Debugf("[%s] received shutdown before notifying "+
|
||||||
|
"beat processed", b.name)
|
||||||
|
}
|
||||||
|
}
|
202
chainio/consumer_test.go
Normal file
202
chainio/consumer_test.go
Normal file
|
@ -0,0 +1,202 @@
|
||||||
|
package chainio
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/lightningnetwork/lnd/fn/v2"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestNewBeatConsumer tests the NewBeatConsumer function.
|
||||||
|
func TestNewBeatConsumer(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
quitChan := make(chan struct{})
|
||||||
|
name := "test"
|
||||||
|
|
||||||
|
// Test the NewBeatConsumer function.
|
||||||
|
b := NewBeatConsumer(quitChan, name)
|
||||||
|
|
||||||
|
// Assert the state.
|
||||||
|
require.Equal(t, quitChan, b.quit)
|
||||||
|
require.Equal(t, name, b.name)
|
||||||
|
require.NotNil(t, b.BlockbeatChan)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestProcessBlockSuccess tests when the block is processed successfully, no
|
||||||
|
// error is returned.
|
||||||
|
func TestProcessBlockSuccess(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
// Create a test consumer.
|
||||||
|
quitChan := make(chan struct{})
|
||||||
|
b := NewBeatConsumer(quitChan, "test")
|
||||||
|
|
||||||
|
// Create a mock beat.
|
||||||
|
mockBeat := &MockBlockbeat{}
|
||||||
|
defer mockBeat.AssertExpectations(t)
|
||||||
|
mockBeat.On("logger").Return(clog)
|
||||||
|
|
||||||
|
// Mock the consumer's err chan.
|
||||||
|
consumerErrChan := make(chan error, 1)
|
||||||
|
b.errChan = consumerErrChan
|
||||||
|
|
||||||
|
// Call the method under test.
|
||||||
|
resultChan := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
resultChan <- b.ProcessBlock(mockBeat)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Assert the beat is sent to the blockbeat channel.
|
||||||
|
beat, err := fn.RecvOrTimeout(b.BlockbeatChan, time.Second)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, mockBeat, beat)
|
||||||
|
|
||||||
|
// Send nil to the consumer's error channel.
|
||||||
|
consumerErrChan <- nil
|
||||||
|
|
||||||
|
// Assert the result of ProcessBlock is nil.
|
||||||
|
result, err := fn.RecvOrTimeout(resultChan, time.Second)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Nil(t, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestProcessBlockConsumerQuitBeforeSend tests when the consumer is quit
|
||||||
|
// before sending the beat, the method returns immediately.
|
||||||
|
func TestProcessBlockConsumerQuitBeforeSend(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
// Create a test consumer.
|
||||||
|
quitChan := make(chan struct{})
|
||||||
|
b := NewBeatConsumer(quitChan, "test")
|
||||||
|
|
||||||
|
// Create a mock beat.
|
||||||
|
mockBeat := &MockBlockbeat{}
|
||||||
|
defer mockBeat.AssertExpectations(t)
|
||||||
|
mockBeat.On("logger").Return(clog)
|
||||||
|
|
||||||
|
// Call the method under test.
|
||||||
|
resultChan := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
resultChan <- b.ProcessBlock(mockBeat)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Instead of reading the BlockbeatChan, close the quit channel.
|
||||||
|
close(quitChan)
|
||||||
|
|
||||||
|
// Assert ProcessBlock returned nil.
|
||||||
|
result, err := fn.RecvOrTimeout(resultChan, time.Second)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Nil(t, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestProcessBlockConsumerQuitAfterSend tests when the consumer is quit after
|
||||||
|
// sending the beat, the method returns immediately.
|
||||||
|
func TestProcessBlockConsumerQuitAfterSend(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
// Create a test consumer.
|
||||||
|
quitChan := make(chan struct{})
|
||||||
|
b := NewBeatConsumer(quitChan, "test")
|
||||||
|
|
||||||
|
// Create a mock beat.
|
||||||
|
mockBeat := &MockBlockbeat{}
|
||||||
|
defer mockBeat.AssertExpectations(t)
|
||||||
|
mockBeat.On("logger").Return(clog)
|
||||||
|
|
||||||
|
// Mock the consumer's err chan.
|
||||||
|
consumerErrChan := make(chan error, 1)
|
||||||
|
b.errChan = consumerErrChan
|
||||||
|
|
||||||
|
// Call the method under test.
|
||||||
|
resultChan := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
resultChan <- b.ProcessBlock(mockBeat)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Assert the beat is sent to the blockbeat channel.
|
||||||
|
beat, err := fn.RecvOrTimeout(b.BlockbeatChan, time.Second)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, mockBeat, beat)
|
||||||
|
|
||||||
|
// Instead of sending nil to the consumer's error channel, close the
|
||||||
|
// quit chanel.
|
||||||
|
close(quitChan)
|
||||||
|
|
||||||
|
// Assert ProcessBlock returned nil.
|
||||||
|
result, err := fn.RecvOrTimeout(resultChan, time.Second)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Nil(t, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNotifyBlockProcessedSendErr asserts the error can be sent and read by
|
||||||
|
// the beat via NotifyBlockProcessed.
|
||||||
|
func TestNotifyBlockProcessedSendErr(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
// Create a test consumer.
|
||||||
|
quitChan := make(chan struct{})
|
||||||
|
b := NewBeatConsumer(quitChan, "test")
|
||||||
|
|
||||||
|
// Create a mock beat.
|
||||||
|
mockBeat := &MockBlockbeat{}
|
||||||
|
defer mockBeat.AssertExpectations(t)
|
||||||
|
mockBeat.On("logger").Return(clog)
|
||||||
|
|
||||||
|
// Mock the consumer's err chan.
|
||||||
|
consumerErrChan := make(chan error, 1)
|
||||||
|
b.errChan = consumerErrChan
|
||||||
|
|
||||||
|
// Call the method under test.
|
||||||
|
done := make(chan error)
|
||||||
|
go func() {
|
||||||
|
defer close(done)
|
||||||
|
b.NotifyBlockProcessed(mockBeat, errDummy)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Assert the error is sent to the beat's err chan.
|
||||||
|
result, err := fn.RecvOrTimeout(consumerErrChan, time.Second)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.ErrorIs(t, result, errDummy)
|
||||||
|
|
||||||
|
// Assert the done channel is closed.
|
||||||
|
result, err = fn.RecvOrTimeout(done, time.Second)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Nil(t, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNotifyBlockProcessedOnQuit asserts NotifyBlockProcessed exits
|
||||||
|
// immediately when the quit channel is closed.
|
||||||
|
func TestNotifyBlockProcessedOnQuit(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
// Create a test consumer.
|
||||||
|
quitChan := make(chan struct{})
|
||||||
|
b := NewBeatConsumer(quitChan, "test")
|
||||||
|
|
||||||
|
// Create a mock beat.
|
||||||
|
mockBeat := &MockBlockbeat{}
|
||||||
|
defer mockBeat.AssertExpectations(t)
|
||||||
|
mockBeat.On("logger").Return(clog)
|
||||||
|
|
||||||
|
// Mock the consumer's err chan - we don't buffer it so it will block
|
||||||
|
// on sending the error.
|
||||||
|
consumerErrChan := make(chan error)
|
||||||
|
b.errChan = consumerErrChan
|
||||||
|
|
||||||
|
// Call the method under test.
|
||||||
|
done := make(chan error)
|
||||||
|
go func() {
|
||||||
|
defer close(done)
|
||||||
|
b.NotifyBlockProcessed(mockBeat, errDummy)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Close the quit channel so the method will return.
|
||||||
|
close(b.quit)
|
||||||
|
|
||||||
|
// Assert the done channel is closed.
|
||||||
|
result, err := fn.RecvOrTimeout(done, time.Second)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Nil(t, result)
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue