chainio: implement Blockbeat

In this commit, a minimal implementation of `Blockbeat` is added to
synchronize block heights, which will be used in `ChainArb`, `Sweeper`,
and `TxPublisher` so blocks are processed sequentially among them.
This commit is contained in:
yyforyongyu 2024-06-27 08:41:53 +08:00
parent 8f886f8507
commit a1ca4bcba0
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
3 changed files with 99 additions and 3 deletions

55
chainio/blockbeat.go Normal file
View File

@ -0,0 +1,55 @@
package chainio
import (
"fmt"
"github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/chainntnfs"
)
// Beat implements the Blockbeat interface. It contains the block epoch and a
// customized logger.
//
// TODO(yy): extend this to check for confirmation status - which serves as the
// single source of truth, to avoid the potential race between receiving blocks
// and `GetTransactionDetails/RegisterSpendNtfn/RegisterConfirmationsNtfn`.
type Beat struct {
// epoch is the current block epoch the blockbeat is aware of.
epoch chainntnfs.BlockEpoch
// log is the customized logger for the blockbeat which prints the
// block height.
log btclog.Logger
}
// Compile-time check to ensure Beat satisfies the Blockbeat interface.
var _ Blockbeat = (*Beat)(nil)
// NewBeat creates a new beat with the specified block epoch and a customized
// logger.
func NewBeat(epoch chainntnfs.BlockEpoch) *Beat {
b := &Beat{
epoch: epoch,
}
// Create a customized logger for the blockbeat.
logPrefix := fmt.Sprintf("Height[%6d]:", b.Height())
b.log = build.NewPrefixLog(logPrefix, clog)
return b
}
// Height returns the height of the block epoch.
//
// NOTE: Part of the Blockbeat interface.
func (b *Beat) Height() int32 {
return b.epoch.Height
}
// logger returns the logger for the blockbeat.
//
// NOTE: Part of the private blockbeat interface.
func (b *Beat) logger() btclog.Logger {
return b.log
}

28
chainio/blockbeat_test.go Normal file
View File

@ -0,0 +1,28 @@
package chainio
import (
"errors"
"testing"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/stretchr/testify/require"
)
var errDummy = errors.New("dummy error")
// TestNewBeat tests the NewBeat and Height functions.
func TestNewBeat(t *testing.T) {
t.Parallel()
// Create a testing epoch.
epoch := chainntnfs.BlockEpoch{
Height: 1,
}
// Create the beat and check the internal state.
beat := NewBeat(epoch)
require.Equal(t, epoch, beat.epoch)
// Check the height function.
require.Equal(t, epoch.Height, beat.Height())
}

View File

@ -1,9 +1,11 @@
package chainio package chainio
import "github.com/btcsuite/btclog/v2"
// Blockbeat defines an interface that can be used by subsystems to retrieve // Blockbeat defines an interface that can be used by subsystems to retrieve
// block data. It is sent by the BlockbeatDispatcher whenever a new block is // block data. It is sent by the BlockbeatDispatcher to all the registered
// received. Once the subsystem finishes processing the block, it must signal // consumers whenever a new block is received. Once the consumer finishes
// it by calling NotifyBlockProcessed. // processing the block, it must signal it by calling `NotifyBlockProcessed`.
// //
// The blockchain is a state machine - whenever there's a state change, it's // The blockchain is a state machine - whenever there's a state change, it's
// manifested in a block. The blockbeat is a way to notify subsystems of this // manifested in a block. The blockbeat is a way to notify subsystems of this
@ -11,10 +13,21 @@ package chainio
// other words, subsystems must react to this state change and should consider // other words, subsystems must react to this state change and should consider
// being driven by the blockbeat in their own state machines. // being driven by the blockbeat in their own state machines.
type Blockbeat interface { type Blockbeat interface {
// blockbeat is a private interface that's only used in this package.
blockbeat
// Height returns the current block height. // Height returns the current block height.
Height() int32 Height() int32
} }
// blockbeat defines a set of private methods used in this package to make
// interaction with the blockbeat easier.
type blockbeat interface {
// logger returns the internal logger used by the blockbeat which has a
// block height prefix.
logger() btclog.Logger
}
// Consumer defines a blockbeat consumer interface. Subsystems that need block // Consumer defines a blockbeat consumer interface. Subsystems that need block
// info must implement it. // info must implement it.
type Consumer interface { type Consumer interface {