multi: make decayed log DB remote compatible

Even though the sphinx router's persistent replay log is not crucial in
the operation of lnd as its state can be re-created by creating a new
brontide connection, we want to make lnd fully stateless and therefore
have the option of not storing any state on disk.
This commit is contained in:
Oliver Gugger 2021-08-03 09:57:32 +02:00
parent 6043113857
commit 1e27f491c7
No known key found for this signature in database
GPG Key ID: 8E4256593F177720
6 changed files with 91 additions and 56 deletions

View File

@ -203,8 +203,6 @@ var (
bitcoindEstimateModes = [2]string{"ECONOMICAL", defaultBitcoindEstimateMode} bitcoindEstimateModes = [2]string{"ECONOMICAL", defaultBitcoindEstimateMode}
defaultPrunedNodeMaxPeers = 4 defaultPrunedNodeMaxPeers = 4
defaultSphinxDbName = "sphinxreplay.db"
) )
// Config defines the configuration options for lnd. // Config defines the configuration options for lnd.

View File

@ -41,6 +41,35 @@ var (
ErrDecayedLogCorrupted = errors.New("decayed log structure corrupted") ErrDecayedLogCorrupted = errors.New("decayed log structure corrupted")
) )
// NewBoltBackendCreator returns a function that creates a new bbolt backend for
// the decayed logs database.
func NewBoltBackendCreator(dbPath,
dbFileName string) func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
return func(boltCfg *kvdb.BoltConfig) (kvdb.Backend, error) {
cfg := &kvdb.BoltBackendConfig{
DBPath: dbPath,
DBFileName: dbFileName,
NoFreelistSync: !boltCfg.SyncFreelist,
AutoCompact: boltCfg.AutoCompact,
AutoCompactMinAge: boltCfg.AutoCompactMinAge,
DBTimeout: boltCfg.DBTimeout,
}
// Use default path for log database.
if dbPath == "" {
cfg.DBPath = defaultDbDirectory
}
db, err := kvdb.GetBoltBackend(cfg)
if err != nil {
return nil, fmt.Errorf("could not open boltdb: %v", err)
}
return db, nil
}
}
// DecayedLog implements the PersistLog interface. It stores the first // DecayedLog implements the PersistLog interface. It stores the first
// HashPrefixSize bytes of a sha256-hashed shared secret along with a node's // HashPrefixSize bytes of a sha256-hashed shared secret along with a node's
// CLTV value. It is a decaying log meaning there will be a garbage collector // CLTV value. It is a decaying log meaning there will be a garbage collector
@ -51,8 +80,6 @@ type DecayedLog struct {
started int32 // To be used atomically. started int32 // To be used atomically.
stopped int32 // To be used atomically. stopped int32 // To be used atomically.
cfg *kvdb.BoltBackendConfig
db kvdb.Backend db kvdb.Backend
notifier chainntnfs.ChainNotifier notifier chainntnfs.ChainNotifier
@ -64,25 +91,11 @@ type DecayedLog struct {
// NewDecayedLog creates a new DecayedLog, which caches recently seen hash // NewDecayedLog creates a new DecayedLog, which caches recently seen hash
// shared secrets. Entries are evicted as their cltv expires using block epochs // shared secrets. Entries are evicted as their cltv expires using block epochs
// from the given notifier. // from the given notifier.
func NewDecayedLog(dbPath, dbFileName string, boltCfg *kvdb.BoltConfig, func NewDecayedLog(db kvdb.Backend,
notifier chainntnfs.ChainNotifier) *DecayedLog { notifier chainntnfs.ChainNotifier) *DecayedLog {
cfg := &kvdb.BoltBackendConfig{
DBPath: dbPath,
DBFileName: dbFileName,
NoFreelistSync: true,
AutoCompact: boltCfg.AutoCompact,
AutoCompactMinAge: boltCfg.AutoCompactMinAge,
DBTimeout: boltCfg.DBTimeout,
}
// Use default path for log database
if dbPath == "" {
cfg.DBPath = defaultDbDirectory
}
return &DecayedLog{ return &DecayedLog{
cfg: cfg, db: db,
notifier: notifier, notifier: notifier,
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@ -96,13 +109,6 @@ func (d *DecayedLog) Start() error {
return nil return nil
} }
// Open the boltdb for use.
var err error
d.db, err = kvdb.GetBoltBackend(d.cfg)
if err != nil {
return fmt.Errorf("could not open boltdb: %v", err)
}
// Initialize the primary buckets used by the decayed log. // Initialize the primary buckets used by the decayed log.
if err := d.initBuckets(); err != nil { if err := d.initBuckets(); err != nil {
return err return err

View File

@ -2,6 +2,7 @@ package htlcswitch
import ( import (
"crypto/rand" "crypto/rand"
"fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"testing" "testing"
@ -18,20 +19,29 @@ const (
) )
// tempDecayedLogPath creates a new temporary database path to back a single // tempDecayedLogPath creates a new temporary database path to back a single
// deccayed log instance. // decayed log instance.
func tempDecayedLogPath(t *testing.T) (string, string) { func tempDecayedLogPath(t *testing.T) string {
dir, err := ioutil.TempDir("", "decayedlog") dir, err := ioutil.TempDir("", "decayedlog")
if err != nil { if err != nil {
t.Fatalf("unable to create temporary decayed log dir: %v", err) t.Fatalf("unable to create temporary decayed log dir: %v", err)
} }
return dir, "sphinxreplay.db" return dir
} }
// startup sets up the DecayedLog and possibly the garbage collector. // startup sets up the DecayedLog and possibly the garbage collector.
func startup(dbPath, dbFileName string, notifier bool) (sphinx.ReplayLog, func startup(dbPath string, notifier bool) (sphinx.ReplayLog,
*mock.ChainNotifier, *sphinx.HashPrefix, error) { *mock.ChainNotifier, *sphinx.HashPrefix, error) {
cfg := &kvdb.BoltConfig{
DBTimeout: time.Second,
}
backend, err := NewBoltBackendCreator(dbPath, "sphinxreplay.db")(cfg)
if err != nil {
return nil, nil, nil, fmt.Errorf("unable to create temporary "+
"decayed log db: %v", err)
}
var log sphinx.ReplayLog var log sphinx.ReplayLog
var chainNotifier *mock.ChainNotifier var chainNotifier *mock.ChainNotifier
if notifier { if notifier {
@ -44,16 +54,14 @@ func startup(dbPath, dbFileName string, notifier bool) (sphinx.ReplayLog,
} }
// Initialize the DecayedLog object // Initialize the DecayedLog object
log = NewDecayedLog( log = NewDecayedLog(backend, chainNotifier)
dbPath, dbFileName, &kvdb.BoltConfig{}, chainNotifier,
)
} else { } else {
// Initialize the DecayedLog object // Initialize the DecayedLog object
log = NewDecayedLog(dbPath, dbFileName, &kvdb.BoltConfig{}, nil) log = NewDecayedLog(backend, nil)
} }
// Open the channeldb (start the garbage collector) // Open the channeldb (start the garbage collector)
err := log.Start() err = log.Start()
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
@ -83,9 +91,9 @@ func shutdown(dir string, d sphinx.ReplayLog) {
func TestDecayedLogGarbageCollector(t *testing.T) { func TestDecayedLogGarbageCollector(t *testing.T) {
t.Parallel() t.Parallel()
dbPath, dbFileName := tempDecayedLogPath(t) dbPath := tempDecayedLogPath(t)
d, notifier, hashedSecret, err := startup(dbPath, dbFileName, true) d, notifier, hashedSecret, err := startup(dbPath, true)
if err != nil { if err != nil {
t.Fatalf("Unable to start up DecayedLog: %v", err) t.Fatalf("Unable to start up DecayedLog: %v", err)
} }
@ -144,9 +152,9 @@ func TestDecayedLogGarbageCollector(t *testing.T) {
func TestDecayedLogPersistentGarbageCollector(t *testing.T) { func TestDecayedLogPersistentGarbageCollector(t *testing.T) {
t.Parallel() t.Parallel()
dbPath, dbFileName := tempDecayedLogPath(t) dbPath := tempDecayedLogPath(t)
d, _, hashedSecret, err := startup(dbPath, dbFileName, true) d, _, hashedSecret, err := startup(dbPath, true)
if err != nil { if err != nil {
t.Fatalf("Unable to start up DecayedLog: %v", err) t.Fatalf("Unable to start up DecayedLog: %v", err)
} }
@ -166,7 +174,7 @@ func TestDecayedLogPersistentGarbageCollector(t *testing.T) {
// Shut down DecayedLog and the garbage collector along with it. // Shut down DecayedLog and the garbage collector along with it.
d.Stop() d.Stop()
d2, notifier2, _, err := startup(dbPath, dbFileName, true) d2, notifier2, _, err := startup(dbPath, true)
if err != nil { if err != nil {
t.Fatalf("Unable to restart DecayedLog: %v", err) t.Fatalf("Unable to restart DecayedLog: %v", err)
} }
@ -200,9 +208,9 @@ func TestDecayedLogPersistentGarbageCollector(t *testing.T) {
func TestDecayedLogInsertionAndDeletion(t *testing.T) { func TestDecayedLogInsertionAndDeletion(t *testing.T) {
t.Parallel() t.Parallel()
dbPath, dbFileName := tempDecayedLogPath(t) dbPath := tempDecayedLogPath(t)
d, _, hashedSecret, err := startup(dbPath, dbFileName, false) d, _, hashedSecret, err := startup(dbPath, false)
if err != nil { if err != nil {
t.Fatalf("Unable to start up DecayedLog: %v", err) t.Fatalf("Unable to start up DecayedLog: %v", err)
} }
@ -238,9 +246,9 @@ func TestDecayedLogInsertionAndDeletion(t *testing.T) {
func TestDecayedLogStartAndStop(t *testing.T) { func TestDecayedLogStartAndStop(t *testing.T) {
t.Parallel() t.Parallel()
dbPath, dbFileName := tempDecayedLogPath(t) dbPath := tempDecayedLogPath(t)
d, _, hashedSecret, err := startup(dbPath, dbFileName, false) d, _, hashedSecret, err := startup(dbPath, false)
if err != nil { if err != nil {
t.Fatalf("Unable to start up DecayedLog: %v", err) t.Fatalf("Unable to start up DecayedLog: %v", err)
} }
@ -255,7 +263,7 @@ func TestDecayedLogStartAndStop(t *testing.T) {
// Shutdown the DecayedLog's channeldb // Shutdown the DecayedLog's channeldb
d.Stop() d.Stop()
d2, _, hashedSecret2, err := startup(dbPath, dbFileName, false) d2, _, hashedSecret2, err := startup(dbPath, false)
if err != nil { if err != nil {
t.Fatalf("Unable to restart DecayedLog: %v", err) t.Fatalf("Unable to restart DecayedLog: %v", err)
} }
@ -282,7 +290,7 @@ func TestDecayedLogStartAndStop(t *testing.T) {
// Shutdown the DecayedLog's channeldb // Shutdown the DecayedLog's channeldb
d2.Stop() d2.Stop()
d3, _, hashedSecret3, err := startup(dbPath, dbFileName, false) d3, _, hashedSecret3, err := startup(dbPath, false)
if err != nil { if err != nil {
t.Fatalf("Unable to restart DecayedLog: %v", err) t.Fatalf("Unable to restart DecayedLog: %v", err)
} }
@ -304,9 +312,9 @@ func TestDecayedLogStartAndStop(t *testing.T) {
func TestDecayedLogStorageAndRetrieval(t *testing.T) { func TestDecayedLogStorageAndRetrieval(t *testing.T) {
t.Parallel() t.Parallel()
dbPath, dbFileName := tempDecayedLogPath(t) dbPath := tempDecayedLogPath(t)
d, _, hashedSecret, err := startup(dbPath, dbFileName, false) d, _, hashedSecret, err := startup(dbPath, false)
if err != nil { if err != nil {
t.Fatalf("Unable to start up DecayedLog: %v", err) t.Fatalf("Unable to start up DecayedLog: %v", err)
} }

View File

@ -10,8 +10,9 @@ import (
) )
const ( const (
channelDBName = "channel.db" channelDBName = "channel.db"
macaroonDBName = "macaroons.db" macaroonDBName = "macaroons.db"
decayedLogDbName = "sphinxreplay.db"
BoltBackend = "bolt" BoltBackend = "bolt"
EtcdBackend = "etcd" EtcdBackend = "etcd"
@ -23,6 +24,10 @@ const (
// NSMacaroonDB is the namespace name that we use for the macaroon DB. // NSMacaroonDB is the namespace name that we use for the macaroon DB.
NSMacaroonDB = "macaroondb" NSMacaroonDB = "macaroondb"
// NSDecayedLogDB is the namespace name that we use for the sphinx
// replay a.k.a. decayed log DB.
NSDecayedLogDB = "decayedlogdb"
) )
// DB holds database configuration for LND. // DB holds database configuration for LND.
@ -108,6 +113,10 @@ type DatabaseBackends struct {
// keys. // keys.
MacaroonDB kvdb.Backend MacaroonDB kvdb.Backend
// DecayedLogDB points to a database backend that stores the decayed log
// data.
DecayedLogDB kvdb.Backend
// Remote indicates whether the database backends are remote, possibly // Remote indicates whether the database backends are remote, possibly
// replicated instances or local bbolt backed databases. // replicated instances or local bbolt backed databases.
Remote bool Remote bool
@ -154,6 +163,7 @@ func (db *DB) GetBackends(ctx context.Context, chanDBPath,
ChanStateDB: etcdBackend, ChanStateDB: etcdBackend,
HeightHintDB: etcdBackend, HeightHintDB: etcdBackend,
MacaroonDB: etcdBackend, MacaroonDB: etcdBackend,
DecayedLogDB: etcdBackend,
Remote: true, Remote: true,
CloseFuncs: closeFuncs, CloseFuncs: closeFuncs,
}, nil }, nil
@ -186,12 +196,26 @@ func (db *DB) GetBackends(ctx context.Context, chanDBPath,
} }
closeFuncs[NSMacaroonDB] = macaroonBackend.Close closeFuncs[NSMacaroonDB] = macaroonBackend.Close
decayedLogBackend, err := kvdb.GetBoltBackend(&kvdb.BoltBackendConfig{
DBPath: chanDBPath,
DBFileName: decayedLogDbName,
DBTimeout: db.Bolt.DBTimeout,
NoFreelistSync: !db.Bolt.SyncFreelist,
AutoCompact: db.Bolt.AutoCompact,
AutoCompactMinAge: db.Bolt.AutoCompactMinAge,
})
if err != nil {
return nil, fmt.Errorf("error opening decayed log DB: %v", err)
}
closeFuncs[NSDecayedLogDB] = decayedLogBackend.Close
returnEarly = false returnEarly = false
return &DatabaseBackends{ return &DatabaseBackends{
GraphDB: boltBackend, GraphDB: boltBackend,
ChanStateDB: boltBackend, ChanStateDB: boltBackend,
HeightHintDB: boltBackend, HeightHintDB: boltBackend,
MacaroonDB: macaroonBackend, MacaroonDB: macaroonBackend,
DecayedLogDB: decayedLogBackend,
CloseFuncs: closeFuncs, CloseFuncs: closeFuncs,
}, nil }, nil
} }

2
lnd.go
View File

@ -1616,6 +1616,7 @@ type databaseInstances struct {
chanStateDB *channeldb.DB chanStateDB *channeldb.DB
heightHintDB kvdb.Backend heightHintDB kvdb.Backend
macaroonDB kvdb.Backend macaroonDB kvdb.Backend
decayedLogDB kvdb.Backend
} }
// initializeDatabases extracts the current databases that we'll use for normal // initializeDatabases extracts the current databases that we'll use for normal
@ -1649,6 +1650,7 @@ func initializeDatabases(ctx context.Context,
dbs := &databaseInstances{ dbs := &databaseInstances{
heightHintDB: databaseBackends.HeightHintDB, heightHintDB: databaseBackends.HeightHintDB,
macaroonDB: databaseBackends.MacaroonDB, macaroonDB: databaseBackends.MacaroonDB,
decayedLogDB: databaseBackends.DecayedLogDB,
} }
cleanUp := func() { cleanUp := func() {
// We can just close the returned close functions directly. Even // We can just close the returned close functions directly. Even

View File

@ -383,12 +383,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
var serializedPubKey [33]byte var serializedPubKey [33]byte
copy(serializedPubKey[:], nodeKeyECDH.PubKey().SerializeCompressed()) copy(serializedPubKey[:], nodeKeyECDH.PubKey().SerializeCompressed())
// Initialize the sphinx router, placing it's persistent replay log in // Initialize the sphinx router.
// the same directory as the channel graph database. We don't need to
// replicate this data, so we'll store it locally.
replayLog := htlcswitch.NewDecayedLog( replayLog := htlcswitch.NewDecayedLog(
cfg.graphDatabaseDir(), defaultSphinxDbName, cfg.DB.Bolt, dbs.decayedLogDB, cc.ChainNotifier,
cc.ChainNotifier,
) )
sphinxRouter := sphinx.NewRouter( sphinxRouter := sphinx.NewRouter(
nodeKeyECDH, cfg.ActiveNetParams.Params, replayLog, nodeKeyECDH, cfg.ActiveNetParams.Params, replayLog,