mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-02-22 22:25:24 +01:00
watchtower: account for rogue updates
In this commit, we introduce the concept of a rogue update. An update is rogue if we need to ACK it but we have already deleted all the data for the associated channel due to the channel being closed. In this case, we now no longer error out and instead keep count of how many rogue updates a session has backed-up.
This commit is contained in:
parent
c33cd0ea38
commit
2a9339805e
3 changed files with 271 additions and 26 deletions
|
@ -2417,14 +2417,10 @@ var clientTests = []clientTest{
|
|||
},
|
||||
},
|
||||
{
|
||||
// This test demonstrates a bug that will be addressed in a
|
||||
// follow-up commit. It shows that if a channel is closed while
|
||||
// an update for that channel still exists in an in-memory queue
|
||||
// somewhere then it is possible that all the data for that
|
||||
// channel gets deleted from the tower client DB. This results
|
||||
// in an error being thrown in the DB AckUpdate method since it
|
||||
// will try to find the associated channel data but will not
|
||||
// find it.
|
||||
// This test shows that if a channel is closed while an update
|
||||
// for that channel still exists in an in-memory queue
|
||||
// somewhere then it is handled correctly by treating it as a
|
||||
// rogue update.
|
||||
name: "channel closed while update is un-acked",
|
||||
cfg: harnessCfg{
|
||||
localBalance: localBalance,
|
||||
|
@ -2532,7 +2528,7 @@ var clientTests = []clientTest{
|
|||
require.NoError(h.t, err)
|
||||
|
||||
// Show that the committed update for the closed channel
|
||||
// remains in the client's DB.
|
||||
// is cleared from the DB.
|
||||
err = wait.Predicate(func() bool {
|
||||
sessions, err := h.clientDB.ListClientSessions(
|
||||
nil,
|
||||
|
@ -2545,11 +2541,11 @@ var clientTests = []clientTest{
|
|||
require.NoError(h.t, err)
|
||||
|
||||
if len(updates) != 0 {
|
||||
return true
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
return true
|
||||
}, waitTime)
|
||||
require.NoError(h.t, err)
|
||||
},
|
||||
|
|
|
@ -50,6 +50,7 @@ var (
|
|||
// => cSessionDBID -> db-assigned-id
|
||||
// => cSessionCommits => seqnum -> encoded CommittedUpdate
|
||||
// => cSessionAckRangeIndex => db-chan-id => start -> end
|
||||
// => cSessionRogueUpdateCount -> count
|
||||
cSessionBkt = []byte("client-session-bucket")
|
||||
|
||||
// cSessionDBID is a key used in the cSessionBkt to store the
|
||||
|
@ -68,6 +69,12 @@ var (
|
|||
// chan-id => start -> end
|
||||
cSessionAckRangeIndex = []byte("client-session-ack-range-index")
|
||||
|
||||
// cSessionRogueUpdateCount is a key in the cSessionBkt bucket storing
|
||||
// the number of rogue updates that were backed up using the session.
|
||||
// Rogue updates are updates for channels that have been closed already
|
||||
// at the time of the back-up.
|
||||
cSessionRogueUpdateCount = []byte("client-session-rogue-update-count")
|
||||
|
||||
// cChanIDIndexBkt is a top-level bucket storing:
|
||||
// db-assigned-id -> channel-ID
|
||||
cChanIDIndexBkt = []byte("client-channel-id-index")
|
||||
|
@ -1242,10 +1249,23 @@ func (c *ClientDB) NumAckedUpdates(id *SessionID) (uint64, error) {
|
|||
}
|
||||
|
||||
sessionBkt := sessions.NestedReadBucket(id[:])
|
||||
if sessionsBkt == nil {
|
||||
if sessionBkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// First, account for any rogue updates.
|
||||
rogueCountBytes := sessionBkt.Get(cSessionRogueUpdateCount)
|
||||
if len(rogueCountBytes) != 0 {
|
||||
rogueCount, err := readBigSize(rogueCountBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
numAcked += rogueCount
|
||||
}
|
||||
|
||||
// Then, check if the session-ack-ranges contains any entries
|
||||
// to account for.
|
||||
sessionAckRanges := sessionBkt.NestedReadBucket(
|
||||
cSessionAckRangeIndex,
|
||||
)
|
||||
|
@ -1525,14 +1545,37 @@ func (c *ClientDB) DeleteSession(id SessionID) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Get the acked updates range index for the session. This is
|
||||
// used to get the list of channels that the session has updates
|
||||
// for.
|
||||
ackRanges := sessionBkt.NestedReadBucket(cSessionAckRangeIndex)
|
||||
|
||||
// There is a small chance that the session only contains rogue
|
||||
// updates. In that case, there will be no ack-ranges index but
|
||||
// the rogue update count will be equal the MaxUpdates.
|
||||
rogueCountBytes := sessionBkt.Get(cSessionRogueUpdateCount)
|
||||
if len(rogueCountBytes) != 0 {
|
||||
rogueCount, err := readBigSize(rogueCountBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
maxUpdates := sess.ClientSessionBody.Policy.MaxUpdates
|
||||
if rogueCount == uint64(maxUpdates) {
|
||||
// Do a sanity check to ensure that the acked
|
||||
// ranges bucket does not exist in this case.
|
||||
if ackRanges != nil {
|
||||
return fmt.Errorf("acked updates "+
|
||||
"exist for session with a "+
|
||||
"max-updates(%d) rogue count",
|
||||
rogueCount)
|
||||
}
|
||||
|
||||
return sessionsBkt.DeleteNestedBucket(id[:])
|
||||
}
|
||||
}
|
||||
|
||||
// A session would only be considered closable if it was
|
||||
// exhausted. Meaning that it should not be the case that it has
|
||||
// no acked-updates.
|
||||
if ackRanges == nil {
|
||||
// A session would only be considered closable if it
|
||||
// was exhausted. Meaning that it should not be the
|
||||
// case that it has no acked-updates.
|
||||
return fmt.Errorf("cannot delete session %s since it "+
|
||||
"is not yet exhausted", id)
|
||||
}
|
||||
|
@ -1763,6 +1806,22 @@ func isSessionClosable(sessionsBkt, chanDetailsBkt, chanIDIndexBkt kvdb.RBucket,
|
|||
return false, nil
|
||||
}
|
||||
|
||||
// Either the acked-update bucket should exist _or_ the rogue update
|
||||
// count must be equal to the session's MaxUpdates value, otherwise
|
||||
// something is wrong because the above check ensures that the session
|
||||
// has been exhausted.
|
||||
rogueCountBytes := sessBkt.Get(cSessionRogueUpdateCount)
|
||||
if len(rogueCountBytes) != 0 {
|
||||
rogueCount, err := readBigSize(rogueCountBytes)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if rogueCount == uint64(session.Policy.MaxUpdates) {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
// If the session has no acked-updates, then something is wrong since
|
||||
// the above check ensures that this session has been exhausted meaning
|
||||
// that it should have MaxUpdates acked updates.
|
||||
|
@ -2005,12 +2064,83 @@ func (c *ClientDB) AckUpdate(id *SessionID, seqNum uint16,
|
|||
return err
|
||||
}
|
||||
|
||||
dbSessionID, dbSessIDBytes, err := getDBSessionID(sessions, *id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
chanID := committedUpdate.BackupID.ChanID
|
||||
height := committedUpdate.BackupID.CommitHeight
|
||||
|
||||
// Get the DB representation of the channel-ID.
|
||||
// Get the DB representation of the channel-ID. There is a
|
||||
// chance that the channel corresponding to this update has been
|
||||
// closed and that the details for this channel no longer exist
|
||||
// in the tower client DB. In that case, we consider this a
|
||||
// rogue update and all we do is make sure to keep track of the
|
||||
// number of rogue updates for this session.
|
||||
_, dbChanIDBytes, err := getDBChanID(chanDetailsBkt, chanID)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrChannelNotRegistered) {
|
||||
var (
|
||||
count uint64
|
||||
err error
|
||||
)
|
||||
|
||||
rogueCountBytes := sessionBkt.Get(
|
||||
cSessionRogueUpdateCount,
|
||||
)
|
||||
if len(rogueCountBytes) != 0 {
|
||||
count, err = readBigSize(rogueCountBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
rogueCount := count + 1
|
||||
countBytes, err := writeBigSize(rogueCount)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = sessionBkt.Put(
|
||||
cSessionRogueUpdateCount, countBytes,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// In the rare chance that this session only has rogue
|
||||
// updates, we check here if the count is equal to the
|
||||
// MaxUpdate of the session. If it is, then we mark the
|
||||
// session as closable.
|
||||
if rogueCount != uint64(session.Policy.MaxUpdates) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Before we mark the session as closable, we do a
|
||||
// sanity check to ensure that this session has no
|
||||
// acked-update index.
|
||||
sessionAckRanges := sessionBkt.NestedReadBucket(
|
||||
cSessionAckRangeIndex,
|
||||
)
|
||||
if sessionAckRanges != nil {
|
||||
return fmt.Errorf("session(%s) has an "+
|
||||
"acked ranges index but has a rogue "+
|
||||
"count indicating saturation",
|
||||
session.ID)
|
||||
}
|
||||
|
||||
closableSessBkt := tx.ReadWriteBucket(
|
||||
cClosableSessionsBkt,
|
||||
)
|
||||
if closableSessBkt == nil {
|
||||
return ErrUninitializedDB
|
||||
}
|
||||
|
||||
var height [4]byte
|
||||
byteOrder.PutUint32(height[:], 0)
|
||||
|
||||
return closableSessBkt.Put(dbSessIDBytes, height[:])
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -2024,11 +2154,6 @@ func (c *ClientDB) AckUpdate(id *SessionID, seqNum uint16,
|
|||
return err
|
||||
}
|
||||
|
||||
dbSessionID, _, err := getDBSessionID(sessions, *id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
chanDetails := chanDetailsBkt.NestedReadWriteBucket(
|
||||
committedUpdate.BackupID.ChanID[:],
|
||||
)
|
||||
|
|
|
@ -675,6 +675,98 @@ func testCommitUpdate(h *clientDBHarness) {
|
|||
h.assertUpdates(session.ID, []wtdb.CommittedUpdate{}, nil)
|
||||
}
|
||||
|
||||
// testRogueUpdates asserts that rogue updates (updates for channels that are
|
||||
// backed up after the channel has been closed and the channel details deleted
|
||||
// from the DB) are handled correctly.
|
||||
func testRogueUpdates(h *clientDBHarness) {
|
||||
const maxUpdates = 5
|
||||
|
||||
tower := h.newTower()
|
||||
|
||||
// Create and insert a new session.
|
||||
session1 := h.randSession(h.t, tower.ID, maxUpdates)
|
||||
h.insertSession(session1, nil)
|
||||
|
||||
// Create a new channel and register it.
|
||||
chanID1 := randChannelID(h.t)
|
||||
h.registerChan(chanID1, nil, nil)
|
||||
|
||||
// Num acked updates should be 0.
|
||||
require.Zero(h.t, h.numAcked(&session1.ID, nil))
|
||||
|
||||
// Commit and ACK enough updates for this channel to fill the session.
|
||||
for i := 1; i <= maxUpdates; i++ {
|
||||
update := randCommittedUpdateForChanWithHeight(
|
||||
h.t, chanID1, uint16(i), uint64(i),
|
||||
)
|
||||
lastApplied := h.commitUpdate(&session1.ID, update, nil)
|
||||
h.ackUpdate(&session1.ID, uint16(i), lastApplied, nil)
|
||||
}
|
||||
|
||||
// Num acked updates should now be 5.
|
||||
require.EqualValues(h.t, maxUpdates, h.numAcked(&session1.ID, nil))
|
||||
|
||||
// Commit one more update for the channel but this time do not ACK it.
|
||||
// This update will be put in a new session since the previous one has
|
||||
// been exhausted.
|
||||
session2 := h.randSession(h.t, tower.ID, maxUpdates)
|
||||
sess2Seq := 1
|
||||
h.insertSession(session2, nil)
|
||||
update := randCommittedUpdateForChanWithHeight(
|
||||
h.t, chanID1, uint16(sess2Seq), uint64(maxUpdates+1),
|
||||
)
|
||||
lastApplied := h.commitUpdate(&session2.ID, update, nil)
|
||||
|
||||
// Session 2 should not have any acked updates yet.
|
||||
require.Zero(h.t, h.numAcked(&session2.ID, nil))
|
||||
|
||||
// There should currently be no closable sessions.
|
||||
require.Empty(h.t, h.listClosableSessions(nil))
|
||||
|
||||
// Now mark the channel as closed.
|
||||
h.markChannelClosed(chanID1, 1, nil)
|
||||
|
||||
// Assert that session 1 is now seen as closable.
|
||||
closableSessionsMap := h.listClosableSessions(nil)
|
||||
require.Len(h.t, closableSessionsMap, 1)
|
||||
_, ok := closableSessionsMap[session1.ID]
|
||||
require.True(h.t, ok)
|
||||
|
||||
// Delete session 1.
|
||||
h.deleteSession(session1.ID, nil)
|
||||
|
||||
// Now try to ACK the update for the channel. This should succeed and
|
||||
// the update should be considered a rogue update.
|
||||
h.ackUpdate(&session2.ID, uint16(sess2Seq), lastApplied, nil)
|
||||
|
||||
// Show that the number of acked updates is now 1.
|
||||
require.EqualValues(h.t, 1, h.numAcked(&session2.ID, nil))
|
||||
|
||||
// We also want to test the extreme case where all the updates for a
|
||||
// particular session are rogue updates. In this case, the session
|
||||
// should be seen as closable if it is saturated.
|
||||
|
||||
// First show that the session is not yet considered closable.
|
||||
require.Empty(h.t, h.listClosableSessions(nil))
|
||||
|
||||
// Then, let's continue adding rogue updates for the closed channel to
|
||||
// session 2.
|
||||
for i := maxUpdates + 2; i <= maxUpdates*2; i++ {
|
||||
sess2Seq++
|
||||
|
||||
update := randCommittedUpdateForChanWithHeight(
|
||||
h.t, chanID1, uint16(sess2Seq), uint64(i),
|
||||
)
|
||||
lastApplied := h.commitUpdate(&session2.ID, update, nil)
|
||||
h.ackUpdate(&session2.ID, uint16(sess2Seq), lastApplied, nil)
|
||||
}
|
||||
|
||||
// At this point, session 2 is saturated with rogue updates. Assert that
|
||||
// it is now closable.
|
||||
closableSessionsMap = h.listClosableSessions(nil)
|
||||
require.Len(h.t, closableSessionsMap, 1)
|
||||
}
|
||||
|
||||
// testMarkChannelClosed asserts the behaviour of MarkChannelClosed.
|
||||
func testMarkChannelClosed(h *clientDBHarness) {
|
||||
tower := h.newTower()
|
||||
|
@ -762,7 +854,7 @@ func testMarkChannelClosed(h *clientDBHarness) {
|
|||
require.EqualValues(h.t, 4, lastApplied)
|
||||
h.ackUpdate(&session1.ID, 5, 5, nil)
|
||||
|
||||
// The session is no exhausted.
|
||||
// The session is now exhausted.
|
||||
// If we now close channel 5, session 1 should still not be closable
|
||||
// since it has an update for channel 6 which is still open.
|
||||
sl = h.markChannelClosed(chanID5, 1, nil)
|
||||
|
@ -1001,6 +1093,10 @@ func TestClientDB(t *testing.T) {
|
|||
name: "mark channel closed",
|
||||
run: testMarkChannelClosed,
|
||||
},
|
||||
{
|
||||
name: "rogue updates",
|
||||
run: testRogueUpdates,
|
||||
},
|
||||
}
|
||||
|
||||
for _, database := range dbs {
|
||||
|
@ -1066,6 +1162,34 @@ func randCommittedUpdateForChannel(t *testing.T, chanID lnwire.ChannelID,
|
|||
}
|
||||
}
|
||||
|
||||
// randCommittedUpdateForChanWithHeight generates a random committed update for
|
||||
// the given channel ID using the given commit height.
|
||||
func randCommittedUpdateForChanWithHeight(t *testing.T, chanID lnwire.ChannelID,
|
||||
seqNum uint16, height uint64) *wtdb.CommittedUpdate {
|
||||
|
||||
t.Helper()
|
||||
|
||||
var hint blob.BreachHint
|
||||
_, err := io.ReadFull(crand.Reader, hint[:])
|
||||
require.NoError(t, err)
|
||||
|
||||
encBlob := make([]byte, blob.Size(blob.FlagCommitOutputs.Type()))
|
||||
_, err = io.ReadFull(crand.Reader, encBlob)
|
||||
require.NoError(t, err)
|
||||
|
||||
return &wtdb.CommittedUpdate{
|
||||
SeqNum: seqNum,
|
||||
CommittedUpdateBody: wtdb.CommittedUpdateBody{
|
||||
BackupID: wtdb.BackupID{
|
||||
ChanID: chanID,
|
||||
CommitHeight: height,
|
||||
},
|
||||
Hint: hint,
|
||||
EncryptedBlob: encBlob,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (h *clientDBHarness) randSession(t *testing.T,
|
||||
towerID wtdb.TowerID, maxUpdates uint16) *wtdb.ClientSession {
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue