From 65dc20f2ccf6b6014a509d304b6ad55603b409d7 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 2 Feb 2023 12:26:27 +0200 Subject: [PATCH] wtclient: let task pipeline only carry wtdb.BackupID Since the retrubution info of a backup task is now only constructed at the time that the task is being bound to a session, the in-memory queue only needs to carry the BackupID of the task. --- watchtower/wtclient/client.go | 53 ++++++++++++++++++---------- watchtower/wtclient/task_pipeline.go | 13 ++++--- 2 files changed, 42 insertions(+), 24 deletions(-) diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 5a59cbcd9..fe4b18648 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -301,7 +301,7 @@ type TowerClient struct { activeSessions sessionQueueSet sessionQueue *sessionQueue - prevTask *backupTask + prevTask *wtdb.BackupID closableSessionQueue *sessionCloseMinHeap @@ -804,10 +804,9 @@ func (c *TowerClient) RegisterChannel(chanID lnwire.ChannelID) error { func (c *TowerClient) BackupState(chanID *lnwire.ChannelID, stateNum uint64) error { - // Retrieve the cached sweep pkscript used for this channel. + // Make sure that this channel is registered with the tower client. c.backupMu.Lock() - summary, ok := c.summaries[*chanID] - if !ok { + if _, ok := c.summaries[*chanID]; !ok { c.backupMu.Unlock() return ErrUnregisteredChannel @@ -829,14 +828,12 @@ func (c *TowerClient) BackupState(chanID *lnwire.ChannelID, c.chanCommitHeights[*chanID] = stateNum c.backupMu.Unlock() - id := wtdb.BackupID{ + id := &wtdb.BackupID{ ChanID: *chanID, CommitHeight: stateNum, } - task := newBackupTask(id, summary.SweepPkScript) - - return c.pipeline.QueueBackupTask(task) + return c.pipeline.QueueBackupTask(id) } // nextSessionQueue attempts to fetch an active session from our set of @@ -1330,7 +1327,7 @@ func (c *TowerClient) backupDispatcher() { return } - c.log.Debugf("Processing %v", task.id) + c.log.Debugf("Processing %v", task) c.stats.taskReceived() c.processTask(task) @@ -1360,8 +1357,22 @@ func (c *TowerClient) backupDispatcher() { // sessionQueue hasn't been exhausted before proceeding to the next task. Tasks // that are rejected because the active sessionQueue is full will be cached as // the prevTask, and should be reprocessed after obtaining a new sessionQueue. -func (c *TowerClient) processTask(task *backupTask) { - status, accepted := c.sessionQueue.AcceptTask(task) +func (c *TowerClient) processTask(task *wtdb.BackupID) { + c.backupMu.Lock() + summary, ok := c.summaries[task.ChanID] + if !ok { + c.backupMu.Unlock() + + log.Infof("not processing task for unregistered channel: %s", + task.ChanID) + + return + } + c.backupMu.Unlock() + + backupTask := newBackupTask(*task, summary.SweepPkScript) + + status, accepted := c.sessionQueue.AcceptTask(backupTask) if accepted { c.taskAccepted(task, status) } else { @@ -1374,9 +1385,11 @@ func (c *TowerClient) processTask(task *backupTask) { // prevTask is always removed as a result of this call. The client's // sessionQueue will be removed if accepting the task left the sessionQueue in // an exhausted state. -func (c *TowerClient) taskAccepted(task *backupTask, newStatus reserveStatus) { - c.log.Infof("Queued %v successfully for session %v", - task.id, c.sessionQueue.ID()) +func (c *TowerClient) taskAccepted(task *wtdb.BackupID, + newStatus reserveStatus) { + + c.log.Infof("Queued %v successfully for session %v", task, + c.sessionQueue.ID()) c.stats.taskAccepted() @@ -1409,7 +1422,9 @@ func (c *TowerClient) taskAccepted(task *backupTask, newStatus reserveStatus) { // the sessionQueue to find a new session. If the sessionQueue was not // exhausted, the client marks the task as ineligible, as this implies we // couldn't construct a valid justice transaction given the session's policy. -func (c *TowerClient) taskRejected(task *backupTask, curStatus reserveStatus) { +func (c *TowerClient) taskRejected(task *wtdb.BackupID, + curStatus reserveStatus) { + switch curStatus { // The sessionQueue has available capacity but the task was rejected, @@ -1417,14 +1432,14 @@ func (c *TowerClient) taskRejected(task *backupTask, curStatus reserveStatus) { case reserveAvailable: c.stats.taskIneligible() - c.log.Infof("Ignoring ineligible %v", task.id) + c.log.Infof("Ignoring ineligible %v", task) err := c.cfg.DB.MarkBackupIneligible( - task.id.ChanID, task.id.CommitHeight, + task.ChanID, task.CommitHeight, ) if err != nil { c.log.Errorf("Unable to mark %v ineligible: %v", - task.id, err) + task, err) // It is safe to not handle this error, even if we could // not persist the result. At worst, this task may be @@ -1444,7 +1459,7 @@ func (c *TowerClient) taskRejected(task *backupTask, curStatus reserveStatus) { c.stats.sessionExhausted() c.log.Debugf("Session %v exhausted, %v queued for next session", - c.sessionQueue.ID(), task.id) + c.sessionQueue.ID(), task) // Cache the task that we pulled off, so that we can process it // once a new session queue is available. diff --git a/watchtower/wtclient/task_pipeline.go b/watchtower/wtclient/task_pipeline.go index 385f477af..9415e1d5b 100644 --- a/watchtower/wtclient/task_pipeline.go +++ b/watchtower/wtclient/task_pipeline.go @@ -6,6 +6,7 @@ import ( "time" "github.com/btcsuite/btclog" + "github.com/lightningnetwork/lnd/watchtower/wtdb" ) // taskPipeline implements a reliable, in-order queue that ensures its queue @@ -25,7 +26,7 @@ type taskPipeline struct { queueCond *sync.Cond queue *list.List - newBackupTasks chan *backupTask + newBackupTasks chan *wtdb.BackupID quit chan struct{} forceQuit chan struct{} @@ -37,7 +38,7 @@ func newTaskPipeline(log btclog.Logger) *taskPipeline { rq := &taskPipeline{ log: log, queue: list.New(), - newBackupTasks: make(chan *backupTask), + newBackupTasks: make(chan *wtdb.BackupID), quit: make(chan struct{}), forceQuit: make(chan struct{}), shutdown: make(chan struct{}), @@ -91,7 +92,7 @@ func (q *taskPipeline) ForceQuit() { // channel will be closed after a call to Stop and all pending tasks have been // delivered, or if a call to ForceQuit is called before the pending entries // have been drained. -func (q *taskPipeline) NewBackupTasks() <-chan *backupTask { +func (q *taskPipeline) NewBackupTasks() <-chan *wtdb.BackupID { return q.newBackupTasks } @@ -99,7 +100,7 @@ func (q *taskPipeline) NewBackupTasks() <-chan *backupTask { // of NewBackupTasks. If the taskPipeline is shutting down, ErrClientExiting is // returned. Otherwise, if QueueBackupTask returns nil it is guaranteed to be // delivered via NewBackupTasks unless ForceQuit is called before completion. -func (q *taskPipeline) QueueBackupTask(task *backupTask) error { +func (q *taskPipeline) QueueBackupTask(task *wtdb.BackupID) error { q.queueCond.L.Lock() select { @@ -164,7 +165,9 @@ func (q *taskPipeline) queueManager() { // Pop the first element from the queue. e := q.queue.Front() - task := q.queue.Remove(e).(*backupTask) + + //nolint:forcetypeassert + task := q.queue.Remove(e).(*wtdb.BackupID) q.queueCond.L.Unlock() select {