Merge pull request #9574 from ellemouton/fixWatcherPanic

lntest: wait for ChanUpdate req to be fully processed before sending another
This commit is contained in:
Oliver Gugger 2025-03-06 09:21:35 -06:00 committed by GitHub
commit 7d7e1872c8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 48 additions and 17 deletions

View file

@ -323,6 +323,9 @@ The underlying functionality between those two options remain the same.
* Add a new CI-step to do some basic [backwards compatibility * Add a new CI-step to do some basic [backwards compatibility
testing](https://github.com/lightningnetwork/lnd/pull/9540) for each PR. testing](https://github.com/lightningnetwork/lnd/pull/9540) for each PR.
* [Fix](https://github.com/lightningnetwork/lnd/pull/9574) an integration test
flake that could lead to a "close of a closed channel" panic.
## Database ## Database
* [Migrate the mission control * [Migrate the mission control

View file

@ -45,6 +45,10 @@ type chanWatchRequest struct {
advertisingNode string advertisingNode string
policy *lnrpc.RoutingPolicy policy *lnrpc.RoutingPolicy
includeUnannounced bool includeUnannounced bool
// handled is a channel that will be closed once the request has been
// handled by the topologyWatcher goroutine.
handled chan struct{}
} }
// nodeWatcher is a topology watcher for a HarnessNode. It keeps track of all // nodeWatcher is a topology watcher for a HarnessNode. It keeps track of all
@ -154,6 +158,7 @@ func (nw *nodeWatcher) WaitForChannelOpen(chanPoint *lnrpc.ChannelPoint) error {
chanPoint: op, chanPoint: op,
eventChan: eventChan, eventChan: eventChan,
chanWatchType: watchOpenChannel, chanWatchType: watchOpenChannel,
handled: make(chan struct{}),
} }
timer := time.After(wait.DefaultTimeout) timer := time.After(wait.DefaultTimeout)
@ -185,6 +190,7 @@ func (nw *nodeWatcher) WaitForChannelClose(
chanPoint: op, chanPoint: op,
eventChan: eventChan, eventChan: eventChan,
chanWatchType: watchCloseChannel, chanWatchType: watchCloseChannel,
handled: make(chan struct{}),
} }
timer := time.After(wait.DefaultTimeout) timer := time.After(wait.DefaultTimeout)
@ -216,7 +222,27 @@ func (nw *nodeWatcher) WaitForChannelPolicyUpdate(
timer := time.After(wait.DefaultTimeout) timer := time.After(wait.DefaultTimeout)
defer ticker.Stop() defer ticker.Stop()
eventChan := make(chan struct{}) // onTimeout is a helper function that will be called in case the
// expected policy is not found before the timeout.
onTimeout := func() error {
expected, err := json.MarshalIndent(policy, "", "\t")
if err != nil {
return fmt.Errorf("encode policy err: %w", err)
}
policies, err := syncMapToJSON(&nw.state.policyUpdates.Map)
if err != nil {
return err
}
return fmt.Errorf("policy not updated before timeout:"+
"\nchannel: %v \nadvertisingNode: %s:%v"+
"\nwant policy:%s\nhave updates:%s", op,
advertisingNode.Name(), advertisingNode.PubKeyStr,
expected, policies)
}
var eventChan = make(chan struct{})
for { for {
select { select {
// Send a watch request every second. // Send a watch request every second.
@ -230,6 +256,7 @@ func (nw *nodeWatcher) WaitForChannelPolicyUpdate(
default: default:
} }
var handled = make(chan struct{})
nw.chanWatchRequests <- &chanWatchRequest{ nw.chanWatchRequests <- &chanWatchRequest{
chanPoint: op, chanPoint: op,
eventChan: eventChan, eventChan: eventChan,
@ -237,28 +264,25 @@ func (nw *nodeWatcher) WaitForChannelPolicyUpdate(
policy: policy, policy: policy,
advertisingNode: advertisingNode.PubKeyStr, advertisingNode: advertisingNode.PubKeyStr,
includeUnannounced: includeUnannounced, includeUnannounced: includeUnannounced,
handled: handled,
}
// We wait for the topologyWatcher to signal that
// it has completed the handling of the request so that
// we don't send a new request before the previous one
// has been processed as this could lead to a double
// closure of the eventChan channel.
select {
case <-handled:
case <-timer:
return onTimeout()
} }
case <-eventChan: case <-eventChan:
return nil return nil
case <-timer: case <-timer:
expected, err := json.MarshalIndent(policy, "", "\t") return onTimeout()
if err != nil {
return fmt.Errorf("encode policy err: %w", err)
}
policies, err := syncMapToJSON(
&nw.state.policyUpdates.Map,
)
if err != nil {
return err
}
return fmt.Errorf("policy not updated before timeout:"+
"\nchannel: %v \nadvertisingNode: %s:%v"+
"\nwant policy:%s\nhave updates:%s", op,
advertisingNode.Name(),
advertisingNode.PubKeyStr, expected, policies)
} }
} }
} }
@ -341,6 +365,10 @@ func (nw *nodeWatcher) topologyWatcher(ctxb context.Context,
nw.handlePolicyUpdateWatchRequest(watchRequest) nw.handlePolicyUpdateWatchRequest(watchRequest)
} }
// Signal to the caller that the request has been
// handled.
close(watchRequest.handled)
case <-ctxb.Done(): case <-ctxb.Done():
return return
} }