wtclient: replay un-acked updates onto pipeline

In this commit, the bugs demonstrated in prior commits are fixed. In the
case where an session has persisted a CommittedUpdate and the tower is
being removed, the session will now replay that update on to the main
task pipeline so that it can be backed up using a different session.
This commit is contained in:
Elle Mouton 2023-02-13 15:49:28 +02:00
parent c432899bf9
commit 3ea67983b5
No known key found for this signature in database
GPG key ID: D7D916376026F177
3 changed files with 105 additions and 46 deletions

View file

@ -701,7 +701,7 @@ func (c *TowerClient) Stop() error {
// task pipeline. // task pipeline.
c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() { c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() {
return func() { return func() {
err := s.Stop() err := s.Stop(false)
if err != nil { if err != nil {
c.log.Errorf("could not stop session "+ c.log.Errorf("could not stop session "+
"queue: %s: %v", s.ID(), err) "queue: %s: %v", s.ID(), err)
@ -1689,37 +1689,20 @@ func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error {
return err return err
} }
// We'll first update our in-memory state followed by our persisted // If an address was provided, then we're only meant to remove the
// state, with the stale tower. The removal of the tower address from // address from the tower.
// the in-memory state will fail if the address is currently being used if msg.addr != nil {
// for a session negotiation. return c.removeTowerAddr(dbTower, msg.addr)
err = c.candidateTowers.RemoveCandidate(dbTower.ID, msg.addr) }
// Otherwise, the tower should no longer be used for future session
// negotiations and backups. First, we'll update our in-memory state
// with the stale tower.
err = c.candidateTowers.RemoveCandidate(dbTower.ID, nil)
if err != nil { if err != nil {
return err return err
} }
if err := c.cfg.DB.RemoveTower(msg.pubKey, msg.addr); err != nil {
// If the persisted state update fails, re-add the address to
// our in-memory state.
tower, newTowerErr := NewTowerFromDBTower(dbTower)
if newTowerErr != nil {
log.Errorf("could not create new in-memory tower: %v",
newTowerErr)
} else {
c.candidateTowers.AddCandidate(tower)
}
return err
}
// If an address was provided, then we're only meant to remove the
// address from the tower, so there's nothing left for us to do.
if msg.addr != nil {
return nil
}
// Otherwise, the tower should no longer be used for future session
// negotiations and backups.
pubKey := msg.pubKey.SerializeCompressed() pubKey := msg.pubKey.SerializeCompressed()
sessions, err := c.cfg.DB.ListClientSessions(&dbTower.ID) sessions, err := c.cfg.DB.ListClientSessions(&dbTower.ID)
if err != nil { if err != nil {
@ -1748,6 +1731,40 @@ func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error {
} }
} }
// Finally, we will update our persisted state with the stale tower.
return c.cfg.DB.RemoveTower(msg.pubKey, nil)
}
// removeTowerAddr removes the given address from the tower.
func (c *TowerClient) removeTowerAddr(tower *wtdb.Tower, addr net.Addr) error {
if addr == nil {
return fmt.Errorf("an address must be provided")
}
// We'll first update our in-memory state followed by our persisted
// state with the stale tower. The removal of the tower address from
// the in-memory state will fail if the address is currently being used
// for a session negotiation.
err := c.candidateTowers.RemoveCandidate(tower.ID, addr)
if err != nil {
return err
}
err = c.cfg.DB.RemoveTower(tower.IdentityKey, addr)
if err != nil {
// If the persisted state update fails, re-add the address to
// our in-memory state.
tower, newTowerErr := NewTowerFromDBTower(tower)
if newTowerErr != nil {
log.Errorf("could not create new in-memory tower: %v",
newTowerErr)
} else {
c.candidateTowers.AddCandidate(tower)
}
return err
}
return nil return nil
} }

View file

@ -2224,10 +2224,11 @@ var clientTests = []clientTest{
}, },
}, },
{ {
// Assert that a client is unable to remove a tower if there // Assert that a client is able to remove a tower if there are
// are persisted un-acked updates. This is a bug that will be // persisted un-acked updates. This tests the case where the
// fixed in a future commit. // client is not-restarted meaning that the un-acked updates
name: "cant remove due to un-acked updates (no client restart)", // will still be in the pending queue.
name: "can remove due to un-acked updates (no client restart)",
cfg: harnessCfg{ cfg: harnessCfg{
localBalance: localBalance, localBalance: localBalance,
remoteBalance: remoteBalance, remoteBalance: remoteBalance,
@ -2288,23 +2289,29 @@ var clientTests = []clientTest{
}, waitTime) }, waitTime)
require.NoError(h.t, err) require.NoError(h.t, err)
// Now attempt to remove the tower. This will fail due // Now remove the tower.
// the tower having "un-acked" updates. This is a bug
// that will be fixed in a future commit.
err = h.client.RemoveTower( err = h.client.RemoveTower(
h.server.addr.IdentityKey, nil, h.server.addr.IdentityKey, nil,
) )
require.ErrorContains( require.NoError(h.t, err)
h.t, err, "tower has unacked updates",
// Add a new tower.
server2 := newServerHarness(
h.t, h.net, towerAddr2Str, nil,
) )
server2.start()
h.addTower(server2.addr)
// Now we assert that the backups are backed up to the
// new tower.
server2.waitForUpdates(hints[numUpdates/2:], waitTime)
}, },
}, },
{ {
// Assert that a client is _unable_ to remove a tower if there // Assert that a client is able to remove a tower if there are
// are persisted un-acked updates _and_ the client is restarted // persisted un-acked updates _and_ the client is restarted
// before the tower is removed. // before the tower is removed.
name: "cant remove due to un-acked updates (with client " + name: "can remove tower with un-acked updates (with restart)",
"restart)",
cfg: harnessCfg{ cfg: harnessCfg{
localBalance: localBalance, localBalance: localBalance,
remoteBalance: remoteBalance, remoteBalance: remoteBalance,
@ -2373,12 +2380,22 @@ var clientTests = []clientTest{
require.NoError(h.t, h.client.Stop()) require.NoError(h.t, h.client.Stop())
h.startClient() h.startClient()
// Now try removing the tower. This will fail due to // Now remove the tower.
// the persisted CommittedUpdate.
err = h.client.RemoveTower( err = h.client.RemoveTower(
h.server.addr.IdentityKey, nil, h.server.addr.IdentityKey, nil,
) )
require.Error(h.t, err, "tower has unacked updates") require.NoError(h.t, err)
// Add a new tower.
server2 := newServerHarness(
h.t, h.net, towerAddr2Str, nil,
)
server2.start()
h.addTower(server2.addr)
// Now we assert that the backups are backed up to the
// new tower.
server2.waitForUpdates(hints[numUpdates/2:], waitTime)
}, },
}, },
} }

