mirror of
https://github.com/lightningnetwork/lnd.git
synced 2024-11-19 01:43:16 +01:00
Merge pull request #9074 from jharveyb/aux_signer_batching_fixes
lnwallet: aux signer batching fixes
This commit is contained in:
commit
5272db372c
1
.gitignore
vendored
1
.gitignore
vendored
@ -66,6 +66,7 @@ profile.tmp
|
||||
.DS_Store
|
||||
|
||||
.vscode
|
||||
*.code-workspace
|
||||
|
||||
# Coverage test
|
||||
coverage.txt
|
||||
|
@ -2,6 +2,7 @@ package contractcourt
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"testing"
|
||||
@ -145,7 +146,9 @@ func TestChainWatcherRemoteUnilateralClosePendingCommit(t *testing.T) {
|
||||
|
||||
// With the HTLC added, we'll now manually initiate a state transition
|
||||
// from Alice to Bob.
|
||||
_, err = aliceChannel.SignNextCommitment()
|
||||
testQuit, testQuitFunc := context.WithCancel(context.Background())
|
||||
_ = testQuitFunc
|
||||
_, err = aliceChannel.SignNextCommitment(testQuit)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -92,7 +92,7 @@ type InterceptableSwitch struct {
|
||||
|
||||
type interceptedPackets struct {
|
||||
packets []*htlcPacket
|
||||
linkQuit chan struct{}
|
||||
linkQuit <-chan struct{}
|
||||
isReplay bool
|
||||
}
|
||||
|
||||
@ -442,8 +442,8 @@ func (s *InterceptableSwitch) Resolve(res *FwdResolution) error {
|
||||
// interceptor. If the interceptor signals the resume action, the htlcs are
|
||||
// forwarded to the switch. The link's quit signal should be provided to allow
|
||||
// cancellation of forwarding during link shutdown.
|
||||
func (s *InterceptableSwitch) ForwardPackets(linkQuit chan struct{}, isReplay bool,
|
||||
packets ...*htlcPacket) error {
|
||||
func (s *InterceptableSwitch) ForwardPackets(linkQuit <-chan struct{},
|
||||
isReplay bool, packets ...*htlcPacket) error {
|
||||
|
||||
// Synchronize with the main event loop. This should be light in the
|
||||
// case where there is no interceptor.
|
||||
|
@ -2,6 +2,7 @@ package htlcswitch
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
crand "crypto/rand"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
@ -101,7 +102,7 @@ type ChannelLinkConfig struct {
|
||||
// switch. The function returns and error in case it fails to send one or
|
||||
// more packets. The link's quit signal should be provided to allow
|
||||
// cancellation of forwarding during link shutdown.
|
||||
ForwardPackets func(chan struct{}, bool, ...*htlcPacket) error
|
||||
ForwardPackets func(<-chan struct{}, bool, ...*htlcPacket) error
|
||||
|
||||
// DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
|
||||
// blobs, which are then used to inform how to forward an HTLC.
|
||||
@ -382,8 +383,9 @@ type channelLink struct {
|
||||
// our next CommitSig.
|
||||
incomingCommitHooks hookMap
|
||||
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
quit context.Context //nolint:containedctx
|
||||
quitFunc context.CancelFunc
|
||||
}
|
||||
|
||||
// hookMap is a data structure that is used to track the hooks that need to be
|
||||
@ -448,6 +450,10 @@ func NewChannelLink(cfg ChannelLinkConfig,
|
||||
channel *lnwallet.LightningChannel) ChannelLink {
|
||||
|
||||
logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
|
||||
quitCtx, quitFunc := context.WithCancel(context.Background())
|
||||
|
||||
// Initialize the Done channel for our quit context.
|
||||
_ = quitCtx.Done()
|
||||
|
||||
return &channelLink{
|
||||
cfg: cfg,
|
||||
@ -458,7 +464,8 @@ func NewChannelLink(cfg ChannelLinkConfig,
|
||||
flushHooks: newHookMap(),
|
||||
outgoingCommitHooks: newHookMap(),
|
||||
incomingCommitHooks: newHookMap(),
|
||||
quit: make(chan struct{}),
|
||||
quit: quitCtx,
|
||||
quitFunc: quitFunc,
|
||||
}
|
||||
}
|
||||
|
||||
@ -573,7 +580,7 @@ func (l *channelLink) Stop() {
|
||||
|
||||
l.hodlQueue.Stop()
|
||||
|
||||
close(l.quit)
|
||||
l.quitFunc()
|
||||
l.wg.Wait()
|
||||
|
||||
// Now that the htlcManager has completely exited, reset the packet
|
||||
@ -660,7 +667,7 @@ func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
|
||||
func (l *channelLink) OnFlushedOnce(hook func()) {
|
||||
select {
|
||||
case l.flushHooks.newTransients <- hook:
|
||||
case <-l.quit:
|
||||
case <-l.quit.Done():
|
||||
}
|
||||
}
|
||||
|
||||
@ -679,7 +686,7 @@ func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
|
||||
|
||||
select {
|
||||
case queue <- hook:
|
||||
case <-l.quit:
|
||||
case <-l.quit.Done():
|
||||
}
|
||||
}
|
||||
|
||||
@ -889,7 +896,7 @@ func (l *channelLink) syncChanStates() error {
|
||||
// party, so we'll process the message in order to determine
|
||||
// if we need to re-transmit any messages to the remote party.
|
||||
msgsToReSend, openedCircuits, closedCircuits, err =
|
||||
l.channel.ProcessChanSyncMsg(remoteChanSyncMsg)
|
||||
l.channel.ProcessChanSyncMsg(l.quit, remoteChanSyncMsg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -918,7 +925,7 @@ func (l *channelLink) syncChanStates() error {
|
||||
l.cfg.Peer.SendMessage(false, msg)
|
||||
}
|
||||
|
||||
case <-l.quit:
|
||||
case <-l.quit.Done():
|
||||
return ErrLinkShuttingDown
|
||||
}
|
||||
|
||||
@ -1041,7 +1048,7 @@ func (l *channelLink) fwdPkgGarbager() {
|
||||
err)
|
||||
continue
|
||||
}
|
||||
case <-l.quit:
|
||||
case <-l.quit.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -1442,7 +1449,7 @@ func (l *channelLink) htlcManager() {
|
||||
)
|
||||
}
|
||||
|
||||
case <-l.quit:
|
||||
case <-l.quit.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -2272,7 +2279,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
||||
}
|
||||
|
||||
select {
|
||||
case <-l.quit:
|
||||
case <-l.quit.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
@ -2334,7 +2341,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
||||
}
|
||||
|
||||
select {
|
||||
case <-l.quit:
|
||||
case <-l.quit.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
@ -2541,7 +2548,7 @@ func (l *channelLink) updateCommitTx() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
newCommit, err := l.channel.SignNextCommitment()
|
||||
newCommit, err := l.channel.SignNextCommitment(l.quit)
|
||||
if err == lnwallet.ErrNoWindow {
|
||||
l.cfg.PendingCommitTicker.Resume()
|
||||
l.log.Trace("PendingCommitTicker resumed")
|
||||
@ -2582,7 +2589,7 @@ func (l *channelLink) updateCommitTx() error {
|
||||
}
|
||||
|
||||
select {
|
||||
case <-l.quit:
|
||||
case <-l.quit.Done():
|
||||
return ErrLinkShuttingDown
|
||||
default:
|
||||
}
|
||||
@ -3057,7 +3064,7 @@ func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
|
||||
// NOTE: Part of the ChannelLink interface.
|
||||
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
|
||||
select {
|
||||
case <-l.quit:
|
||||
case <-l.quit.Done():
|
||||
// Return early if the link is already in the process of
|
||||
// quitting. It doesn't make sense to hand the message to the
|
||||
// mailbox here.
|
||||
@ -3744,7 +3751,7 @@ func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
|
||||
filteredPkts = append(filteredPkts, pkt)
|
||||
}
|
||||
|
||||
err := l.cfg.ForwardPackets(l.quit, replay, filteredPkts...)
|
||||
err := l.cfg.ForwardPackets(l.quit.Done(), replay, filteredPkts...)
|
||||
if err != nil {
|
||||
log.Errorf("Unhandled error while reforwarding htlc "+
|
||||
"settle/fail over htlcswitch: %v", err)
|
||||
|
@ -1,6 +1,7 @@
|
||||
package htlcswitch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"testing"
|
||||
"time"
|
||||
@ -94,7 +95,9 @@ func (l *linkTestContext) receiveHtlcAliceToBob() {
|
||||
func (l *linkTestContext) sendCommitSigBobToAlice(expHtlcs int) {
|
||||
l.t.Helper()
|
||||
|
||||
sigs, err := l.bobChannel.SignNextCommitment()
|
||||
testQuit, testQuitFunc := context.WithCancel(context.Background())
|
||||
_ = testQuitFunc
|
||||
sigs, err := l.bobChannel.SignNextCommitment(testQuit)
|
||||
if err != nil {
|
||||
l.t.Fatalf("error signing commitment: %v", err)
|
||||
}
|
||||
|
@ -2189,17 +2189,21 @@ func newSingleLinkTestHarness(t *testing.T, chanAmt,
|
||||
return nil
|
||||
}
|
||||
|
||||
forwardPackets := func(linkQuit <-chan struct{}, _ bool,
|
||||
packets ...*htlcPacket) error {
|
||||
|
||||
return aliceSwitch.ForwardPackets(linkQuit, packets...)
|
||||
}
|
||||
|
||||
// Instantiate with a long interval, so that we can precisely control
|
||||
// the firing via force feeding.
|
||||
bticker := ticker.NewForce(time.Hour)
|
||||
aliceCfg := ChannelLinkConfig{
|
||||
FwrdingPolicy: globalPolicy,
|
||||
Peer: alicePeer,
|
||||
BestHeight: aliceSwitch.BestHeight,
|
||||
Circuits: aliceSwitch.CircuitModifier(),
|
||||
ForwardPackets: func(linkQuit chan struct{}, _ bool, packets ...*htlcPacket) error {
|
||||
return aliceSwitch.ForwardPackets(linkQuit, packets...)
|
||||
},
|
||||
FwrdingPolicy: globalPolicy,
|
||||
Peer: alicePeer,
|
||||
BestHeight: aliceSwitch.BestHeight,
|
||||
Circuits: aliceSwitch.CircuitModifier(),
|
||||
ForwardPackets: forwardPackets,
|
||||
DecodeHopIterators: decoder.DecodeHopIterators,
|
||||
ExtractErrorEncrypter: func(*btcec.PublicKey) (
|
||||
hop.ErrorEncrypter, lnwire.FailCode) {
|
||||
@ -2240,12 +2244,14 @@ func newSingleLinkTestHarness(t *testing.T, chanAmt,
|
||||
return aliceSwitch.AddLink(aliceLink)
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-notifyUpdateChan:
|
||||
case <-aliceLink.(*channelLink).quit:
|
||||
close(doneChan)
|
||||
return
|
||||
if chanLink, ok := aliceLink.(*channelLink); ok {
|
||||
for {
|
||||
select {
|
||||
case <-notifyUpdateChan:
|
||||
case <-chanLink.quit.Done():
|
||||
close(doneChan)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -2312,7 +2318,7 @@ func handleStateUpdate(link *channelLink,
|
||||
}
|
||||
link.HandleChannelUpdate(remoteRev)
|
||||
|
||||
remoteSigs, err := remoteChannel.SignNextCommitment()
|
||||
remoteSigs, err := remoteChannel.SignNextCommitment(link.quit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -2355,7 +2361,7 @@ func updateState(batchTick chan time.Time, link *channelLink,
|
||||
// Trigger update by ticking the batchTicker.
|
||||
select {
|
||||
case batchTick <- time.Now():
|
||||
case <-link.quit:
|
||||
case <-link.quit.Done():
|
||||
return fmt.Errorf("link shutting down")
|
||||
}
|
||||
return handleStateUpdate(link, remoteChannel)
|
||||
@ -2363,7 +2369,7 @@ func updateState(batchTick chan time.Time, link *channelLink,
|
||||
|
||||
// The remote is triggering the state update, emulate this by
|
||||
// signing and sending CommitSig to the link.
|
||||
remoteSigs, err := remoteChannel.SignNextCommitment()
|
||||
remoteSigs, err := remoteChannel.SignNextCommitment(link.quit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -4849,17 +4855,21 @@ func (h *persistentLinkHarness) restartLink(
|
||||
return nil
|
||||
}
|
||||
|
||||
forwardPackets := func(linkQuit <-chan struct{}, _ bool,
|
||||
packets ...*htlcPacket) error {
|
||||
|
||||
return h.hSwitch.ForwardPackets(linkQuit, packets...)
|
||||
}
|
||||
|
||||
// Instantiate with a long interval, so that we can precisely control
|
||||
// the firing via force feeding.
|
||||
bticker := ticker.NewForce(time.Hour)
|
||||
aliceCfg := ChannelLinkConfig{
|
||||
FwrdingPolicy: globalPolicy,
|
||||
Peer: alicePeer,
|
||||
BestHeight: h.hSwitch.BestHeight,
|
||||
Circuits: h.hSwitch.CircuitModifier(),
|
||||
ForwardPackets: func(linkQuit chan struct{}, _ bool, packets ...*htlcPacket) error {
|
||||
return h.hSwitch.ForwardPackets(linkQuit, packets...)
|
||||
},
|
||||
FwrdingPolicy: globalPolicy,
|
||||
Peer: alicePeer,
|
||||
BestHeight: h.hSwitch.BestHeight,
|
||||
Circuits: h.hSwitch.CircuitModifier(),
|
||||
ForwardPackets: forwardPackets,
|
||||
DecodeHopIterators: decoder.DecodeHopIterators,
|
||||
ExtractErrorEncrypter: func(*btcec.PublicKey) (
|
||||
hop.ErrorEncrypter, lnwire.FailCode) {
|
||||
@ -4904,12 +4914,14 @@ func (h *persistentLinkHarness) restartLink(
|
||||
return nil, nil, err
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-notifyUpdateChan:
|
||||
case <-aliceLink.(*channelLink).quit:
|
||||
close(doneChan)
|
||||
return
|
||||
if chanLink, ok := aliceLink.(*channelLink); ok {
|
||||
for {
|
||||
select {
|
||||
case <-notifyUpdateChan:
|
||||
case <-chanLink.quit.Done():
|
||||
close(doneChan)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -5892,7 +5904,9 @@ func TestChannelLinkFail(t *testing.T) {
|
||||
|
||||
// Sign a commitment that will include
|
||||
// signature for the HTLC just sent.
|
||||
sigs, err := remoteChannel.SignNextCommitment()
|
||||
sigs, err := remoteChannel.SignNextCommitment(
|
||||
c.quit,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("error signing commitment: %v",
|
||||
err)
|
||||
@ -5934,7 +5948,9 @@ func TestChannelLinkFail(t *testing.T) {
|
||||
|
||||
// Sign a commitment that will include
|
||||
// signature for the HTLC just sent.
|
||||
sigs, err := remoteChannel.SignNextCommitment()
|
||||
sigs, err := remoteChannel.SignNextCommitment(
|
||||
c.quit,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("error signing commitment: %v",
|
||||
err)
|
||||
@ -7018,7 +7034,7 @@ func TestPipelineSettle(t *testing.T) {
|
||||
// erroneously forwarded. If the forwardChan is closed before the last
|
||||
// step, then the test will fail.
|
||||
forwardChan := make(chan struct{})
|
||||
fwdPkts := func(c chan struct{}, _ bool, hp ...*htlcPacket) error {
|
||||
fwdPkts := func(c <-chan struct{}, _ bool, hp ...*htlcPacket) error {
|
||||
close(forwardChan)
|
||||
return nil
|
||||
}
|
||||
@ -7204,7 +7220,7 @@ func TestChannelLinkShortFailureRelay(t *testing.T) {
|
||||
aliceMsgs := mockPeer.sentMsgs
|
||||
switchChan := make(chan *htlcPacket)
|
||||
|
||||
coreLink.cfg.ForwardPackets = func(linkQuit chan struct{}, _ bool,
|
||||
coreLink.cfg.ForwardPackets = func(linkQuit <-chan struct{}, _ bool,
|
||||
packets ...*htlcPacket) error {
|
||||
|
||||
for _, p := range packets {
|
||||
|
@ -94,7 +94,7 @@ type mailBoxConfig struct {
|
||||
// forwardPackets send a varidic number of htlcPackets to the switch to
|
||||
// be routed. A quit channel should be provided so that the call can
|
||||
// properly exit during shutdown.
|
||||
forwardPackets func(chan struct{}, ...*htlcPacket) error
|
||||
forwardPackets func(<-chan struct{}, ...*htlcPacket) error
|
||||
|
||||
// clock is a time source for the mailbox.
|
||||
clock clock.Clock
|
||||
@ -801,7 +801,7 @@ type mailOrchConfig struct {
|
||||
// forwardPackets send a varidic number of htlcPackets to the switch to
|
||||
// be routed. A quit channel should be provided so that the call can
|
||||
// properly exit during shutdown.
|
||||
forwardPackets func(chan struct{}, ...*htlcPacket) error
|
||||
forwardPackets func(<-chan struct{}, ...*htlcPacket) error
|
||||
|
||||
// clock is a time source for the generated mailboxes.
|
||||
clock clock.Clock
|
||||
|
@ -250,7 +250,7 @@ func newMailboxContext(t *testing.T, startTime time.Time,
|
||||
return ctx
|
||||
}
|
||||
|
||||
func (c *mailboxContext) forward(_ chan struct{},
|
||||
func (c *mailboxContext) forward(_ <-chan struct{},
|
||||
pkts ...*htlcPacket) error {
|
||||
|
||||
for _, pkt := range pkts {
|
||||
@ -706,7 +706,7 @@ func TestMailOrchestrator(t *testing.T) {
|
||||
// First, we'll create a new instance of our orchestrator.
|
||||
mo := newMailOrchestrator(&mailOrchConfig{
|
||||
failMailboxUpdate: failMailboxUpdate,
|
||||
forwardPackets: func(_ chan struct{},
|
||||
forwardPackets: func(_ <-chan struct{},
|
||||
pkts ...*htlcPacket) error {
|
||||
|
||||
return nil
|
||||
|
@ -654,7 +654,7 @@ func (s *Switch) IsForwardedHTLC(chanID lnwire.ShortChannelID,
|
||||
// given to forward them through the router. The sending link's quit channel is
|
||||
// used to prevent deadlocks when the switch stops a link in the midst of
|
||||
// forwarding.
|
||||
func (s *Switch) ForwardPackets(linkQuit chan struct{},
|
||||
func (s *Switch) ForwardPackets(linkQuit <-chan struct{},
|
||||
packets ...*htlcPacket) error {
|
||||
|
||||
var (
|
||||
@ -832,7 +832,7 @@ func (s *Switch) logFwdErrs(num *int, wg *sync.WaitGroup, fwdChan chan error) {
|
||||
// receive a shutdown requuest. This method does not wait for a response from
|
||||
// the htlcForwarder before returning.
|
||||
func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error,
|
||||
linkQuit chan struct{}) error {
|
||||
linkQuit <-chan struct{}) error {
|
||||
|
||||
command := &plexPacket{
|
||||
pkt: packet,
|
||||
|
@ -1127,15 +1127,19 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer,
|
||||
return nil
|
||||
}
|
||||
|
||||
forwardPackets := func(linkQuit <-chan struct{}, _ bool,
|
||||
packets ...*htlcPacket) error {
|
||||
|
||||
return server.htlcSwitch.ForwardPackets(linkQuit, packets...)
|
||||
}
|
||||
|
||||
link := NewChannelLink(
|
||||
ChannelLinkConfig{
|
||||
BestHeight: server.htlcSwitch.BestHeight,
|
||||
FwrdingPolicy: h.globalPolicy,
|
||||
Peer: peer,
|
||||
Circuits: server.htlcSwitch.CircuitModifier(),
|
||||
ForwardPackets: func(linkQuit chan struct{}, _ bool, packets ...*htlcPacket) error {
|
||||
return server.htlcSwitch.ForwardPackets(linkQuit, packets...)
|
||||
},
|
||||
BestHeight: server.htlcSwitch.BestHeight,
|
||||
FwrdingPolicy: h.globalPolicy,
|
||||
Peer: peer,
|
||||
Circuits: server.htlcSwitch.CircuitModifier(),
|
||||
ForwardPackets: forwardPackets,
|
||||
DecodeHopIterators: decoder.DecodeHopIterators,
|
||||
ExtractErrorEncrypter: func(*btcec.PublicKey) (
|
||||
hop.ErrorEncrypter, lnwire.FailCode) {
|
||||
@ -1176,12 +1180,14 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer,
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-notifyUpdateChan:
|
||||
case <-link.(*channelLink).quit:
|
||||
close(doneChan)
|
||||
return
|
||||
if chanLink, ok := link.(*channelLink); ok {
|
||||
for {
|
||||
select {
|
||||
case <-notifyUpdateChan:
|
||||
case <-chanLink.quit.Done():
|
||||
close(doneChan)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -47,18 +47,19 @@ type AuxSigJob struct {
|
||||
BaseAuxJob
|
||||
|
||||
// Resp is a channel that will be used to send the result of the sign
|
||||
// job.
|
||||
// job. This channel MUST be buffered.
|
||||
Resp chan AuxSigJobResp
|
||||
|
||||
// Cancel is a channel that should be closed if the caller wishes to
|
||||
// abandon all pending sign jobs part of a single batch.
|
||||
Cancel chan struct{}
|
||||
// Cancel is a channel that is closed by the caller if they wish to
|
||||
// abandon all pending sign jobs part of a single batch. This should
|
||||
// never be closed by the validator.
|
||||
Cancel <-chan struct{}
|
||||
}
|
||||
|
||||
// NewAuxSigJob creates a new AuxSigJob.
|
||||
func NewAuxSigJob(sigJob SignJob, keyRing CommitmentKeyRing, incoming bool,
|
||||
htlc PaymentDescriptor, commitBlob fn.Option[tlv.Blob],
|
||||
htlcLeaf input.AuxTapLeaf, cancelChan chan struct{}) AuxSigJob {
|
||||
htlcLeaf input.AuxTapLeaf, cancelChan <-chan struct{}) AuxSigJob {
|
||||
|
||||
return AuxSigJob{
|
||||
SignDesc: sigJob.SignDesc,
|
||||
|
@ -2,12 +2,14 @@ package lnwallet
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"cmp"
|
||||
"container/list"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"github.com/btcsuite/btcd/blockchain"
|
||||
@ -137,6 +139,10 @@ var (
|
||||
// errNoPartialSig is returned when a partial signature is required,
|
||||
// but none is found.
|
||||
errNoPartialSig = errors.New("no partial signature found")
|
||||
|
||||
// errQuit is returned when a quit signal was received, interrupting the
|
||||
// current operation.
|
||||
errQuit = errors.New("received quit signal")
|
||||
)
|
||||
|
||||
// ErrCommitSyncLocalDataLoss is returned in the case that we receive a valid
|
||||
@ -4525,7 +4531,9 @@ type NewCommitState struct {
|
||||
// for the remote party's commitment are also returned.
|
||||
//
|
||||
//nolint:funlen
|
||||
func (lc *LightningChannel) SignNextCommitment() (*NewCommitState, error) {
|
||||
func (lc *LightningChannel) SignNextCommitment(
|
||||
ctx context.Context) (*NewCommitState, error) {
|
||||
|
||||
lc.Lock()
|
||||
defer lc.Unlock()
|
||||
|
||||
@ -4626,6 +4634,17 @@ func (lc *LightningChannel) SignNextCommitment() (*NewCommitState, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// We'll need to send over the signatures to the remote party in the
|
||||
// order as they appear on the commitment transaction after BIP 69
|
||||
// sorting.
|
||||
slices.SortFunc(sigBatch, func(i, j SignJob) int {
|
||||
return cmp.Compare(i.OutputIndex, j.OutputIndex)
|
||||
})
|
||||
slices.SortFunc(auxSigBatch, func(i, j AuxSigJob) int {
|
||||
return cmp.Compare(i.OutputIndex, j.OutputIndex)
|
||||
})
|
||||
|
||||
lc.sigPool.SubmitSignBatch(sigBatch)
|
||||
|
||||
err = fn.MapOptionZ(lc.auxSigner, func(a AuxSigner) error {
|
||||
@ -4675,23 +4694,19 @@ func (lc *LightningChannel) SignNextCommitment() (*NewCommitState, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// We'll need to send over the signatures to the remote party in the
|
||||
// order as they appear on the commitment transaction after BIP 69
|
||||
// sorting.
|
||||
sort.Slice(sigBatch, func(i, j int) bool {
|
||||
return sigBatch[i].OutputIndex < sigBatch[j].OutputIndex
|
||||
})
|
||||
sort.Slice(auxSigBatch, func(i, j int) bool {
|
||||
return auxSigBatch[i].OutputIndex < auxSigBatch[j].OutputIndex
|
||||
})
|
||||
|
||||
// With the jobs sorted, we'll now iterate through all the responses to
|
||||
// gather each of the signatures in order.
|
||||
// Iterate through all the responses to gather each of the signatures
|
||||
// in the order they were submitted.
|
||||
htlcSigs = make([]lnwire.Sig, 0, len(sigBatch))
|
||||
auxSigs := make([]fn.Option[tlv.Blob], 0, len(auxSigBatch))
|
||||
for i := range sigBatch {
|
||||
htlcSigJob := sigBatch[i]
|
||||
jobResp := <-htlcSigJob.Resp
|
||||
var jobResp SignJobResp
|
||||
|
||||
select {
|
||||
case jobResp = <-htlcSigJob.Resp:
|
||||
case <-ctx.Done():
|
||||
return nil, errQuit
|
||||
}
|
||||
|
||||
// If an error occurred, then we'll cancel any other active
|
||||
// jobs.
|
||||
@ -4707,7 +4722,13 @@ func (lc *LightningChannel) SignNextCommitment() (*NewCommitState, error) {
|
||||
}
|
||||
|
||||
auxHtlcSigJob := auxSigBatch[i]
|
||||
auxJobResp := <-auxHtlcSigJob.Resp
|
||||
var auxJobResp AuxSigJobResp
|
||||
|
||||
select {
|
||||
case auxJobResp = <-auxHtlcSigJob.Resp:
|
||||
case <-ctx.Done():
|
||||
return nil, errQuit
|
||||
}
|
||||
|
||||
// If an error occurred, then we'll cancel any other active
|
||||
// jobs.
|
||||
@ -4800,7 +4821,7 @@ func (lc *LightningChannel) resignMusigCommit(commitTx *wire.MsgTx,
|
||||
// previous commitment txn. This allows the link to clear its mailbox of those
|
||||
// circuits in case they are still in memory, and ensure the switch's circuit
|
||||
// map has been updated by deleting the closed circuits.
|
||||
func (lc *LightningChannel) ProcessChanSyncMsg(
|
||||
func (lc *LightningChannel) ProcessChanSyncMsg(ctx context.Context,
|
||||
msg *lnwire.ChannelReestablish) ([]lnwire.Message, []models.CircuitKey,
|
||||
[]models.CircuitKey, error) {
|
||||
|
||||
@ -4964,7 +4985,7 @@ func (lc *LightningChannel) ProcessChanSyncMsg(
|
||||
// revocation, but also initiate a state transition to re-sync
|
||||
// them.
|
||||
if lc.OweCommitment() {
|
||||
newCommit, err := lc.SignNextCommitment()
|
||||
newCommit, err := lc.SignNextCommitment(ctx)
|
||||
switch {
|
||||
|
||||
// If we signed this state, then we'll accumulate
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -391,6 +391,14 @@ func (*mockChainIO) GetBlockHeader(
|
||||
|
||||
type auxSignerMock struct {
|
||||
mock.Mock
|
||||
|
||||
jobHandlerFunc func([]AuxSigJob)
|
||||
}
|
||||
|
||||
func NewAuxSignerMock(jobHandler func([]AuxSigJob)) *auxSignerMock {
|
||||
return &auxSignerMock{
|
||||
jobHandlerFunc: jobHandler,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *auxSignerMock) SubmitSecondLevelSigBatch(
|
||||
@ -398,12 +406,7 @@ func (a *auxSignerMock) SubmitSecondLevelSigBatch(
|
||||
commitTx *wire.MsgTx, sigJobs []AuxSigJob) error {
|
||||
|
||||
args := a.Called(chanState, commitTx, sigJobs)
|
||||
|
||||
// While we return, we'll also send back an instant response for the
|
||||
// set of jobs.
|
||||
for _, sigJob := range sigJobs {
|
||||
sigJob.Resp <- AuxSigJobResp{}
|
||||
}
|
||||
a.jobHandlerFunc(sigJobs)
|
||||
|
||||
return args.Error(0)
|
||||
}
|
||||
|
@ -47,17 +47,17 @@ type VerifyJob struct {
|
||||
// TODO(roasbeef): remove -- never actually used?
|
||||
HtlcIndex uint64
|
||||
|
||||
// Cancel is a channel that should be closed if the caller wishes to
|
||||
// Cancel is a channel that is closed by the caller if they wish to
|
||||
// cancel all pending verification jobs part of a single batch. This
|
||||
// channel is to be closed in the case that a single signature in a
|
||||
// batch has been returned as invalid, as there is no need to verify
|
||||
// the remainder of the signatures.
|
||||
Cancel chan struct{}
|
||||
// channel is closed in the case that a single signature in a batch has
|
||||
// been returned as invalid, as there is no need to verify the remainder
|
||||
// of the signatures.
|
||||
Cancel <-chan struct{}
|
||||
|
||||
// ErrResp is the channel that the result of the signature verification
|
||||
// is to be sent over. In the see that the signature is valid, a nil
|
||||
// error will be passed. Otherwise, a concrete error detailing the
|
||||
// issue will be passed.
|
||||
// issue will be passed. This channel MUST be buffered.
|
||||
ErrResp chan *HtlcIndexErr
|
||||
}
|
||||
|
||||
@ -88,12 +88,13 @@ type SignJob struct {
|
||||
// transaction being signed.
|
||||
OutputIndex int32
|
||||
|
||||
// Cancel is a channel that should be closed if the caller wishes to
|
||||
// abandon all pending sign jobs part of a single batch.
|
||||
Cancel chan struct{}
|
||||
// Cancel is a channel that is closed by the caller if they wish to
|
||||
// abandon all pending sign jobs part of a single batch. This should
|
||||
// never be closed by the validator.
|
||||
Cancel <-chan struct{}
|
||||
|
||||
// Resp is the channel that the response to this particular SignJob
|
||||
// will be sent over.
|
||||
// will be sent over. This channel MUST be buffered.
|
||||
//
|
||||
// TODO(roasbeef): actually need to allow caller to set, need to retain
|
||||
// order mark commit sig as special
|
||||
|
@ -1,6 +1,7 @@
|
||||
package lnwallet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
@ -100,6 +101,11 @@ var (
|
||||
bobDustLimit = btcutil.Amount(1300)
|
||||
|
||||
testChannelCapacity float64 = 10
|
||||
|
||||
// testQuit is a context that will never be cancelled, that is used in
|
||||
// place of a real quit context.
|
||||
testQuit, testQuitFunc = context.WithCancel(context.Background())
|
||||
_ = testQuitFunc
|
||||
)
|
||||
|
||||
// CreateTestChannels creates to fully populated channels to be used within
|
||||
@ -541,7 +547,7 @@ func calcStaticFee(chanType channeldb.ChannelType, numHTLCs int) btcutil.Amount
|
||||
// pending updates. This method is useful when testing interactions between two
|
||||
// live state machines.
|
||||
func ForceStateTransition(chanA, chanB *LightningChannel) error {
|
||||
aliceNewCommit, err := chanA.SignNextCommitment()
|
||||
aliceNewCommit, err := chanA.SignNextCommitment(testQuit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -554,7 +560,7 @@ func ForceStateTransition(chanA, chanB *LightningChannel) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bobNewCommit, err := chanB.SignNextCommitment()
|
||||
bobNewCommit, err := chanB.SignNextCommitment(testQuit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -357,7 +357,7 @@ func testVectors(t *testing.T, chanType channeldb.ChannelType, test testCase) {
|
||||
|
||||
// Execute commit dance to arrive at the point where the local node has
|
||||
// received the test commitment and the remote signature.
|
||||
localNewCommit, err := localChannel.SignNextCommitment()
|
||||
localNewCommit, err := localChannel.SignNextCommitment(testQuit)
|
||||
require.NoError(t, err, "local unable to sign commitment")
|
||||
|
||||
err = remoteChannel.ReceiveNewCommitment(localNewCommit.CommitSigs)
|
||||
@ -369,7 +369,7 @@ func testVectors(t *testing.T, chanType channeldb.ChannelType, test testCase) {
|
||||
_, _, _, _, err = localChannel.ReceiveRevocation(revMsg)
|
||||
require.NoError(t, err)
|
||||
|
||||
remoteNewCommit, err := remoteChannel.SignNextCommitment()
|
||||
remoteNewCommit, err := remoteChannel.SignNextCommitment(testQuit)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(
|
||||
|
Loading…
Reference in New Issue
Block a user