mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-02-22 22:25:24 +01:00
chainio: add helper methods to dispatch beats
This commit adds two methods to handle dispatching beats. These are exported methods so other systems can send beats to their managed subinstances.
This commit is contained in:
parent
01ac713aec
commit
a1eb87e280
3 changed files with 316 additions and 0 deletions
105
chainio/dispatcher.go
Normal file
105
chainio/dispatcher.go
Normal file
|
@ -0,0 +1,105 @@
|
|||
package chainio
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// DefaultProcessBlockTimeout is the timeout value used when waiting for one
|
||||
// consumer to finish processing the new block epoch.
|
||||
var DefaultProcessBlockTimeout = 60 * time.Second
|
||||
|
||||
// ErrProcessBlockTimeout is the error returned when a consumer takes too long
|
||||
// to process the block.
|
||||
var ErrProcessBlockTimeout = errors.New("process block timeout")
|
||||
|
||||
// DispatchSequential takes a list of consumers and notify them about the new
|
||||
// epoch sequentially. It requires the consumer to finish processing the block
|
||||
// within the specified time, otherwise a timeout error is returned.
|
||||
func DispatchSequential(b Blockbeat, consumers []Consumer) error {
|
||||
for _, c := range consumers {
|
||||
// Send the beat to the consumer.
|
||||
err := notifyAndWait(b, c, DefaultProcessBlockTimeout)
|
||||
if err != nil {
|
||||
b.logger().Errorf("Failed to process block: %v", err)
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DispatchConcurrent notifies each consumer concurrently about the blockbeat.
|
||||
// It requires the consumer to finish processing the block within the specified
|
||||
// time, otherwise a timeout error is returned.
|
||||
func DispatchConcurrent(b Blockbeat, consumers []Consumer) error {
|
||||
// errChans is a map of channels that will be used to receive errors
|
||||
// returned from notifying the consumers.
|
||||
errChans := make(map[string]chan error, len(consumers))
|
||||
|
||||
// Notify each queue in goroutines.
|
||||
for _, c := range consumers {
|
||||
// Create a signal chan.
|
||||
errChan := make(chan error, 1)
|
||||
errChans[c.Name()] = errChan
|
||||
|
||||
// Notify each consumer concurrently.
|
||||
go func(c Consumer, beat Blockbeat) {
|
||||
// Send the beat to the consumer.
|
||||
errChan <- notifyAndWait(
|
||||
b, c, DefaultProcessBlockTimeout,
|
||||
)
|
||||
}(c, b)
|
||||
}
|
||||
|
||||
// Wait for all consumers in each queue to finish.
|
||||
for name, errChan := range errChans {
|
||||
err := <-errChan
|
||||
if err != nil {
|
||||
b.logger().Errorf("Consumer=%v failed to process "+
|
||||
"block: %v", name, err)
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// notifyAndWait sends the blockbeat to the specified consumer. It requires the
|
||||
// consumer to finish processing the block within the specified time, otherwise
|
||||
// a timeout error is returned.
|
||||
func notifyAndWait(b Blockbeat, c Consumer, timeout time.Duration) error {
|
||||
b.logger().Debugf("Waiting for consumer[%s] to process it", c.Name())
|
||||
|
||||
// Record the time it takes the consumer to process this block.
|
||||
start := time.Now()
|
||||
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
errChan <- c.ProcessBlock(b)
|
||||
}()
|
||||
|
||||
// We expect the consumer to finish processing this block under 30s,
|
||||
// otherwise a timeout error is returned.
|
||||
select {
|
||||
case err := <-errChan:
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
return fmt.Errorf("%s got err in ProcessBlock: %w", c.Name(),
|
||||
err)
|
||||
|
||||
case <-time.After(timeout):
|
||||
return fmt.Errorf("consumer %s: %w", c.Name(),
|
||||
ErrProcessBlockTimeout)
|
||||
}
|
||||
|
||||
b.logger().Debugf("Consumer[%s] processed block in %v", c.Name(),
|
||||
time.Since(start))
|
||||
|
||||
return nil
|
||||
}
|
161
chainio/dispatcher_test.go
Normal file
161
chainio/dispatcher_test.go
Normal file
|
@ -0,0 +1,161 @@
|
|||
package chainio
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestNotifyAndWaitOnConsumerErr asserts when the consumer returns an error,
|
||||
// it's returned by notifyAndWait.
|
||||
func TestNotifyAndWaitOnConsumerErr(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Create a mock consumer.
|
||||
consumer := &MockConsumer{}
|
||||
defer consumer.AssertExpectations(t)
|
||||
consumer.On("Name").Return("mocker")
|
||||
|
||||
// Create a mock beat.
|
||||
mockBeat := &MockBlockbeat{}
|
||||
defer mockBeat.AssertExpectations(t)
|
||||
mockBeat.On("logger").Return(clog)
|
||||
|
||||
// Mock ProcessBlock to return an error.
|
||||
consumer.On("ProcessBlock", mockBeat).Return(errDummy).Once()
|
||||
|
||||
// Call the method under test.
|
||||
err := notifyAndWait(mockBeat, consumer, DefaultProcessBlockTimeout)
|
||||
|
||||
// We expect the error to be returned.
|
||||
require.ErrorIs(t, err, errDummy)
|
||||
}
|
||||
|
||||
// TestNotifyAndWaitOnConsumerErr asserts when the consumer successfully
|
||||
// processed the beat, no error is returned.
|
||||
func TestNotifyAndWaitOnConsumerSuccess(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Create a mock consumer.
|
||||
consumer := &MockConsumer{}
|
||||
defer consumer.AssertExpectations(t)
|
||||
consumer.On("Name").Return("mocker")
|
||||
|
||||
// Create a mock beat.
|
||||
mockBeat := &MockBlockbeat{}
|
||||
defer mockBeat.AssertExpectations(t)
|
||||
mockBeat.On("logger").Return(clog)
|
||||
|
||||
// Mock ProcessBlock to return nil.
|
||||
consumer.On("ProcessBlock", mockBeat).Return(nil).Once()
|
||||
|
||||
// Call the method under test.
|
||||
err := notifyAndWait(mockBeat, consumer, DefaultProcessBlockTimeout)
|
||||
|
||||
// We expect a nil error to be returned.
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// TestNotifyAndWaitOnConsumerTimeout asserts when the consumer times out
|
||||
// processing the block, the timeout error is returned.
|
||||
func TestNotifyAndWaitOnConsumerTimeout(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Set timeout to be 10ms.
|
||||
processBlockTimeout := 10 * time.Millisecond
|
||||
|
||||
// Create a mock consumer.
|
||||
consumer := &MockConsumer{}
|
||||
defer consumer.AssertExpectations(t)
|
||||
consumer.On("Name").Return("mocker")
|
||||
|
||||
// Create a mock beat.
|
||||
mockBeat := &MockBlockbeat{}
|
||||
defer mockBeat.AssertExpectations(t)
|
||||
mockBeat.On("logger").Return(clog)
|
||||
|
||||
// Mock ProcessBlock to return nil but blocks on returning.
|
||||
consumer.On("ProcessBlock", mockBeat).Return(nil).Run(
|
||||
func(args mock.Arguments) {
|
||||
// Sleep one second to block on the method.
|
||||
time.Sleep(processBlockTimeout * 100)
|
||||
}).Once()
|
||||
|
||||
// Call the method under test.
|
||||
err := notifyAndWait(mockBeat, consumer, processBlockTimeout)
|
||||
|
||||
// We expect a timeout error to be returned.
|
||||
require.ErrorIs(t, err, ErrProcessBlockTimeout)
|
||||
}
|
||||
|
||||
// TestDispatchSequential checks that the beat is sent to the consumers
|
||||
// sequentially.
|
||||
func TestDispatchSequential(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Create three mock consumers.
|
||||
consumer1 := &MockConsumer{}
|
||||
defer consumer1.AssertExpectations(t)
|
||||
consumer1.On("Name").Return("mocker1")
|
||||
|
||||
consumer2 := &MockConsumer{}
|
||||
defer consumer2.AssertExpectations(t)
|
||||
consumer2.On("Name").Return("mocker2")
|
||||
|
||||
consumer3 := &MockConsumer{}
|
||||
defer consumer3.AssertExpectations(t)
|
||||
consumer3.On("Name").Return("mocker3")
|
||||
|
||||
consumers := []Consumer{consumer1, consumer2, consumer3}
|
||||
|
||||
// Create a mock beat.
|
||||
mockBeat := &MockBlockbeat{}
|
||||
defer mockBeat.AssertExpectations(t)
|
||||
mockBeat.On("logger").Return(clog)
|
||||
|
||||
// prevConsumer specifies the previous consumer that was called.
|
||||
var prevConsumer string
|
||||
|
||||
// Mock the ProcessBlock on consumers to reutrn immediately.
|
||||
consumer1.On("ProcessBlock", mockBeat).Return(nil).Run(
|
||||
func(args mock.Arguments) {
|
||||
// Check the order of the consumers.
|
||||
//
|
||||
// The first consumer should have no previous consumer.
|
||||
require.Empty(t, prevConsumer)
|
||||
|
||||
// Set the consumer as the previous consumer.
|
||||
prevConsumer = consumer1.Name()
|
||||
}).Once()
|
||||
|
||||
consumer2.On("ProcessBlock", mockBeat).Return(nil).Run(
|
||||
func(args mock.Arguments) {
|
||||
// Check the order of the consumers.
|
||||
//
|
||||
// The second consumer should see consumer1.
|
||||
require.Equal(t, consumer1.Name(), prevConsumer)
|
||||
|
||||
// Set the consumer as the previous consumer.
|
||||
prevConsumer = consumer2.Name()
|
||||
}).Once()
|
||||
|
||||
consumer3.On("ProcessBlock", mockBeat).Return(nil).Run(
|
||||
func(args mock.Arguments) {
|
||||
// Check the order of the consumers.
|
||||
//
|
||||
// The third consumer should see consumer2.
|
||||
require.Equal(t, consumer2.Name(), prevConsumer)
|
||||
|
||||
// Set the consumer as the previous consumer.
|
||||
prevConsumer = consumer3.Name()
|
||||
}).Once()
|
||||
|
||||
// Call the method under test.
|
||||
err := DispatchSequential(mockBeat, consumers)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check the previous consumer is the last consumer.
|
||||
require.Equal(t, consumer3.Name(), prevConsumer)
|
||||
}
|
50
chainio/mocks.go
Normal file
50
chainio/mocks.go
Normal file
|
@ -0,0 +1,50 @@
|
|||
package chainio
|
||||
|
||||
import (
|
||||
"github.com/btcsuite/btclog/v2"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
// MockConsumer is a mock implementation of the Consumer interface.
|
||||
type MockConsumer struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// Compile-time constraint to ensure MockConsumer implements Consumer.
|
||||
var _ Consumer = (*MockConsumer)(nil)
|
||||
|
||||
// Name returns a human-readable string for this subsystem.
|
||||
func (m *MockConsumer) Name() string {
|
||||
args := m.Called()
|
||||
return args.String(0)
|
||||
}
|
||||
|
||||
// ProcessBlock takes a blockbeat and processes it. A receive-only error chan
|
||||
// must be returned.
|
||||
func (m *MockConsumer) ProcessBlock(b Blockbeat) error {
|
||||
args := m.Called(b)
|
||||
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
// MockBlockbeat is a mock implementation of the Blockbeat interface.
|
||||
type MockBlockbeat struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// Compile-time constraint to ensure MockBlockbeat implements Blockbeat.
|
||||
var _ Blockbeat = (*MockBlockbeat)(nil)
|
||||
|
||||
// Height returns the current block height.
|
||||
func (m *MockBlockbeat) Height() int32 {
|
||||
args := m.Called()
|
||||
|
||||
return args.Get(0).(int32)
|
||||
}
|
||||
|
||||
// logger returns the logger for the blockbeat.
|
||||
func (m *MockBlockbeat) logger() btclog.Logger {
|
||||
args := m.Called()
|
||||
|
||||
return args.Get(0).(btclog.Logger)
|
||||
}
|
Loading…
Add table
Reference in a new issue