View file

@ -161,7 +161,10 @@ func (q *sessionQueue) Start() {
// Stop idempotently stops the sessionQueue by initiating a clean shutdown that // Stop idempotently stops the sessionQueue by initiating a clean shutdown that
// will clear all pending tasks in the queue before returning to the caller. // will clear all pending tasks in the queue before returning to the caller.
func (q *sessionQueue) Stop() error { // The final param should only be set to true if this is the last time that
// this session will be used. Otherwise, during normal shutdown, the final param
// should be false.
func (q *sessionQueue) Stop(final bool) error {
var returnErr error var returnErr error
q.stopped.Do(func() { q.stopped.Do(func() {
q.log.Debugf("SessionQueue(%s) stopping ...", q.ID()) q.log.Debugf("SessionQueue(%s) stopping ...", q.ID())
@ -195,6 +198,28 @@ func (q *sessionQueue) Stop() error {
unAckedUpdates := make(map[wtdb.BackupID]bool) unAckedUpdates := make(map[wtdb.BackupID]bool)
for _, update := range updates { for _, update := range updates {
unAckedUpdates[update.BackupID] = true unAckedUpdates[update.BackupID] = true
if !final {
continue
}
err := q.cfg.TaskPipeline.QueueBackupID(
&update.BackupID,
)
if err != nil {
log.Errorf("could not re-queue %s: %v",
update.BackupID, err)
continue
}
err = q.cfg.DB.DeleteCommittedUpdate(
q.ID(), update.SeqNum,
)
if err != nil {
log.Errorf("could not delete committed "+
"update %d for session %s",
update.SeqNum, q.ID())
}
} }
// Push any task that was on the pending queue that there is // Push any task that was on the pending queue that there is
@ -752,7 +777,7 @@ func (s *sessionQueueSet) StopAndRemove(id wtdb.SessionID) error {
delete(s.queues, id) delete(s.queues, id)
return queue.Stop() return queue.Stop(true)
} }
// Get fetches and returns the sessionQueue with the given ID. // Get fetches and returns the sessionQueue with the given ID.