watchtower: add FetchSessionCommittedUpdates func to DB

In this commit, a new tower client db function is added that can be used
to fetch all the committed updates for a given session ID. This is done
in preparation for an upcoming commit where the CommittedUpdates will be
removed from the ClientSession struct.
This commit is contained in:
Elle Mouton 2022-09-30 11:47:54 +02:00
parent 15858cae1c
commit fe3d9174ea
No known key found for this signature in database
GPG Key ID: D7D916376026F177
4 changed files with 72 additions and 4 deletions

View File

@ -65,6 +65,11 @@ type DB interface {
ListClientSessions(*wtdb.TowerID, ...wtdb.ClientSessionListOption) (
map[wtdb.SessionID]*wtdb.ClientSession, error)
// FetchSessionCommittedUpdates retrieves the current set of un-acked
// updates of the given session.
FetchSessionCommittedUpdates(id *wtdb.SessionID) (
[]wtdb.CommittedUpdate, error)
// FetchChanSummaries loads a mapping from all registered channels to
// their channel summaries.
FetchChanSummaries() (wtdb.ChannelSummaries, error)

View File

@ -842,6 +842,36 @@ func listTowerSessions(id TowerID, sessionsBkt, towersBkt,
return clientSessions, nil
}
// FetchSessionCommittedUpdates retrieves the current set of un-acked updates
// of the given session.
func (c *ClientDB) FetchSessionCommittedUpdates(id *SessionID) (
[]CommittedUpdate, error) {
var committedUpdates []CommittedUpdate
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
sessions := tx.ReadBucket(cSessionBkt)
if sessions == nil {
return ErrUninitializedDB
}
sessionBkt := sessions.NestedReadBucket(id[:])
if sessionBkt == nil {
return ErrClientSessionNotFound
}
var err error
committedUpdates, err = getClientSessionCommits(
sessionBkt, nil, nil,
)
return err
}, func() {})
if err != nil {
return nil, err
}
return committedUpdates, nil
}
// FetchChanSummaries loads a mapping from all registered channels to their
// channel summaries.
func (c *ClientDB) FetchChanSummaries() (ChannelSummaries, error) {

View File

@ -207,6 +207,20 @@ func (h *clientDBHarness) newTower() *wtdb.Tower {
}, nil)
}
func (h *clientDBHarness) fetchSessionCommittedUpdates(id *wtdb.SessionID,
expErr error) []wtdb.CommittedUpdate {
h.t.Helper()
updates, err := h.db.FetchSessionCommittedUpdates(id)
if err != expErr {
h.t.Fatalf("expected fetch session committed updates error: "+
"%v, got: %v", expErr, err)
}
return updates
}
// testCreateClientSession asserts various conditions regarding the creation of
// a new ClientSession. The test asserts:
// - client sessions can only be created if a session key index is reserved.
@ -506,6 +520,9 @@ func testCommitUpdate(h *clientDBHarness) {
// session, which should fail.
update1 := randCommittedUpdate(h.t, 1)
h.commitUpdate(&session.ID, update1, wtdb.ErrClientSessionNotFound)
h.fetchSessionCommittedUpdates(
&session.ID, wtdb.ErrClientSessionNotFound,
)
// Reserve a session key index and insert the session.
session.KeyIndex = h.nextKeyIndex(session.TowerID, blobType)
@ -665,14 +682,14 @@ func (h *clientDBHarness) assertUpdates(id wtdb.SessionID,
_ = h.listSessions(
nil, wtdb.WithPerAckedUpdate(perAckedUpdate(ackedUpdates)),
)
dbSession := h.listSessions(nil)[id]
checkCommittedUpdates(h.t, dbSession, expectedPending)
committedUpates := h.fetchSessionCommittedUpdates(&id, nil)
checkCommittedUpdates(h.t, committedUpates, expectedPending)
checkAckedUpdates(h.t, ackedUpdates, expectedAcked)
}
// checkCommittedUpdates asserts that the CommittedUpdates on session match the
// expUpdates provided.
func checkCommittedUpdates(t *testing.T, session *wtdb.ClientSession,
func checkCommittedUpdates(t *testing.T, actualUpdates,
expUpdates []wtdb.CommittedUpdate) {
t.Helper()
@ -684,7 +701,7 @@ func checkCommittedUpdates(t *testing.T, session *wtdb.ClientSession,
expUpdates = make([]wtdb.CommittedUpdate, 0)
}
require.Equal(t, expUpdates, session.CommittedUpdates)
require.Equal(t, expUpdates, actualUpdates)
}
// checkAckedUpdates asserts that the AckedUpdates on a session match the

View File

@ -242,6 +242,22 @@ func (m *ClientDB) listClientSessions(tower *wtdb.TowerID,
return sessions, nil
}
// FetchSessionCommittedUpdates retrieves the current set of un-acked updates
// of the given session.
func (m *ClientDB) FetchSessionCommittedUpdates(id *wtdb.SessionID) (
[]wtdb.CommittedUpdate, error) {
m.mu.Lock()
defer m.mu.Unlock()
sess, ok := m.activeSessions[*id]
if !ok {
return nil, wtdb.ErrClientSessionNotFound
}
return sess.CommittedUpdates, nil
}
// CreateClientSession records a newly negotiated client session in the set of
// active sessions. The session can be identified by its SessionID.
func (m *ClientDB) CreateClientSession(session *wtdb.ClientSession) error {