watchtower: update DeleteCommittedUpdate to delete all

This commit updates the DeleteCommittedUpdate DB method to delete all of
a given session's committed updates instead of just one at a time. The
reason for this is that in an upcoming commit, we will introduce a
"Terminal" session state - once we have deleted a committed update for a
session it should be considered "Terminal" and there is never a case
where we would only want to delete one committed update and not the
rest. So we want these two actions (deleting committed updates of a
session and setting it's status to terminal) to be atomic.
This commit is contained in:
Elle Mouton 2023-11-30 11:31:04 +02:00
parent a3bf2c70e5
commit 1ae802812c
No known key found for this signature in database
GPG Key ID: D7D916376026F177
4 changed files with 55 additions and 51 deletions

View File

@ -136,9 +136,9 @@ type DB interface {
// space.
GetDBQueue(namespace []byte) wtdb.Queue[*wtdb.BackupID]
// DeleteCommittedUpdate deletes the committed update belonging to the
// given session and with the given sequence number from the db.
DeleteCommittedUpdate(id *wtdb.SessionID, seqNum uint16) error
// DeleteCommittedUpdates deletes all the committed updates belonging to
// the given session from the db.
DeleteCommittedUpdates(id *wtdb.SessionID) error
}
// AuthDialer connects to a remote node using an authenticated transport, such

View File

@ -211,14 +211,13 @@ func (q *sessionQueue) Stop(final bool) error {
update.BackupID, err)
continue
}
}
err = q.cfg.DB.DeleteCommittedUpdate(
q.ID(), update.SeqNum,
)
if final {
err = q.cfg.DB.DeleteCommittedUpdates(q.ID())
if err != nil {
log.Errorf("could not delete committed "+
"update %d for session %s",
update.SeqNum, q.ID())
"updates for session %s", q.ID())
}
}

View File

@ -196,9 +196,9 @@ var (
// session has updates for channels that are still open.
errSessionHasOpenChannels = errors.New("session has open channels")
// errSessionHasUnackedUpdates is an error used to indicate that a
// ErrSessionHasUnackedUpdates is an error used to indicate that a
// session has un-acked updates.
errSessionHasUnackedUpdates = errors.New("session has un-acked updates")
ErrSessionHasUnackedUpdates = errors.New("session has un-acked updates")
// errChannelHasMoreSessions is an error used to indicate that a channel
// has updates in other non-closed sessions.
@ -1798,22 +1798,21 @@ func isSessionClosable(sessionsBkt, chanDetailsBkt, chanIDIndexBkt kvdb.RBucket,
return false, ErrSessionNotFound
}
// Since the DeleteCommittedUpdates method deletes the cSessionCommits
// bucket in one go, it is possible for the session to be closable even
// if this bucket no longer exists.
commitsBkt := sessBkt.NestedReadBucket(cSessionCommits)
if commitsBkt == nil {
// If the session has no cSessionCommits bucket then we can be
// sure that no updates have ever been committed to the session
// and so it is not yet exhausted.
return false, nil
}
// If the session has any un-acked updates, then it is not yet closable.
err := commitsBkt.ForEach(func(_, _ []byte) error {
return errSessionHasUnackedUpdates
})
if errors.Is(err, errSessionHasUnackedUpdates) {
return false, nil
} else if err != nil {
return false, err
if commitsBkt != nil {
// If the session has any un-acked updates, then it is not yet
// closable.
err := commitsBkt.ForEach(func(_, _ []byte) error {
return ErrSessionHasUnackedUpdates
})
if errors.Is(err, ErrSessionHasUnackedUpdates) {
return false, nil
} else if err != nil {
return false, err
}
}
session, err := getClientSessionBody(sessionsBkt, id[:])
@ -2215,9 +2214,9 @@ func (c *ClientDB) GetDBQueue(namespace []byte) Queue[*BackupID] {
)
}
// DeleteCommittedUpdate deletes the committed update with the given sequence
// number from the given session.
func (c *ClientDB) DeleteCommittedUpdate(id *SessionID, seqNum uint16) error {
// DeleteCommittedUpdates deletes all the committed updates for the given
// session.
func (c *ClientDB) DeleteCommittedUpdates(id *SessionID) error {
return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
sessions := tx.ReadWriteBucket(cSessionBkt)
if sessions == nil {
@ -2231,23 +2230,40 @@ func (c *ClientDB) DeleteCommittedUpdate(id *SessionID, seqNum uint16) error {
}
// If the commits sub-bucket doesn't exist, there can't possibly
// be a corresponding update to remove.
// be corresponding updates to remove.
sessionCommits := sessionBkt.NestedReadWriteBucket(
cSessionCommits,
)
if sessionCommits == nil {
return ErrCommittedUpdateNotFound
return nil
}
var seqNumBuf [2]byte
byteOrder.PutUint16(seqNumBuf[:], seqNum)
// errFoundUpdates is an error we will use to exit early from
// the ForEach loop. The return of this error means that at
// least one committed update exists.
var errFoundUpdates = fmt.Errorf("found committed updates")
err := sessionCommits.ForEach(func(k, v []byte) error {
return errFoundUpdates
})
switch {
// If the errFoundUpdates signal error was returned then there
// are some updates that need to be deleted.
case errors.Is(err, errFoundUpdates):
if sessionCommits.Get(seqNumBuf[:]) == nil {
return ErrCommittedUpdateNotFound
// If no error is returned then the ForEach call back was never
// entered meaning that there are no un-acked committed updates.
// So we can exit now as there is nothing left to do.
case err == nil:
return nil
// If an expected error is returned, return that error.
default:
return err
}
// Remove the corresponding committed update.
return sessionCommits.Delete(seqNumBuf[:])
// Delete all the committed updates in one go by deleting the
// session commits bucket.
return sessionBkt.DeleteNestedBucket(cSessionCommits)
}, func() {})
}

View File

@ -194,12 +194,12 @@ func (h *clientDBHarness) ackUpdate(id *wtdb.SessionID, seqNum uint16,
require.ErrorIs(h.t, err, expErr)
}
func (h *clientDBHarness) deleteCommittedUpdate(id *wtdb.SessionID,
seqNum uint16, expErr error) {
func (h *clientDBHarness) deleteCommittedUpdates(id *wtdb.SessionID,
expErr error) {
h.t.Helper()
err := h.db.DeleteCommittedUpdate(id, seqNum)
err := h.db.DeleteCommittedUpdates(id)
require.ErrorIs(h.t, err, expErr)
}
@ -660,18 +660,7 @@ func testCommitUpdate(h *clientDBHarness) {
// We will now also test that the DeleteCommittedUpdates method also
// works.
// First, try to delete a committed update that does not exist.
h.deleteCommittedUpdate(
&session.ID, update4.SeqNum, wtdb.ErrCommittedUpdateNotFound,
)
// Now delete an existing committed update and ensure that it succeeds.
h.deleteCommittedUpdate(&session.ID, update1.SeqNum, nil)
h.assertUpdates(session.ID, []wtdb.CommittedUpdate{
*update2,
}, nil)
h.deleteCommittedUpdate(&session.ID, update2.SeqNum, nil)
h.deleteCommittedUpdates(&session.ID, nil)
h.assertUpdates(session.ID, []wtdb.CommittedUpdate{}, nil)
}