wtclient+server: introduce tower client Manager

Introduce a wtclient `Manager` which handles tower clients. It indexes
clients by the policy used. The policy field is thus removed from the
`Config` struct which configures the Manager and is instead added to a
new `towerClientCfg` which configures a specific client managed by the
manager. For now, only the `NewClient` method is added to the Manager.
It can be used to construct a new `TowerClient`. The Manager currently
does notthing with the clients added to it.
This commit is contained in:
Elle Mouton 2023-08-11 11:28:29 +02:00
parent 80684eccbd
commit ab0375e0c1
No known key found for this signature in database
GPG key ID: D7D916376026F177
4 changed files with 204 additions and 153 deletions

View file

@ -282,6 +282,8 @@ type server struct {
sphinx *hop.OnionProcessor
towerClientMgr *wtclient.Manager
towerClient wtclient.Client
anchorTowerClient wtclient.Client
@ -1548,7 +1550,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
fetchClosedChannel := s.chanStateDB.FetchClosedChannelForID
s.towerClient, err = wtclient.New(&wtclient.Config{
s.towerClientMgr, err = wtclient.NewManager(&wtclient.Config{
FetchClosedChannel: fetchClosedChannel,
BuildBreachRetribution: buildBreachRetribution,
SessionCloseRange: cfg.WtClient.SessionCloseRange,
@ -1565,7 +1567,6 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
Dial: cfg.net.Dial,
AuthDial: authDial,
DB: dbs.TowerClientDB,
Policy: policy,
ChainHash: *s.cfg.ActiveNetParams.GenesisHash,
MinBackoff: 10 * time.Second,
MaxBackoff: 5 * time.Minute,
@ -1575,35 +1576,22 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
return nil, err
}
// Register a legacy tower client.
s.towerClient, err = s.towerClientMgr.NewClient(policy)
if err != nil {
return nil, err
}
// Copy the policy for legacy channels and set the blob flag
// signalling support for anchor channels.
anchorPolicy := policy
anchorPolicy.TxPolicy.BlobType |=
blob.Type(blob.FlagAnchorChannel)
s.anchorTowerClient, err = wtclient.New(&wtclient.Config{
FetchClosedChannel: fetchClosedChannel,
BuildBreachRetribution: buildBreachRetribution,
SessionCloseRange: cfg.WtClient.SessionCloseRange,
ChainNotifier: s.cc.ChainNotifier,
SubscribeChannelEvents: func() (subscribe.Subscription,
error) {
return s.channelNotifier.
SubscribeChannelEvents()
},
Signer: cc.Wallet.Cfg.Signer,
NewAddress: newSweepPkScriptGen(cc.Wallet),
SecretKeyRing: s.cc.KeyRing,
Dial: cfg.net.Dial,
AuthDial: authDial,
DB: dbs.TowerClientDB,
Policy: anchorPolicy,
ChainHash: *s.cfg.ActiveNetParams.GenesisHash,
MinBackoff: 10 * time.Second,
MaxBackoff: 5 * time.Minute,
MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue,
})
// Register an anchors tower client.
s.anchorTowerClient, err = s.towerClientMgr.NewClient(
anchorPolicy,
)
if err != nil {
return nil, err
}

View file

@ -11,19 +11,16 @@ import (
"time"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/subscribe"
"github.com/lightningnetwork/lnd/tor"
"github.com/lightningnetwork/lnd/watchtower/wtdb"
"github.com/lightningnetwork/lnd/watchtower/wtpolicy"
"github.com/lightningnetwork/lnd/watchtower/wtserver"
@ -148,92 +145,6 @@ type Client interface {
Stop() error
}
// Config provides the TowerClient with access to the resources it requires to
// perform its duty. All nillable fields must be non-nil for the tower to be
// initialized properly.
type Config struct {
// Signer provides access to the wallet so that the client can sign
// justice transactions that spend from a remote party's commitment
// transaction.
Signer input.Signer
// SubscribeChannelEvents can be used to subscribe to channel event
// notifications.
SubscribeChannelEvents func() (subscribe.Subscription, error)
// FetchClosedChannel can be used to fetch the info about a closed
// channel. If the channel is not found or not yet closed then
// channeldb.ErrClosedChannelNotFound will be returned.
FetchClosedChannel func(cid lnwire.ChannelID) (
*channeldb.ChannelCloseSummary, error)
// ChainNotifier can be used to subscribe to block notifications.
ChainNotifier chainntnfs.ChainNotifier
// BuildBreachRetribution is a function closure that allows the client
// fetch the breach retribution info for a certain channel at a certain
// revoked commitment height.
BuildBreachRetribution BreachRetributionBuilder
// NewAddress generates a new on-chain sweep pkscript.
NewAddress func() ([]byte, error)
// SecretKeyRing is used to derive the session keys used to communicate
// with the tower. The client only stores the KeyLocators internally so
// that we never store private keys on disk.
SecretKeyRing ECDHKeyRing
// Dial connects to an addr using the specified net and returns the
// connection object.
Dial tor.DialFunc
// AuthDialer establishes a brontide connection over an onion or clear
// network.
AuthDial AuthDialer
// DB provides access to the client's stable storage medium.
DB DB
// Policy is the session policy the client will propose when creating
// new sessions with the tower. If the policy differs from any active
// sessions recorded in the database, those sessions will be ignored and
// new sessions will be requested immediately.
Policy wtpolicy.Policy
// ChainHash identifies the chain that the client is on and for which
// the tower must be watching to monitor for breaches.
ChainHash chainhash.Hash
// ReadTimeout is the duration we will wait during a read before
// breaking out of a blocking read. If the value is less than or equal
// to zero, the default will be used instead.
ReadTimeout time.Duration
// WriteTimeout is the duration we will wait during a write before
// breaking out of a blocking write. If the value is less than or equal
// to zero, the default will be used instead.
WriteTimeout time.Duration
// MinBackoff defines the initial backoff applied to connections with
// watchtowers. Subsequent backoff durations will grow exponentially up
// until MaxBackoff.
MinBackoff time.Duration
// MaxBackoff defines the maximum backoff applied to connections with
// watchtowers. If the exponential backoff produces a timeout greater
// than this value, the backoff will be clamped to MaxBackoff.
MaxBackoff time.Duration
// SessionCloseRange is the range over which we will generate a random
// number of blocks to delay closing a session after its last channel
// has been closed.
SessionCloseRange uint32
// MaxTasksInMemQueue is the maximum number of backup tasks that should
// be kept in-memory. Any more tasks will overflow to disk.
MaxTasksInMemQueue uint64
}
// BreachRetributionBuilder is a function that can be used to construct a
// BreachRetribution from a channel ID and a commitment height.
type BreachRetributionBuilder func(id lnwire.ChannelID,
@ -273,6 +184,17 @@ type staleTowerMsg struct {
errChan chan error
}
// towerClientCfg holds the configuration values required by a TowerClient.
type towerClientCfg struct {
*Config
// Policy is the session policy the client will propose when creating
// new sessions with the tower. If the policy differs from any active
// sessions recorded in the database, those sessions will be ignored and
// new sessions will be requested immediately.
Policy wtpolicy.Policy
}
// TowerClient is a concrete implementation of the Client interface, offering a
// non-blocking, reliable subsystem for backing up revoked states to a specified
// private tower.
@ -280,7 +202,7 @@ type TowerClient struct {
started sync.Once
stopped sync.Once
cfg *Config
cfg *towerClientCfg
log btclog.Logger
@ -313,24 +235,9 @@ type TowerClient struct {
// interface.
var _ Client = (*TowerClient)(nil)
// New initializes a new TowerClient from the provide Config. An error is
// returned if the client could not be initialized.
func New(config *Config) (*TowerClient, error) {
// Copy the config to prevent side effects from modifying both the
// internal and external version of the Config.
cfg := new(Config)
*cfg = *config
// Set the read timeout to the default if none was provided.
if cfg.ReadTimeout <= 0 {
cfg.ReadTimeout = DefaultReadTimeout
}
// Set the write timeout to the default if none was provided.
if cfg.WriteTimeout <= 0 {
cfg.WriteTimeout = DefaultWriteTimeout
}
// newTowerClient initializes a new TowerClient from the provided
// towerClientCfg. An error is returned if the client could not be initialized.
func newTowerClient(cfg *towerClientCfg) (*TowerClient, error) {
identifier, err := cfg.Policy.BlobType.Identifier()
if err != nil {
return nil, err

View file

@ -395,15 +395,16 @@ func (c *mockChannel) getState(
}
type testHarness struct {
t *testing.T
cfg harnessCfg
signer *wtmock.MockSigner
capacity lnwire.MilliSatoshi
clientDB *wtdb.ClientDB
clientCfg *wtclient.Config
client wtclient.Client
server *serverHarness
net *mockNet
t *testing.T
cfg harnessCfg
signer *wtmock.MockSigner
capacity lnwire.MilliSatoshi
clientDB *wtdb.ClientDB
clientCfg *wtclient.Config
clientPolicy wtpolicy.Policy
client wtclient.Client
server *serverHarness
net *mockNet
blockEvents *mockBlockSub
height int32
@ -486,6 +487,7 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness {
return &channeldb.ChannelCloseSummary{CloseHeight: height}, nil
}
h.clientPolicy = cfg.policy
h.clientCfg = &wtclient.Config{
Signer: signer,
SubscribeChannelEvents: func() (subscribe.Subscription, error) {
@ -497,7 +499,6 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness {
DB: clientDB,
AuthDial: mockNet.AuthDial,
SecretKeyRing: wtmock.NewSecretKeyRing(),
Policy: cfg.policy,
NewAddress: func() ([]byte, error) {
return addrScript, nil
},
@ -559,7 +560,10 @@ func (h *testHarness) startClient() {
Address: towerTCPAddr,
}
h.client, err = wtclient.New(h.clientCfg)
m, err := wtclient.NewManager(h.clientCfg)
require.NoError(h.t, err)
h.client, err = m.NewClient(h.clientPolicy)
require.NoError(h.t, err)
require.NoError(h.t, h.client.Start())
require.NoError(h.t, h.client.AddTower(towerAddr))
@ -1452,9 +1456,7 @@ var clientTests = []clientTest{
// Assert that the server has updates for the clients
// most recent policy.
h.server.assertUpdatesForPolicy(
hints, h.clientCfg.Policy,
)
h.server.assertUpdatesForPolicy(hints, h.clientPolicy)
},
},
{
@ -1496,7 +1498,7 @@ var clientTests = []clientTest{
// Restart the client with a new policy, which will
// immediately try to overwrite the prior session with
// the old policy.
h.clientCfg.Policy.SweepFeeRate *= 2
h.clientPolicy.SweepFeeRate *= 2
h.startClient()
// Wait for all the updates to be populated in the
@ -1505,9 +1507,7 @@ var clientTests = []clientTest{
// Assert that the server has updates for the clients
// most recent policy.
h.server.assertUpdatesForPolicy(
hints, h.clientCfg.Policy,
)
h.server.assertUpdatesForPolicy(hints, h.clientPolicy)
},
},
{
@ -1549,10 +1549,10 @@ var clientTests = []clientTest{
// adjusting the MaxUpdates. The client should detect
// that the two policies have equivalent TxPolicies and
// continue using the first.
expPolicy := h.clientCfg.Policy
expPolicy := h.clientPolicy
// Restart the client with a new policy.
h.clientCfg.Policy.MaxUpdates = 20
h.clientPolicy.MaxUpdates = 20
h.startClient()
// Now, queue the second half of the retributions.

View file

@ -0,0 +1,156 @@
package wtclient
import (
"fmt"
"sync"
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/subscribe"
"github.com/lightningnetwork/lnd/tor"
"github.com/lightningnetwork/lnd/watchtower/blob"
"github.com/lightningnetwork/lnd/watchtower/wtpolicy"
)
// Config provides the TowerClient with access to the resources it requires to
// perform its duty. All nillable fields must be non-nil for the tower to be
// initialized properly.
type Config struct {
// Signer provides access to the wallet so that the client can sign
// justice transactions that spend from a remote party's commitment
// transaction.
Signer input.Signer
// SubscribeChannelEvents can be used to subscribe to channel event
// notifications.
SubscribeChannelEvents func() (subscribe.Subscription, error)
// FetchClosedChannel can be used to fetch the info about a closed
// channel. If the channel is not found or not yet closed then
// channeldb.ErrClosedChannelNotFound will be returned.
FetchClosedChannel func(cid lnwire.ChannelID) (
*channeldb.ChannelCloseSummary, error)
// ChainNotifier can be used to subscribe to block notifications.
ChainNotifier chainntnfs.ChainNotifier
// BuildBreachRetribution is a function closure that allows the client
// fetch the breach retribution info for a certain channel at a certain
// revoked commitment height.
BuildBreachRetribution BreachRetributionBuilder
// NewAddress generates a new on-chain sweep pkscript.
NewAddress func() ([]byte, error)
// SecretKeyRing is used to derive the session keys used to communicate
// with the tower. The client only stores the KeyLocators internally so
// that we never store private keys on disk.
SecretKeyRing ECDHKeyRing
// Dial connects to an addr using the specified net and returns the
// connection object.
Dial tor.DialFunc
// AuthDialer establishes a brontide connection over an onion or clear
// network.
AuthDial AuthDialer
// DB provides access to the client's stable storage medium.
DB DB
// ChainHash identifies the chain that the client is on and for which
// the tower must be watching to monitor for breaches.
ChainHash chainhash.Hash
// ReadTimeout is the duration we will wait during a read before
// breaking out of a blocking read. If the value is less than or equal
// to zero, the default will be used instead.
ReadTimeout time.Duration
// WriteTimeout is the duration we will wait during a write before
// breaking out of a blocking write. If the value is less than or equal
// to zero, the default will be used instead.
WriteTimeout time.Duration
// MinBackoff defines the initial backoff applied to connections with
// watchtowers. Subsequent backoff durations will grow exponentially up
// until MaxBackoff.
MinBackoff time.Duration
// MaxBackoff defines the maximum backoff applied to connections with
// watchtowers. If the exponential backoff produces a timeout greater
// than this value, the backoff will be clamped to MaxBackoff.
MaxBackoff time.Duration
// SessionCloseRange is the range over which we will generate a random
// number of blocks to delay closing a session after its last channel
// has been closed.
SessionCloseRange uint32
// MaxTasksInMemQueue is the maximum number of backup tasks that should
// be kept in-memory. Any more tasks will overflow to disk.
MaxTasksInMemQueue uint64
}
// Manager manages the various tower clients that are active. A client is
// required for each different commitment transaction type. The Manager acts as
// a tower client multiplexer.
type Manager struct {
cfg *Config
clients map[blob.Type]*TowerClient
clientsMu sync.Mutex
}
// NewManager constructs a new Manager.
func NewManager(config *Config) (*Manager, error) {
// Copy the config to prevent side effects from modifying both the
// internal and external version of the Config.
cfg := *config
// Set the read timeout to the default if none was provided.
if cfg.ReadTimeout <= 0 {
cfg.ReadTimeout = DefaultReadTimeout
}
// Set the write timeout to the default if none was provided.
if cfg.WriteTimeout <= 0 {
cfg.WriteTimeout = DefaultWriteTimeout
}
return &Manager{
cfg: &cfg,
clients: make(map[blob.Type]*TowerClient),
}, nil
}
// NewClient constructs a new TowerClient and adds it to the set of clients that
// the Manager is keeping track of.
func (m *Manager) NewClient(policy wtpolicy.Policy) (*TowerClient, error) {
m.clientsMu.Lock()
defer m.clientsMu.Unlock()
_, ok := m.clients[policy.BlobType]
if ok {
return nil, fmt.Errorf("a client with blob type %s has "+
"already been registered", policy.BlobType)
}
cfg := &towerClientCfg{
Config: m.cfg,
Policy: policy,
}
client, err := newTowerClient(cfg)
if err != nil {
return nil, err
}
m.clients[policy.BlobType] = client
return client, nil
}