lnd/chainio
2024-12-20 17:54:08 +08:00
..
blockbeat_test.go chainio: implement Blockbeat 2024-12-20 17:54:00 +08:00
blockbeat.go chainio: implement Blockbeat 2024-12-20 17:54:00 +08:00
consumer_test.go chainio: add partial implementation of Consumer interface 2024-12-20 17:54:01 +08:00
consumer.go multi: optimize loggings around changes from blockbeat 2024-12-20 17:54:08 +08:00
dispatcher_test.go chainio: add BlockbeatDispatcher to dispatch blockbeats 2024-12-20 17:54:01 +08:00
dispatcher.go multi: optimize loggings around changes from blockbeat 2024-12-20 17:54:08 +08:00
interface.go chainio: implement Blockbeat 2024-12-20 17:54:00 +08:00
log.go chainio: introduce chainio to handle block synchronization 2024-12-20 17:54:00 +08:00
mocks.go chainio: add helper methods to dispatch beats 2024-12-20 17:54:01 +08:00
README.md chainio: introduce chainio to handle block synchronization 2024-12-20 17:54:00 +08:00

Chainio

chainio is a package designed to provide blockchain data access to various subsystems within lnd. When a new block is received, it is encapsulated in a Blockbeat object and disseminated to all registered consumers. Consumers may receive these updates either concurrently or sequentially, based on their registration configuration, ensuring that each subsystem maintains a synchronized view of the current block state.

The main components include:

  • Blockbeat: An interface that provides information about the block.

  • Consumer: An interface that specifies how subsystems handle the blockbeat.

  • BlockbeatDispatcher: The core service responsible for receiving each block and distributing it to all consumers.

Additionally, the BeatConsumer struct provides a partial implementation of the Consumer interface. This struct helps reduce code duplication, allowing subsystems to avoid re-implementing the ProcessBlock method and provides a commonly used NotifyBlockProcessed method.

Register a Consumer

Consumers within the same queue are notified sequentially, while all queues are notified concurrently. A queue consists of a slice of consumers, which are notified in left-to-right order. Developers are responsible for determining dependencies in block consumption across subsystems: independent subsystems should be notified concurrently, whereas dependent subsystems should be notified sequentially.

To notify the consumers concurrently, put them in different queues,

// consumer1 and consumer2 will be notified concurrently.
queue1 := []chainio.Consumer{consumer1}
blockbeatDispatcher.RegisterQueue(consumer1)

queue2 := []chainio.Consumer{consumer2}
blockbeatDispatcher.RegisterQueue(consumer2)

To notify the consumers sequentially, put them in the same queue,

// consumers will be notified sequentially via,
// consumer1 -> consumer2 -> consumer3
queue := []chainio.Consumer{
   consumer1,
   consumer2,
   consumer3,
}
blockbeatDispatcher.RegisterQueue(queue)

Implement the Consumer Interface

Implementing the Consumer interface is straightforward. Below is an example of how sweep.TxPublisher implements this interface.

To start, embed the partial implementation chainio.BeatConsumer, which already provides the ProcessBlock implementation and commonly used NotifyBlockProcessed method, and exposes BlockbeatChan for the consumer to receive blockbeats.

type TxPublisher struct {
   started atomic.Bool
   stopped atomic.Bool

   chainio.BeatConsumer

   ...

We should also remember to initialize this BeatConsumer,

...
// Mount the block consumer.
tp.BeatConsumer = chainio.NewBeatConsumer(tp.quit, tp.Name())

Finally, in the main event loop, read from BlockbeatChan, process the received blockbeat, and, crucially, call tp.NotifyBlockProcessed to inform the blockbeat dispatcher that processing is complete.

for {
      select {
      case beat := <-tp.BlockbeatChan:
         // Consume this blockbeat, usually it means updating the subsystem
         // using the new block data.

         // Notify we've processed the block.
         tp.NotifyBlockProcessed(beat, nil)

      ...

Existing Queues

Currently, we have a single queue of consumers dedicated to handling force closures. This queue includes ChainArbitrator, UtxoSweeper, and TxPublisher, with ChainArbitrator managing two internal consumers: chainWatcher and ChannelArbitrator. The blockbeat flows sequentially through the chain as follows: ChainArbitrator => chainWatcher => ChannelArbitrator => UtxoSweeper => TxPublisher. The following diagram illustrates the flow within the public subsystems.

sequenceDiagram
		autonumber
		participant bb as BlockBeat
		participant cc as ChainArb
		participant us as UtxoSweeper
		participant tp as TxPublisher
		
		note left of bb: 0. received block x,<br>dispatching...
		
    note over bb,cc: 1. send block x to ChainArb,<br>wait for its done signal
		bb->>cc: block x
		rect rgba(165, 0, 85, 0.8)
      critical signal processed
        cc->>bb: processed block
      option Process error or timeout
        bb->>bb: error and exit
      end
    end

    note over bb,us: 2. send block x to UtxoSweeper, wait for its done signal
		bb->>us: block x
		rect rgba(165, 0, 85, 0.8)
      critical signal processed
        us->>bb: processed block
      option Process error or timeout
        bb->>bb: error and exit
      end
    end

    note over bb,tp: 3. send block x to TxPublisher, wait for its done signal
		bb->>tp: block x
		rect rgba(165, 0, 85, 0.8)
      critical signal processed
        tp->>bb: processed block
      option Process error or timeout
        bb->>bb: error and exit
      end
    end