lnd/htlcswitch/payment_result.go
Johan T. Halseth 1febe1a6d5
htlcswitch/payment_result: add paymentResultStore
paymentResultStore is a persistent store where we keep track of all
received payment results. This is used to ensure we don't lose results
from payment attempts on restarts.
2019-06-07 16:53:32 +02:00

261 lines
7.1 KiB
Go

package htlcswitch
import (
"bytes"
"encoding/binary"
"errors"
"io"
"sync"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/multimutex"
)
var (
// networkResultStoreBucketKey is used for the root level bucket that
// stores the network result for each payment ID.
networkResultStoreBucketKey = []byte("network-result-store-bucket")
// ErrPaymentIDNotFound is an error returned if the given paymentID is
// not found.
ErrPaymentIDNotFound = errors.New("paymentID not found")
// ErrPaymentIDAlreadyExists is returned if we try to write a pending
// payment whose paymentID already exists.
ErrPaymentIDAlreadyExists = errors.New("paymentID already exists")
)
// PaymentResult wraps a decoded result received from the network after a
// payment attempt was made. This is what is eventually handed to the router
// for processing.
type PaymentResult struct {
// Preimage is set by the switch in case a sent HTLC was settled.
Preimage [32]byte
// Error is non-nil in case a HTLC send failed, and the HTLC is now
// irrevocably cancelled. If the payment failed during forwarding, this
// error will be a *ForwardingError.
Error error
}
// networkResult is the raw result received from the network after a payment
// attempt has been made. Since the switch doesn't always have the necessary
// data to decode the raw message, we store it together with some meta data,
// and decode it when the router query for the final result.
type networkResult struct {
// msg is the received result. This should be of type UpdateFulfillHTLC
// or UpdateFailHTLC.
msg lnwire.Message
// unencrypted indicates whether the failure encoded in the message is
// unencrypted, and hence doesn't need to be decrypted.
unencrypted bool
// isResolution indicates whether this is a resolution message, in
// which the failure reason might not be included.
isResolution bool
}
// serializeNetworkResult serializes the networkResult.
func serializeNetworkResult(w io.Writer, n *networkResult) error {
if _, err := lnwire.WriteMessage(w, n.msg, 0); err != nil {
return err
}
return channeldb.WriteElements(w, n.unencrypted, n.isResolution)
}
// deserializeNetworkResult deserializes the networkResult.
func deserializeNetworkResult(r io.Reader) (*networkResult, error) {
var (
err error
)
n := &networkResult{}
n.msg, err = lnwire.ReadMessage(r, 0)
if err != nil {
return nil, err
}
if err := channeldb.ReadElements(r,
&n.unencrypted, &n.isResolution,
); err != nil {
return nil, err
}
return n, nil
}
// networkResultStore is a persistent store that stores any results of HTLCs in
// flight on the network. Since payment results are inherently asynchronous, it
// is used as a common access point for senders of HTLCs, to know when a result
// is back. The Switch will checkpoint any received result to the store, and
// the store will keep results and notify the callers about them.
type networkResultStore struct {
db *channeldb.DB
// results is a map from paymentIDs to channels where subscribers to
// payment results will be notified.
results map[uint64][]chan *networkResult
resultsMtx sync.Mutex
// paymentIDMtx is a multimutex used to make sure the database and
// result subscribers map is consistent for each payment ID in case of
// concurrent callers.
paymentIDMtx *multimutex.Mutex
}
func newNetworkResultStore(db *channeldb.DB) *networkResultStore {
return &networkResultStore{
db: db,
results: make(map[uint64][]chan *networkResult),
paymentIDMtx: multimutex.NewMutex(),
}
}
// storeResult stores the networkResult for the given paymentID, and
// notifies any subscribers.
func (store *networkResultStore) storeResult(paymentID uint64,
result *networkResult) error {
// We get a mutex for this payment ID. This is needed to ensure
// consistency between the database state and the subscribers in case
// of concurrent calls.
store.paymentIDMtx.Lock(paymentID)
defer store.paymentIDMtx.Unlock(paymentID)
// Serialize the payment result.
var b bytes.Buffer
if err := serializeNetworkResult(&b, result); err != nil {
return err
}
var paymentIDBytes [8]byte
binary.BigEndian.PutUint64(paymentIDBytes[:], paymentID)
err := store.db.Batch(func(tx *bbolt.Tx) error {
networkResults, err := tx.CreateBucketIfNotExists(
networkResultStoreBucketKey,
)
if err != nil {
return err
}
return networkResults.Put(paymentIDBytes[:], b.Bytes())
})
if err != nil {
return err
}
// Now that the result is stored in the database, we can notify any
// active subscribers.
store.resultsMtx.Lock()
for _, res := range store.results[paymentID] {
res <- result
}
delete(store.results, paymentID)
store.resultsMtx.Unlock()
return nil
}
// subscribeResult is used to get the payment result for the given
// payment ID. It returns a channel on which the result will be delivered when
// ready.
func (store *networkResultStore) subscribeResult(paymentID uint64) (
<-chan *networkResult, error) {
// We get a mutex for this payment ID. This is needed to ensure
// consistency between the database state and the subscribers in case
// of concurrent calls.
store.paymentIDMtx.Lock(paymentID)
defer store.paymentIDMtx.Unlock(paymentID)
var (
result *networkResult
resultChan = make(chan *networkResult, 1)
)
err := store.db.View(func(tx *bbolt.Tx) error {
var err error
result, err = fetchResult(tx, paymentID)
switch {
// Result not yet available, we will notify once a result is
// available.
case err == ErrPaymentIDNotFound:
return nil
case err != nil:
return err
// The result was found, and will be returned immediately.
default:
return nil
}
})
if err != nil {
return nil, err
}
// If the result was found, we can send it on the result channel
// imemdiately.
if result != nil {
resultChan <- result
return resultChan, nil
}
// Otherwise we store the result channel for when the result is
// available.
store.resultsMtx.Lock()
store.results[paymentID] = append(
store.results[paymentID], resultChan,
)
store.resultsMtx.Unlock()
return resultChan, nil
}
// getResult attempts to immediately fetch the result for the given pid from
// the store. If no result is available, ErrPaymentIDNotFound is returned.
func (store *networkResultStore) getResult(pid uint64) (
*networkResult, error) {
var result *networkResult
err := store.db.View(func(tx *bbolt.Tx) error {
var err error
result, err = fetchResult(tx, pid)
return err
})
if err != nil {
return nil, err
}
return result, nil
}
func fetchResult(tx *bbolt.Tx, pid uint64) (*networkResult, error) {
var paymentIDBytes [8]byte
binary.BigEndian.PutUint64(paymentIDBytes[:], pid)
networkResults := tx.Bucket(networkResultStoreBucketKey)
if networkResults == nil {
return nil, ErrPaymentIDNotFound
}
// Check whether a result is already available.
resultBytes := networkResults.Get(paymentIDBytes[:])
if resultBytes == nil {
return nil, ErrPaymentIDNotFound
}
// Decode the result we found.
r := bytes.NewReader(resultBytes)
return deserializeNetworkResult(r)
}