From 1ae802812cd7a4968eacc9d52e3a4fedf31d1350 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 30 Nov 2023 11:31:04 +0200 Subject: [PATCH] watchtower: update DeleteCommittedUpdate to delete all This commit updates the DeleteCommittedUpdate DB method to delete all of a given session's committed updates instead of just one at a time. The reason for this is that in an upcoming commit, we will introduce a "Terminal" session state - once we have deleted a committed update for a session it should be considered "Terminal" and there is never a case where we would only want to delete one committed update and not the rest. So we want these two actions (deleting committed updates of a session and setting it's status to terminal) to be atomic. --- watchtower/wtclient/interface.go | 6 +-- watchtower/wtclient/session_queue.go | 9 ++-- watchtower/wtdb/client_db.go | 72 +++++++++++++++++----------- watchtower/wtdb/client_db_test.go | 19 ++------ 4 files changed, 55 insertions(+), 51 deletions(-) diff --git a/watchtower/wtclient/interface.go b/watchtower/wtclient/interface.go index e69155288..c1d71c0e7 100644 --- a/watchtower/wtclient/interface.go +++ b/watchtower/wtclient/interface.go @@ -136,9 +136,9 @@ type DB interface { // space. GetDBQueue(namespace []byte) wtdb.Queue[*wtdb.BackupID] - // DeleteCommittedUpdate deletes the committed update belonging to the - // given session and with the given sequence number from the db. - DeleteCommittedUpdate(id *wtdb.SessionID, seqNum uint16) error + // DeleteCommittedUpdates deletes all the committed updates belonging to + // the given session from the db. + DeleteCommittedUpdates(id *wtdb.SessionID) error } // AuthDialer connects to a remote node using an authenticated transport, such diff --git a/watchtower/wtclient/session_queue.go b/watchtower/wtclient/session_queue.go index 1bd7a4ddb..3c1126a89 100644 --- a/watchtower/wtclient/session_queue.go +++ b/watchtower/wtclient/session_queue.go @@ -211,14 +211,13 @@ func (q *sessionQueue) Stop(final bool) error { update.BackupID, err) continue } + } - err = q.cfg.DB.DeleteCommittedUpdate( - q.ID(), update.SeqNum, - ) + if final { + err = q.cfg.DB.DeleteCommittedUpdates(q.ID()) if err != nil { log.Errorf("could not delete committed "+ - "update %d for session %s", - update.SeqNum, q.ID()) + "updates for session %s", q.ID()) } } diff --git a/watchtower/wtdb/client_db.go b/watchtower/wtdb/client_db.go index 3a8eb6725..46b3e73f5 100644 --- a/watchtower/wtdb/client_db.go +++ b/watchtower/wtdb/client_db.go @@ -196,9 +196,9 @@ var ( // session has updates for channels that are still open. errSessionHasOpenChannels = errors.New("session has open channels") - // errSessionHasUnackedUpdates is an error used to indicate that a + // ErrSessionHasUnackedUpdates is an error used to indicate that a // session has un-acked updates. - errSessionHasUnackedUpdates = errors.New("session has un-acked updates") + ErrSessionHasUnackedUpdates = errors.New("session has un-acked updates") // errChannelHasMoreSessions is an error used to indicate that a channel // has updates in other non-closed sessions. @@ -1798,22 +1798,21 @@ func isSessionClosable(sessionsBkt, chanDetailsBkt, chanIDIndexBkt kvdb.RBucket, return false, ErrSessionNotFound } + // Since the DeleteCommittedUpdates method deletes the cSessionCommits + // bucket in one go, it is possible for the session to be closable even + // if this bucket no longer exists. commitsBkt := sessBkt.NestedReadBucket(cSessionCommits) - if commitsBkt == nil { - // If the session has no cSessionCommits bucket then we can be - // sure that no updates have ever been committed to the session - // and so it is not yet exhausted. - return false, nil - } - - // If the session has any un-acked updates, then it is not yet closable. - err := commitsBkt.ForEach(func(_, _ []byte) error { - return errSessionHasUnackedUpdates - }) - if errors.Is(err, errSessionHasUnackedUpdates) { - return false, nil - } else if err != nil { - return false, err + if commitsBkt != nil { + // If the session has any un-acked updates, then it is not yet + // closable. + err := commitsBkt.ForEach(func(_, _ []byte) error { + return ErrSessionHasUnackedUpdates + }) + if errors.Is(err, ErrSessionHasUnackedUpdates) { + return false, nil + } else if err != nil { + return false, err + } } session, err := getClientSessionBody(sessionsBkt, id[:]) @@ -2215,9 +2214,9 @@ func (c *ClientDB) GetDBQueue(namespace []byte) Queue[*BackupID] { ) } -// DeleteCommittedUpdate deletes the committed update with the given sequence -// number from the given session. -func (c *ClientDB) DeleteCommittedUpdate(id *SessionID, seqNum uint16) error { +// DeleteCommittedUpdates deletes all the committed updates for the given +// session. +func (c *ClientDB) DeleteCommittedUpdates(id *SessionID) error { return kvdb.Update(c.db, func(tx kvdb.RwTx) error { sessions := tx.ReadWriteBucket(cSessionBkt) if sessions == nil { @@ -2231,23 +2230,40 @@ func (c *ClientDB) DeleteCommittedUpdate(id *SessionID, seqNum uint16) error { } // If the commits sub-bucket doesn't exist, there can't possibly - // be a corresponding update to remove. + // be corresponding updates to remove. sessionCommits := sessionBkt.NestedReadWriteBucket( cSessionCommits, ) if sessionCommits == nil { - return ErrCommittedUpdateNotFound + return nil } - var seqNumBuf [2]byte - byteOrder.PutUint16(seqNumBuf[:], seqNum) + // errFoundUpdates is an error we will use to exit early from + // the ForEach loop. The return of this error means that at + // least one committed update exists. + var errFoundUpdates = fmt.Errorf("found committed updates") + err := sessionCommits.ForEach(func(k, v []byte) error { + return errFoundUpdates + }) + switch { + // If the errFoundUpdates signal error was returned then there + // are some updates that need to be deleted. + case errors.Is(err, errFoundUpdates): - if sessionCommits.Get(seqNumBuf[:]) == nil { - return ErrCommittedUpdateNotFound + // If no error is returned then the ForEach call back was never + // entered meaning that there are no un-acked committed updates. + // So we can exit now as there is nothing left to do. + case err == nil: + return nil + + // If an expected error is returned, return that error. + default: + return err } - // Remove the corresponding committed update. - return sessionCommits.Delete(seqNumBuf[:]) + // Delete all the committed updates in one go by deleting the + // session commits bucket. + return sessionBkt.DeleteNestedBucket(cSessionCommits) }, func() {}) } diff --git a/watchtower/wtdb/client_db_test.go b/watchtower/wtdb/client_db_test.go index 9348b845f..475b72837 100644 --- a/watchtower/wtdb/client_db_test.go +++ b/watchtower/wtdb/client_db_test.go @@ -194,12 +194,12 @@ func (h *clientDBHarness) ackUpdate(id *wtdb.SessionID, seqNum uint16, require.ErrorIs(h.t, err, expErr) } -func (h *clientDBHarness) deleteCommittedUpdate(id *wtdb.SessionID, - seqNum uint16, expErr error) { +func (h *clientDBHarness) deleteCommittedUpdates(id *wtdb.SessionID, + expErr error) { h.t.Helper() - err := h.db.DeleteCommittedUpdate(id, seqNum) + err := h.db.DeleteCommittedUpdates(id) require.ErrorIs(h.t, err, expErr) } @@ -660,18 +660,7 @@ func testCommitUpdate(h *clientDBHarness) { // We will now also test that the DeleteCommittedUpdates method also // works. - // First, try to delete a committed update that does not exist. - h.deleteCommittedUpdate( - &session.ID, update4.SeqNum, wtdb.ErrCommittedUpdateNotFound, - ) - - // Now delete an existing committed update and ensure that it succeeds. - h.deleteCommittedUpdate(&session.ID, update1.SeqNum, nil) - h.assertUpdates(session.ID, []wtdb.CommittedUpdate{ - *update2, - }, nil) - - h.deleteCommittedUpdate(&session.ID, update2.SeqNum, nil) + h.deleteCommittedUpdates(&session.ID, nil) h.assertUpdates(session.ID, []wtdb.CommittedUpdate{}, nil) }