mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-03-04 17:55:36 +01:00
chanbackup/pubsub: add method ManualUpdate
This method inserts channel updates and waits for them to be processed. It will be used to update channel.backup upon LND shutdown.
This commit is contained in:
parent
df84148ed2
commit
fb397c11f1
2 changed files with 105 additions and 6 deletions
|
@ -48,6 +48,19 @@ type ChannelEvent struct {
|
||||||
NewChans []ChannelWithAddrs
|
NewChans []ChannelWithAddrs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// manualUpdate holds a group of channel state updates and an error channel
|
||||||
|
// to send back an error happened upon update processing or file updating.
|
||||||
|
type manualUpdate struct {
|
||||||
|
// singles hold channels backups. They can be either new or known
|
||||||
|
// channels in the Swapper.
|
||||||
|
singles []Single
|
||||||
|
|
||||||
|
// errChan is the channel to send an error back. If the update handling
|
||||||
|
// and the subsequent file updating succeeds, nil is sent.
|
||||||
|
// The channel must have capacity of 1 to prevent Swapper blocking.
|
||||||
|
errChan chan error
|
||||||
|
}
|
||||||
|
|
||||||
// ChannelSubscription represents an intent to be notified of any updates to
|
// ChannelSubscription represents an intent to be notified of any updates to
|
||||||
// the primary channel state.
|
// the primary channel state.
|
||||||
type ChannelSubscription struct {
|
type ChannelSubscription struct {
|
||||||
|
@ -90,6 +103,8 @@ type SubSwapper struct {
|
||||||
// over.
|
// over.
|
||||||
chanEvents *ChannelSubscription
|
chanEvents *ChannelSubscription
|
||||||
|
|
||||||
|
manualUpdates chan manualUpdate
|
||||||
|
|
||||||
// keyRing is the main key ring that will allow us to pack the new
|
// keyRing is the main key ring that will allow us to pack the new
|
||||||
// multi backup.
|
// multi backup.
|
||||||
keyRing keychain.KeyRing
|
keyRing keychain.KeyRing
|
||||||
|
@ -126,11 +141,12 @@ func NewSubSwapper(startingChans []Single, chanNotifier ChannelNotifier,
|
||||||
}
|
}
|
||||||
|
|
||||||
return &SubSwapper{
|
return &SubSwapper{
|
||||||
backupState: backupState,
|
backupState: backupState,
|
||||||
chanEvents: chanEvents,
|
chanEvents: chanEvents,
|
||||||
keyRing: keyRing,
|
keyRing: keyRing,
|
||||||
Swapper: backupSwapper,
|
Swapper: backupSwapper,
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
|
manualUpdates: make(chan manualUpdate),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -168,6 +184,43 @@ func (s *SubSwapper) Stop() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ManualUpdate inserts/updates channel states into the swapper. The updates
|
||||||
|
// are processed in another goroutine. The method waits for the updates to be
|
||||||
|
// fully processed and the file to be updated on-disk before returning.
|
||||||
|
func (s *SubSwapper) ManualUpdate(singles []Single) error {
|
||||||
|
// Create the channel to send an error back. If the update handling
|
||||||
|
// and the subsequent file updating succeeds, nil is sent.
|
||||||
|
// The channel must have capacity of 1 to prevent Swapper blocking.
|
||||||
|
errChan := make(chan error, 1)
|
||||||
|
|
||||||
|
// Create the update object to insert into the processing loop.
|
||||||
|
update := manualUpdate{
|
||||||
|
singles: singles,
|
||||||
|
errChan: errChan,
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case s.manualUpdates <- update:
|
||||||
|
case <-s.quit:
|
||||||
|
return fmt.Errorf("swapper stopped when sending manual update")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for processing, block on errChan.
|
||||||
|
select {
|
||||||
|
case err := <-errChan:
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("processing of manual update "+
|
||||||
|
"failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-s.quit:
|
||||||
|
return fmt.Errorf("swapper stopped when waiting for outcome")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Success.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// updateBackupFile updates the backup file in place given the current state of
|
// updateBackupFile updates the backup file in place given the current state of
|
||||||
// the SubSwapper. We accept the set of channels that were closed between this
|
// the SubSwapper. We accept the set of channels that were closed between this
|
||||||
// update and the last to make sure we leave them out of our backup set union.
|
// update and the last to make sure we leave them out of our backup set union.
|
||||||
|
@ -294,13 +347,45 @@ func (s *SubSwapper) backupUpdater() {
|
||||||
"num_old_chans=%v, num_new_chans=%v",
|
"num_old_chans=%v, num_new_chans=%v",
|
||||||
oldStateSize, newStateSize)
|
oldStateSize, newStateSize)
|
||||||
|
|
||||||
// With out new state constructed, we'll, atomically
|
// Without new state constructed, we'll, atomically
|
||||||
// update the on-disk backup state.
|
// update the on-disk backup state.
|
||||||
if err := s.updateBackupFile(closedChans...); err != nil {
|
if err := s.updateBackupFile(closedChans...); err != nil {
|
||||||
log.Errorf("unable to update backup file: %v",
|
log.Errorf("unable to update backup file: %v",
|
||||||
err)
|
err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We received a manual update. Handle it and update the file.
|
||||||
|
case manualUpdate := <-s.manualUpdates:
|
||||||
|
oldStateSize := len(s.backupState)
|
||||||
|
|
||||||
|
// For all open channels, we'll create a new SCB given
|
||||||
|
// the required information.
|
||||||
|
for _, single := range manualUpdate.singles {
|
||||||
|
log.Debugf("Manual update of channel %v",
|
||||||
|
single.FundingOutpoint)
|
||||||
|
|
||||||
|
s.backupState[single.FundingOutpoint] = single
|
||||||
|
}
|
||||||
|
|
||||||
|
newStateSize := len(s.backupState)
|
||||||
|
|
||||||
|
log.Infof("Updating on-disk multi SCB backup: "+
|
||||||
|
"num_old_chans=%v, num_new_chans=%v",
|
||||||
|
oldStateSize, newStateSize)
|
||||||
|
|
||||||
|
// Without new state constructed, we'll, atomically
|
||||||
|
// update the on-disk backup state.
|
||||||
|
err := s.updateBackupFile()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("unable to update backup file: %v",
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the error (or nil) to the caller of
|
||||||
|
// ManualUpdate. The error channel must have capacity of
|
||||||
|
// 1 not to block here.
|
||||||
|
manualUpdate.errChan <- err
|
||||||
|
|
||||||
// TODO(roasbeef): refresh periodically on a time basis due to
|
// TODO(roasbeef): refresh periodically on a time basis due to
|
||||||
// possible addr changes from node
|
// possible addr changes from node
|
||||||
|
|
||||||
|
|
|
@ -277,4 +277,18 @@ func TestSubSwapperUpdater(t *testing.T) {
|
||||||
// Verify that the new set of backups, now has one less after the
|
// Verify that the new set of backups, now has one less after the
|
||||||
// sub-swapper switches the new set with the old.
|
// sub-swapper switches the new set with the old.
|
||||||
assertExpectedBackupSwap(t, swapper, subSwapper, keyRing, backupSet)
|
assertExpectedBackupSwap(t, swapper, subSwapper, keyRing, backupSet)
|
||||||
|
|
||||||
|
// Check ManualUpdate method.
|
||||||
|
channel, err := genRandomOpenChannelShell()
|
||||||
|
require.NoError(t, err)
|
||||||
|
single := NewSingle(channel, nil)
|
||||||
|
backupSet[channel.FundingOutpoint] = single
|
||||||
|
require.NoError(t, subSwapper.ManualUpdate([]Single{single}))
|
||||||
|
|
||||||
|
// Verify that the state of the backup is as expected.
|
||||||
|
assertExpectedBackupSwap(t, swapper, subSwapper, keyRing, backupSet)
|
||||||
|
|
||||||
|
// Check the case ManualUpdate returns an error.
|
||||||
|
swapper.fail = true
|
||||||
|
require.Error(t, subSwapper.ManualUpdate([]Single{single}))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue