lnd/htlcswitch/circuit_map.go
eugene f7b3da4bb2
htlcswitch+lntest: modify Switch to persist resolution messages
Include unit tests for the Switch, and integration tests that
exercise the persistence logic.
2022-04-29 16:40:19 -04:00

1213 lines
36 KiB
Go

package htlcswitch
import (
"bytes"
"fmt"
"sync"
"github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
)
var (
// ErrCorruptedCircuitMap indicates that the on-disk bucketing structure
// has altered since the circuit map instance was initialized.
ErrCorruptedCircuitMap = errors.New("circuit map has been corrupted")
// ErrCircuitNotInHashIndex indicates that a particular circuit did not
// appear in the in-memory hash index.
ErrCircuitNotInHashIndex = errors.New("payment circuit not found in " +
"hash index")
// ErrUnknownCircuit signals that circuit could not be removed from the
// map because it was not found.
ErrUnknownCircuit = errors.New("unknown payment circuit")
// ErrCircuitClosing signals that an htlc has already closed this
// circuit in-memory.
ErrCircuitClosing = errors.New("circuit has already been closed")
// ErrDuplicateCircuit signals that this circuit was previously
// added.
ErrDuplicateCircuit = errors.New("duplicate circuit add")
// ErrUnknownKeystone signals that no circuit was found using the
// outgoing circuit key.
ErrUnknownKeystone = errors.New("unknown circuit keystone")
// ErrDuplicateKeystone signals that this circuit was previously
// assigned a keystone.
ErrDuplicateKeystone = errors.New("cannot add duplicate keystone")
)
// CircuitModifier is a common interface used by channel links to modify the
// contents of the circuit map maintained by the switch.
type CircuitModifier interface {
// OpenCircuits preemptively records a batch keystones that will mark
// currently pending circuits as open. These changes can be rolled back
// on restart if the outgoing Adds do not make it into a commitment
// txn.
OpenCircuits(...Keystone) error
// TrimOpenCircuits removes a channel's open channels with htlc indexes
// above `start`.
TrimOpenCircuits(chanID lnwire.ShortChannelID, start uint64) error
// DeleteCircuits removes the incoming circuit key to remove all
// persistent references to a circuit. Returns a ErrUnknownCircuit if
// any of the incoming keys are not known.
DeleteCircuits(inKeys ...CircuitKey) error
}
// CircuitLookup is a common interface used to lookup information that is stored
// in the circuit map.
type CircuitLookup interface {
// LookupCircuit queries the circuit map for the circuit identified by
// inKey.
LookupCircuit(inKey CircuitKey) *PaymentCircuit
// LookupOpenCircuit queries the circuit map for a circuit identified
// by its outgoing circuit key.
LookupOpenCircuit(outKey CircuitKey) *PaymentCircuit
}
// CircuitFwdActions represents the forwarding decision made by the circuit
// map, and is returned from CommitCircuits. The sequence of circuits provided
// to CommitCircuits is split into three sub-sequences, allowing the caller to
// do an in-order scan, comparing the head of each subsequence, to determine
// the decision made by the circuit map.
type CircuitFwdActions struct {
// Adds is the subsequence of circuits that were successfully committed
// in the circuit map.
Adds []*PaymentCircuit
// Drops is the subsequence of circuits for which no action should be
// done.
Drops []*PaymentCircuit
// Fails is the subsequence of circuits that should be failed back by
// the calling link.
Fails []*PaymentCircuit
}
// CircuitMap is an interface for managing the construction and teardown of
// payment circuits used by the switch.
type CircuitMap interface {
CircuitModifier
CircuitLookup
// CommitCircuits attempts to add the given circuits to the circuit
// map. The list of circuits is split into three distinct
// sub-sequences, corresponding to adds, drops, and fails. Adds should
// be forwarded to the switch, while fails should be failed back
// locally within the calling link.
CommitCircuits(circuit ...*PaymentCircuit) (*CircuitFwdActions, error)
// CloseCircuit marks the circuit identified by `outKey` as closing
// in-memory, which prevents duplicate settles/fails from completing an
// open circuit twice.
CloseCircuit(outKey CircuitKey) (*PaymentCircuit, error)
// FailCircuit is used by locally failed HTLCs to mark the circuit
// identified by `inKey` as closing in-memory, which prevents duplicate
// settles/fails from being accepted for the same circuit.
FailCircuit(inKey CircuitKey) (*PaymentCircuit, error)
// LookupByPaymentHash queries the circuit map and returns all open
// circuits that use the given payment hash.
LookupByPaymentHash(hash [32]byte) []*PaymentCircuit
// NumPending returns the total number of active circuits added by
// CommitCircuits.
NumPending() int
// NumOpen returns the number of circuits with HTLCs that have been
// forwarded via an outgoing link.
NumOpen() int
}
var (
// circuitAddKey is the key used to retrieve the bucket containing
// payment circuits. A circuit records information about how to return
// a packet to the source link, potentially including an error
// encrypter for applying this hop's encryption to the payload in the
// reverse direction.
//
// Bucket hierarchy:
//
// circuitAddKey(root-bucket)
// |
// |-- <incoming-circuit-key>: <encoded bytes of PaymentCircuit>
// |-- <incoming-circuit-key>: <encoded bytes of PaymentCircuit>
// |
// ...
//
circuitAddKey = []byte("circuit-adds")
// circuitKeystoneKey is used to retrieve the bucket containing circuit
// keystones, which are set in place once a forwarded packet is
// assigned an index on an outgoing commitment txn.
//
// Bucket hierarchy:
//
// circuitKeystoneKey(root-bucket)
// |
// |-- <outgoing-circuit-key>: <incoming-circuit-key>
// |-- <outgoing-circuit-key>: <incoming-circuit-key>
// |
// ...
//
circuitKeystoneKey = []byte("circuit-keystones")
)
// circuitMap is a data structure that implements thread safe, persistent
// storage of circuit routing information. The switch consults a circuit map to
// determine where to forward returning HTLC update messages. Circuits are
// always identifiable by their incoming CircuitKey, in addition to their
// outgoing CircuitKey if the circuit is fully-opened.
type circuitMap struct {
cfg *CircuitMapConfig
mtx sync.RWMutex
// pending is an in-memory mapping of all half payment circuits, and is
// kept in sync with the on-disk contents of the circuit map.
pending map[CircuitKey]*PaymentCircuit
// opened is an in-memory mapping of all full payment circuits, which
// is also synchronized with the persistent state of the circuit map.
opened map[CircuitKey]*PaymentCircuit
// closed is an in-memory set of circuits for which the switch has
// received a settle or fail. This precedes the actual deletion of a
// circuit from disk.
closed map[CircuitKey]struct{}
// hashIndex is a volatile index that facilitates fast queries by
// payment hash against the contents of circuits. This index can be
// reconstructed entirely from the set of persisted full circuits on
// startup.
hashIndex map[[32]byte]map[CircuitKey]struct{}
}
// CircuitMapConfig houses the critical interfaces and references necessary to
// parameterize an instance of circuitMap.
type CircuitMapConfig struct {
// DB provides the persistent storage engine for the circuit map.
DB kvdb.Backend
// FetchAllOpenChannels is a function that fetches all currently open
// channels from the channel database.
FetchAllOpenChannels func() ([]*channeldb.OpenChannel, error)
// FetchClosedChannels is a function that fetches all closed channels
// from the channel database.
FetchClosedChannels func(
pendingOnly bool) ([]*channeldb.ChannelCloseSummary, error)
// ExtractErrorEncrypter derives the shared secret used to encrypt
// errors from the obfuscator's ephemeral public key.
ExtractErrorEncrypter hop.ErrorEncrypterExtracter
// CheckResolutionMsg checks whether a given resolution message exists
// for the passed CircuitKey.
CheckResolutionMsg func(outKey *CircuitKey) error
}
// NewCircuitMap creates a new instance of the circuitMap.
func NewCircuitMap(cfg *CircuitMapConfig) (CircuitMap, error) {
cm := &circuitMap{
cfg: cfg,
}
// Initialize the on-disk buckets used by the circuit map.
if err := cm.initBuckets(); err != nil {
return nil, err
}
// Delete old circuits and keystones of closed channels.
if err := cm.cleanClosedChannels(); err != nil {
return nil, err
}
// Load any previously persisted circuit into back into memory.
if err := cm.restoreMemState(); err != nil {
return nil, err
}
// Trim any keystones that were not committed in an outgoing commit txn.
//
// NOTE: This operation will be applied to the persistent state of all
// active channels. Therefore, it must be called before any links are
// created to avoid interfering with normal operation.
if err := cm.trimAllOpenCircuits(); err != nil {
return nil, err
}
return cm, nil
}
// initBuckets ensures that the primary buckets used by the circuit are
// initialized so that we can assume their existence after startup.
func (cm *circuitMap) initBuckets() error {
return kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error {
_, err := tx.CreateTopLevelBucket(circuitKeystoneKey)
if err != nil {
return err
}
_, err = tx.CreateTopLevelBucket(circuitAddKey)
return err
}, func() {})
}
// cleanClosedChannels deletes all circuits and keystones related to closed
// channels. It first reads all the closed channels and caches the ShortChanIDs
// into a map for fast lookup. Then it iterates the circuit bucket and keystone
// bucket and deletes items whose ChanID matches the ShortChanID.
//
// NOTE: this operation can also be built into restoreMemState since the latter
// already opens and iterates the two root buckets, circuitAddKey and
// circuitKeystoneKey. Depending on the size of the buckets, this marginal gain
// may be worth investigating. Atm, for clarity, this operation is wrapped into
// its own function.
func (cm *circuitMap) cleanClosedChannels() error {
log.Infof("Cleaning circuits from disk for closed channels")
// closedChanIDSet stores the short channel IDs for closed channels.
closedChanIDSet := make(map[lnwire.ShortChannelID]struct{})
// circuitKeySet stores the incoming circuit keys of the payment
// circuits that need to be deleted.
circuitKeySet := make(map[CircuitKey]struct{})
// keystoneKeySet stores the outgoing keys of the keystones that need
// to be deleted.
keystoneKeySet := make(map[CircuitKey]struct{})
// isClosedChannel is a helper closure that returns a bool indicating
// the chanID belongs to a closed channel.
isClosedChannel := func(chanID lnwire.ShortChannelID) bool {
// Skip if the channel ID is zero value. This has the effect
// that a zero value incoming or outgoing key will never be
// matched and its corresponding circuits or keystones are not
// deleted.
if chanID.ToUint64() == 0 {
return false
}
_, ok := closedChanIDSet[chanID]
return ok
}
// Find closed channels and cache their ShortChannelIDs into a map.
// This map will be used for looking up relative circuits and keystones.
closedChannels, err := cm.cfg.FetchClosedChannels(false)
if err != nil {
return err
}
for _, closedChannel := range closedChannels {
// Skip if the channel close is pending.
if closedChannel.IsPending {
continue
}
closedChanIDSet[closedChannel.ShortChanID] = struct{}{}
}
log.Debugf("Found %v closed channels", len(closedChanIDSet))
// Exit early if there are no closed channels.
if len(closedChanIDSet) == 0 {
log.Infof("Finished cleaning: no closed channels found, " +
"no actions taken.",
)
return nil
}
// Find the payment circuits and keystones that need to be deleted.
if err := kvdb.View(cm.cfg.DB, func(tx kvdb.RTx) error {
circuitBkt := tx.ReadBucket(circuitAddKey)
if circuitBkt == nil {
return ErrCorruptedCircuitMap
}
keystoneBkt := tx.ReadBucket(circuitKeystoneKey)
if keystoneBkt == nil {
return ErrCorruptedCircuitMap
}
// If a circuit's incoming/outgoing key prefix matches the
// ShortChanID, it will be deleted. However, if the ShortChanID
// of the incoming key is zero, the circuit will be kept as it
// indicates a locally initiated payment.
if err := circuitBkt.ForEach(func(_, v []byte) error {
circuit, err := cm.decodeCircuit(v)
if err != nil {
return err
}
// Check if the incoming channel ID can be found in the
// closed channel ID map.
if !isClosedChannel(circuit.Incoming.ChanID) {
return nil
}
circuitKeySet[circuit.Incoming] = struct{}{}
return nil
}); err != nil {
return err
}
// If a keystone's InKey or OutKey matches the short channel id
// in the closed channel ID map, it will be deleted.
err := keystoneBkt.ForEach(func(k, v []byte) error {
var (
inKey CircuitKey
outKey CircuitKey
)
// Decode the incoming and outgoing circuit keys.
if err := inKey.SetBytes(v); err != nil {
return err
}
if err := outKey.SetBytes(k); err != nil {
return err
}
// Check if the incoming channel ID can be found in the
// closed channel ID map.
if isClosedChannel(inKey.ChanID) {
// If the incoming channel is closed, we can
// skip checking on outgoing channel ID because
// this keystone will be deleted.
keystoneKeySet[outKey] = struct{}{}
// Technically the incoming keys found in
// keystone bucket should be a subset of
// circuit bucket. So a previous loop should
// have this inKey put inside circuitAddKey map
// already. We do this again to be sure the
// circuits are properly cleaned. Even this
// inKey doesn't exist in circuit bucket, we
// are fine as db deletion is a noop.
circuitKeySet[inKey] = struct{}{}
return nil
}
// Check if the outgoing channel ID can be found in the
// closed channel ID map. Notice that we need to store
// the outgoing key because it's used for db query.
//
// NOTE: We skip this if a resolution message can be
// found under the outKey. This means that there is an
// existing resolution message(s) that need to get to
// the incoming links.
if isClosedChannel(outKey.ChanID) {
// Check the resolution message store. A return
// value of nil means we need to skip deleting
// these circuits.
if cm.cfg.CheckResolutionMsg(&outKey) == nil {
return nil
}
keystoneKeySet[outKey] = struct{}{}
// Also update circuitKeySet to mark the
// payment circuit needs to be deleted.
circuitKeySet[inKey] = struct{}{}
}
return nil
})
return err
}, func() {
// Reset the sets.
circuitKeySet = make(map[CircuitKey]struct{})
keystoneKeySet = make(map[CircuitKey]struct{})
}); err != nil {
return err
}
log.Debugf("To be deleted: num_circuits=%v, num_keystones=%v",
len(circuitKeySet), len(keystoneKeySet),
)
numCircuitsDeleted := 0
numKeystonesDeleted := 0
// Delete all the circuits and keystones for closed channels.
if err := kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error {
circuitBkt := tx.ReadWriteBucket(circuitAddKey)
if circuitBkt == nil {
return ErrCorruptedCircuitMap
}
keystoneBkt := tx.ReadWriteBucket(circuitKeystoneKey)
if keystoneBkt == nil {
return ErrCorruptedCircuitMap
}
// Delete the circuit.
for inKey := range circuitKeySet {
if err := circuitBkt.Delete(inKey.Bytes()); err != nil {
return err
}
numCircuitsDeleted++
}
// Delete the keystone using the outgoing key.
for outKey := range keystoneKeySet {
err := keystoneBkt.Delete(outKey.Bytes())
if err != nil {
return err
}
numKeystonesDeleted++
}
return nil
}, func() {}); err != nil {
numCircuitsDeleted = 0
numKeystonesDeleted = 0
return err
}
log.Infof("Finished cleaning: num_closed_channel=%v, "+
"num_circuits=%v, num_keystone=%v",
len(closedChannels), numCircuitsDeleted, numKeystonesDeleted,
)
return nil
}
// restoreMemState loads the contents of the half circuit and full circuit
// buckets from disk and reconstructs the in-memory representation of the
// circuit map. Afterwards, the state of the hash index is reconstructed using
// the recovered set of full circuits. This method will also remove any stray
// keystones, which are those that appear fully-opened, but have no pending
// circuit related to the intended incoming link.
func (cm *circuitMap) restoreMemState() error {
log.Infof("Restoring in-memory circuit state from disk")
var (
opened map[CircuitKey]*PaymentCircuit
pending map[CircuitKey]*PaymentCircuit
)
if err := kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error {
// Restore any of the circuits persisted in the circuit bucket
// back into memory.
circuitBkt := tx.ReadWriteBucket(circuitAddKey)
if circuitBkt == nil {
return ErrCorruptedCircuitMap
}
if err := circuitBkt.ForEach(func(_, v []byte) error {
circuit, err := cm.decodeCircuit(v)
if err != nil {
return err
}
circuit.LoadedFromDisk = true
pending[circuit.Incoming] = circuit
return nil
}); err != nil {
return err
}
// Furthermore, load the keystone bucket and resurrect the
// keystones used in any open circuits.
keystoneBkt := tx.ReadWriteBucket(circuitKeystoneKey)
if keystoneBkt == nil {
return ErrCorruptedCircuitMap
}
var strayKeystones []Keystone
if err := keystoneBkt.ForEach(func(k, v []byte) error {
var (
inKey CircuitKey
outKey = &CircuitKey{}
)
// Decode the incoming and outgoing circuit keys.
if err := inKey.SetBytes(v); err != nil {
return err
}
if err := outKey.SetBytes(k); err != nil {
return err
}
// Retrieve the pending circuit, set its keystone, then
// add it to the opened map.
circuit, ok := pending[inKey]
if ok {
circuit.Outgoing = outKey
opened[*outKey] = circuit
} else {
strayKeystones = append(strayKeystones, Keystone{
InKey: inKey,
OutKey: *outKey,
})
}
return nil
}); err != nil {
return err
}
// If any stray keystones were found, we'll proceed to prune
// them from the circuit map's persistent storage. This may
// manifest on older nodes that had updated channels before
// their short channel id was set properly. We believe this
// issue has been fixed, though this will allow older nodes to
// recover without additional intervention.
for _, strayKeystone := range strayKeystones {
// As a precaution, we will only cleanup keystones
// related to locally-initiated payments. If a
// documented case of stray keystones emerges for
// forwarded payments, this check should be removed, but
// with extreme caution.
if strayKeystone.OutKey.ChanID != hop.Source {
continue
}
log.Infof("Removing stray keystone: %v", strayKeystone)
err := keystoneBkt.Delete(strayKeystone.OutKey.Bytes())
if err != nil {
return err
}
}
return nil
}, func() {
opened = make(map[CircuitKey]*PaymentCircuit)
pending = make(map[CircuitKey]*PaymentCircuit)
}); err != nil {
return err
}
cm.pending = pending
cm.opened = opened
cm.closed = make(map[CircuitKey]struct{})
log.Infof("Payment circuits loaded: num_pending=%v, num_open=%v",
len(pending), len(opened))
// Finally, reconstruct the hash index by running through our set of
// open circuits.
cm.hashIndex = make(map[[32]byte]map[CircuitKey]struct{})
for _, circuit := range opened {
cm.addCircuitToHashIndex(circuit)
}
return nil
}
// decodeCircuit reconstructs an in-memory payment circuit from a byte slice.
// The byte slice is assumed to have been generated by the circuit's Encode
// method. If the decoding is successful, the onion obfuscator will be
// reextracted, since it is not stored in plaintext on disk.
func (cm *circuitMap) decodeCircuit(v []byte) (*PaymentCircuit, error) {
var circuit = &PaymentCircuit{}
circuitReader := bytes.NewReader(v)
if err := circuit.Decode(circuitReader); err != nil {
return nil, err
}
// If the error encrypter is nil, this is locally-source payment so
// there is no encrypter.
if circuit.ErrorEncrypter == nil {
return circuit, nil
}
// Otherwise, we need to reextract the encrypter, so that the shared
// secret is rederived from what was decoded.
err := circuit.ErrorEncrypter.Reextract(
cm.cfg.ExtractErrorEncrypter,
)
if err != nil {
return nil, err
}
return circuit, nil
}
// trimAllOpenCircuits reads the set of active channels from disk and trims
// keystones for any non-pending channels using the next unallocated htlc index.
// This method is intended to be called on startup. Each link will also trim
// it's own circuits upon startup.
//
// NOTE: This operation will be applied to the persistent state of all active
// channels. Therefore, it must be called before any links are created to avoid
// interfering with normal operation.
func (cm *circuitMap) trimAllOpenCircuits() error {
activeChannels, err := cm.cfg.FetchAllOpenChannels()
if err != nil {
return err
}
for _, activeChannel := range activeChannels {
if activeChannel.IsPending {
continue
}
// First, skip any channels that have not been assigned their
// final channel identifier, otherwise we would try to trim
// htlcs belonging to the all-zero, hop.Source ID.
chanID := activeChannel.ShortChanID()
if chanID == hop.Source {
continue
}
// Next, retrieve the next unallocated htlc index, which bounds
// the cutoff of confirmed htlc indexes.
start, err := activeChannel.NextLocalHtlcIndex()
if err != nil {
return err
}
// Finally, remove all pending circuits above at or above the
// next unallocated local htlc indexes. This has the effect of
// reverting any circuits that have either not been locked in,
// or had not been included in a pending commitment.
err = cm.TrimOpenCircuits(chanID, start)
if err != nil {
return err
}
}
return nil
}
// TrimOpenCircuits removes a channel's keystones above the short chan id's
// highest committed htlc index. This has the effect of returning those
// circuits to a half-open state. Since opening of circuits is done in advance
// of actually committing the Add htlcs into a commitment txn, this allows
// circuits to be opened preemptively, since we can roll them back after any
// failures.
func (cm *circuitMap) TrimOpenCircuits(chanID lnwire.ShortChannelID,
start uint64) error {
log.Infof("Trimming open circuits for chan_id=%v, start_htlc_id=%v",
chanID, start)
var trimmedOutKeys []CircuitKey
// Scan forward from the last unacked htlc id, stopping as soon as we
// don't find any more. Outgoing htlc id's must be assigned in order,
// so there should never be disjoint segments of keystones to trim.
cm.mtx.Lock()
for i := start; ; i++ {
outKey := CircuitKey{
ChanID: chanID,
HtlcID: i,
}
circuit, ok := cm.opened[outKey]
if !ok {
break
}
circuit.Outgoing = nil
delete(cm.opened, outKey)
trimmedOutKeys = append(trimmedOutKeys, outKey)
cm.removeCircuitFromHashIndex(circuit)
}
cm.mtx.Unlock()
if len(trimmedOutKeys) == 0 {
return nil
}
return kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error {
keystoneBkt := tx.ReadWriteBucket(circuitKeystoneKey)
if keystoneBkt == nil {
return ErrCorruptedCircuitMap
}
for _, outKey := range trimmedOutKeys {
err := keystoneBkt.Delete(outKey.Bytes())
if err != nil {
return err
}
}
return nil
}, func() {})
}
// LookupCircuit queries the circuit map for the circuit identified by its
// incoming circuit key. Returns nil if there is no such circuit.
func (cm *circuitMap) LookupCircuit(inKey CircuitKey) *PaymentCircuit {
cm.mtx.RLock()
defer cm.mtx.RUnlock()
return cm.pending[inKey]
}
// LookupOpenCircuit searches for the circuit identified by its outgoing circuit
// key.
func (cm *circuitMap) LookupOpenCircuit(outKey CircuitKey) *PaymentCircuit {
cm.mtx.RLock()
defer cm.mtx.RUnlock()
return cm.opened[outKey]
}
// LookupByPaymentHash looks up and returns any payment circuits with a given
// payment hash.
func (cm *circuitMap) LookupByPaymentHash(hash [32]byte) []*PaymentCircuit {
cm.mtx.RLock()
defer cm.mtx.RUnlock()
var circuits []*PaymentCircuit
if circuitSet, ok := cm.hashIndex[hash]; ok {
// Iterate over the outgoing circuit keys found with this hash,
// and retrieve the circuit from the opened map.
circuits = make([]*PaymentCircuit, 0, len(circuitSet))
for key := range circuitSet {
if circuit, ok := cm.opened[key]; ok {
circuits = append(circuits, circuit)
}
}
}
return circuits
}
// CommitCircuits accepts any number of circuits and persistently adds them to
// the switch's circuit map. The method returns a list of circuits that had not
// been seen prior by the switch. A link should only forward HTLCs corresponding
// to the returned circuits to the switch.
//
// NOTE: This method uses batched writes to improve performance, gains will only
// be realized if it is called concurrently from separate goroutines.
func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) (
*CircuitFwdActions, error) {
inKeys := make([]CircuitKey, 0, len(circuits))
for _, circuit := range circuits {
inKeys = append(inKeys, circuit.Incoming)
}
log.Tracef("Committing fresh circuits: %v", newLogClosure(func() string {
return spew.Sdump(inKeys)
}))
actions := &CircuitFwdActions{}
// If an empty list was passed, return early to avoid grabbing the lock.
if len(circuits) == 0 {
return actions, nil
}
// First, we reconcile the provided circuits with our set of pending
// circuits to construct a set of new circuits that need to be written
// to disk. The circuit's pointer is stored so that we only permit this
// exact circuit to be forwarded through the switch. If a circuit is
// already pending, the htlc will be reforwarded by the switch.
//
// NOTE: We track an additional addFails subsequence, which permits us
// to fail back all packets that weren't dropped if we encounter an
// error when committing the circuits.
cm.mtx.Lock()
var adds, drops, fails, addFails []*PaymentCircuit
for _, circuit := range circuits {
inKey := circuit.InKey()
if foundCircuit, ok := cm.pending[inKey]; ok {
switch {
// This circuit has a keystone, it's waiting for a
// response from the remote peer on the outgoing link.
// Drop it like it's hot, ensure duplicates get caught.
case foundCircuit.HasKeystone():
drops = append(drops, circuit)
// If no keystone is set and the switch has not been
// restarted, the corresponding packet should still be
// in the outgoing link's mailbox. It will be delivered
// if it comes online before the switch goes down.
//
// NOTE: Dropping here prevents a flapping, incoming
// link from failing a duplicate add while it is still
// in the server's memory mailboxes.
case !foundCircuit.LoadedFromDisk:
drops = append(drops, circuit)
// Otherwise, the in-mem packet has been lost due to a
// restart. It is now safe to send back a failure along
// the incoming link. The incoming link should be able
// detect and ignore duplicate packets of this type.
default:
fails = append(fails, circuit)
addFails = append(addFails, circuit)
}
continue
}
cm.pending[inKey] = circuit
adds = append(adds, circuit)
addFails = append(addFails, circuit)
}
cm.mtx.Unlock()
// If all circuits are dropped or failed, we are done.
if len(adds) == 0 {
actions.Drops = drops
actions.Fails = fails
return actions, nil
}
// Now, optimistically serialize the circuits to add.
var bs = make([]bytes.Buffer, len(adds))
for i, circuit := range adds {
if err := circuit.Encode(&bs[i]); err != nil {
actions.Drops = drops
actions.Fails = addFails
return actions, err
}
}
// Write the entire batch of circuits to the persistent circuit bucket
// using bolt's Batch write. This method must be called from multiple,
// distinct goroutines to have any impact on performance.
err := kvdb.Batch(cm.cfg.DB, func(tx kvdb.RwTx) error {
circuitBkt := tx.ReadWriteBucket(circuitAddKey)
if circuitBkt == nil {
return ErrCorruptedCircuitMap
}
for i, circuit := range adds {
inKeyBytes := circuit.InKey().Bytes()
circuitBytes := bs[i].Bytes()
err := circuitBkt.Put(inKeyBytes, circuitBytes)
if err != nil {
return err
}
}
return nil
})
// Return if the write succeeded.
if err == nil {
actions.Adds = adds
actions.Drops = drops
actions.Fails = fails
return actions, nil
}
// Otherwise, rollback the circuits added to the pending set if the
// write failed.
cm.mtx.Lock()
for _, circuit := range adds {
delete(cm.pending, circuit.InKey())
}
cm.mtx.Unlock()
// Since our write failed, we will return the dropped packets and mark
// all other circuits as failed.
actions.Drops = drops
actions.Fails = addFails
return actions, err
}
// Keystone is a tuple binding an incoming and outgoing CircuitKey. Keystones
// are preemptively written by an outgoing link before signing a new commitment
// state, and cements which HTLCs we are awaiting a response from a remote
// peer.
type Keystone struct {
InKey CircuitKey
OutKey CircuitKey
}
// String returns a human readable description of the Keystone.
func (k *Keystone) String() string {
return fmt.Sprintf("%s --> %s", k.InKey, k.OutKey)
}
// OpenCircuits sets the outgoing circuit key for the circuit identified by
// inKey, persistently marking the circuit as opened. After the changes have
// been persisted, the circuit map's in-memory indexes are updated so that this
// circuit can be queried using LookupByKeystone or LookupByPaymentHash.
func (cm *circuitMap) OpenCircuits(keystones ...Keystone) error {
if len(keystones) == 0 {
return nil
}
log.Tracef("Opening finalized circuits: %v", newLogClosure(func() string {
return spew.Sdump(keystones)
}))
// Check that all keystones correspond to committed-but-unopened
// circuits.
cm.mtx.RLock()
openedCircuits := make([]*PaymentCircuit, 0, len(keystones))
for _, ks := range keystones {
if _, ok := cm.opened[ks.OutKey]; ok {
cm.mtx.RUnlock()
return ErrDuplicateKeystone
}
circuit, ok := cm.pending[ks.InKey]
if !ok {
cm.mtx.RUnlock()
return ErrUnknownCircuit
}
openedCircuits = append(openedCircuits, circuit)
}
cm.mtx.RUnlock()
err := kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error {
// Now, load the circuit bucket to which we will write the
// already serialized circuit.
keystoneBkt := tx.ReadWriteBucket(circuitKeystoneKey)
if keystoneBkt == nil {
return ErrCorruptedCircuitMap
}
for _, ks := range keystones {
outBytes := ks.OutKey.Bytes()
inBytes := ks.InKey.Bytes()
err := keystoneBkt.Put(outBytes, inBytes)
if err != nil {
return err
}
}
return nil
}, func() {})
if err != nil {
return err
}
cm.mtx.Lock()
for i, circuit := range openedCircuits {
ks := keystones[i]
// Since our persistent operation was successful, we can now
// modify the in memory representations. Set the outgoing
// circuit key on our pending circuit, add the same circuit to
// set of opened circuits, and add this circuit to the hash
// index.
circuit.Outgoing = &CircuitKey{}
*circuit.Outgoing = ks.OutKey
cm.opened[ks.OutKey] = circuit
cm.addCircuitToHashIndex(circuit)
}
cm.mtx.Unlock()
return nil
}
// addCirciutToHashIndex inserts a circuit into the circuit map's hash index, so
// that it can be queried using LookupByPaymentHash.
func (cm *circuitMap) addCircuitToHashIndex(c *PaymentCircuit) {
if _, ok := cm.hashIndex[c.PaymentHash]; !ok {
cm.hashIndex[c.PaymentHash] = make(map[CircuitKey]struct{})
}
cm.hashIndex[c.PaymentHash][c.OutKey()] = struct{}{}
}
// FailCircuit marks the circuit identified by `inKey` as closing in-memory,
// which prevents duplicate settles/fails from completing an open circuit twice.
func (cm *circuitMap) FailCircuit(inKey CircuitKey) (*PaymentCircuit, error) {
cm.mtx.Lock()
defer cm.mtx.Unlock()
circuit, ok := cm.pending[inKey]
if !ok {
return nil, ErrUnknownCircuit
}
_, ok = cm.closed[inKey]
if ok {
return nil, ErrCircuitClosing
}
cm.closed[inKey] = struct{}{}
return circuit, nil
}
// CloseCircuit marks the circuit identified by `outKey` as closing in-memory,
// which prevents duplicate settles/fails from completing an open
// circuit twice.
func (cm *circuitMap) CloseCircuit(outKey CircuitKey) (*PaymentCircuit, error) {
cm.mtx.Lock()
defer cm.mtx.Unlock()
circuit, ok := cm.opened[outKey]
if !ok {
return nil, ErrUnknownCircuit
}
_, ok = cm.closed[circuit.Incoming]
if ok {
return nil, ErrCircuitClosing
}
cm.closed[circuit.Incoming] = struct{}{}
return circuit, nil
}
// DeleteCircuits destroys the target circuits by removing them from the circuit
// map, additionally removing the circuits' keystones if any HTLCs were
// forwarded through an outgoing link. The circuits should be identified by its
// incoming circuit key. If a given circuit is not found in the circuit map, it
// will be ignored from the query. This would typically indicate that the
// circuit was already cleaned up at a different point in time.
func (cm *circuitMap) DeleteCircuits(inKeys ...CircuitKey) error {
log.Tracef("Deleting resolved circuits: %v", newLogClosure(func() string {
return spew.Sdump(inKeys)
}))
var (
closingCircuits = make(map[CircuitKey]struct{})
removedCircuits = make(map[CircuitKey]*PaymentCircuit)
)
cm.mtx.Lock()
// Remove any references to the circuits from memory, keeping track of
// which circuits were removed, and which ones had been marked closed.
// This can be used to restore these entries later if the persistent
// removal fails.
for _, inKey := range inKeys {
circuit, ok := cm.pending[inKey]
if !ok {
continue
}
delete(cm.pending, inKey)
if _, ok := cm.closed[inKey]; ok {
closingCircuits[inKey] = struct{}{}
delete(cm.closed, inKey)
}
if circuit.HasKeystone() {
delete(cm.opened, circuit.OutKey())
cm.removeCircuitFromHashIndex(circuit)
}
removedCircuits[inKey] = circuit
}
cm.mtx.Unlock()
err := kvdb.Batch(cm.cfg.DB, func(tx kvdb.RwTx) error {
for _, circuit := range removedCircuits {
// If this htlc made it to an outgoing link, load the
// keystone bucket from which we will remove the
// outgoing circuit key.
if circuit.HasKeystone() {
keystoneBkt := tx.ReadWriteBucket(circuitKeystoneKey)
if keystoneBkt == nil {
return ErrCorruptedCircuitMap
}
outKey := circuit.OutKey()
err := keystoneBkt.Delete(outKey.Bytes())
if err != nil {
return err
}
}
// Remove the circuit itself based on the incoming
// circuit key.
circuitBkt := tx.ReadWriteBucket(circuitAddKey)
if circuitBkt == nil {
return ErrCorruptedCircuitMap
}
inKey := circuit.InKey()
if err := circuitBkt.Delete(inKey.Bytes()); err != nil {
return err
}
}
return nil
})
// Return if the write succeeded.
if err == nil {
return nil
}
// If the persistent changes failed, restore the circuit map to it's
// previous state.
cm.mtx.Lock()
for inKey, circuit := range removedCircuits {
cm.pending[inKey] = circuit
if _, ok := closingCircuits[inKey]; ok {
cm.closed[inKey] = struct{}{}
}
if circuit.HasKeystone() {
cm.opened[circuit.OutKey()] = circuit
cm.addCircuitToHashIndex(circuit)
}
}
cm.mtx.Unlock()
return err
}
// removeCircuitFromHashIndex removes the given circuit from the hash index,
// pruning any unnecessary memory optimistically.
func (cm *circuitMap) removeCircuitFromHashIndex(c *PaymentCircuit) {
// Locate bucket containing this circuit's payment hashes.
circuitsWithHash, ok := cm.hashIndex[c.PaymentHash]
if !ok {
return
}
outKey := c.OutKey()
// Remove this circuit from the set of circuitsWithHash.
delete(circuitsWithHash, outKey)
// Prune the payment hash bucket if no other entries remain.
if len(circuitsWithHash) == 0 {
delete(cm.hashIndex, c.PaymentHash)
}
}
// NumPending returns the number of active circuits added to the circuit map.
func (cm *circuitMap) NumPending() int {
cm.mtx.RLock()
defer cm.mtx.RUnlock()
return len(cm.pending)
}
// NumOpen returns the number of circuits that have been opened by way of
// setting their keystones. This is the number of HTLCs that are waiting for a
// settle/fail response from a remote peer.
func (cm *circuitMap) NumOpen() int {
cm.mtx.RLock()
defer cm.mtx.RUnlock()
return len(cm.opened)
}