diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 716db74aa..f3b8d3307 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -701,7 +701,7 @@ func (c *TowerClient) Stop() error { // task pipeline. c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() { return func() { - err := s.Stop() + err := s.Stop(false) if err != nil { c.log.Errorf("could not stop session "+ "queue: %s: %v", s.ID(), err) @@ -1689,37 +1689,20 @@ func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error { return err } - // We'll first update our in-memory state followed by our persisted - // state, with the stale tower. The removal of the tower address from - // the in-memory state will fail if the address is currently being used - // for a session negotiation. - err = c.candidateTowers.RemoveCandidate(dbTower.ID, msg.addr) + // If an address was provided, then we're only meant to remove the + // address from the tower. + if msg.addr != nil { + return c.removeTowerAddr(dbTower, msg.addr) + } + + // Otherwise, the tower should no longer be used for future session + // negotiations and backups. First, we'll update our in-memory state + // with the stale tower. + err = c.candidateTowers.RemoveCandidate(dbTower.ID, nil) if err != nil { return err } - if err := c.cfg.DB.RemoveTower(msg.pubKey, msg.addr); err != nil { - // If the persisted state update fails, re-add the address to - // our in-memory state. - tower, newTowerErr := NewTowerFromDBTower(dbTower) - if newTowerErr != nil { - log.Errorf("could not create new in-memory tower: %v", - newTowerErr) - } else { - c.candidateTowers.AddCandidate(tower) - } - - return err - } - - // If an address was provided, then we're only meant to remove the - // address from the tower, so there's nothing left for us to do. - if msg.addr != nil { - return nil - } - - // Otherwise, the tower should no longer be used for future session - // negotiations and backups. pubKey := msg.pubKey.SerializeCompressed() sessions, err := c.cfg.DB.ListClientSessions(&dbTower.ID) if err != nil { @@ -1748,6 +1731,40 @@ func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error { } } + // Finally, we will update our persisted state with the stale tower. + return c.cfg.DB.RemoveTower(msg.pubKey, nil) +} + +// removeTowerAddr removes the given address from the tower. +func (c *TowerClient) removeTowerAddr(tower *wtdb.Tower, addr net.Addr) error { + if addr == nil { + return fmt.Errorf("an address must be provided") + } + + // We'll first update our in-memory state followed by our persisted + // state with the stale tower. The removal of the tower address from + // the in-memory state will fail if the address is currently being used + // for a session negotiation. + err := c.candidateTowers.RemoveCandidate(tower.ID, addr) + if err != nil { + return err + } + + err = c.cfg.DB.RemoveTower(tower.IdentityKey, addr) + if err != nil { + // If the persisted state update fails, re-add the address to + // our in-memory state. + tower, newTowerErr := NewTowerFromDBTower(tower) + if newTowerErr != nil { + log.Errorf("could not create new in-memory tower: %v", + newTowerErr) + } else { + c.candidateTowers.AddCandidate(tower) + } + + return err + } + return nil } diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 1b54d87cc..5cb998c9a 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -2224,10 +2224,11 @@ var clientTests = []clientTest{ }, }, { - // Assert that a client is unable to remove a tower if there - // are persisted un-acked updates. This is a bug that will be - // fixed in a future commit. - name: "cant remove due to un-acked updates (no client restart)", + // Assert that a client is able to remove a tower if there are + // persisted un-acked updates. This tests the case where the + // client is not-restarted meaning that the un-acked updates + // will still be in the pending queue. + name: "can remove due to un-acked updates (no client restart)", cfg: harnessCfg{ localBalance: localBalance, remoteBalance: remoteBalance, @@ -2288,23 +2289,29 @@ var clientTests = []clientTest{ }, waitTime) require.NoError(h.t, err) - // Now attempt to remove the tower. This will fail due - // the tower having "un-acked" updates. This is a bug - // that will be fixed in a future commit. + // Now remove the tower. err = h.client.RemoveTower( h.server.addr.IdentityKey, nil, ) - require.ErrorContains( - h.t, err, "tower has unacked updates", + require.NoError(h.t, err) + + // Add a new tower. + server2 := newServerHarness( + h.t, h.net, towerAddr2Str, nil, ) + server2.start() + h.addTower(server2.addr) + + // Now we assert that the backups are backed up to the + // new tower. + server2.waitForUpdates(hints[numUpdates/2:], waitTime) }, }, { - // Assert that a client is _unable_ to remove a tower if there - // are persisted un-acked updates _and_ the client is restarted + // Assert that a client is able to remove a tower if there are + // persisted un-acked updates _and_ the client is restarted // before the tower is removed. - name: "cant remove due to un-acked updates (with client " + - "restart)", + name: "can remove tower with un-acked updates (with restart)", cfg: harnessCfg{ localBalance: localBalance, remoteBalance: remoteBalance, @@ -2373,12 +2380,22 @@ var clientTests = []clientTest{ require.NoError(h.t, h.client.Stop()) h.startClient() - // Now try removing the tower. This will fail due to - // the persisted CommittedUpdate. + // Now remove the tower. err = h.client.RemoveTower( h.server.addr.IdentityKey, nil, ) - require.Error(h.t, err, "tower has unacked updates") + require.NoError(h.t, err) + + // Add a new tower. + server2 := newServerHarness( + h.t, h.net, towerAddr2Str, nil, + ) + server2.start() + h.addTower(server2.addr) + + // Now we assert that the backups are backed up to the + // new tower. + server2.waitForUpdates(hints[numUpdates/2:], waitTime) }, }, } diff --git a/watchtower/wtclient/session_queue.go b/watchtower/wtclient/session_queue.go index 1377ceb57..27c36c6fe 100644 --- a/watchtower/wtclient/session_queue.go +++ b/watchtower/wtclient/session_queue.go @@ -161,7 +161,10 @@ func (q *sessionQueue) Start() { // Stop idempotently stops the sessionQueue by initiating a clean shutdown that // will clear all pending tasks in the queue before returning to the caller. -func (q *sessionQueue) Stop() error { +// The final param should only be set to true if this is the last time that +// this session will be used. Otherwise, during normal shutdown, the final param +// should be false. +func (q *sessionQueue) Stop(final bool) error { var returnErr error q.stopped.Do(func() { q.log.Debugf("SessionQueue(%s) stopping ...", q.ID()) @@ -195,6 +198,28 @@ func (q *sessionQueue) Stop() error { unAckedUpdates := make(map[wtdb.BackupID]bool) for _, update := range updates { unAckedUpdates[update.BackupID] = true + + if !final { + continue + } + + err := q.cfg.TaskPipeline.QueueBackupID( + &update.BackupID, + ) + if err != nil { + log.Errorf("could not re-queue %s: %v", + update.BackupID, err) + continue + } + + err = q.cfg.DB.DeleteCommittedUpdate( + q.ID(), update.SeqNum, + ) + if err != nil { + log.Errorf("could not delete committed "+ + "update %d for session %s", + update.SeqNum, q.ID()) + } } // Push any task that was on the pending queue that there is @@ -752,7 +777,7 @@ func (s *sessionQueueSet) StopAndRemove(id wtdb.SessionID) error { delete(s.queues, id) - return queue.Stop() + return queue.Stop(true) } // Get fetches and returns the sessionQueue with the given ID.