mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-01-18 21:35:24 +01:00
multi: add reset closure to kvdb.Update
Similarly as with kvdb.View this commits adds a reset closure to the kvdb.Update call in order to be able to reset external state if the underlying db backend needs to retry the transaction.
This commit is contained in:
parent
2a358327f4
commit
d89f51d1d0
@ -1265,7 +1265,7 @@ func (rs *retributionStore) Add(ret *retributionInfo) error {
|
||||
}
|
||||
|
||||
return retBucket.Put(outBuf.Bytes(), retBuf.Bytes())
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// Finalize writes a signed justice transaction to the retribution store. This
|
||||
@ -1290,7 +1290,7 @@ func (rs *retributionStore) Finalize(chanPoint *wire.OutPoint,
|
||||
}
|
||||
|
||||
return justiceBkt.Put(chanBuf.Bytes(), txBuf.Bytes())
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// GetFinalizedTxn loads the finalized justice transaction for the provided
|
||||
@ -1396,7 +1396,7 @@ func (rs *retributionStore) Remove(chanPoint *wire.OutPoint) error {
|
||||
}
|
||||
|
||||
return justiceBkt.Delete(chanBytes)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// ForAll iterates through all stored retributions and executes the passed
|
||||
|
@ -893,7 +893,7 @@ func (c *OpenChannel) MarkAsOpen(openLoc lnwire.ShortChannelID) error {
|
||||
channel.ShortChannelID = openLoc
|
||||
|
||||
return putOpenChannel(chanBucket.(kvdb.RwBucket), channel)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -1219,7 +1219,7 @@ func (c *OpenChannel) putChanStatus(status ChannelStatus,
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -1248,7 +1248,7 @@ func (c *OpenChannel) clearChanStatus(status ChannelStatus) error {
|
||||
channel.chanStatus = status
|
||||
|
||||
return putOpenChannel(chanBucket, channel)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -1356,7 +1356,7 @@ func (c *OpenChannel) SyncPending(addr net.Addr, pendingHeight uint32) error {
|
||||
|
||||
return kvdb.Update(c.Db, func(tx kvdb.RwTx) error {
|
||||
return syncNewChannel(tx, c, []net.Addr{addr})
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// syncNewChannel will write the passed channel to disk, and also create a
|
||||
@ -1490,7 +1490,7 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment,
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -2030,7 +2030,7 @@ func (c *OpenChannel) AppendRemoteCommitChain(diff *CommitDiff) error {
|
||||
return err
|
||||
}
|
||||
return chanBucket.Put(commitDiffKey, b.Bytes())
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// RemoteCommitChainTip returns the "tip" of the current remote commitment
|
||||
@ -2167,7 +2167,7 @@ func (c *OpenChannel) InsertNextRevocation(revKey *btcec.PublicKey) error {
|
||||
}
|
||||
|
||||
return putChanRevocationState(chanBucket, c)
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -2327,6 +2327,8 @@ func (c *OpenChannel) AdvanceCommitChainTail(fwdPkg *FwdPkg,
|
||||
newRemoteCommit = &newCommit.Commitment
|
||||
|
||||
return nil
|
||||
}, func() {
|
||||
newRemoteCommit = nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
@ -2393,7 +2395,7 @@ func (c *OpenChannel) AckAddHtlcs(addRefs ...AddRef) error {
|
||||
|
||||
return kvdb.Update(c.Db, func(tx kvdb.RwTx) error {
|
||||
return c.Packager.AckAddHtlcs(tx, addRefs...)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// AckSettleFails updates the SettleFailFilter containing any of the provided
|
||||
@ -2406,7 +2408,7 @@ func (c *OpenChannel) AckSettleFails(settleFailRefs ...SettleFailRef) error {
|
||||
|
||||
return kvdb.Update(c.Db, func(tx kvdb.RwTx) error {
|
||||
return c.Packager.AckSettleFails(tx, settleFailRefs...)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// SetFwdFilter atomically sets the forwarding filter for the forwarding package
|
||||
@ -2417,7 +2419,7 @@ func (c *OpenChannel) SetFwdFilter(height uint64, fwdFilter *PkgFilter) error {
|
||||
|
||||
return kvdb.Update(c.Db, func(tx kvdb.RwTx) error {
|
||||
return c.Packager.SetFwdFilter(tx, height, fwdFilter)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// RemoveFwdPkgs atomically removes forwarding packages specified by the remote
|
||||
@ -2438,7 +2440,7 @@ func (c *OpenChannel) RemoveFwdPkgs(heights ...uint64) error {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// RevocationLogTail returns the "tail", or the end of the current revocation
|
||||
@ -2799,7 +2801,7 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary,
|
||||
return putChannelCloseSummary(
|
||||
tx, chanPointBuf.Bytes(), summary, chanState,
|
||||
)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// ChannelSnapshot is a frozen snapshot of the current channel state. A
|
||||
|
@ -1447,7 +1447,7 @@ func TestBalanceAtHeight(t *testing.T) {
|
||||
commit.RemoteBalance = remote
|
||||
|
||||
return appendChannelLogEntry(logBucket, &commit)
|
||||
})
|
||||
}, func() {})
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -191,11 +191,16 @@ type DB struct {
|
||||
|
||||
// Update is a wrapper around walletdb.Update which calls into the extended
|
||||
// backend when available. This call is needed to be able to cast DB to
|
||||
// ExtendedBackend.
|
||||
func (db *DB) Update(f func(tx walletdb.ReadWriteTx) error) error {
|
||||
// ExtendedBackend. The passed reset function is called before the start of the
|
||||
// transaction and can be used to reset intermediate state. As callers may
|
||||
// expect retries of the f closure (depending on the database backend used), the
|
||||
// reset function will be called before each retry respectively.
|
||||
func (db *DB) Update(f func(tx walletdb.ReadWriteTx) error, reset func()) error {
|
||||
if v, ok := db.Backend.(kvdb.ExtendedBackend); ok {
|
||||
return v.Update(f)
|
||||
return v.Update(f, reset)
|
||||
}
|
||||
|
||||
reset()
|
||||
return walletdb.Update(db, f)
|
||||
}
|
||||
|
||||
@ -310,7 +315,7 @@ func (d *DB) Wipe() error {
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// createChannelDB creates and initializes a fresh version of channeldb. In
|
||||
@ -364,7 +369,7 @@ func initChannelDB(db kvdb.Backend) error {
|
||||
|
||||
meta.DbVersionNumber = getLatestDBVersion(dbVersions)
|
||||
return putMeta(meta, tx)
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create new channeldb: %v", err)
|
||||
}
|
||||
@ -939,7 +944,7 @@ func (d *DB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
|
||||
// garbage collect it to ensure we don't establish persistent
|
||||
// connections to peers without open channels.
|
||||
return d.pruneLinkNode(tx, chanSummary.RemotePub)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// pruneLinkNode determines whether we should garbage collect a link node from
|
||||
@ -979,7 +984,7 @@ func (d *DB) PruneLinkNodes() error {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// ChannelShell is a shell of a channel that is meant to be used for channel
|
||||
@ -1026,7 +1031,7 @@ func (d *DB) RestoreChannelShells(channelShells ...*ChannelShell) error {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1210,7 +1215,7 @@ func (d *DB) syncVersions(versions []version) error {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// ChannelGraph returns a new instance of the directed channel graph.
|
||||
|
@ -209,7 +209,7 @@ func TestPackagerEmptyFwdPkg(t *testing.T) {
|
||||
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.AddFwdPkg(tx, fwdPkg)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to add fwd pkg: %v", err)
|
||||
}
|
||||
|
||||
@ -228,7 +228,7 @@ func TestPackagerEmptyFwdPkg(t *testing.T) {
|
||||
// fwd filter.
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.SetFwdFilter(tx, fwdPkg.Height, fwdPkg.FwdFilter)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to set fwdfiter: %v", err)
|
||||
}
|
||||
|
||||
@ -246,7 +246,7 @@ func TestPackagerEmptyFwdPkg(t *testing.T) {
|
||||
// Lastly, remove the completed forwarding package from disk.
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.RemovePkg(tx, fwdPkg.Height)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to remove fwdpkg: %v", err)
|
||||
}
|
||||
|
||||
@ -281,7 +281,7 @@ func TestPackagerOnlyAdds(t *testing.T) {
|
||||
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.AddFwdPkg(tx, fwdPkg)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to add fwd pkg: %v", err)
|
||||
}
|
||||
|
||||
@ -302,7 +302,7 @@ func TestPackagerOnlyAdds(t *testing.T) {
|
||||
// was failed locally.
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.SetFwdFilter(tx, fwdPkg.Height, fwdPkg.FwdFilter)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to set fwdfiter: %v", err)
|
||||
}
|
||||
|
||||
@ -326,7 +326,7 @@ func TestPackagerOnlyAdds(t *testing.T) {
|
||||
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.AckAddHtlcs(tx, addRef)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to ack add htlc: %v", err)
|
||||
}
|
||||
}
|
||||
@ -345,7 +345,7 @@ func TestPackagerOnlyAdds(t *testing.T) {
|
||||
// Lastly, remove the completed forwarding package from disk.
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.RemovePkg(tx, fwdPkg.Height)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to remove fwdpkg: %v", err)
|
||||
}
|
||||
|
||||
@ -383,7 +383,7 @@ func TestPackagerOnlySettleFails(t *testing.T) {
|
||||
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.AddFwdPkg(tx, fwdPkg)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to add fwd pkg: %v", err)
|
||||
}
|
||||
|
||||
@ -404,7 +404,7 @@ func TestPackagerOnlySettleFails(t *testing.T) {
|
||||
// was failed locally.
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.SetFwdFilter(tx, fwdPkg.Height, fwdPkg.FwdFilter)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to set fwdfiter: %v", err)
|
||||
}
|
||||
|
||||
@ -430,7 +430,7 @@ func TestPackagerOnlySettleFails(t *testing.T) {
|
||||
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.AckSettleFails(tx, failSettleRef)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to ack add htlc: %v", err)
|
||||
}
|
||||
}
|
||||
@ -450,7 +450,7 @@ func TestPackagerOnlySettleFails(t *testing.T) {
|
||||
// Lastly, remove the completed forwarding package from disk.
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.RemovePkg(tx, fwdPkg.Height)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to remove fwdpkg: %v", err)
|
||||
}
|
||||
|
||||
@ -488,7 +488,7 @@ func TestPackagerAddsThenSettleFails(t *testing.T) {
|
||||
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.AddFwdPkg(tx, fwdPkg)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to add fwd pkg: %v", err)
|
||||
}
|
||||
|
||||
@ -509,7 +509,7 @@ func TestPackagerAddsThenSettleFails(t *testing.T) {
|
||||
// was failed locally.
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.SetFwdFilter(tx, fwdPkg.Height, fwdPkg.FwdFilter)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to set fwdfiter: %v", err)
|
||||
}
|
||||
|
||||
@ -534,7 +534,7 @@ func TestPackagerAddsThenSettleFails(t *testing.T) {
|
||||
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.AckAddHtlcs(tx, addRef)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to ack add htlc: %v", err)
|
||||
}
|
||||
}
|
||||
@ -561,7 +561,7 @@ func TestPackagerAddsThenSettleFails(t *testing.T) {
|
||||
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.AckSettleFails(tx, failSettleRef)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to remove settle/fail htlc: %v", err)
|
||||
}
|
||||
}
|
||||
@ -581,7 +581,7 @@ func TestPackagerAddsThenSettleFails(t *testing.T) {
|
||||
// Lastly, remove the completed forwarding package from disk.
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.RemovePkg(tx, fwdPkg.Height)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to remove fwdpkg: %v", err)
|
||||
}
|
||||
|
||||
@ -621,7 +621,7 @@ func TestPackagerSettleFailsThenAdds(t *testing.T) {
|
||||
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.AddFwdPkg(tx, fwdPkg)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to add fwd pkg: %v", err)
|
||||
}
|
||||
|
||||
@ -642,7 +642,7 @@ func TestPackagerSettleFailsThenAdds(t *testing.T) {
|
||||
// was failed locally.
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.SetFwdFilter(tx, fwdPkg.Height, fwdPkg.FwdFilter)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to set fwdfiter: %v", err)
|
||||
}
|
||||
|
||||
@ -671,7 +671,7 @@ func TestPackagerSettleFailsThenAdds(t *testing.T) {
|
||||
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.AckSettleFails(tx, failSettleRef)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to remove settle/fail htlc: %v", err)
|
||||
}
|
||||
}
|
||||
@ -698,7 +698,7 @@ func TestPackagerSettleFailsThenAdds(t *testing.T) {
|
||||
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.AckAddHtlcs(tx, addRef)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to ack add htlc: %v", err)
|
||||
}
|
||||
}
|
||||
@ -718,7 +718,7 @@ func TestPackagerSettleFailsThenAdds(t *testing.T) {
|
||||
// Lastly, remove the completed forwarding package from disk.
|
||||
if err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return packager.RemovePkg(tx, fwdPkg.Height)
|
||||
}); err != nil {
|
||||
}, func() {}); err != nil {
|
||||
t.Fatalf("unable to remove fwdpkg: %v", err)
|
||||
}
|
||||
|
||||
|
@ -434,7 +434,7 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error {
|
||||
// Finally, we commit the information of the lightning node
|
||||
// itself.
|
||||
return addLightningNode(tx, node)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// AddLightningNode adds a vertex/node to the graph database. If the node is not
|
||||
@ -448,7 +448,7 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error {
|
||||
func (c *ChannelGraph) AddLightningNode(node *LightningNode) error {
|
||||
return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
|
||||
return addLightningNode(tx, node)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
func addLightningNode(tx kvdb.RwTx, node *LightningNode) error {
|
||||
@ -519,7 +519,7 @@ func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error {
|
||||
}
|
||||
|
||||
return c.deleteLightningNode(nodes, nodePub[:])
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// deleteLightningNode uses an existing database transaction to remove a
|
||||
@ -579,7 +579,7 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
|
||||
|
||||
err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
|
||||
return c.addChannelEdge(tx, edge)
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -820,7 +820,7 @@ func (c *ChannelGraph) UpdateChannelEdge(edge *ChannelEdgeInfo) error {
|
||||
}
|
||||
|
||||
return putChanEdgeInfo(edgeIndex, edge, chanKey)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
const (
|
||||
@ -943,6 +943,8 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
|
||||
// prune any nodes that have had a channel closed within the
|
||||
// latest block.
|
||||
return c.pruneGraphNodes(nodes, edgeIndex)
|
||||
}, func() {
|
||||
chansClosed = nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -976,7 +978,7 @@ func (c *ChannelGraph) PruneGraphNodes() error {
|
||||
}
|
||||
|
||||
return c.pruneGraphNodes(nodes, edgeIndex)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// pruneGraphNodes attempts to remove any nodes from the graph who have had a
|
||||
@ -1195,6 +1197,8 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf
|
||||
}
|
||||
|
||||
return nil
|
||||
}, func() {
|
||||
removedChans = nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -1297,7 +1301,7 @@ func (c *ChannelGraph) DeleteChannelEdges(chanIDs ...uint64) error {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1936,6 +1940,8 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error {
|
||||
var err error
|
||||
isUpdate1, err = updateEdgePolicy(tx, edge)
|
||||
return err
|
||||
}, func() {
|
||||
isUpdate1 = false
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
@ -3268,7 +3274,7 @@ func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
|
||||
var k [8]byte
|
||||
byteOrder.PutUint64(k[:], chanID)
|
||||
return zombieIndex.Delete(k[:])
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -2894,7 +2894,7 @@ func TestEdgePolicyMissingMaxHtcl(t *testing.T) {
|
||||
}
|
||||
|
||||
return edges.Put(edgeKey[:], stripped)
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
t.Fatalf("error writing db: %v", err)
|
||||
}
|
||||
|
@ -562,6 +562,8 @@ func (d *DB) AddInvoice(newInvoice *Invoice, paymentHash lntypes.Hash) (
|
||||
|
||||
invoiceAddIndex = newIndex
|
||||
return nil
|
||||
}, func() {
|
||||
invoiceAddIndex = 0
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@ -950,6 +952,8 @@ func (d *DB) UpdateInvoice(ref InvoiceRef,
|
||||
)
|
||||
|
||||
return err
|
||||
}, func() {
|
||||
updatedInvoice = nil
|
||||
})
|
||||
|
||||
return updatedInvoice, err
|
||||
@ -1866,7 +1870,7 @@ func (d *DB) DeleteInvoice(invoicesToDelete []InvoiceDeleteRef) error {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -234,13 +234,15 @@ func (db *db) View(f func(tx walletdb.ReadTx) error, reset func()) error {
|
||||
}
|
||||
|
||||
// Update opens a database read/write transaction and executes the function f
|
||||
// with the transaction passed as a parameter. After f exits, if f did not
|
||||
// error, the transaction is committed. Otherwise, if f did error, the
|
||||
// transaction is rolled back. If the rollback fails, the original error
|
||||
// returned by f is still returned. If the commit fails, the commit error is
|
||||
// returned.
|
||||
func (db *db) Update(f func(tx walletdb.ReadWriteTx) error) error {
|
||||
// with the transaction passed as a parameter. After f exits, if f did not
|
||||
// error, the transaction is committed. Otherwise, if f did error, the
|
||||
// transaction is rolled back. If the rollback fails, the original error
|
||||
// returned by f is still returned. If the commit fails, the commit error is
|
||||
// returned. As callers may expect retries of the f closure, the reset function
|
||||
// will be called before each retry respectively.
|
||||
func (db *db) Update(f func(tx walletdb.ReadWriteTx) error, reset func()) error {
|
||||
apply := func(stm STM) error {
|
||||
reset()
|
||||
return f(newReadWriteTx(stm, db.config.Prefix))
|
||||
}
|
||||
|
||||
@ -304,5 +306,5 @@ func (db *db) Close() error {
|
||||
//
|
||||
// Batch is only useful when there are multiple goroutines calling it.
|
||||
func (db *db) Batch(apply func(tx walletdb.ReadWriteTx) error) error {
|
||||
return db.Update(apply)
|
||||
return db.Update(apply, func() {})
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ func TestCopy(t *testing.T) {
|
||||
|
||||
require.NoError(t, apple.Put([]byte("key"), []byte("val")))
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
|
||||
// Expect non-zero copy.
|
||||
var buf bytes.Buffer
|
||||
@ -66,7 +66,7 @@ func TestAbortContext(t *testing.T) {
|
||||
require.Error(t, err, "context canceled")
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
|
||||
require.Error(t, err, "context canceled")
|
||||
|
||||
|
@ -79,7 +79,7 @@ func TestBucketCreation(t *testing.T) {
|
||||
require.NotNil(t, apple.NestedReadWriteBucket([]byte("banana")))
|
||||
require.NotNil(t, apple.NestedReadBucket([]byte("banana")))
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
|
||||
require.Nil(t, err)
|
||||
|
||||
@ -189,7 +189,7 @@ func TestBucketDeletion(t *testing.T) {
|
||||
// "aple/banana" exists
|
||||
require.NotNil(t, apple.NestedReadWriteBucket([]byte("banana")))
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
|
||||
require.Nil(t, err)
|
||||
|
||||
@ -261,7 +261,7 @@ func TestBucketForEach(t *testing.T) {
|
||||
require.Equal(t, expected, got)
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
|
||||
require.Nil(t, err)
|
||||
|
||||
@ -354,7 +354,7 @@ func TestBucketForEachWithError(t *testing.T) {
|
||||
require.Equal(t, expected, got)
|
||||
require.Error(t, err)
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
|
||||
require.Nil(t, err)
|
||||
|
||||
@ -399,7 +399,7 @@ func TestBucketSequence(t *testing.T) {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
|
||||
require.Nil(t, err)
|
||||
}
|
||||
@ -431,7 +431,7 @@ func TestKeyClash(t *testing.T) {
|
||||
require.NotNil(t, banana)
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
|
||||
require.Nil(t, err)
|
||||
|
||||
@ -457,7 +457,7 @@ func TestKeyClash(t *testing.T) {
|
||||
require.Error(t, walletdb.ErrIncompatibleValue, b)
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
|
||||
require.Nil(t, err)
|
||||
|
||||
@ -494,7 +494,7 @@ func TestBucketCreateDelete(t *testing.T) {
|
||||
require.NotNil(t, banana)
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.Update(func(tx walletdb.ReadWriteTx) error {
|
||||
@ -503,7 +503,7 @@ func TestBucketCreateDelete(t *testing.T) {
|
||||
require.NoError(t, apple.DeleteNestedBucket([]byte("banana")))
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.Update(func(tx walletdb.ReadWriteTx) error {
|
||||
@ -512,7 +512,7 @@ func TestBucketCreateDelete(t *testing.T) {
|
||||
require.NoError(t, apple.Put([]byte("banana"), []byte("value")))
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
require.NoError(t, err)
|
||||
|
||||
expected := map[string]string{
|
||||
|
@ -24,7 +24,7 @@ func TestReadCursorEmptyInterval(t *testing.T) {
|
||||
require.NotNil(t, b)
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.View(func(tx walletdb.ReadTx) error {
|
||||
@ -78,7 +78,7 @@ func TestReadCursorNonEmptyInterval(t *testing.T) {
|
||||
require.NoError(t, b.Put([]byte(kv.key), []byte(kv.val)))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -162,7 +162,7 @@ func TestReadWriteCursor(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
return nil
|
||||
}))
|
||||
}, func() {}))
|
||||
|
||||
err = db.Update(func(tx walletdb.ReadWriteTx) error {
|
||||
b := tx.ReadWriteBucket([]byte("apple"))
|
||||
@ -276,7 +276,7 @@ func TestReadWriteCursor(t *testing.T) {
|
||||
require.Equal(t, reverseKVs(expected), kvs)
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -320,7 +320,7 @@ func TestReadWriteCursorWithBucketAndValue(t *testing.T) {
|
||||
require.NotNil(t, b2)
|
||||
|
||||
return nil
|
||||
}))
|
||||
}, func() {}))
|
||||
|
||||
err = db.View(func(tx walletdb.ReadTx) error {
|
||||
b := tx.ReadBucket([]byte("apple"))
|
||||
|
@ -142,7 +142,7 @@ func TestChangeDuringUpdate(t *testing.T) {
|
||||
|
||||
count++
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, count, 2)
|
||||
|
@ -10,11 +10,15 @@ import (
|
||||
// error, the transaction is committed. Otherwise, if f did error, the
|
||||
// transaction is rolled back. If the rollback fails, the original error
|
||||
// returned by f is still returned. If the commit fails, the commit error is
|
||||
// returned.
|
||||
func Update(db Backend, f func(tx RwTx) error) error {
|
||||
// returned. As callers may expect retries of the f closure (depending on the
|
||||
// database backend used), the reset function will be called before each retry
|
||||
// respectively.
|
||||
func Update(db Backend, f func(tx RwTx) error, reset func()) error {
|
||||
if extendedDB, ok := db.(ExtendedBackend); ok {
|
||||
return extendedDB.Update(f)
|
||||
return extendedDB.Update(f, reset)
|
||||
}
|
||||
|
||||
reset()
|
||||
return walletdb.Update(db, f)
|
||||
}
|
||||
|
||||
@ -72,13 +76,15 @@ type ExtendedBackend interface {
|
||||
//called before each retry respectively.
|
||||
View(f func(tx walletdb.ReadTx) error, reset func()) error
|
||||
|
||||
// Update opens a database read/write transaction and executes the function
|
||||
// f with the transaction passed as a parameter. After f exits, if f did not
|
||||
// error, the transaction is committed. Otherwise, if f did error, the
|
||||
// transaction is rolled back. If the rollback fails, the original error
|
||||
// returned by f is still returned. If the commit fails, the commit error is
|
||||
// returned.
|
||||
Update(f func(tx walletdb.ReadWriteTx) error) error
|
||||
// Update opens a database read/write transaction and executes the
|
||||
// function f with the transaction passed as a parameter. After f exits,
|
||||
// if f did not error, the transaction is committed. Otherwise, if f did
|
||||
// error, the transaction is rolled back. If the rollback fails, the
|
||||
// original error returned by f is still returned. If the commit fails,
|
||||
// the commit error is returned. As callers may expect retries of the f
|
||||
// closure (depending on the database backend used), the reset function
|
||||
// will be called before each retry respectively.
|
||||
Update(f func(tx walletdb.ReadWriteTx) error, reset func()) error
|
||||
}
|
||||
|
||||
// Open opens an existing database for the specified type. The arguments are
|
||||
|
@ -60,7 +60,7 @@ func fetchMeta(meta *Meta, tx kvdb.RTx) error {
|
||||
func (d *DB) PutMeta(meta *Meta) error {
|
||||
return kvdb.Update(d, func(tx kvdb.RwTx) error {
|
||||
return putMeta(meta, tx)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// putMeta is an internal helper function used in order to allow callers to
|
||||
|
@ -209,7 +209,7 @@ func TestMigrationWithPanic(t *testing.T) {
|
||||
}
|
||||
|
||||
return bucket.Put(keyPrefix, beforeMigration)
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
t.Fatalf("unable to insert: %v", err)
|
||||
}
|
||||
@ -251,7 +251,7 @@ func TestMigrationWithPanic(t *testing.T) {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -283,7 +283,7 @@ func TestMigrationWithFatal(t *testing.T) {
|
||||
}
|
||||
|
||||
return bucket.Put(keyPrefix, beforeMigration)
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
t.Fatalf("unable to insert pre migration key: %v", err)
|
||||
}
|
||||
@ -326,7 +326,7 @@ func TestMigrationWithFatal(t *testing.T) {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -359,7 +359,7 @@ func TestMigrationWithoutErrors(t *testing.T) {
|
||||
}
|
||||
|
||||
return bucket.Put(keyPrefix, beforeMigration)
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
t.Fatalf("unable to update db pre migration: %v", err)
|
||||
}
|
||||
@ -401,7 +401,7 @@ func TestMigrationWithoutErrors(t *testing.T) {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -448,7 +448,7 @@ func TestMigrationReversion(t *testing.T) {
|
||||
}
|
||||
|
||||
return putMeta(newMeta, tx)
|
||||
})
|
||||
}, func() {})
|
||||
|
||||
// Close the database. Even if we succeeded, our next step is to reopen.
|
||||
cdb.Close()
|
||||
|
@ -152,7 +152,7 @@ func createChannelDB(dbPath string) error {
|
||||
DbVersionNumber: 0,
|
||||
}
|
||||
return putMeta(meta, tx)
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create new channeldb")
|
||||
}
|
||||
|
@ -244,7 +244,7 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error {
|
||||
// Finally, we commit the information of the lightning node
|
||||
// itself.
|
||||
return addLightningNode(tx, node)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
func addLightningNode(tx kvdb.RwTx, node *LightningNode) error {
|
||||
|
@ -51,7 +51,7 @@ func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB),
|
||||
// Apply migration.
|
||||
err = kvdb.Update(cdb, func(tx kvdb.RwTx) error {
|
||||
return migrationFunc(tx)
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
|
@ -95,7 +95,7 @@ func (db *DB) addPayment(payment *outgoingPayment) error {
|
||||
binary.BigEndian.PutUint64(paymentIDBytes, paymentID)
|
||||
|
||||
return payments.Put(paymentIDBytes, paymentBytes)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// fetchAllPayments returns all outgoing payments in DB.
|
||||
|
@ -55,7 +55,7 @@ func beforeMigrationFuncV11(t *testing.T, d *DB, invoices []Invoice) {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -125,7 +125,7 @@ func TestPaymentStatusesMigration(t *testing.T) {
|
||||
}
|
||||
|
||||
return circuits.Put(inFlightKey, inFlightCircuit)
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
t.Fatalf("unable to add circuit map entry: %v", err)
|
||||
}
|
||||
@ -385,7 +385,7 @@ func TestMigrateOptionalChannelCloseSummaryFields(t *testing.T) {
|
||||
return err
|
||||
}
|
||||
return closedChanBucket.Put(chanID, old)
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
t.Fatalf("unable to add old serialization: %v",
|
||||
err)
|
||||
@ -493,7 +493,7 @@ func TestMigrateGossipMessageStoreKeys(t *testing.T) {
|
||||
}
|
||||
|
||||
return messageStore.Put(oldMsgKey[:], b.Bytes())
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -683,7 +683,7 @@ func TestOutgoingPaymentsMigration(t *testing.T) {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -859,6 +859,8 @@ func TestPaymentRouteSerialization(t *testing.T) {
|
||||
}
|
||||
|
||||
return nil
|
||||
}, func() {
|
||||
oldPayments = nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create test payments: %v", err)
|
||||
|
@ -47,7 +47,7 @@ func ApplyMigration(t *testing.T,
|
||||
|
||||
// beforeMigration usually used for populating the database
|
||||
// with test data.
|
||||
err = kvdb.Update(cdb, beforeMigration)
|
||||
err = kvdb.Update(cdb, beforeMigration, func() {})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -65,14 +65,14 @@ func ApplyMigration(t *testing.T,
|
||||
|
||||
// afterMigration usually used for checking the database state and
|
||||
// throwing the error if something went wrong.
|
||||
err = kvdb.Update(cdb, afterMigration)
|
||||
err = kvdb.Update(cdb, afterMigration, func() {})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Apply migration.
|
||||
err = kvdb.Update(cdb, migrationFunc)
|
||||
err = kvdb.Update(cdb, migrationFunc, func() {})
|
||||
if err != nil {
|
||||
t.Logf("migration error: %v", err)
|
||||
}
|
||||
|
@ -108,7 +108,7 @@ func (l *LinkNode) Sync() error {
|
||||
}
|
||||
|
||||
return putLinkNode(nodeMetaBucket, l)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// putLinkNode serializes then writes the encoded version of the passed link
|
||||
@ -132,7 +132,7 @@ func putLinkNode(nodeMetaBucket kvdb.RwBucket, l *LinkNode) error {
|
||||
func (db *DB) DeleteLinkNode(identity *btcec.PublicKey) error {
|
||||
return kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
return db.deleteLinkNode(tx, identity)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
func (db *DB) deleteLinkNode(tx kvdb.RwTx, identity *btcec.PublicKey) error {
|
||||
|
@ -749,7 +749,7 @@ func (db *DB) DeletePayments() error {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// fetchSequenceNumbers fetches all the sequence numbers associated with a
|
||||
|
@ -183,7 +183,7 @@ func deletePayment(t *testing.T, db *DB, paymentHash lntypes.Hash, seqNr uint64)
|
||||
// Delete the index that references this payment.
|
||||
indexes := tx.ReadWriteBucket(paymentsIndexBucket)
|
||||
return indexes.Delete(key)
|
||||
})
|
||||
}, func() {})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("could not delete "+
|
||||
@ -622,7 +622,7 @@ func TestFetchPaymentWithSequenceNumber(t *testing.T) {
|
||||
tx, test.paymentHash, seqNrBytes[:],
|
||||
)
|
||||
return err
|
||||
})
|
||||
}, func() {})
|
||||
require.Equal(t, test.expectedErr, err)
|
||||
})
|
||||
}
|
||||
@ -666,7 +666,7 @@ func appendDuplicatePayment(t *testing.T, db *DB, paymentHash lntypes.Hash,
|
||||
require.NoError(t, err)
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
t.Fatalf("could not create payment: %v", err)
|
||||
}
|
||||
|
@ -80,7 +80,7 @@ func (d *DB) WriteFlapCounts(flapCounts map[route.Vertex]*FlapCount) error {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// ReadFlapCount attempts to read the flap count for a peer, failing if the
|
||||
|
@ -130,7 +130,7 @@ func (d *DB) PutResolverReport(tx kvdb.RwTx, chainHash chainhash.Hash,
|
||||
|
||||
// If the transaction is nil, we'll create a new one.
|
||||
if tx == nil {
|
||||
return kvdb.Update(d, putReportFunc)
|
||||
return kvdb.Update(d, putReportFunc, func() {})
|
||||
}
|
||||
|
||||
// Otherwise, we can write the report to disk using the existing
|
||||
|
@ -202,7 +202,7 @@ func TestFetchChannelWriteBucket(t *testing.T) {
|
||||
defer cleanup()
|
||||
|
||||
// Update our db to the starting state we expect.
|
||||
err = kvdb.Update(db, test.setup)
|
||||
err = kvdb.Update(db, test.setup, func() {})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Try to get our report bucket.
|
||||
@ -211,7 +211,7 @@ func TestFetchChannelWriteBucket(t *testing.T) {
|
||||
tx, testChainHash, &testChanPoint1,
|
||||
)
|
||||
return err
|
||||
})
|
||||
}, func() {})
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
@ -80,7 +80,7 @@ func (s *WaitingProofStore) Add(proof *WaitingProof) error {
|
||||
key := proof.Key()
|
||||
|
||||
return bucket.Put(key[:], b.Bytes())
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -109,7 +109,7 @@ func (s *WaitingProofStore) Remove(key WaitingProofKey) error {
|
||||
}
|
||||
|
||||
return bucket.Delete(key[:])
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -924,7 +924,7 @@ func (b *boltArbitratorLog) WipeHistory() error {
|
||||
|
||||
// Finally, we'll delete the enclosing bucket itself.
|
||||
return tx.DeleteTopLevelBucket(b.scopeKey[:])
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// checkpointContract is a private method that will be fed into
|
||||
@ -951,7 +951,7 @@ func (b *boltArbitratorLog) checkpointContract(c ContractResolver,
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
func encodeIncomingResolution(w io.Writer, i *lnwallet.IncomingHtlcResolution) error {
|
||||
|
@ -239,7 +239,7 @@ func TestMessageStoreUnsupportedMessage(t *testing.T) {
|
||||
err = kvdb.Update(msgStore.db, func(tx kvdb.RwTx) error {
|
||||
messageStore := tx.ReadWriteBucket(messageStoreBucket)
|
||||
return messageStore.Put(msgKey, rawMsg.Bytes())
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
t.Fatalf("unable to add unsupported message to store: %v", err)
|
||||
}
|
||||
|
@ -3505,7 +3505,7 @@ func (f *fundingManager) saveChannelOpeningState(chanPoint *wire.OutPoint,
|
||||
byteOrder.PutUint64(scratch[2:], shortChanID.ToUint64())
|
||||
|
||||
return bucket.Put(outpointBytes.Bytes(), scratch)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// getChannelOpeningState fetches the channelOpeningState for the provided
|
||||
@ -3560,5 +3560,5 @@ func (f *fundingManager) deleteChannelOpeningState(chanPoint *wire.OutPoint) err
|
||||
}
|
||||
|
||||
return bucket.Delete(outpointBytes.Bytes())
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
@ -227,7 +227,7 @@ func (cm *circuitMap) initBuckets() error {
|
||||
|
||||
_, err = tx.CreateTopLevelBucket(circuitAddKey)
|
||||
return err
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// restoreMemState loads the contents of the half circuit and full circuit
|
||||
@ -240,8 +240,8 @@ func (cm *circuitMap) restoreMemState() error {
|
||||
log.Infof("Restoring in-memory circuit state from disk")
|
||||
|
||||
var (
|
||||
opened = make(map[CircuitKey]*PaymentCircuit)
|
||||
pending = make(map[CircuitKey]*PaymentCircuit)
|
||||
opened map[CircuitKey]*PaymentCircuit
|
||||
pending map[CircuitKey]*PaymentCircuit
|
||||
)
|
||||
|
||||
if err := kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error {
|
||||
@ -331,6 +331,9 @@ func (cm *circuitMap) restoreMemState() error {
|
||||
|
||||
return nil
|
||||
|
||||
}, func() {
|
||||
opened = make(map[CircuitKey]*PaymentCircuit)
|
||||
pending = make(map[CircuitKey]*PaymentCircuit)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -483,7 +486,7 @@ func (cm *circuitMap) TrimOpenCircuits(chanID lnwire.ShortChannelID,
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// LookupByHTLC looks up the payment circuit by the outgoing channel and HTLC
|
||||
@ -730,7 +733,7 @@ func (cm *circuitMap) OpenCircuits(keystones ...Keystone) error {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -131,7 +131,7 @@ func (d *DecayedLog) initBuckets() error {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// Stop halts the garbage collector and closes boltdb.
|
||||
|
@ -305,5 +305,5 @@ func (store *networkResultStore) cleanStore(keep map[uint64]struct{}) error {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
@ -100,6 +100,8 @@ func (s *persistentSequencer) NextID() (uint64, error) {
|
||||
nextIDBkt.SetSequence(nextHorizonID)
|
||||
|
||||
return nil
|
||||
}, func() {
|
||||
nextHorizonID = 0
|
||||
}); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -124,5 +126,5 @@ func (s *persistentSequencer) initDB() error {
|
||||
return kvdb.Update(s.db, func(tx kvdb.RwTx) error {
|
||||
_, err := tx.CreateTopLevelBucket(nextPaymentIDKey)
|
||||
return err
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
@ -62,7 +62,7 @@ func NewRootKeyStorage(db kvdb.Backend) (*RootKeyStorage, error) {
|
||||
err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
_, err := tx.CreateTopLevelBucket(rootKeyBucketName)
|
||||
return err
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -123,7 +123,7 @@ func (r *RootKeyStorage) CreateUnlock(password *[]byte) error {
|
||||
|
||||
r.encKey = encKey
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// Get implements the Get method for the bakery.RootKeyStorage interface.
|
||||
@ -211,6 +211,8 @@ func (r *RootKeyStorage) RootKey(ctx context.Context) ([]byte, []byte, error) {
|
||||
return err
|
||||
}
|
||||
return ns.Put(id, encKey)
|
||||
}, func() {
|
||||
rootKey = nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
@ -310,6 +312,8 @@ func (r *RootKeyStorage) DeleteMacaroonID(
|
||||
rootKeyIDDeleted = rootKeyID
|
||||
|
||||
return nil
|
||||
}, func() {
|
||||
rootKeyIDDeleted = nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -283,7 +283,7 @@ func (ns *nurseryStore) Incubate(kids []kidOutput, babies []babyOutput) error {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// CribToKinder atomically moves a babyOutput in the crib bucket to the
|
||||
@ -365,7 +365,7 @@ func (ns *nurseryStore) CribToKinder(bby *babyOutput) error {
|
||||
// This informs the utxo nursery that it should attempt to spend
|
||||
// this output when the blockchain reaches the maturity height.
|
||||
return hghtChanBucketCsv.Put(pfxOutputKey, []byte{})
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// PreschoolToKinder atomically moves a kidOutput from the preschool bucket to
|
||||
@ -463,7 +463,7 @@ func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput,
|
||||
// that this CSV delayed output will be ready to broadcast at
|
||||
// the maturity height, after a brief period of incubation.
|
||||
return hghtChanBucket.Put(pfxOutputKey, []byte{})
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// GraduateKinder atomically moves an output at the provided height into the
|
||||
@ -525,7 +525,7 @@ func (ns *nurseryStore) GraduateKinder(height uint32, kid *kidOutput) error {
|
||||
// using graduate-prefixed key.
|
||||
return chanBucket.Put(pfxOutputKey,
|
||||
gradBuffer.Bytes())
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// FetchClass returns a list of babyOutputs in the crib bucket whose CLTV
|
||||
@ -844,7 +844,7 @@ func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error {
|
||||
}
|
||||
|
||||
return removeBucketIfExists(chanIndex, chanBytes)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// Helper Methods
|
||||
|
@ -41,10 +41,7 @@ type missionControlStore struct {
|
||||
}
|
||||
|
||||
func newMissionControlStore(db kvdb.Backend, maxRecords int) (*missionControlStore, error) {
|
||||
store := &missionControlStore{
|
||||
db: db,
|
||||
maxRecords: maxRecords,
|
||||
}
|
||||
var store *missionControlStore
|
||||
|
||||
// Create buckets if not yet existing.
|
||||
err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
||||
@ -64,6 +61,11 @@ func newMissionControlStore(db kvdb.Backend, maxRecords int) (*missionControlSto
|
||||
}
|
||||
|
||||
return nil
|
||||
}, func() {
|
||||
store = &missionControlStore{
|
||||
db: db,
|
||||
maxRecords: maxRecords,
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -81,7 +83,7 @@ func (b *missionControlStore) clear() error {
|
||||
|
||||
_, err := tx.CreateTopLevelBucket(resultsKey)
|
||||
return err
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// fetchAll returns all results currently stored in the database.
|
||||
@ -251,7 +253,7 @@ func (b *missionControlStore) AddResult(rp *paymentResult) error {
|
||||
|
||||
// Put into results bucket.
|
||||
return bucket.Put(k, v)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// getResultKey returns a byte slice representing a unique key for this payment
|
||||
|
@ -92,7 +92,7 @@ func NewSweeperStore(db kvdb.Backend, chainHash *chainhash.Hash) (
|
||||
err = migrateTxHashes(tx, txHashesBucket, chainHash)
|
||||
|
||||
return err
|
||||
})
|
||||
}, func() {})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -193,7 +193,7 @@ func (s *sweeperStore) NotifyPublishTx(sweepTx *wire.MsgTx) error {
|
||||
hash := sweepTx.TxHash()
|
||||
|
||||
return txHashesBucket.Put(hash[:], []byte{})
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// GetLastPublishedTx returns the last tx that we called NotifyPublishTx
|
||||
|
@ -146,7 +146,7 @@ func OpenClientDB(dbPath string) (*ClientDB, error) {
|
||||
// initialized. This allows us to assume their presence throughout all
|
||||
// operations. If an known top-level bucket is expected to exist but is
|
||||
// missing, this will trigger a ErrUninitializedDB error.
|
||||
err = kvdb.Update(clientDB.db, initClientDBBuckets)
|
||||
err = kvdb.Update(clientDB.db, initClientDBBuckets, func() {})
|
||||
if err != nil {
|
||||
bdb.Close()
|
||||
return nil, err
|
||||
@ -293,6 +293,8 @@ func (c *ClientDB) CreateTower(lnAddr *lnwire.NetAddress) (*Tower, error) {
|
||||
|
||||
// Store the new or updated tower under its tower id.
|
||||
return putTower(towers, tower)
|
||||
}, func() {
|
||||
tower = nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -379,7 +381,7 @@ func (c *ClientDB) RemoveTower(pubKey *btcec.PublicKey, addr net.Addr) error {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// LoadTowerByID retrieves a tower by its tower ID.
|
||||
@ -506,6 +508,8 @@ func (c *ClientDB) NextSessionKeyIndex(towerID TowerID) (uint32, error) {
|
||||
|
||||
// Record the reserved session key index under this tower's id.
|
||||
return keyIndex.Put(towerIDBytes, indexBuf[:])
|
||||
}, func() {
|
||||
index = 0
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@ -558,7 +562,7 @@ func (c *ClientDB) CreateClientSession(session *ClientSession) error {
|
||||
// Finally, write the client session's body in the sessions
|
||||
// bucket.
|
||||
return putClientSessionBody(sessions, session)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// ListClientSessions returns the set of all client sessions known to the db. An
|
||||
@ -686,7 +690,7 @@ func (c *ClientDB) RegisterChannel(chanID lnwire.ChannelID,
|
||||
}
|
||||
|
||||
return putChanSummary(chanSummaries, chanID, &summary)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// MarkBackupIneligible records that the state identified by the (channel id,
|
||||
@ -794,6 +798,8 @@ func (c *ClientDB) CommitUpdate(id *SessionID,
|
||||
|
||||
return nil
|
||||
|
||||
}, func() {
|
||||
lastApplied = 0
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@ -899,7 +905,7 @@ func (c *ClientDB) AckUpdate(id *SessionID, seqNum uint16,
|
||||
|
||||
// Finally, insert the ack into the sessionAcks sub-bucket.
|
||||
return sessionAcks.Put(seqNumBuf[:], b.Bytes())
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// getClientSessionBody loads the body of a ClientSession from the sessions
|
||||
|
@ -88,7 +88,7 @@ func OpenTowerDB(dbPath string) (*TowerDB, error) {
|
||||
// initialized. This allows us to assume their presence throughout all
|
||||
// operations. If an known top-level bucket is expected to exist but is
|
||||
// missing, this will trigger a ErrUninitializedDB error.
|
||||
err = kvdb.Update(towerDB.db, initTowerDBBuckets)
|
||||
err = kvdb.Update(towerDB.db, initTowerDBBuckets, func() {})
|
||||
if err != nil {
|
||||
bdb.Close()
|
||||
return nil, err
|
||||
@ -214,7 +214,7 @@ func (t *TowerDB) InsertSessionInfo(session *SessionInfo) error {
|
||||
// be deleted without needing to iterate over the entire
|
||||
// database.
|
||||
return touchSessionHintBkt(updateIndex, &session.ID)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// InsertStateUpdate stores an update sent by the client after validating that
|
||||
@ -296,6 +296,8 @@ func (t *TowerDB) InsertStateUpdate(update *SessionStateUpdate) (uint16, error)
|
||||
// hint under its session id. This will allow us to delete the
|
||||
// entries efficiently if the session is ever removed.
|
||||
return putHintForSession(updateIndex, &update.ID, update.Hint)
|
||||
}, func() {
|
||||
lastApplied = 0
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@ -385,7 +387,7 @@ func (t *TowerDB) DeleteSession(target SessionID) error {
|
||||
// Finally, remove this session from the update index, which
|
||||
// also removes any of the indexed hints beneath it.
|
||||
return removeSessionHintBkt(updateIndex, &target)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// QueryMatches searches against all known state updates for any that match the
|
||||
@ -484,7 +486,7 @@ func (t *TowerDB) SetLookoutTip(epoch *chainntnfs.BlockEpoch) error {
|
||||
}
|
||||
|
||||
return putLookoutEpoch(lookoutTip, epoch)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// GetLookoutTip retrieves the current lookout tip block epoch from the tower
|
||||
|
@ -107,7 +107,7 @@ func initOrSyncVersions(db versionedDB, init bool, versions []version) error {
|
||||
if init {
|
||||
return kvdb.Update(db.bdb(), func(tx kvdb.RwTx) error {
|
||||
return initDBVersion(tx, getLatestDBVersion(versions))
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// Otherwise, ensure that any migrations are applied to ensure the data
|
||||
@ -159,5 +159,5 @@ func syncVersions(db versionedDB, versions []version) error {
|
||||
}
|
||||
|
||||
return putDBVersion(tx, latestVersion)
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user