From ab0375e0c12d151193ef0be562016a32371344d5 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 11 Aug 2023 11:28:29 +0200 Subject: [PATCH] wtclient+server: introduce tower client Manager Introduce a wtclient `Manager` which handles tower clients. It indexes clients by the policy used. The policy field is thus removed from the `Config` struct which configures the Manager and is instead added to a new `towerClientCfg` which configures a specific client managed by the manager. For now, only the `NewClient` method is added to the Manager. It can be used to construct a new `TowerClient`. The Manager currently does notthing with the clients added to it. --- server.go | 38 +++---- watchtower/wtclient/client.go | 123 +++-------------------- watchtower/wtclient/client_test.go | 40 ++++---- watchtower/wtclient/manager.go | 156 +++++++++++++++++++++++++++++ 4 files changed, 204 insertions(+), 153 deletions(-) create mode 100644 watchtower/wtclient/manager.go diff --git a/server.go b/server.go index fcc489435..f23b4415d 100644 --- a/server.go +++ b/server.go @@ -282,6 +282,8 @@ type server struct { sphinx *hop.OnionProcessor + towerClientMgr *wtclient.Manager + towerClient wtclient.Client anchorTowerClient wtclient.Client @@ -1548,7 +1550,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, fetchClosedChannel := s.chanStateDB.FetchClosedChannelForID - s.towerClient, err = wtclient.New(&wtclient.Config{ + s.towerClientMgr, err = wtclient.NewManager(&wtclient.Config{ FetchClosedChannel: fetchClosedChannel, BuildBreachRetribution: buildBreachRetribution, SessionCloseRange: cfg.WtClient.SessionCloseRange, @@ -1565,7 +1567,6 @@ func newServer(cfg *Config, listenAddrs []net.Addr, Dial: cfg.net.Dial, AuthDial: authDial, DB: dbs.TowerClientDB, - Policy: policy, ChainHash: *s.cfg.ActiveNetParams.GenesisHash, MinBackoff: 10 * time.Second, MaxBackoff: 5 * time.Minute, @@ -1575,35 +1576,22 @@ func newServer(cfg *Config, listenAddrs []net.Addr, return nil, err } + // Register a legacy tower client. + s.towerClient, err = s.towerClientMgr.NewClient(policy) + if err != nil { + return nil, err + } + // Copy the policy for legacy channels and set the blob flag // signalling support for anchor channels. anchorPolicy := policy anchorPolicy.TxPolicy.BlobType |= blob.Type(blob.FlagAnchorChannel) - s.anchorTowerClient, err = wtclient.New(&wtclient.Config{ - FetchClosedChannel: fetchClosedChannel, - BuildBreachRetribution: buildBreachRetribution, - SessionCloseRange: cfg.WtClient.SessionCloseRange, - ChainNotifier: s.cc.ChainNotifier, - SubscribeChannelEvents: func() (subscribe.Subscription, - error) { - - return s.channelNotifier. - SubscribeChannelEvents() - }, - Signer: cc.Wallet.Cfg.Signer, - NewAddress: newSweepPkScriptGen(cc.Wallet), - SecretKeyRing: s.cc.KeyRing, - Dial: cfg.net.Dial, - AuthDial: authDial, - DB: dbs.TowerClientDB, - Policy: anchorPolicy, - ChainHash: *s.cfg.ActiveNetParams.GenesisHash, - MinBackoff: 10 * time.Second, - MaxBackoff: 5 * time.Minute, - MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue, - }) + // Register an anchors tower client. + s.anchorTowerClient, err = s.towerClientMgr.NewClient( + anchorPolicy, + ) if err != nil { return nil, err } diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index f9036e87f..68839b8df 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -11,19 +11,16 @@ import ( "time" "github.com/btcsuite/btcd/btcec/v2" - "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btclog" "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/fn" - "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/subscribe" - "github.com/lightningnetwork/lnd/tor" "github.com/lightningnetwork/lnd/watchtower/wtdb" "github.com/lightningnetwork/lnd/watchtower/wtpolicy" "github.com/lightningnetwork/lnd/watchtower/wtserver" @@ -148,92 +145,6 @@ type Client interface { Stop() error } -// Config provides the TowerClient with access to the resources it requires to -// perform its duty. All nillable fields must be non-nil for the tower to be -// initialized properly. -type Config struct { - // Signer provides access to the wallet so that the client can sign - // justice transactions that spend from a remote party's commitment - // transaction. - Signer input.Signer - - // SubscribeChannelEvents can be used to subscribe to channel event - // notifications. - SubscribeChannelEvents func() (subscribe.Subscription, error) - - // FetchClosedChannel can be used to fetch the info about a closed - // channel. If the channel is not found or not yet closed then - // channeldb.ErrClosedChannelNotFound will be returned. - FetchClosedChannel func(cid lnwire.ChannelID) ( - *channeldb.ChannelCloseSummary, error) - - // ChainNotifier can be used to subscribe to block notifications. - ChainNotifier chainntnfs.ChainNotifier - - // BuildBreachRetribution is a function closure that allows the client - // fetch the breach retribution info for a certain channel at a certain - // revoked commitment height. - BuildBreachRetribution BreachRetributionBuilder - - // NewAddress generates a new on-chain sweep pkscript. - NewAddress func() ([]byte, error) - - // SecretKeyRing is used to derive the session keys used to communicate - // with the tower. The client only stores the KeyLocators internally so - // that we never store private keys on disk. - SecretKeyRing ECDHKeyRing - - // Dial connects to an addr using the specified net and returns the - // connection object. - Dial tor.DialFunc - - // AuthDialer establishes a brontide connection over an onion or clear - // network. - AuthDial AuthDialer - - // DB provides access to the client's stable storage medium. - DB DB - - // Policy is the session policy the client will propose when creating - // new sessions with the tower. If the policy differs from any active - // sessions recorded in the database, those sessions will be ignored and - // new sessions will be requested immediately. - Policy wtpolicy.Policy - - // ChainHash identifies the chain that the client is on and for which - // the tower must be watching to monitor for breaches. - ChainHash chainhash.Hash - - // ReadTimeout is the duration we will wait during a read before - // breaking out of a blocking read. If the value is less than or equal - // to zero, the default will be used instead. - ReadTimeout time.Duration - - // WriteTimeout is the duration we will wait during a write before - // breaking out of a blocking write. If the value is less than or equal - // to zero, the default will be used instead. - WriteTimeout time.Duration - - // MinBackoff defines the initial backoff applied to connections with - // watchtowers. Subsequent backoff durations will grow exponentially up - // until MaxBackoff. - MinBackoff time.Duration - - // MaxBackoff defines the maximum backoff applied to connections with - // watchtowers. If the exponential backoff produces a timeout greater - // than this value, the backoff will be clamped to MaxBackoff. - MaxBackoff time.Duration - - // SessionCloseRange is the range over which we will generate a random - // number of blocks to delay closing a session after its last channel - // has been closed. - SessionCloseRange uint32 - - // MaxTasksInMemQueue is the maximum number of backup tasks that should - // be kept in-memory. Any more tasks will overflow to disk. - MaxTasksInMemQueue uint64 -} - // BreachRetributionBuilder is a function that can be used to construct a // BreachRetribution from a channel ID and a commitment height. type BreachRetributionBuilder func(id lnwire.ChannelID, @@ -273,6 +184,17 @@ type staleTowerMsg struct { errChan chan error } +// towerClientCfg holds the configuration values required by a TowerClient. +type towerClientCfg struct { + *Config + + // Policy is the session policy the client will propose when creating + // new sessions with the tower. If the policy differs from any active + // sessions recorded in the database, those sessions will be ignored and + // new sessions will be requested immediately. + Policy wtpolicy.Policy +} + // TowerClient is a concrete implementation of the Client interface, offering a // non-blocking, reliable subsystem for backing up revoked states to a specified // private tower. @@ -280,7 +202,7 @@ type TowerClient struct { started sync.Once stopped sync.Once - cfg *Config + cfg *towerClientCfg log btclog.Logger @@ -313,24 +235,9 @@ type TowerClient struct { // interface. var _ Client = (*TowerClient)(nil) -// New initializes a new TowerClient from the provide Config. An error is -// returned if the client could not be initialized. -func New(config *Config) (*TowerClient, error) { - // Copy the config to prevent side effects from modifying both the - // internal and external version of the Config. - cfg := new(Config) - *cfg = *config - - // Set the read timeout to the default if none was provided. - if cfg.ReadTimeout <= 0 { - cfg.ReadTimeout = DefaultReadTimeout - } - - // Set the write timeout to the default if none was provided. - if cfg.WriteTimeout <= 0 { - cfg.WriteTimeout = DefaultWriteTimeout - } - +// newTowerClient initializes a new TowerClient from the provided +// towerClientCfg. An error is returned if the client could not be initialized. +func newTowerClient(cfg *towerClientCfg) (*TowerClient, error) { identifier, err := cfg.Policy.BlobType.Identifier() if err != nil { return nil, err diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 51dd16e09..00404b40f 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -395,15 +395,16 @@ func (c *mockChannel) getState( } type testHarness struct { - t *testing.T - cfg harnessCfg - signer *wtmock.MockSigner - capacity lnwire.MilliSatoshi - clientDB *wtdb.ClientDB - clientCfg *wtclient.Config - client wtclient.Client - server *serverHarness - net *mockNet + t *testing.T + cfg harnessCfg + signer *wtmock.MockSigner + capacity lnwire.MilliSatoshi + clientDB *wtdb.ClientDB + clientCfg *wtclient.Config + clientPolicy wtpolicy.Policy + client wtclient.Client + server *serverHarness + net *mockNet blockEvents *mockBlockSub height int32 @@ -486,6 +487,7 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness { return &channeldb.ChannelCloseSummary{CloseHeight: height}, nil } + h.clientPolicy = cfg.policy h.clientCfg = &wtclient.Config{ Signer: signer, SubscribeChannelEvents: func() (subscribe.Subscription, error) { @@ -497,7 +499,6 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness { DB: clientDB, AuthDial: mockNet.AuthDial, SecretKeyRing: wtmock.NewSecretKeyRing(), - Policy: cfg.policy, NewAddress: func() ([]byte, error) { return addrScript, nil }, @@ -559,7 +560,10 @@ func (h *testHarness) startClient() { Address: towerTCPAddr, } - h.client, err = wtclient.New(h.clientCfg) + m, err := wtclient.NewManager(h.clientCfg) + require.NoError(h.t, err) + + h.client, err = m.NewClient(h.clientPolicy) require.NoError(h.t, err) require.NoError(h.t, h.client.Start()) require.NoError(h.t, h.client.AddTower(towerAddr)) @@ -1452,9 +1456,7 @@ var clientTests = []clientTest{ // Assert that the server has updates for the clients // most recent policy. - h.server.assertUpdatesForPolicy( - hints, h.clientCfg.Policy, - ) + h.server.assertUpdatesForPolicy(hints, h.clientPolicy) }, }, { @@ -1496,7 +1498,7 @@ var clientTests = []clientTest{ // Restart the client with a new policy, which will // immediately try to overwrite the prior session with // the old policy. - h.clientCfg.Policy.SweepFeeRate *= 2 + h.clientPolicy.SweepFeeRate *= 2 h.startClient() // Wait for all the updates to be populated in the @@ -1505,9 +1507,7 @@ var clientTests = []clientTest{ // Assert that the server has updates for the clients // most recent policy. - h.server.assertUpdatesForPolicy( - hints, h.clientCfg.Policy, - ) + h.server.assertUpdatesForPolicy(hints, h.clientPolicy) }, }, { @@ -1549,10 +1549,10 @@ var clientTests = []clientTest{ // adjusting the MaxUpdates. The client should detect // that the two policies have equivalent TxPolicies and // continue using the first. - expPolicy := h.clientCfg.Policy + expPolicy := h.clientPolicy // Restart the client with a new policy. - h.clientCfg.Policy.MaxUpdates = 20 + h.clientPolicy.MaxUpdates = 20 h.startClient() // Now, queue the second half of the retributions. diff --git a/watchtower/wtclient/manager.go b/watchtower/wtclient/manager.go new file mode 100644 index 000000000..922766d2d --- /dev/null +++ b/watchtower/wtclient/manager.go @@ -0,0 +1,156 @@ +package wtclient + +import ( + "fmt" + "sync" + "time" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/input" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/subscribe" + "github.com/lightningnetwork/lnd/tor" + "github.com/lightningnetwork/lnd/watchtower/blob" + "github.com/lightningnetwork/lnd/watchtower/wtpolicy" +) + +// Config provides the TowerClient with access to the resources it requires to +// perform its duty. All nillable fields must be non-nil for the tower to be +// initialized properly. +type Config struct { + // Signer provides access to the wallet so that the client can sign + // justice transactions that spend from a remote party's commitment + // transaction. + Signer input.Signer + + // SubscribeChannelEvents can be used to subscribe to channel event + // notifications. + SubscribeChannelEvents func() (subscribe.Subscription, error) + + // FetchClosedChannel can be used to fetch the info about a closed + // channel. If the channel is not found or not yet closed then + // channeldb.ErrClosedChannelNotFound will be returned. + FetchClosedChannel func(cid lnwire.ChannelID) ( + *channeldb.ChannelCloseSummary, error) + + // ChainNotifier can be used to subscribe to block notifications. + ChainNotifier chainntnfs.ChainNotifier + + // BuildBreachRetribution is a function closure that allows the client + // fetch the breach retribution info for a certain channel at a certain + // revoked commitment height. + BuildBreachRetribution BreachRetributionBuilder + + // NewAddress generates a new on-chain sweep pkscript. + NewAddress func() ([]byte, error) + + // SecretKeyRing is used to derive the session keys used to communicate + // with the tower. The client only stores the KeyLocators internally so + // that we never store private keys on disk. + SecretKeyRing ECDHKeyRing + + // Dial connects to an addr using the specified net and returns the + // connection object. + Dial tor.DialFunc + + // AuthDialer establishes a brontide connection over an onion or clear + // network. + AuthDial AuthDialer + + // DB provides access to the client's stable storage medium. + DB DB + + // ChainHash identifies the chain that the client is on and for which + // the tower must be watching to monitor for breaches. + ChainHash chainhash.Hash + + // ReadTimeout is the duration we will wait during a read before + // breaking out of a blocking read. If the value is less than or equal + // to zero, the default will be used instead. + ReadTimeout time.Duration + + // WriteTimeout is the duration we will wait during a write before + // breaking out of a blocking write. If the value is less than or equal + // to zero, the default will be used instead. + WriteTimeout time.Duration + + // MinBackoff defines the initial backoff applied to connections with + // watchtowers. Subsequent backoff durations will grow exponentially up + // until MaxBackoff. + MinBackoff time.Duration + + // MaxBackoff defines the maximum backoff applied to connections with + // watchtowers. If the exponential backoff produces a timeout greater + // than this value, the backoff will be clamped to MaxBackoff. + MaxBackoff time.Duration + + // SessionCloseRange is the range over which we will generate a random + // number of blocks to delay closing a session after its last channel + // has been closed. + SessionCloseRange uint32 + + // MaxTasksInMemQueue is the maximum number of backup tasks that should + // be kept in-memory. Any more tasks will overflow to disk. + MaxTasksInMemQueue uint64 +} + +// Manager manages the various tower clients that are active. A client is +// required for each different commitment transaction type. The Manager acts as +// a tower client multiplexer. +type Manager struct { + cfg *Config + + clients map[blob.Type]*TowerClient + clientsMu sync.Mutex +} + +// NewManager constructs a new Manager. +func NewManager(config *Config) (*Manager, error) { + // Copy the config to prevent side effects from modifying both the + // internal and external version of the Config. + cfg := *config + + // Set the read timeout to the default if none was provided. + if cfg.ReadTimeout <= 0 { + cfg.ReadTimeout = DefaultReadTimeout + } + + // Set the write timeout to the default if none was provided. + if cfg.WriteTimeout <= 0 { + cfg.WriteTimeout = DefaultWriteTimeout + } + + return &Manager{ + cfg: &cfg, + clients: make(map[blob.Type]*TowerClient), + }, nil +} + +// NewClient constructs a new TowerClient and adds it to the set of clients that +// the Manager is keeping track of. +func (m *Manager) NewClient(policy wtpolicy.Policy) (*TowerClient, error) { + m.clientsMu.Lock() + defer m.clientsMu.Unlock() + + _, ok := m.clients[policy.BlobType] + if ok { + return nil, fmt.Errorf("a client with blob type %s has "+ + "already been registered", policy.BlobType) + } + + cfg := &towerClientCfg{ + Config: m.cfg, + Policy: policy, + } + + client, err := newTowerClient(cfg) + if err != nil { + return nil, err + } + + m.clients[policy.BlobType] = client + + return client, nil +}