mirror of
https://github.com/lightningnetwork/lnd.git
synced 2024-11-19 01:43:16 +01:00
609 lines
14 KiB
Go
609 lines
14 KiB
Go
package routing
|
|
|
|
import (
|
|
"bytes"
|
|
"container/list"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/lightningnetwork/lnd/kvdb"
|
|
"github.com/lightningnetwork/lnd/lnwire"
|
|
"github.com/lightningnetwork/lnd/tlv"
|
|
)
|
|
|
|
var (
|
|
// resultsKey is the fixed key under which the attempt results are
|
|
// stored.
|
|
resultsKey = []byte("missioncontrol-results")
|
|
|
|
// Big endian is the preferred byte order, due to cursor scans over
|
|
// integer keys iterating in order.
|
|
byteOrder = binary.BigEndian
|
|
)
|
|
|
|
// missionControlDB is an interface that defines the database methods that a
|
|
// single missionControlStore has access to. It allows the missionControlStore
|
|
// to be unaware of the overall DB structure and restricts its access to the DB
|
|
// by only providing it the bucket that it needs to care about.
|
|
type missionControlDB interface {
|
|
// update can be used to perform reads and writes on the given bucket.
|
|
update(f func(bkt kvdb.RwBucket) error, reset func()) error
|
|
|
|
// view can be used to perform reads on the given bucket.
|
|
view(f func(bkt kvdb.RBucket) error, reset func()) error
|
|
|
|
// purge will delete all the contents in this store.
|
|
purge() error
|
|
}
|
|
|
|
// missionControlStore is a bolt db based implementation of a mission control
|
|
// store. It stores the raw payment attempt data from which the internal mission
|
|
// controls state can be rederived on startup. This allows the mission control
|
|
// internal data structure to be changed without requiring a database migration.
|
|
// Also changes to mission control parameters can be applied to historical data.
|
|
// Finally, it enables importing raw data from an external source.
|
|
type missionControlStore struct {
|
|
done chan struct{}
|
|
wg sync.WaitGroup
|
|
db missionControlDB
|
|
|
|
// TODO(yy): Remove the usage of sync.Cond - we are better off using
|
|
// channes than a Cond as suggested in the official godoc.
|
|
//
|
|
// queueCond is signalled when items are put into the queue.
|
|
queueCond *sync.Cond
|
|
|
|
// queue stores all pending payment results not yet added to the store.
|
|
// Access is protected by the queueCond.L mutex.
|
|
queue *list.List
|
|
|
|
// keys holds the stored MC store item keys in the order of storage.
|
|
// We use this list when adding/deleting items from the database to
|
|
// avoid cursor use which may be slow in the remote DB case.
|
|
keys *list.List
|
|
|
|
// keysMap holds the stored MC store item keys. We use this map to check
|
|
// if a new payment result has already been stored.
|
|
keysMap map[string]struct{}
|
|
|
|
// maxRecords is the maximum amount of records we will store in the db.
|
|
maxRecords int
|
|
|
|
// flushInterval is the configured interval we use to store new results
|
|
// and delete outdated ones from the db.
|
|
flushInterval time.Duration
|
|
}
|
|
|
|
func newMissionControlStore(db missionControlDB, maxRecords int,
|
|
flushInterval time.Duration) (*missionControlStore, error) {
|
|
|
|
var (
|
|
keys *list.List
|
|
keysMap map[string]struct{}
|
|
)
|
|
|
|
// Create buckets if not yet existing.
|
|
err := db.update(func(resultsBucket kvdb.RwBucket) error {
|
|
// Collect all keys to be able to quickly calculate the
|
|
// difference when updating the DB state.
|
|
c := resultsBucket.ReadCursor()
|
|
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
|
keys.PushBack(string(k))
|
|
keysMap[string(k)] = struct{}{}
|
|
}
|
|
|
|
return nil
|
|
}, func() {
|
|
keys = list.New()
|
|
keysMap = make(map[string]struct{})
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
log.Infof("Loaded %d mission control entries", len(keysMap))
|
|
|
|
return &missionControlStore{
|
|
done: make(chan struct{}),
|
|
db: db,
|
|
queueCond: sync.NewCond(&sync.Mutex{}),
|
|
queue: list.New(),
|
|
keys: keys,
|
|
keysMap: keysMap,
|
|
maxRecords: maxRecords,
|
|
flushInterval: flushInterval,
|
|
}, nil
|
|
}
|
|
|
|
// clear removes all results from the db.
|
|
func (b *missionControlStore) clear() error {
|
|
b.queueCond.L.Lock()
|
|
defer b.queueCond.L.Unlock()
|
|
|
|
if err := b.db.purge(); err != nil {
|
|
return err
|
|
}
|
|
|
|
b.queue = list.New()
|
|
|
|
return nil
|
|
}
|
|
|
|
// fetchAll returns all results currently stored in the database.
|
|
func (b *missionControlStore) fetchAll() ([]*paymentResult, error) {
|
|
var results []*paymentResult
|
|
|
|
err := b.db.view(func(resultBucket kvdb.RBucket) error {
|
|
results = make([]*paymentResult, 0)
|
|
|
|
return resultBucket.ForEach(func(k, v []byte) error {
|
|
result, err := deserializeResult(k, v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
results = append(results, result)
|
|
|
|
return nil
|
|
})
|
|
|
|
}, func() {
|
|
results = nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
// serializeResult serializes a payment result and returns a key and value byte
|
|
// slice to insert into the bucket.
|
|
func serializeResult(rp *paymentResult) ([]byte, []byte, error) {
|
|
recordProducers := []tlv.RecordProducer{
|
|
&rp.timeFwd,
|
|
&rp.timeReply,
|
|
&rp.route,
|
|
}
|
|
|
|
rp.failure.WhenSome(
|
|
func(failure tlv.RecordT[tlv.TlvType3, paymentFailure]) {
|
|
recordProducers = append(recordProducers, &failure)
|
|
},
|
|
)
|
|
|
|
// Compose key that identifies this result.
|
|
key := getResultKey(rp)
|
|
|
|
var buff bytes.Buffer
|
|
err := lnwire.EncodeRecordsTo(
|
|
&buff, lnwire.ProduceRecordsSorted(recordProducers...),
|
|
)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return key, buff.Bytes(), err
|
|
}
|
|
|
|
// deserializeResult deserializes a payment result.
|
|
func deserializeResult(k, v []byte) (*paymentResult, error) {
|
|
// Parse payment id.
|
|
result := paymentResult{
|
|
id: byteOrder.Uint64(k[8:]),
|
|
}
|
|
|
|
failure := tlv.ZeroRecordT[tlv.TlvType3, paymentFailure]()
|
|
recordProducers := []tlv.RecordProducer{
|
|
&result.timeFwd,
|
|
&result.timeReply,
|
|
&result.route,
|
|
&failure,
|
|
}
|
|
|
|
r := bytes.NewReader(v)
|
|
typeMap, err := lnwire.DecodeRecords(
|
|
r, lnwire.ProduceRecordsSorted(recordProducers...)...,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if _, ok := typeMap[result.failure.TlvType()]; ok {
|
|
result.failure = tlv.SomeRecordT(failure)
|
|
}
|
|
|
|
return &result, nil
|
|
}
|
|
|
|
// serializeRoute serializes a mcRoute and writes the resulting bytes to the
|
|
// given io.Writer.
|
|
func serializeRoute(w io.Writer, r *mcRoute) error {
|
|
records := lnwire.ProduceRecordsSorted(
|
|
&r.sourcePubKey,
|
|
&r.totalAmount,
|
|
&r.hops,
|
|
)
|
|
|
|
return lnwire.EncodeRecordsTo(w, records)
|
|
}
|
|
|
|
// deserializeRoute deserializes the mcRoute from the given io.Reader.
|
|
func deserializeRoute(r io.Reader) (*mcRoute, error) {
|
|
var rt mcRoute
|
|
records := lnwire.ProduceRecordsSorted(
|
|
&rt.sourcePubKey,
|
|
&rt.totalAmount,
|
|
&rt.hops,
|
|
)
|
|
|
|
_, err := lnwire.DecodeRecords(r, records...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &rt, nil
|
|
}
|
|
|
|
// deserializeHop deserializes the mcHop from the given io.Reader.
|
|
func deserializeHop(r io.Reader) (*mcHop, error) {
|
|
var (
|
|
h mcHop
|
|
blinding = tlv.ZeroRecordT[tlv.TlvType3, lnwire.TrueBoolean]()
|
|
custom = tlv.ZeroRecordT[tlv.TlvType4, lnwire.TrueBoolean]()
|
|
)
|
|
records := lnwire.ProduceRecordsSorted(
|
|
&h.channelID,
|
|
&h.pubKeyBytes,
|
|
&h.amtToFwd,
|
|
&blinding,
|
|
&custom,
|
|
)
|
|
|
|
typeMap, err := lnwire.DecodeRecords(r, records...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if _, ok := typeMap[h.hasBlindingPoint.TlvType()]; ok {
|
|
h.hasBlindingPoint = tlv.SomeRecordT(blinding)
|
|
}
|
|
|
|
if _, ok := typeMap[h.hasCustomRecords.TlvType()]; ok {
|
|
h.hasCustomRecords = tlv.SomeRecordT(custom)
|
|
}
|
|
|
|
return &h, nil
|
|
}
|
|
|
|
// serializeHop serializes a mcHop and writes the resulting bytes to the given
|
|
// io.Writer.
|
|
func serializeHop(w io.Writer, h *mcHop) error {
|
|
recordProducers := []tlv.RecordProducer{
|
|
&h.channelID,
|
|
&h.pubKeyBytes,
|
|
&h.amtToFwd,
|
|
}
|
|
|
|
h.hasBlindingPoint.WhenSome(func(
|
|
hasBlinding tlv.RecordT[tlv.TlvType3, lnwire.TrueBoolean]) {
|
|
|
|
recordProducers = append(recordProducers, &hasBlinding)
|
|
})
|
|
|
|
h.hasCustomRecords.WhenSome(func(
|
|
hasCustom tlv.RecordT[tlv.TlvType4, lnwire.TrueBoolean]) {
|
|
|
|
recordProducers = append(recordProducers, &hasCustom)
|
|
})
|
|
|
|
return lnwire.EncodeRecordsTo(
|
|
w, lnwire.ProduceRecordsSorted(recordProducers...),
|
|
)
|
|
}
|
|
|
|
// AddResult adds a new result to the db.
|
|
func (b *missionControlStore) AddResult(rp *paymentResult) {
|
|
b.queueCond.L.Lock()
|
|
b.queue.PushBack(rp)
|
|
b.queueCond.L.Unlock()
|
|
|
|
b.queueCond.Signal()
|
|
}
|
|
|
|
// stop stops the store ticker goroutine.
|
|
func (b *missionControlStore) stop() {
|
|
close(b.done)
|
|
|
|
b.queueCond.Signal()
|
|
|
|
b.wg.Wait()
|
|
}
|
|
|
|
// run runs the MC store ticker goroutine.
|
|
func (b *missionControlStore) run() {
|
|
b.wg.Add(1)
|
|
|
|
go func() {
|
|
defer b.wg.Done()
|
|
|
|
timer := time.NewTimer(b.flushInterval)
|
|
|
|
// Immediately stop the timer. It will be started once new
|
|
// items are added to the store. As the doc for time.Timer
|
|
// states, every call to Stop() done on a timer that is not
|
|
// known to have been fired needs to be checked and the timer's
|
|
// channel needs to be drained appropriately. This could happen
|
|
// if the flushInterval is very small (e.g. 1 nanosecond).
|
|
if !timer.Stop() {
|
|
select {
|
|
case <-timer.C:
|
|
case <-b.done:
|
|
log.Debugf("Stopping mission control store")
|
|
}
|
|
}
|
|
|
|
for {
|
|
// Wait for the queue to not be empty.
|
|
b.queueCond.L.Lock()
|
|
for b.queue.Front() == nil {
|
|
// To make sure we can properly stop, we must
|
|
// read the `done` channel first before
|
|
// attempting to call `Wait()`. This is due to
|
|
// the fact when `Signal` is called before the
|
|
// `Wait` call, the `Wait` call will block
|
|
// indefinitely.
|
|
//
|
|
// TODO(yy): replace this with channels.
|
|
select {
|
|
case <-b.done:
|
|
b.queueCond.L.Unlock()
|
|
|
|
return
|
|
default:
|
|
}
|
|
|
|
b.queueCond.Wait()
|
|
}
|
|
b.queueCond.L.Unlock()
|
|
|
|
// Restart the timer.
|
|
timer.Reset(b.flushInterval)
|
|
|
|
select {
|
|
case <-timer.C:
|
|
if err := b.storeResults(); err != nil {
|
|
log.Errorf("Failed to update mission "+
|
|
"control store: %v", err)
|
|
}
|
|
|
|
case <-b.done:
|
|
// Release the timer's resources.
|
|
if !timer.Stop() {
|
|
select {
|
|
case <-timer.C:
|
|
case <-b.done:
|
|
log.Debugf("Mission control " +
|
|
"store stopped")
|
|
}
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// storeResults stores all accumulated results.
|
|
func (b *missionControlStore) storeResults() error {
|
|
// We copy a reference to the queue and clear the original queue to be
|
|
// able to release the lock.
|
|
b.queueCond.L.Lock()
|
|
l := b.queue
|
|
|
|
if l.Len() == 0 {
|
|
b.queueCond.L.Unlock()
|
|
|
|
return nil
|
|
}
|
|
b.queue = list.New()
|
|
b.queueCond.L.Unlock()
|
|
|
|
var (
|
|
newKeys map[string]struct{}
|
|
delKeys []string
|
|
storeCount int
|
|
pruneCount int
|
|
)
|
|
|
|
// Create a deduped list of new entries.
|
|
newKeys = make(map[string]struct{}, l.Len())
|
|
for e := l.Front(); e != nil; e = e.Next() {
|
|
pr, ok := e.Value.(*paymentResult)
|
|
if !ok {
|
|
return fmt.Errorf("wrong type %T (not *paymentResult)",
|
|
e.Value)
|
|
}
|
|
key := string(getResultKey(pr))
|
|
if _, ok := b.keysMap[key]; ok {
|
|
l.Remove(e)
|
|
continue
|
|
}
|
|
if _, ok := newKeys[key]; ok {
|
|
l.Remove(e)
|
|
continue
|
|
}
|
|
newKeys[key] = struct{}{}
|
|
}
|
|
|
|
// Create a list of entries to delete.
|
|
toDelete := b.keys.Len() + len(newKeys) - b.maxRecords
|
|
if b.maxRecords > 0 && toDelete > 0 {
|
|
delKeys = make([]string, 0, toDelete)
|
|
|
|
// Delete as many as needed from old keys.
|
|
for e := b.keys.Front(); len(delKeys) < toDelete && e != nil; {
|
|
key, ok := e.Value.(string)
|
|
if !ok {
|
|
return fmt.Errorf("wrong type %T (not string)",
|
|
e.Value)
|
|
}
|
|
delKeys = append(delKeys, key)
|
|
e = e.Next()
|
|
}
|
|
|
|
// If more deletions are needed, simply do not add from the
|
|
// list of new keys.
|
|
for e := l.Front(); len(delKeys) < toDelete && e != nil; {
|
|
toDelete--
|
|
pr, ok := e.Value.(*paymentResult)
|
|
if !ok {
|
|
return fmt.Errorf("wrong type %T (not "+
|
|
"*paymentResult )", e.Value)
|
|
}
|
|
key := string(getResultKey(pr))
|
|
delete(newKeys, key)
|
|
l.Remove(e)
|
|
e = l.Front()
|
|
}
|
|
}
|
|
|
|
err := b.db.update(func(bucket kvdb.RwBucket) error {
|
|
for e := l.Front(); e != nil; e = e.Next() {
|
|
pr, ok := e.Value.(*paymentResult)
|
|
if !ok {
|
|
return fmt.Errorf("wrong type %T (not "+
|
|
"*paymentResult)", e.Value)
|
|
}
|
|
|
|
// Serialize result into key and value byte slices.
|
|
k, v, err := serializeResult(pr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Put into results bucket.
|
|
if err := bucket.Put(k, v); err != nil {
|
|
return err
|
|
}
|
|
|
|
storeCount++
|
|
}
|
|
|
|
// Prune oldest entries.
|
|
for _, key := range delKeys {
|
|
if err := bucket.Delete([]byte(key)); err != nil {
|
|
return err
|
|
}
|
|
pruneCount++
|
|
}
|
|
|
|
return nil
|
|
}, func() {
|
|
storeCount, pruneCount = 0, 0
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Debugf("Stored mission control results: %d added, %d deleted",
|
|
storeCount, pruneCount)
|
|
|
|
// DB Update was successful, update the in-memory cache.
|
|
for _, key := range delKeys {
|
|
delete(b.keysMap, key)
|
|
b.keys.Remove(b.keys.Front())
|
|
}
|
|
for e := l.Front(); e != nil; e = e.Next() {
|
|
pr, ok := e.Value.(*paymentResult)
|
|
if !ok {
|
|
return fmt.Errorf("wrong type %T (not *paymentResult)",
|
|
e.Value)
|
|
}
|
|
key := string(getResultKey(pr))
|
|
b.keys.PushBack(key)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// getResultKey returns a byte slice representing a unique key for this payment
|
|
// result.
|
|
func getResultKey(rp *paymentResult) []byte {
|
|
var keyBytes [8 + 8 + 33]byte
|
|
|
|
// Identify records by a combination of time, payment id and sender pub
|
|
// key. This allows importing mission control data from an external
|
|
// source without key collisions and keeps the records sorted
|
|
// chronologically.
|
|
byteOrder.PutUint64(keyBytes[:], rp.timeReply.Val)
|
|
byteOrder.PutUint64(keyBytes[8:], rp.id)
|
|
copy(keyBytes[16:], rp.route.Val.sourcePubKey.Val[:])
|
|
|
|
return keyBytes[:]
|
|
}
|
|
|
|
// failureMessage wraps the lnwire.FailureMessage interface such that we can
|
|
// apply a Record method and use the failureMessage in a TLV encoded type.
|
|
type failureMessage struct {
|
|
lnwire.FailureMessage
|
|
}
|
|
|
|
// Record returns a TLV record that can be used to encode/decode a list of
|
|
// failureMessage to/from a TLV stream.
|
|
func (r *failureMessage) Record() tlv.Record {
|
|
recordSize := func() uint64 {
|
|
var (
|
|
b bytes.Buffer
|
|
buf [8]byte
|
|
)
|
|
if err := encodeFailureMessage(&b, r, &buf); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
return uint64(len(b.Bytes()))
|
|
}
|
|
|
|
return tlv.MakeDynamicRecord(
|
|
0, r, recordSize, encodeFailureMessage, decodeFailureMessage,
|
|
)
|
|
}
|
|
|
|
func encodeFailureMessage(w io.Writer, val interface{}, _ *[8]byte) error {
|
|
if v, ok := val.(*failureMessage); ok {
|
|
var b bytes.Buffer
|
|
err := lnwire.EncodeFailureMessage(&b, v.FailureMessage, 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = w.Write(b.Bytes())
|
|
|
|
return err
|
|
}
|
|
|
|
return tlv.NewTypeForEncodingErr(val, "routing.failureMessage")
|
|
}
|
|
|
|
func decodeFailureMessage(r io.Reader, val interface{}, _ *[8]byte,
|
|
l uint64) error {
|
|
|
|
if v, ok := val.(*failureMessage); ok {
|
|
msg, err := lnwire.DecodeFailureMessage(r, 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
*v = failureMessage{
|
|
FailureMessage: msg,
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
return tlv.NewTypeForDecodingErr(val, "routing.failureMessage", l, l)
|
|
}
|