mirror of
https://github.com/lightningnetwork/lnd.git
synced 2024-11-19 01:43:16 +01:00
multi: ensure link is always torn down due to db failures, add exponential back off for sql-kvdb failures (#7927)
* lnwallet: fix log output msg The log message is off by one. * htlcswitch: fail channel when revoking it fails. When the revocation of a channel state fails after receiving a new CommitmentSigned msg we have to fail the channel otherwise we continue with an unclean state. * docs: update release-docs * htlcswitch: tear down connection if revocation processing fails If we couldn't revoke due to a DB error, then we want to also tear down the connection, as we don't want the other party to continue to send updates. That may lead to de-sync'd state an eventual force close. Otherwise, the database might be able to recover come the next reconnection attempt. * kvdb: use sql.LevelSerializable for all backends In this commit, we modify the default isolation level to be `sql.LevelSerializable. This is the strictness isolation type for postgres. For sqlite, there's only ever a single writer, so this doesn't apply directly. * kvdb/sqlbase: add randomized exponential backoff for serialization failures In this commit, we add randomized exponential backoff for serialization failures. For postgres, we''ll his this any time a transaction set fails to be linearized. For sqlite, we'll his this if we have many writers trying to grab the write lock at time same time, manifesting as a `SQLITE_BUSY` error code. As is, we'll retry up to 10 times, waiting a minimum of 50 miliseconds between each attempt, up to 5 seconds without any delay at all. For sqlite, this is also bounded by the busy timeout set, which applies on top of this retry logic (block for busy timeout seconds, then apply this back off logic). * docs/release-notes: add entry for sqlite/postgres tx retry --------- Co-authored-by: ziggie <ziggie1984@protonmail.com>
This commit is contained in:
parent
eb0d8af645
commit
01c64712a3
@ -59,9 +59,18 @@
|
||||
have to make sure to not broadcast outdated transactions which can lead to
|
||||
locked up wallet funds indefinitely in the worst case.
|
||||
|
||||
- [Remove nil value](https://github.com/lightningnetwork/lnd/pull/7922) from
|
||||
* [Remove nil value](https://github.com/lightningnetwork/lnd/pull/7922) from
|
||||
variadic parameter list.
|
||||
|
||||
* Make sure to [fail a channel if revoking the old channel state
|
||||
fails](https://github.com/lightningnetwork/lnd/pull/7876).
|
||||
|
||||
|
||||
* Failed `sqlite` or `postgres` transactions due to a serialization error will
|
||||
now be [automatically
|
||||
retried](https://github.com/lightningnetwork/lnd/pull/7927) with an
|
||||
exponential back off.
|
||||
|
||||
# New Features
|
||||
## Functional Enhancements
|
||||
### Protocol Features
|
||||
|
@ -1943,6 +1943,25 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
||||
l.channel.RevokeCurrentCommitment()
|
||||
if err != nil {
|
||||
l.log.Errorf("unable to revoke commitment: %v", err)
|
||||
|
||||
// We need to fail the channel in case revoking our
|
||||
// local commitment does not succeed. We might have
|
||||
// already advanced our channel state which would lead
|
||||
// us to proceed with an unclean state.
|
||||
//
|
||||
// NOTE: We do not trigger a force close because this
|
||||
// could resolve itself in case our db was just busy
|
||||
// not accepting new transactions.
|
||||
l.fail(
|
||||
LinkFailureError{
|
||||
code: ErrInternalError,
|
||||
Warning: true,
|
||||
FailureAction: LinkFailureDisconnect,
|
||||
},
|
||||
"ChannelPoint(%v): unable to accept new "+
|
||||
"commitment: %v",
|
||||
l.channel.ChannelPoint(), err,
|
||||
)
|
||||
return
|
||||
}
|
||||
l.cfg.Peer.SendMessage(false, nextRevocation)
|
||||
@ -2007,8 +2026,13 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
||||
ReceiveRevocation(msg)
|
||||
if err != nil {
|
||||
// TODO(halseth): force close?
|
||||
l.fail(LinkFailureError{code: ErrInvalidRevocation},
|
||||
"unable to accept revocation: %v", err)
|
||||
l.fail(
|
||||
LinkFailureError{
|
||||
code: ErrInvalidRevocation,
|
||||
FailureAction: LinkFailureDisconnect,
|
||||
},
|
||||
"unable to accept revocation: %v", err,
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -86,6 +86,10 @@ type LinkFailureError struct {
|
||||
// the channel should not be attempted loaded again.
|
||||
PermanentFailure bool
|
||||
|
||||
// Warning denotes if this is a non-terminal error that doesn't warrant
|
||||
// failing the channel all together.
|
||||
Warning bool
|
||||
|
||||
// SendData is a byte slice that will be sent to the peer. If nil a
|
||||
// generic error will be sent.
|
||||
SendData []byte
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -18,6 +19,28 @@ const (
|
||||
// kvTableName is the name of the table that will contain all the kv
|
||||
// pairs.
|
||||
kvTableName = "kv"
|
||||
|
||||
// DefaultNumTxRetries is the default number of times we'll retry a
|
||||
// transaction if it fails with an error that permits transaction
|
||||
// repetition.
|
||||
DefaultNumTxRetries = 10
|
||||
|
||||
// DefaultInitialRetryDelay is the default initial delay between
|
||||
// retries. This will be used to generate a random delay between -50%
|
||||
// and +50% of this value, so 20 to 60 milliseconds. The retry will be
|
||||
// doubled after each attempt until we reach DefaultMaxRetryDelay. We
|
||||
// start with a random value to avoid multiple goroutines that are
|
||||
// created at the same time to effectively retry at the same time.
|
||||
DefaultInitialRetryDelay = time.Millisecond * 50
|
||||
|
||||
// DefaultMaxRetryDelay is the default maximum delay between retries.
|
||||
DefaultMaxRetryDelay = time.Second * 5
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrRetriesExceeded is returned when a transaction is retried more
|
||||
// than the max allowed valued without a success.
|
||||
ErrRetriesExceeded = errors.New("db tx retries exceeded")
|
||||
)
|
||||
|
||||
// Config holds a set of configuration options of a sql database connection.
|
||||
@ -209,28 +232,110 @@ func (db *db) Update(f func(tx walletdb.ReadWriteTx) error,
|
||||
return db.executeTransaction(f, reset, false)
|
||||
}
|
||||
|
||||
// randRetryDelay returns a random retry delay between -50% and +50% of the
|
||||
// configured delay that is doubled for each attempt and capped at a max value.
|
||||
func randRetryDelay(initialRetryDelay, maxRetryDelay, attempt int) time.Duration {
|
||||
halfDelay := initialRetryDelay / 2
|
||||
randDelay := prand.Int63n(int64(initialRetryDelay)) //nolint:gosec
|
||||
|
||||
// 50% plus 0%-100% gives us the range of 50%-150%.
|
||||
initialDelay := halfDelay + time.Duration(randDelay)
|
||||
|
||||
// If this is the first attempt, we just return the initial delay.
|
||||
if attempt == 0 {
|
||||
return initialDelay
|
||||
}
|
||||
|
||||
// For each subsequent delay, we double the initial delay. This still
|
||||
// gives us a somewhat random delay, but it still increases with each
|
||||
// attempt. If we double something n times, that's the same as
|
||||
// multiplying the value with 2^n. We limit the power to 32 to avoid
|
||||
// overflows.
|
||||
factor := time.Duration(math.Pow(2, math.Min(float64(attempt), 32)))
|
||||
actualDelay := initialDelay * factor
|
||||
|
||||
// Cap the delay at the maximum configured value.
|
||||
if actualDelay > maxRetryDelay {
|
||||
return maxRetryDelay
|
||||
}
|
||||
|
||||
return actualDelay
|
||||
}
|
||||
|
||||
// executeTransaction creates a new read-only or read-write transaction and
|
||||
// executes the given function within it.
|
||||
func (db *db) executeTransaction(f func(tx walletdb.ReadWriteTx) error,
|
||||
reset func(), readOnly bool) error {
|
||||
|
||||
reset()
|
||||
// waitBeforeRetry is a helper function that will wait for a random
|
||||
// interval before exiting to retry the db transaction. If false is
|
||||
// returned, then this means that daemon is shutting down so we
|
||||
// should abort the retries.
|
||||
waitBeforeRetry := func(attemptNumber int) bool {
|
||||
retryDelay := randRetryDelay(
|
||||
attemptNumber, DefaultInitialRetryDelay,
|
||||
DefaultMaxRetryDelay,
|
||||
)
|
||||
|
||||
tx, err := newReadWriteTx(db, readOnly)
|
||||
if err != nil {
|
||||
return err
|
||||
log.Debugf("Retrying transaction due to tx serialization "+
|
||||
"error, attempt_number=%v, delay=%v", attemptNumber,
|
||||
retryDelay)
|
||||
|
||||
select {
|
||||
// Before we try again, we'll wait with a random backoff based
|
||||
// on the retry delay.
|
||||
case time.After(retryDelay):
|
||||
return true
|
||||
|
||||
// If the daemon is shutting down, then we'll exit early.
|
||||
case <-db.Context.Done():
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
err = catchPanic(func() error { return f(tx) })
|
||||
if err != nil {
|
||||
if rollbackErr := tx.Rollback(); rollbackErr != nil {
|
||||
log.Errorf("Error rolling back tx: %v", rollbackErr)
|
||||
for i := 0; i < DefaultNumTxRetries; i++ {
|
||||
reset()
|
||||
|
||||
tx, err := newReadWriteTx(db, readOnly)
|
||||
if err != nil {
|
||||
dbErr := MapSQLError(err)
|
||||
|
||||
if IsSerializationError(dbErr) {
|
||||
// Nothing to roll back here, since we didn't
|
||||
// even get a transaction yet.
|
||||
if waitBeforeRetry(i) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return dbErr
|
||||
}
|
||||
|
||||
return err
|
||||
err = catchPanic(func() error { return f(tx) })
|
||||
if err != nil {
|
||||
if rollbackErr := tx.Rollback(); rollbackErr != nil {
|
||||
log.Errorf("Error rolling back tx: %v",
|
||||
rollbackErr)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
dbErr := tx.Commit()
|
||||
if IsSerializationError(dbErr) {
|
||||
_ = tx.Rollback()
|
||||
|
||||
if waitBeforeRetry(i) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return dbErr
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
// If we get to this point, then we weren't able to successfully commit
|
||||
// a tx given the max number of retries.
|
||||
return ErrRetriesExceeded
|
||||
}
|
||||
|
||||
// PrintStats returns all collected stats pretty printed into a string.
|
||||
|
@ -49,7 +49,8 @@ func newReadWriteTx(db *db, readOnly bool) (*readWriteTx, error) {
|
||||
tx, err := db.db.BeginTx(
|
||||
context.Background(),
|
||||
&sql.TxOptions{
|
||||
ReadOnly: readOnly,
|
||||
ReadOnly: readOnly,
|
||||
Isolation: sql.LevelSerializable,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
|
@ -4425,7 +4425,7 @@ func (lc *LightningChannel) ProcessChanSyncMsg(
|
||||
case msg.NextLocalCommitHeight == remoteTipHeight:
|
||||
lc.log.Debugf("sync: remote's next commit height is %v, while "+
|
||||
"we believe it is %v, we owe them a commitment",
|
||||
msg.NextLocalCommitHeight, remoteTipHeight)
|
||||
msg.NextLocalCommitHeight, remoteTipHeight+1)
|
||||
|
||||
// Grab the current remote chain tip from the database. This
|
||||
// commit diff contains all the information required to re-sync
|
||||
|
@ -3093,10 +3093,21 @@ func (p *Brontide) handleLinkFailure(failure linkFailureReport) {
|
||||
if failure.linkErr.SendData != nil {
|
||||
data = failure.linkErr.SendData
|
||||
}
|
||||
err := p.SendMessage(true, &lnwire.Error{
|
||||
ChanID: failure.chanID,
|
||||
Data: data,
|
||||
})
|
||||
|
||||
var networkMsg lnwire.Message
|
||||
if failure.linkErr.Warning {
|
||||
networkMsg = &lnwire.Warning{
|
||||
ChanID: failure.chanID,
|
||||
Data: data,
|
||||
}
|
||||
} else {
|
||||
networkMsg = &lnwire.Error{
|
||||
ChanID: failure.chanID,
|
||||
Data: data,
|
||||
}
|
||||
}
|
||||
|
||||
err := p.SendMessage(true, networkMsg)
|
||||
if err != nil {
|
||||
p.log.Errorf("unable to send msg to "+
|
||||
"remote peer: %v", err)
|
||||
|
Loading…
Reference in New Issue
Block a user