mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-01-18 13:27:56 +01:00
batch: handle serialization errors correctly
This commit is contained in:
parent
211dd21082
commit
c29fb81d1b
@ -5,6 +5,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/lightningnetwork/lnd/kvdb"
|
"github.com/lightningnetwork/lnd/kvdb"
|
||||||
|
"github.com/lightningnetwork/lnd/sqldb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// errSolo is a sentinel error indicating that the requester should re-run the
|
// errSolo is a sentinel error indicating that the requester should re-run the
|
||||||
@ -55,9 +56,21 @@ func (b *batch) run() {
|
|||||||
for i, req := range b.reqs {
|
for i, req := range b.reqs {
|
||||||
err := req.Update(tx)
|
err := req.Update(tx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// If we get a serialization error, we
|
||||||
|
// want the underlying SQL retry
|
||||||
|
// mechanism to retry the entire batch.
|
||||||
|
// Otherwise, we can succeed in an
|
||||||
|
// sqldb retry and still re-execute the
|
||||||
|
// failing request individually.
|
||||||
|
dbErr := sqldb.MapSQLError(err)
|
||||||
|
if !sqldb.IsSerializationError(dbErr) {
|
||||||
failIdx = i
|
failIdx = i
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return dbErr
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}, func() {
|
}, func() {
|
||||||
|
74
batch/batch_test.go
Normal file
74
batch/batch_test.go
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
package batch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/btcsuite/btcwallet/walletdb"
|
||||||
|
"github.com/lightningnetwork/lnd/kvdb"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRetry(t *testing.T) {
|
||||||
|
dbDir := t.TempDir()
|
||||||
|
|
||||||
|
dbName := filepath.Join(dbDir, "weks.db")
|
||||||
|
db, err := walletdb.Create(
|
||||||
|
"bdb", dbName, true, kvdb.DefaultDBTimeout,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create walletdb: %v", err)
|
||||||
|
}
|
||||||
|
t.Cleanup(func() {
|
||||||
|
db.Close()
|
||||||
|
})
|
||||||
|
|
||||||
|
var (
|
||||||
|
mu sync.Mutex
|
||||||
|
called int
|
||||||
|
)
|
||||||
|
sched := NewTimeScheduler(db, &mu, time.Second)
|
||||||
|
|
||||||
|
// First, we construct a request that should retry individually and
|
||||||
|
// execute it non-lazily. It should still return the error the second
|
||||||
|
// time.
|
||||||
|
req := &Request{
|
||||||
|
Update: func(tx kvdb.RwTx) error {
|
||||||
|
called++
|
||||||
|
|
||||||
|
return errors.New("test")
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err = sched.Execute(req)
|
||||||
|
|
||||||
|
// Check and reset the called counter.
|
||||||
|
mu.Lock()
|
||||||
|
require.Equal(t, 2, called)
|
||||||
|
called = 0
|
||||||
|
mu.Unlock()
|
||||||
|
|
||||||
|
require.ErrorContains(t, err, "test")
|
||||||
|
|
||||||
|
// Now, we construct a request that should NOT retry because it returns
|
||||||
|
// a serialization error, which should cause the underlying postgres
|
||||||
|
// transaction to retry. Since we aren't using postgres, this will
|
||||||
|
// cause the transaction to not be retried at all.
|
||||||
|
req = &Request{
|
||||||
|
Update: func(tx kvdb.RwTx) error {
|
||||||
|
called++
|
||||||
|
|
||||||
|
return errors.New("could not serialize access")
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err = sched.Execute(req)
|
||||||
|
|
||||||
|
// Check the called counter.
|
||||||
|
mu.Lock()
|
||||||
|
require.Equal(t, 1, called)
|
||||||
|
mu.Unlock()
|
||||||
|
|
||||||
|
require.ErrorContains(t, err, "could not serialize access")
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user