lntest: refactor handle close channel update

This commit is contained in:
yyforyongyu 2021-08-08 17:20:04 +08:00
parent 0701834a5d
commit 92cd6657c5
No known key found for this signature in database
GPG key ID: 9BCD95C4FF296868

View file

@ -373,8 +373,8 @@ type HarnessNode struct {
openChans map[wire.OutPoint]int openChans map[wire.OutPoint]int
openChanWatchers map[wire.OutPoint][]chan struct{} openChanWatchers map[wire.OutPoint][]chan struct{}
closedChans map[wire.OutPoint]struct{} closedChans map[wire.OutPoint]struct{}
closeClients map[wire.OutPoint][]chan struct{} closeChanWatchers map[wire.OutPoint][]chan struct{}
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@ -449,8 +449,8 @@ func newNode(cfg NodeConfig) (*HarnessNode, error) {
openChans: make(map[wire.OutPoint]int), openChans: make(map[wire.OutPoint]int),
openChanWatchers: make(map[wire.OutPoint][]chan struct{}), openChanWatchers: make(map[wire.OutPoint][]chan struct{}),
closedChans: make(map[wire.OutPoint]struct{}), closedChans: make(map[wire.OutPoint]struct{}),
closeClients: make(map[wire.OutPoint][]chan struct{}), closeChanWatchers: make(map[wire.OutPoint][]chan struct{}),
}, nil }, nil
} }
@ -1439,33 +1439,13 @@ func (hn *HarnessNode) lightningNetworkWatcher(subscribed chan error) {
// dispatch any requests. // dispatch any requests.
case graphUpdate := <-graphUpdates: case graphUpdate := <-graphUpdates:
hn.handleChannelEdgeUpdates(graphUpdate.ChannelUpdates) hn.handleChannelEdgeUpdates(graphUpdate.ChannelUpdates)
hn.handleClosedChannelUpdate(graphUpdate.ClosedChans)
// For each channel closed, we'll mark that we've // TODO(yy): handle node updates too
// detected a channel closure while lnd was pruning the
// channel graph.
for _, closedChan := range graphUpdate.ClosedChans {
txidHash, _ := getChanPointFundingTxid(closedChan.ChanPoint)
txid, _ := chainhash.NewHash(txidHash)
op := wire.OutPoint{
Hash: *txid,
Index: closedChan.ChanPoint.OutputIndex,
}
hn.closedChans[op] = struct{}{}
// As the channel has been closed, we'll notify
// all register clients.
for _, eventChan := range hn.closeClients[op] {
close(eventChan)
}
delete(hn.closeClients, op)
}
// A new watch request, has just arrived. We'll either be able // A new watch request, has just arrived. We'll either be able
// to dispatch immediately, or need to add the client for // to dispatch immediately, or need to add the client for
// processing later. // processing later.
case watchRequest := <-hn.chanWatchRequests: case watchRequest := <-hn.chanWatchRequests:
targetChan := watchRequest.chanPoint
// TODO(roasbeef): add update type also, checks for // TODO(roasbeef): add update type also, checks for
// multiple of 2 // multiple of 2
if watchRequest.chanOpen { if watchRequest.chanOpen {
@ -1473,20 +1453,7 @@ func (hn *HarnessNode) lightningNetworkWatcher(subscribed chan error) {
continue continue
} }
// If this is a close request, then it can be hn.handleCloseChannelWatchRequest(watchRequest)
// immediately dispatched if we've already seen a
// channel closure for this channel.
if _, ok := hn.closedChans[targetChan]; ok {
close(watchRequest.eventChan)
continue
}
// Otherwise, we'll add this to the list of close watch
// clients for this out point.
hn.closeClients[targetChan] = append(
hn.closeClients[targetChan],
watchRequest.eventChan,
)
case <-hn.quit: case <-hn.quit:
return return
@ -1529,24 +1496,18 @@ func (hn *HarnessNode) WaitForNetworkChannelOpen(ctx context.Context,
// closed once a transaction spending the funding outpoint is seen within a // closed once a transaction spending the funding outpoint is seen within a
// confirmed block. // confirmed block.
func (hn *HarnessNode) WaitForNetworkChannelClose(ctx context.Context, func (hn *HarnessNode) WaitForNetworkChannelClose(ctx context.Context,
op *lnrpc.ChannelPoint) error { chanPoint *lnrpc.ChannelPoint) error {
eventChan := make(chan struct{}) eventChan := make(chan struct{})
txidHash, err := getChanPointFundingTxid(op) op, err := MakeOutpoint(chanPoint)
if err != nil { if err != nil {
return err return fmt.Errorf("failed to create outpoint for %v "+
} "got err: %v", chanPoint, err)
txid, err := chainhash.NewHash(txidHash)
if err != nil {
return err
} }
hn.chanWatchRequests <- &chanWatchRequest{ hn.chanWatchRequests <- &chanWatchRequest{
chanPoint: wire.OutPoint{ chanPoint: op,
Hash: *txid,
Index: op.OutputIndex,
},
eventChan: eventChan, eventChan: eventChan,
chanOpen: false, chanOpen: false,
} }
@ -1555,7 +1516,8 @@ func (hn *HarnessNode) WaitForNetworkChannelClose(ctx context.Context,
case <-eventChan: case <-eventChan:
return nil return nil
case <-ctx.Done(): case <-ctx.Done():
return fmt.Errorf("channel not closed before timeout") return fmt.Errorf("channel:%s not closed before timeout: "+
"%s", op, hn)
} }
} }
@ -1702,3 +1664,52 @@ func (hn *HarnessNode) handleOpenChannelWatchRequest(req *chanWatchRequest) {
req.eventChan, req.eventChan,
) )
} }
// handleClosedChannelUpdate takes a series of closed channel updates, extracts
// the outpoints, saves them to harness node's internal state, and notifies all
// registered clients.
func (hn *HarnessNode) handleClosedChannelUpdate(
updates []*lnrpc.ClosedChannelUpdate) {
// For each channel closed, we'll mark that we've detected a channel
// closure while lnd was pruning the channel graph.
for _, closedChan := range updates {
op, err := MakeOutpoint(closedChan.ChanPoint)
if err != nil {
hn.PrintErr("failed to create outpoint for %v "+
"got err: %v", closedChan.ChanPoint, err)
return
}
hn.closedChans[op] = struct{}{}
// As the channel has been closed, we'll notify all register
// watchers.
for _, eventChan := range hn.closeChanWatchers[op] {
close(eventChan)
}
delete(hn.closeChanWatchers, op)
}
}
// handleCloseChannelWatchRequest processes a watch close channel request by
// checking whether the given channel point can be found in the node's internal
// state. If not, the request is added to a watch request list than will be
// handled by handleCloseChannelWatchRequest.
func (hn *HarnessNode) handleCloseChannelWatchRequest(req *chanWatchRequest) {
targetChan := req.chanPoint
// If this is a close request, then it can be immediately dispatched if
// we've already seen a channel closure for this channel.
if _, ok := hn.closedChans[targetChan]; ok {
close(req.eventChan)
return
}
// Otherwise, we'll add this to the list of close channel watchers for
// this out point.
hn.closeChanWatchers[targetChan] = append(
hn.closeChanWatchers[targetChan],
req.eventChan,
)
}