lnd/discovery/message_store.go
2021-09-29 17:00:03 +02:00

300 lines
8.2 KiB
Go

package discovery
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
)
var (
// messageStoreBucket is a key used to create a top level bucket in the
// gossiper database, used for storing messages that are to be sent to
// peers. Upon restarts, these messages will be read and resent to their
// respective peers.
//
// maps:
// pubKey (33 bytes) + msgShortChanID (8 bytes) + msgType (2 bytes) -> msg
messageStoreBucket = []byte("message-store")
// ErrUnsupportedMessage is an error returned when we attempt to add a
// message to the store that is not supported.
ErrUnsupportedMessage = errors.New("unsupported message type")
// ErrCorruptedMessageStore indicates that the on-disk bucketing
// structure has altered since the gossip message store instance was
// initialized.
ErrCorruptedMessageStore = errors.New("gossip message store has been " +
"corrupted")
)
// GossipMessageStore is a store responsible for storing gossip messages which
// we should reliably send to our peers.
type GossipMessageStore interface {
// AddMessage adds a message to the store for this peer.
AddMessage(lnwire.Message, [33]byte) error
// DeleteMessage deletes a message from the store for this peer.
DeleteMessage(lnwire.Message, [33]byte) error
// Messages returns the total set of messages that exist within the
// store for all peers.
Messages() (map[[33]byte][]lnwire.Message, error)
// Peers returns the public key of all peers with messages within the
// store.
Peers() (map[[33]byte]struct{}, error)
// MessagesForPeer returns the set of messages that exists within the
// store for the given peer.
MessagesForPeer([33]byte) ([]lnwire.Message, error)
}
// MessageStore is an implementation of the GossipMessageStore interface backed
// by a channeldb instance. By design, this store will only keep the latest
// version of a message (like in the case of multiple ChannelUpdate's) for a
// channel with a peer.
type MessageStore struct {
db kvdb.Backend
}
// A compile-time assertion to ensure messageStore implements the
// GossipMessageStore interface.
var _ GossipMessageStore = (*MessageStore)(nil)
// NewMessageStore creates a new message store backed by a channeldb instance.
func NewMessageStore(db kvdb.Backend) (*MessageStore, error) {
err := kvdb.Batch(db, func(tx kvdb.RwTx) error {
_, err := tx.CreateTopLevelBucket(messageStoreBucket)
return err
})
if err != nil {
return nil, fmt.Errorf("unable to create required buckets: %v",
err)
}
return &MessageStore{db}, nil
}
// msgShortChanID retrieves the short channel ID of the message.
func msgShortChanID(msg lnwire.Message) (lnwire.ShortChannelID, error) {
var shortChanID lnwire.ShortChannelID
switch msg := msg.(type) {
case *lnwire.AnnounceSignatures:
shortChanID = msg.ShortChannelID
case *lnwire.ChannelUpdate:
shortChanID = msg.ShortChannelID
default:
return shortChanID, ErrUnsupportedMessage
}
return shortChanID, nil
}
// messageStoreKey constructs the database key for the message to be stored.
func messageStoreKey(msg lnwire.Message, peerPubKey [33]byte) ([]byte, error) {
shortChanID, err := msgShortChanID(msg)
if err != nil {
return nil, err
}
var k [33 + 8 + 2]byte
copy(k[:33], peerPubKey[:])
binary.BigEndian.PutUint64(k[33:41], shortChanID.ToUint64())
binary.BigEndian.PutUint16(k[41:43], uint16(msg.MsgType()))
return k[:], nil
}
// AddMessage adds a message to the store for this peer.
func (s *MessageStore) AddMessage(msg lnwire.Message, peerPubKey [33]byte) error {
// Construct the key for which we'll find this message with in the store.
msgKey, err := messageStoreKey(msg, peerPubKey)
if err != nil {
return err
}
// Serialize the message with its wire encoding.
var b bytes.Buffer
if _, err := lnwire.WriteMessage(&b, msg, 0); err != nil {
return err
}
return kvdb.Batch(s.db, func(tx kvdb.RwTx) error {
messageStore := tx.ReadWriteBucket(messageStoreBucket)
if messageStore == nil {
return ErrCorruptedMessageStore
}
return messageStore.Put(msgKey, b.Bytes())
})
}
// DeleteMessage deletes a message from the store for this peer.
func (s *MessageStore) DeleteMessage(msg lnwire.Message,
peerPubKey [33]byte) error {
// Construct the key for which we'll find this message with in the
// store.
msgKey, err := messageStoreKey(msg, peerPubKey)
if err != nil {
return err
}
return kvdb.Batch(s.db, func(tx kvdb.RwTx) error {
messageStore := tx.ReadWriteBucket(messageStoreBucket)
if messageStore == nil {
return ErrCorruptedMessageStore
}
// In the event that we're attempting to delete a ChannelUpdate
// from the store, we'll make sure that we're actually deleting
// the correct one as it can be overwritten.
if msg, ok := msg.(*lnwire.ChannelUpdate); ok {
// Deleting a value from a bucket that doesn't exist
// acts as a NOP, so we'll return if a message doesn't
// exist under this key.
v := messageStore.Get(msgKey)
if v == nil {
return nil
}
dbMsg, err := lnwire.ReadMessage(bytes.NewReader(v), 0)
if err != nil {
return err
}
// If the timestamps don't match, then the update stored
// should be the latest one, so we'll avoid deleting it.
if msg.Timestamp != dbMsg.(*lnwire.ChannelUpdate).Timestamp {
return nil
}
}
return messageStore.Delete(msgKey)
})
}
// readMessage reads a message from its serialized form and ensures its
// supported by the current version of the message store.
func readMessage(msgBytes []byte) (lnwire.Message, error) {
msg, err := lnwire.ReadMessage(bytes.NewReader(msgBytes), 0)
if err != nil {
return nil, err
}
// Check if the message is supported by the store. We can reuse the
// check for ShortChannelID as its a dependency on messages stored.
if _, err := msgShortChanID(msg); err != nil {
return nil, err
}
return msg, nil
}
// Messages returns the total set of messages that exist within the store for
// all peers.
func (s *MessageStore) Messages() (map[[33]byte][]lnwire.Message, error) {
var msgs map[[33]byte][]lnwire.Message
err := kvdb.View(s.db, func(tx kvdb.RTx) error {
messageStore := tx.ReadBucket(messageStoreBucket)
if messageStore == nil {
return ErrCorruptedMessageStore
}
return messageStore.ForEach(func(k, v []byte) error {
var pubKey [33]byte
copy(pubKey[:], k[:33])
// Deserialize the message from its raw bytes and filter
// out any which are not currently supported by the
// store.
msg, err := readMessage(v)
if err == ErrUnsupportedMessage {
return nil
}
if err != nil {
return err
}
msgs[pubKey] = append(msgs[pubKey], msg)
return nil
})
}, func() {
msgs = make(map[[33]byte][]lnwire.Message)
})
if err != nil {
return nil, err
}
return msgs, nil
}
// MessagesForPeer returns the set of messages that exists within the store for
// the given peer.
func (s *MessageStore) MessagesForPeer(
peerPubKey [33]byte) ([]lnwire.Message, error) {
var msgs []lnwire.Message
err := kvdb.View(s.db, func(tx kvdb.RTx) error {
messageStore := tx.ReadBucket(messageStoreBucket)
if messageStore == nil {
return ErrCorruptedMessageStore
}
c := messageStore.ReadCursor()
k, v := c.Seek(peerPubKey[:])
for ; bytes.HasPrefix(k, peerPubKey[:]); k, v = c.Next() {
// Deserialize the message from its raw bytes and filter
// out any which are not currently supported by the
// store.
msg, err := readMessage(v)
if err == ErrUnsupportedMessage {
continue
}
if err != nil {
return err
}
msgs = append(msgs, msg)
}
return nil
}, func() {
msgs = nil
})
if err != nil {
return nil, err
}
return msgs, nil
}
// Peers returns the public key of all peers with messages within the store.
func (s *MessageStore) Peers() (map[[33]byte]struct{}, error) {
var peers map[[33]byte]struct{}
err := kvdb.View(s.db, func(tx kvdb.RTx) error {
messageStore := tx.ReadBucket(messageStoreBucket)
if messageStore == nil {
return ErrCorruptedMessageStore
}
return messageStore.ForEach(func(k, _ []byte) error {
var pubKey [33]byte
copy(pubKey[:], k[:33])
peers[pubKey] = struct{}{}
return nil
})
}, func() {
peers = make(map[[33]byte]struct{})
})
if err != nil {
return nil, err
}
return peers, nil
}