chainio: introduce chainio to handle block synchronization

This commit inits the package `chainio` and defines the interface
`Blockbeat` and `Consumer`. The `Consumer` must be implemented by other
subsystems if it requires block epoch subscription.
This commit is contained in:
yyforyongyu 2024-06-27 08:36:19 +08:00
parent 30ee450091
commit 060ff013c1
No known key found for this signature in database
GPG key ID: 9BCD95C4FF296868
3 changed files with 224 additions and 0 deletions

152
chainio/README.md Normal file
View file

@ -0,0 +1,152 @@
# Chainio
`chainio` is a package designed to provide blockchain data access to various
subsystems within `lnd`. When a new block is received, it is encapsulated in a
`Blockbeat` object and disseminated to all registered consumers. Consumers may
receive these updates either concurrently or sequentially, based on their
registration configuration, ensuring that each subsystem maintains a
synchronized view of the current block state.
The main components include:
- `Blockbeat`: An interface that provides information about the block.
- `Consumer`: An interface that specifies how subsystems handle the blockbeat.
- `BlockbeatDispatcher`: The core service responsible for receiving each block
and distributing it to all consumers.
Additionally, the `BeatConsumer` struct provides a partial implementation of
the `Consumer` interface. This struct helps reduce code duplication, allowing
subsystems to avoid re-implementing the `ProcessBlock` method and provides a
commonly used `NotifyBlockProcessed` method.
### Register a Consumer
Consumers within the same queue are notified **sequentially**, while all queues
are notified **concurrently**. A queue consists of a slice of consumers, which
are notified in left-to-right order. Developers are responsible for determining
dependencies in block consumption across subsystems: independent subsystems
should be notified concurrently, whereas dependent subsystems should be
notified sequentially.
To notify the consumers concurrently, put them in different queues,
```go
// consumer1 and consumer2 will be notified concurrently.
queue1 := []chainio.Consumer{consumer1}
blockbeatDispatcher.RegisterQueue(consumer1)
queue2 := []chainio.Consumer{consumer2}
blockbeatDispatcher.RegisterQueue(consumer2)
```
To notify the consumers sequentially, put them in the same queue,
```go
// consumers will be notified sequentially via,
// consumer1 -> consumer2 -> consumer3
queue := []chainio.Consumer{
consumer1,
consumer2,
consumer3,
}
blockbeatDispatcher.RegisterQueue(queue)
```
### Implement the `Consumer` Interface
Implementing the `Consumer` interface is straightforward. Below is an example
of how
[`sweep.TxPublisher`](https://github.com/lightningnetwork/lnd/blob/5cec466fad44c582a64cfaeb91f6d5fd302fcf85/sweep/fee_bumper.go#L310)
implements this interface.
To start, embed the partial implementation `chainio.BeatConsumer`, which
already provides the `ProcessBlock` implementation and commonly used
`NotifyBlockProcessed` method, and exposes `BlockbeatChan` for the consumer to
receive blockbeats.
```go
type TxPublisher struct {
started atomic.Bool
stopped atomic.Bool
chainio.BeatConsumer
...
```
We should also remember to initialize this `BeatConsumer`,
```go
...
// Mount the block consumer.
tp.BeatConsumer = chainio.NewBeatConsumer(tp.quit, tp.Name())
```
Finally, in the main event loop, read from `BlockbeatChan`, process the
received blockbeat, and, crucially, call `tp.NotifyBlockProcessed` to inform
the blockbeat dispatcher that processing is complete.
```go
for {
select {
case beat := <-tp.BlockbeatChan:
// Consume this blockbeat, usually it means updating the subsystem
// using the new block data.
// Notify we've processed the block.
tp.NotifyBlockProcessed(beat, nil)
...
```
### Existing Queues
Currently, we have a single queue of consumers dedicated to handling force
closures. This queue includes `ChainArbitrator`, `UtxoSweeper`, and
`TxPublisher`, with `ChainArbitrator` managing two internal consumers:
`chainWatcher` and `ChannelArbitrator`. The blockbeat flows sequentially
through the chain as follows: `ChainArbitrator => chainWatcher =>
ChannelArbitrator => UtxoSweeper => TxPublisher`. The following diagram
illustrates the flow within the public subsystems.
```mermaid
sequenceDiagram
autonumber
participant bb as BlockBeat
participant cc as ChainArb
participant us as UtxoSweeper
participant tp as TxPublisher
note left of bb: 0. received block x,<br>dispatching...
note over bb,cc: 1. send block x to ChainArb,<br>wait for its done signal
bb->>cc: block x
rect rgba(165, 0, 85, 0.8)
critical signal processed
cc->>bb: processed block
option Process error or timeout
bb->>bb: error and exit
end
end
note over bb,us: 2. send block x to UtxoSweeper, wait for its done signal
bb->>us: block x
rect rgba(165, 0, 85, 0.8)
critical signal processed
us->>bb: processed block
option Process error or timeout
bb->>bb: error and exit
end
end
note over bb,tp: 3. send block x to TxPublisher, wait for its done signal
bb->>tp: block x
rect rgba(165, 0, 85, 0.8)
critical signal processed
tp->>bb: processed block
option Process error or timeout
bb->>bb: error and exit
end
end
```

40
chainio/interface.go Normal file
View file

@ -0,0 +1,40 @@
package chainio
// Blockbeat defines an interface that can be used by subsystems to retrieve
// block data. It is sent by the BlockbeatDispatcher whenever a new block is
// received. Once the subsystem finishes processing the block, it must signal
// it by calling NotifyBlockProcessed.
//
// The blockchain is a state machine - whenever there's a state change, it's
// manifested in a block. The blockbeat is a way to notify subsystems of this
// state change, and to provide them with the data they need to process it. In
// other words, subsystems must react to this state change and should consider
// being driven by the blockbeat in their own state machines.
type Blockbeat interface {
// Height returns the current block height.
Height() int32
}
// Consumer defines a blockbeat consumer interface. Subsystems that need block
// info must implement it.
type Consumer interface {
// TODO(yy): We should also define the start methods used by the
// consumers such that when implementing the interface, the consumer
// will always be started with a blockbeat. This cannot be enforced at
// the moment as we need refactor all the start methods to only take a
// beat.
//
// Start(beat Blockbeat) error
// Name returns a human-readable string for this subsystem.
Name() string
// ProcessBlock takes a blockbeat and processes it. It should not
// return until the subsystem has updated its state based on the block
// data.
//
// NOTE: The consumer must try its best to NOT return an error. If an
// error is returned from processing the block, it means the subsystem
// cannot react to onchain state changes and lnd will shutdown.
ProcessBlock(b Blockbeat) error
}

32
chainio/log.go Normal file
View file

@ -0,0 +1,32 @@
package chainio
import (
"github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/build"
)
// Subsystem defines the logging code for this subsystem.
const Subsystem = "CHIO"
// clog is a logger that is initialized with no output filters. This means the
// package will not perform any logging by default until the caller requests
// it.
var clog btclog.Logger
// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger(Subsystem, nil))
}
// DisableLog disables all library log output. Logging output is disabled by
// default until UseLogger is called.
func DisableLog() {
UseLogger(btclog.Disabled)
}
// UseLogger uses a specified Logger to output package logging info. This
// should be used in preference to SetLogWriter if the caller is also using
// btclog.
func UseLogger(logger btclog.Logger) {
clog = logger
}