wtclient: let task pipeline only carry wtdb.BackupID

Since the retrubution info of a backup task is now only constructed at
the time that the task is being bound to a session, the in-memory queue
only needs to carry the BackupID of the task.
This commit is contained in:
Elle Mouton 2023-02-02 12:26:27 +02:00
parent 2371bbf09a
commit 65dc20f2cc
No known key found for this signature in database
GPG key ID: D7D916376026F177
2 changed files with 42 additions and 24 deletions

View file

@ -301,7 +301,7 @@ type TowerClient struct {
activeSessions sessionQueueSet
sessionQueue *sessionQueue
prevTask *backupTask
prevTask *wtdb.BackupID
closableSessionQueue *sessionCloseMinHeap
@ -804,10 +804,9 @@ func (c *TowerClient) RegisterChannel(chanID lnwire.ChannelID) error {
func (c *TowerClient) BackupState(chanID *lnwire.ChannelID,
stateNum uint64) error {
// Retrieve the cached sweep pkscript used for this channel.
// Make sure that this channel is registered with the tower client.
c.backupMu.Lock()
summary, ok := c.summaries[*chanID]
if !ok {
if _, ok := c.summaries[*chanID]; !ok {
c.backupMu.Unlock()
return ErrUnregisteredChannel
@ -829,14 +828,12 @@ func (c *TowerClient) BackupState(chanID *lnwire.ChannelID,
c.chanCommitHeights[*chanID] = stateNum
c.backupMu.Unlock()
id := wtdb.BackupID{
id := &wtdb.BackupID{
ChanID: *chanID,
CommitHeight: stateNum,
}
task := newBackupTask(id, summary.SweepPkScript)
return c.pipeline.QueueBackupTask(task)
return c.pipeline.QueueBackupTask(id)
}
// nextSessionQueue attempts to fetch an active session from our set of
@ -1330,7 +1327,7 @@ func (c *TowerClient) backupDispatcher() {
return
}
c.log.Debugf("Processing %v", task.id)
c.log.Debugf("Processing %v", task)
c.stats.taskReceived()
c.processTask(task)
@ -1360,8 +1357,22 @@ func (c *TowerClient) backupDispatcher() {
// sessionQueue hasn't been exhausted before proceeding to the next task. Tasks
// that are rejected because the active sessionQueue is full will be cached as
// the prevTask, and should be reprocessed after obtaining a new sessionQueue.
func (c *TowerClient) processTask(task *backupTask) {
status, accepted := c.sessionQueue.AcceptTask(task)
func (c *TowerClient) processTask(task *wtdb.BackupID) {
c.backupMu.Lock()
summary, ok := c.summaries[task.ChanID]
if !ok {
c.backupMu.Unlock()
log.Infof("not processing task for unregistered channel: %s",
task.ChanID)
return
}
c.backupMu.Unlock()
backupTask := newBackupTask(*task, summary.SweepPkScript)
status, accepted := c.sessionQueue.AcceptTask(backupTask)
if accepted {
c.taskAccepted(task, status)
} else {
@ -1374,9 +1385,11 @@ func (c *TowerClient) processTask(task *backupTask) {
// prevTask is always removed as a result of this call. The client's
// sessionQueue will be removed if accepting the task left the sessionQueue in
// an exhausted state.
func (c *TowerClient) taskAccepted(task *backupTask, newStatus reserveStatus) {
c.log.Infof("Queued %v successfully for session %v",
task.id, c.sessionQueue.ID())
func (c *TowerClient) taskAccepted(task *wtdb.BackupID,
newStatus reserveStatus) {
c.log.Infof("Queued %v successfully for session %v", task,
c.sessionQueue.ID())
c.stats.taskAccepted()
@ -1409,7 +1422,9 @@ func (c *TowerClient) taskAccepted(task *backupTask, newStatus reserveStatus) {
// the sessionQueue to find a new session. If the sessionQueue was not
// exhausted, the client marks the task as ineligible, as this implies we
// couldn't construct a valid justice transaction given the session's policy.
func (c *TowerClient) taskRejected(task *backupTask, curStatus reserveStatus) {
func (c *TowerClient) taskRejected(task *wtdb.BackupID,
curStatus reserveStatus) {
switch curStatus {
// The sessionQueue has available capacity but the task was rejected,
@ -1417,14 +1432,14 @@ func (c *TowerClient) taskRejected(task *backupTask, curStatus reserveStatus) {
case reserveAvailable:
c.stats.taskIneligible()
c.log.Infof("Ignoring ineligible %v", task.id)
c.log.Infof("Ignoring ineligible %v", task)
err := c.cfg.DB.MarkBackupIneligible(
task.id.ChanID, task.id.CommitHeight,
task.ChanID, task.CommitHeight,
)
if err != nil {
c.log.Errorf("Unable to mark %v ineligible: %v",
task.id, err)
task, err)
// It is safe to not handle this error, even if we could
// not persist the result. At worst, this task may be
@ -1444,7 +1459,7 @@ func (c *TowerClient) taskRejected(task *backupTask, curStatus reserveStatus) {
c.stats.sessionExhausted()
c.log.Debugf("Session %v exhausted, %v queued for next session",
c.sessionQueue.ID(), task.id)
c.sessionQueue.ID(), task)
// Cache the task that we pulled off, so that we can process it
// once a new session queue is available.

View file

@ -6,6 +6,7 @@ import (
"time"
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/watchtower/wtdb"
)
// taskPipeline implements a reliable, in-order queue that ensures its queue
@ -25,7 +26,7 @@ type taskPipeline struct {
queueCond *sync.Cond
queue *list.List
newBackupTasks chan *backupTask
newBackupTasks chan *wtdb.BackupID
quit chan struct{}
forceQuit chan struct{}
@ -37,7 +38,7 @@ func newTaskPipeline(log btclog.Logger) *taskPipeline {
rq := &taskPipeline{
log: log,
queue: list.New(),
newBackupTasks: make(chan *backupTask),
newBackupTasks: make(chan *wtdb.BackupID),
quit: make(chan struct{}),
forceQuit: make(chan struct{}),
shutdown: make(chan struct{}),
@ -91,7 +92,7 @@ func (q *taskPipeline) ForceQuit() {
// channel will be closed after a call to Stop and all pending tasks have been
// delivered, or if a call to ForceQuit is called before the pending entries
// have been drained.
func (q *taskPipeline) NewBackupTasks() <-chan *backupTask {
func (q *taskPipeline) NewBackupTasks() <-chan *wtdb.BackupID {
return q.newBackupTasks
}
@ -99,7 +100,7 @@ func (q *taskPipeline) NewBackupTasks() <-chan *backupTask {
// of NewBackupTasks. If the taskPipeline is shutting down, ErrClientExiting is
// returned. Otherwise, if QueueBackupTask returns nil it is guaranteed to be
// delivered via NewBackupTasks unless ForceQuit is called before completion.
func (q *taskPipeline) QueueBackupTask(task *backupTask) error {
func (q *taskPipeline) QueueBackupTask(task *wtdb.BackupID) error {
q.queueCond.L.Lock()
select {
@ -164,7 +165,9 @@ func (q *taskPipeline) queueManager() {
// Pop the first element from the queue.
e := q.queue.Front()
task := q.queue.Remove(e).(*backupTask)
//nolint:forcetypeassert
task := q.queue.Remove(e).(*wtdb.BackupID)
q.queueCond.L.Unlock()
select {