lnd/watchtower/wtclient/session_queue.go
2019-06-28 16:10:41 -07:00

667 lines
20 KiB
Go

package wtclient
import (
"container/list"
"fmt"
"sync"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/watchtower/wtdb"
"github.com/lightningnetwork/lnd/watchtower/wtserver"
"github.com/lightningnetwork/lnd/watchtower/wtwire"
)
// retryInterval is the default duration we will wait between attempting to
// connect back out to a tower if the prior state update failed.
const retryInterval = 2 * time.Second
// reserveStatus is an enum that signals how full a particular session is.
type reserveStatus uint8
const (
// reserveAvailable indicates that the session has space for at least
// one more backup.
reserveAvailable reserveStatus = iota
// reserveExhausted indicates that all slots in the session have been
// allocated.
reserveExhausted
)
// sessionQueueConfig bundles the resources required by the sessionQueue to
// perform its duties. All entries MUST be non-nil.
type sessionQueueConfig struct {
// ClientSession provides access to the negotiated session parameters
// and updating its persistent storage.
ClientSession *wtdb.ClientSession
// ChainHash identifies the chain for which the session's justice
// transactions are targeted.
ChainHash chainhash.Hash
// Dial allows the client to dial the tower using it's public key and
// net address.
Dial func(*btcec.PrivateKey,
*lnwire.NetAddress) (wtserver.Peer, error)
// SendMessage encodes, encrypts, and writes a message to the given peer.
SendMessage func(wtserver.Peer, wtwire.Message) error
// ReadMessage receives, decypts, and decodes a message from the given
// peer.
ReadMessage func(wtserver.Peer) (wtwire.Message, error)
// Signer facilitates signing of inputs, used to construct the witnesses
// for justice transaction inputs.
Signer input.Signer
// DB provides access to the client's stable storage.
DB DB
// MinBackoff defines the initial backoff applied by the session
// queue before reconnecting to the tower after a failed or partially
// successful batch is sent. Subsequent backoff durations will grow
// exponentially up until MaxBackoff.
MinBackoff time.Duration
// MaxBackoff defines the maximum backoff applied by the session
// queue before reconnecting to the tower after a failed or partially
// successful batch is sent. If the exponential backoff produces a
// timeout greater than this value, the backoff duration will be clamped
// to MaxBackoff.
MaxBackoff time.Duration
}
// sessionQueue implements a reliable queue that will encrypt and send accepted
// backups to the watchtower specified in the config's ClientSession. Calling
// Quit will attempt to perform a clean shutdown by receiving an ACK from the
// tower for all pending backups before exiting. The clean shutdown can be
// aborted by using ForceQuit, which will attempt to shutdown the queue
// immediately.
type sessionQueue struct {
started sync.Once
stopped sync.Once
forced sync.Once
cfg *sessionQueueConfig
commitQueue *list.List
pendingQueue *list.List
queueMtx sync.Mutex
queueCond *sync.Cond
localInit *wtwire.Init
towerAddr *lnwire.NetAddress
seqNum uint16
retryBackoff time.Duration
quit chan struct{}
forceQuit chan struct{}
shutdown chan struct{}
}
// newSessionQueue intiializes a fresh sessionQueue.
func newSessionQueue(cfg *sessionQueueConfig) *sessionQueue {
localInit := wtwire.NewInitMessage(
lnwire.NewRawFeatureVector(wtwire.AltruistSessionsRequired),
cfg.ChainHash,
)
towerAddr := &lnwire.NetAddress{
IdentityKey: cfg.ClientSession.Tower.IdentityKey,
Address: cfg.ClientSession.Tower.Addresses[0],
}
sq := &sessionQueue{
cfg: cfg,
commitQueue: list.New(),
pendingQueue: list.New(),
localInit: localInit,
towerAddr: towerAddr,
seqNum: cfg.ClientSession.SeqNum,
retryBackoff: cfg.MinBackoff,
quit: make(chan struct{}),
forceQuit: make(chan struct{}),
shutdown: make(chan struct{}),
}
sq.queueCond = sync.NewCond(&sq.queueMtx)
// The database should return them in sorted order, and session queue's
// sequence number will be equal to that of the last committed update.
for _, update := range sq.cfg.ClientSession.CommittedUpdates {
sq.commitQueue.PushBack(update)
}
return sq
}
// Start idempotently starts the sessionQueue so that it can begin accepting
// backups.
func (q *sessionQueue) Start() {
q.started.Do(func() {
// TODO(conner): load prior committed state updates from disk an
// populate in queue.
go q.sessionManager()
})
}
// Stop idempotently stops the sessionQueue by initiating a clean shutdown that
// will clear all pending tasks in the queue before returning to the caller.
func (q *sessionQueue) Stop() {
q.stopped.Do(func() {
log.Debugf("SessionQueue(%s) stopping ...", q.ID())
close(q.quit)
q.signalUntilShutdown()
// Skip log if we also force quit.
select {
case <-q.forceQuit:
return
default:
}
log.Debugf("SessionQueue(%s) stopped", q.ID())
})
}
// ForceQuit idempotently aborts any clean shutdown in progress and returns to
// he caller after all lingering goroutines have spun down.
func (q *sessionQueue) ForceQuit() {
q.forced.Do(func() {
log.Infof("SessionQueue(%s) force quitting...", q.ID())
close(q.forceQuit)
q.signalUntilShutdown()
log.Infof("SessionQueue(%s) force quit", q.ID())
})
}
// ID returns the wtdb.SessionID for the queue, which can be used to uniquely
// identify this a particular queue.
func (q *sessionQueue) ID() *wtdb.SessionID {
return &q.cfg.ClientSession.ID
}
// AcceptTask attempts to queue a backupTask for delivery to the sessionQueue's
// tower. The session will only be accepted if the queue is not already
// exhausted and the task is successfully bound to the ClientSession.
func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
q.queueCond.L.Lock()
numPending := uint32(q.pendingQueue.Len())
maxUpdates := q.cfg.ClientSession.Policy.MaxUpdates
log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+
"pending=%d max-updates=%d",
q.ID(), task.id, q.seqNum, numPending, maxUpdates)
// Examine the current reserve status of the session queue.
curStatus := q.reserveStatus()
switch curStatus {
// The session queue is exhausted, and cannot accept the task because it
// is full. Reject the task such that it can be tried against a
// different session.
case reserveExhausted:
q.queueCond.L.Unlock()
return curStatus, false
// The session queue is not exhausted. Compute the sweep and reward
// outputs as a function of the session parameters. If the outputs are
// dusty or uneconomical to backup, the task is rejected and will not be
// tried again.
//
// TODO(conner): queue backups and retry with different session params.
case reserveAvailable:
err := task.bindSession(&q.cfg.ClientSession.ClientSessionBody)
if err != nil {
q.queueCond.L.Unlock()
log.Debugf("SessionQueue(%s) rejected %v: %v ",
q.ID(), task.id, err)
return curStatus, false
}
}
// The sweep and reward outputs satisfy the session's policy, queue the
// task for final signing and delivery.
q.pendingQueue.PushBack(task)
// Finally, compute the session's *new* reserve status. This will be
// used by the client to determine if it can continue using this session
// queue, or if it should negotiate a new one.
newStatus := q.reserveStatus()
q.queueCond.L.Unlock()
q.queueCond.Signal()
return newStatus, true
}
// sessionManager is the primary event loop for the sessionQueue, and is
// responsible for encrypting and sending accepted tasks to the tower.
func (q *sessionQueue) sessionManager() {
defer close(q.shutdown)
for {
q.queueCond.L.Lock()
for q.commitQueue.Len() == 0 &&
q.pendingQueue.Len() == 0 {
q.queueCond.Wait()
select {
case <-q.quit:
if q.commitQueue.Len() == 0 &&
q.pendingQueue.Len() == 0 {
q.queueCond.L.Unlock()
return
}
case <-q.forceQuit:
q.queueCond.L.Unlock()
return
default:
}
}
q.queueCond.L.Unlock()
// Exit immediately if a force quit has been requested. If the
// either of the queues still has state updates to send to the
// tower, we may never exit in the above case if we are unable
// to reach the tower for some reason.
select {
case <-q.forceQuit:
return
default:
}
// Initiate a new connection to the watchtower and attempt to
// drain all pending tasks.
q.drainBackups()
}
}
// drainBackups attempts to send all pending updates in the queue to the tower.
func (q *sessionQueue) drainBackups() {
// First, check that we are able to dial this session's tower.
conn, err := q.cfg.Dial(q.cfg.ClientSession.SessionPrivKey, q.towerAddr)
if err != nil {
log.Errorf("SessionQueue(%s) unable to dial tower at %v: %v",
q.ID(), q.towerAddr, err)
q.increaseBackoff()
select {
case <-time.After(q.retryBackoff):
case <-q.forceQuit:
}
return
}
defer conn.Close()
// Begin draining the queue of pending state updates. Before the first
// update is sent, we will precede it with an Init message. If the first
// is successful, subsequent updates can be streamed without sending an
// Init.
for sendInit := true; ; sendInit = false {
// Generate the next state update to upload to the tower. This
// method will first proceed in dequeueing committed updates
// before attempting to dequeue any pending updates.
stateUpdate, isPending, backupID, err := q.nextStateUpdate()
if err != nil {
log.Errorf("SessionQueue(%v) unable to get next state "+
"update: %v", q.ID(), err)
return
}
// Now, send the state update to the tower and wait for a reply.
err = q.sendStateUpdate(
conn, stateUpdate, q.localInit, sendInit, isPending,
)
if err != nil {
log.Errorf("SessionQueue(%s) unable to send state "+
"update: %v", q.ID(), err)
q.increaseBackoff()
select {
case <-time.After(q.retryBackoff):
case <-q.forceQuit:
}
return
}
log.Infof("SessionQueue(%s) uploaded %v seqnum=%d",
q.ID(), backupID, stateUpdate.SeqNum)
// If the last task was backed up successfully, we'll exit and
// continue once more tasks are added to the queue. We'll also
// clear any accumulated backoff as this batch was able to be
// sent reliably.
if stateUpdate.IsComplete == 1 {
q.resetBackoff()
return
}
// Always apply a small delay between sends, which makes the
// unit tests more reliable. If we were requested to back off,
// when we will do so.
select {
case <-time.After(time.Millisecond):
case <-q.forceQuit:
return
}
}
}
// nextStateUpdate returns the next wtwire.StateUpdate to upload to the tower.
// If any committed updates are present, this method will reconstruct the state
// update from the committed update using the current last applied value found
// in the database. Otherwise, it will select the next pending update, craft the
// payload, and commit an update before returning the state update to send. The
// boolean value in the response is true if the state update is taken from the
// pending queue, allowing the caller to remove the update from either the
// commit or pending queue if the update is successfully acked.
func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool,
wtdb.BackupID, error) {
var (
seqNum uint16
update wtdb.CommittedUpdate
isLast bool
isPending bool
)
q.queueCond.L.Lock()
switch {
// If the commit queue is non-empty, parse the next committed update.
case q.commitQueue.Len() > 0:
next := q.commitQueue.Front()
update = next.Value.(wtdb.CommittedUpdate)
seqNum = update.SeqNum
// If this is the last item in the commit queue and no items
// exist in the pending queue, we will use the IsComplete flag
// in the StateUpdate to signal that the tower can release the
// connection after replying to free up resources.
isLast = q.commitQueue.Len() == 1 && q.pendingQueue.Len() == 0
q.queueCond.L.Unlock()
log.Debugf("SessionQueue(%s) reprocessing committed state "+
"update for %v seqnum=%d",
q.ID(), update.BackupID, seqNum)
// Otherwise, craft and commit the next update from the pending queue.
default:
isPending = true
// Determine the current sequence number to apply for this
// pending update.
seqNum = q.seqNum + 1
// Obtain the next task from the queue.
next := q.pendingQueue.Front()
task := next.Value.(*backupTask)
// If this is the last item in the pending queue, we will use
// the IsComplete flag in the StateUpdate to signal that the
// tower can release the connection after replying to free up
// resources.
isLast = q.pendingQueue.Len() == 1
q.queueCond.L.Unlock()
hint, encBlob, err := task.craftSessionPayload(q.cfg.Signer)
if err != nil {
// TODO(conner): mark will not send
err := fmt.Errorf("unable to craft session payload: %v",
err)
return nil, false, wtdb.BackupID{}, err
}
// TODO(conner): special case other obscure errors
update = wtdb.CommittedUpdate{
SeqNum: seqNum,
CommittedUpdateBody: wtdb.CommittedUpdateBody{
BackupID: task.id,
Hint: hint,
EncryptedBlob: encBlob,
},
}
log.Debugf("SessionQueue(%s) committing state update "+
"%v seqnum=%d", q.ID(), update.BackupID, seqNum)
}
// Before sending the task to the tower, commit the state update
// to disk using the assigned sequence number. If this task has already
// been committed, the call will succeed and only be used for the
// purpose of obtaining the last applied value to send to the tower.
//
// This step ensures that if we crash before receiving an ack that we
// will retransmit the same update. If the tower successfully received
// the update from before, it will reply with an ACK regardless of what
// we send the next time. This step ensures that if we reliably send the
// same update for a given sequence number, to prevent us from thinking
// we backed up a state when we instead backed up another.
lastApplied, err := q.cfg.DB.CommitUpdate(q.ID(), &update)
if err != nil {
// TODO(conner): mark failed/reschedule
err := fmt.Errorf("unable to commit state update for "+
"%v seqnum=%d: %v", update.BackupID, seqNum, err)
return nil, false, wtdb.BackupID{}, err
}
stateUpdate := &wtwire.StateUpdate{
SeqNum: update.SeqNum,
LastApplied: lastApplied,
Hint: update.Hint,
EncryptedBlob: update.EncryptedBlob,
}
// Set the IsComplete flag if this is the last queued item.
if isLast {
stateUpdate.IsComplete = 1
}
return stateUpdate, isPending, update.BackupID, nil
}
// sendStateUpdate sends a wtwire.StateUpdate to the watchtower and processes
// the ACK before returning. If sendInit is true, this method will first send
// the localInit message and verify that the tower supports our required feature
// bits. And error is returned if any part of the send fails. The boolean return
// variable indicates whether or not we should back off before attempting to
// send the next state update.
func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
stateUpdate *wtwire.StateUpdate, localInit *wtwire.Init,
sendInit, isPending bool) error {
// If this is the first message being sent to the tower, we must send an
// Init message to establish that server supports the features we
// require.
if sendInit {
// Send Init to tower.
err := q.cfg.SendMessage(conn, q.localInit)
if err != nil {
return err
}
// Receive Init from tower.
remoteMsg, err := q.cfg.ReadMessage(conn)
if err != nil {
return err
}
remoteInit, ok := remoteMsg.(*wtwire.Init)
if !ok {
return fmt.Errorf("watchtower %s responded with %T "+
"to Init", q.towerAddr, remoteMsg)
}
// Validate Init.
err = q.localInit.CheckRemoteInit(
remoteInit, wtwire.FeatureNames,
)
if err != nil {
return err
}
}
// Send StateUpdate to tower.
err := q.cfg.SendMessage(conn, stateUpdate)
if err != nil {
return err
}
// Receive StateUpdate from tower.
remoteMsg, err := q.cfg.ReadMessage(conn)
if err != nil {
return err
}
stateUpdateReply, ok := remoteMsg.(*wtwire.StateUpdateReply)
if !ok {
return fmt.Errorf("watchtower %s responded with %T to "+
"StateUpdate", q.towerAddr, remoteMsg)
}
// Process the reply from the tower.
switch stateUpdateReply.Code {
// The tower reported a successful update, validate the response and
// record the last applied returned.
case wtwire.CodeOK:
// TODO(conner): handle other error cases properly, ban towers, etc.
default:
err := fmt.Errorf("received error code %v in "+
"StateUpdateReply for seqnum=%d",
stateUpdateReply.Code, stateUpdate.SeqNum)
log.Warnf("SessionQueue(%s) unable to upload state update to "+
"tower=%s: %v", q.ID(), q.towerAddr, err)
return err
}
lastApplied := stateUpdateReply.LastApplied
err = q.cfg.DB.AckUpdate(q.ID(), stateUpdate.SeqNum, lastApplied)
switch {
case err == wtdb.ErrUnallocatedLastApplied:
// TODO(conner): borked watchtower
err = fmt.Errorf("unable to ack seqnum=%d: %v",
stateUpdate.SeqNum, err)
log.Errorf("SessionQueue(%v) failed to ack update: %v", q.ID(), err)
return err
case err == wtdb.ErrLastAppliedReversion:
// TODO(conner): borked watchtower
err = fmt.Errorf("unable to ack seqnum=%d: %v",
stateUpdate.SeqNum, err)
log.Errorf("SessionQueue(%s) failed to ack update: %v",
q.ID(), err)
return err
case err != nil:
err = fmt.Errorf("unable to ack seqnum=%d: %v",
stateUpdate.SeqNum, err)
log.Errorf("SessionQueue(%s) failed to ack update: %v",
q.ID(), err)
return err
}
q.queueCond.L.Lock()
if isPending {
// If a pending update was successfully sent, increment the
// sequence number and remove the item from the queue. This
// ensures the total number of backups in the session remains
// unchanged, which maintains the external view of the session's
// reserve status.
q.seqNum++
q.pendingQueue.Remove(q.pendingQueue.Front())
} else {
// Otherwise, simply remove the update from the committed queue.
// This has no effect on the queues reserve status since the
// update had already been committed.
q.commitQueue.Remove(q.commitQueue.Front())
}
q.queueCond.L.Unlock()
return nil
}
// reserveStatus returns a reserveStatus indicating whether or not the
// sessionQueue can accept another task. reserveAvailable is returned when a
// task can be accepted, and reserveExhausted is returned if the all slots in
// the session have been allocated.
//
// NOTE: This method MUST be called with queueCond's exclusive lock held.
func (q *sessionQueue) reserveStatus() reserveStatus {
numPending := uint32(q.pendingQueue.Len())
maxUpdates := uint32(q.cfg.ClientSession.Policy.MaxUpdates)
if uint32(q.seqNum)+numPending < maxUpdates {
return reserveAvailable
}
return reserveExhausted
}
// resetBackoff returns the connection backoff the minimum configured backoff.
func (q *sessionQueue) resetBackoff() {
q.retryBackoff = q.cfg.MinBackoff
}
// increaseBackoff doubles the current connection backoff, clamping to the
// configured maximum backoff if it would exceed the limit.
func (q *sessionQueue) increaseBackoff() {
q.retryBackoff *= 2
if q.retryBackoff > q.cfg.MaxBackoff {
q.retryBackoff = q.cfg.MaxBackoff
}
}
// signalUntilShutdown strobes the sessionQueue's condition variable until the
// main event loop exits.
func (q *sessionQueue) signalUntilShutdown() {
for {
select {
case <-time.After(time.Millisecond):
q.queueCond.Signal()
case <-q.shutdown:
return
}
}
}
// sessionQueueSet maintains a mapping of SessionIDs to their corresponding
// sessionQueue.
type sessionQueueSet map[wtdb.SessionID]*sessionQueue
// Add inserts a sessionQueue into the sessionQueueSet.
func (s *sessionQueueSet) Add(sessionQueue *sessionQueue) {
(*s)[*sessionQueue.ID()] = sessionQueue
}
// ApplyAndWait executes the nil-adic function returned from getApply for each
// sessionQueue in the set in parallel, then waits for all of them to finish
// before returning to the caller.
func (s *sessionQueueSet) ApplyAndWait(getApply func(*sessionQueue) func()) {
var wg sync.WaitGroup
for _, sessionq := range *s {
wg.Add(1)
go func(sq *sessionQueue) {
defer wg.Done()
getApply(sq)()
}(sessionq)
}
wg.Wait()
}