lntest: refactor handle update open channel

This commit is contained in:
yyforyongyu 2021-08-08 17:19:25 +08:00
parent a1024163fe
commit 0701834a5d
No known key found for this signature in database
GPG key ID: 9BCD95C4FF296868
2 changed files with 97 additions and 78 deletions

View file

@ -1346,14 +1346,11 @@ func copyPorts(oldNode *lntest.HarnessNode) lntest.NodeOption {
} }
} }
func rpcPointToWirePoint(t *harnessTest, chanPoint *lnrpc.ChannelPoint) wire.OutPoint { func rpcPointToWirePoint(t *harnessTest,
txid, err := lnrpc.GetChanPointFundingTxid(chanPoint) chanPoint *lnrpc.ChannelPoint) wire.OutPoint {
if err != nil {
t.Fatalf("unable to get txid: %v", err)
}
return wire.OutPoint{ op, err := lntest.MakeOutpoint(chanPoint)
Hash: *txid, require.NoError(t.t, err, "unable to get txid")
Index: chanPoint.OutputIndex,
} return op
} }

View file

@ -370,8 +370,8 @@ type HarnessNode struct {
// edges seen for that channel within the network. When this number // edges seen for that channel within the network. When this number
// reaches 2, then it means that both edge advertisements has propagated // reaches 2, then it means that both edge advertisements has propagated
// through the network. // through the network.
openChans map[wire.OutPoint]int openChans map[wire.OutPoint]int
openClients map[wire.OutPoint][]chan struct{} openChanWatchers map[wire.OutPoint][]chan struct{}
closedChans map[wire.OutPoint]struct{} closedChans map[wire.OutPoint]struct{}
closeClients map[wire.OutPoint][]chan struct{} closeClients map[wire.OutPoint][]chan struct{}
@ -447,7 +447,7 @@ func newNode(cfg NodeConfig) (*HarnessNode, error) {
NodeID: nodeNum, NodeID: nodeNum,
chanWatchRequests: make(chan *chanWatchRequest), chanWatchRequests: make(chan *chanWatchRequest),
openChans: make(map[wire.OutPoint]int), openChans: make(map[wire.OutPoint]int),
openClients: make(map[wire.OutPoint][]chan struct{}), openChanWatchers: make(map[wire.OutPoint][]chan struct{}),
closedChans: make(map[wire.OutPoint]struct{}), closedChans: make(map[wire.OutPoint]struct{}),
closeClients: make(map[wire.OutPoint][]chan struct{}), closeClients: make(map[wire.OutPoint][]chan struct{}),
@ -1438,31 +1438,7 @@ func (hn *HarnessNode) lightningNetworkWatcher(subscribed chan error) {
// the current set of registered clients to see if we can // the current set of registered clients to see if we can
// dispatch any requests. // dispatch any requests.
case graphUpdate := <-graphUpdates: case graphUpdate := <-graphUpdates:
// For each new channel, we'll increment the number of hn.handleChannelEdgeUpdates(graphUpdate.ChannelUpdates)
// edges seen by one.
for _, newChan := range graphUpdate.ChannelUpdates {
txidHash, _ := getChanPointFundingTxid(newChan.ChanPoint)
txid, _ := chainhash.NewHash(txidHash)
op := wire.OutPoint{
Hash: *txid,
Index: newChan.ChanPoint.OutputIndex,
}
hn.openChans[op]++
// For this new channel, if the number of edges
// seen is less than two, then the channel
// hasn't been fully announced yet.
if numEdges := hn.openChans[op]; numEdges < 2 {
continue
}
// Otherwise, we'll notify all the registered
// clients and remove the dispatched clients.
for _, eventChan := range hn.openClients[op] {
close(eventChan)
}
delete(hn.openClients, op)
}
// For each channel closed, we'll mark that we've // For each channel closed, we'll mark that we've
// detected a channel closure while lnd was pruning the // detected a channel closure while lnd was pruning the
@ -1493,35 +1469,7 @@ func (hn *HarnessNode) lightningNetworkWatcher(subscribed chan error) {
// TODO(roasbeef): add update type also, checks for // TODO(roasbeef): add update type also, checks for
// multiple of 2 // multiple of 2
if watchRequest.chanOpen { if watchRequest.chanOpen {
// If this is an open request, then it can be hn.handleOpenChannelWatchRequest(watchRequest)
// dispatched if the number of edges seen for
// the channel is at least two.
if numEdges := hn.openChans[targetChan]; numEdges >= 2 {
close(watchRequest.eventChan)
continue
}
// Before we add the channel to our set of open
// clients, we'll check to see if the channel
// is already in the channel graph of the
// target node. This lets us handle the case
// where a node has already seen a channel
// before a notification has been requested,
// causing us to miss it.
chanFound := checkChanPointInGraph(
context.Background(), hn, targetChan,
)
if chanFound {
close(watchRequest.eventChan)
continue
}
// Otherwise, we'll add this to the list of
// watch open clients for this out point.
hn.openClients[targetChan] = append(
hn.openClients[targetChan],
watchRequest.eventChan,
)
continue continue
} }
@ -1551,24 +1499,18 @@ func (hn *HarnessNode) lightningNetworkWatcher(subscribed chan error) {
// considered "fully advertised" once both of its directional edges has been // considered "fully advertised" once both of its directional edges has been
// advertised within the test Lightning Network. // advertised within the test Lightning Network.
func (hn *HarnessNode) WaitForNetworkChannelOpen(ctx context.Context, func (hn *HarnessNode) WaitForNetworkChannelOpen(ctx context.Context,
op *lnrpc.ChannelPoint) error { chanPoint *lnrpc.ChannelPoint) error {
eventChan := make(chan struct{}) eventChan := make(chan struct{})
txidHash, err := getChanPointFundingTxid(op) op, err := MakeOutpoint(chanPoint)
if err != nil { if err != nil {
return err return fmt.Errorf("failed to create outpoint for %v "+
} "got err: %v", chanPoint, err)
txid, err := chainhash.NewHash(txidHash)
if err != nil {
return err
} }
hn.chanWatchRequests <- &chanWatchRequest{ hn.chanWatchRequests <- &chanWatchRequest{
chanPoint: wire.OutPoint{ chanPoint: op,
Hash: *txid,
Index: op.OutputIndex,
},
eventChan: eventChan, eventChan: eventChan,
chanOpen: true, chanOpen: true,
} }
@ -1577,7 +1519,8 @@ func (hn *HarnessNode) WaitForNetworkChannelOpen(ctx context.Context,
case <-eventChan: case <-eventChan:
return nil return nil
case <-ctx.Done(): case <-ctx.Done():
return fmt.Errorf("channel not opened before timeout") return fmt.Errorf("channel:%s not opened before timeout: %s",
op, hn)
} }
} }
@ -1680,3 +1623,82 @@ func (hn *HarnessNode) PrintErr(format string, a ...interface{}) {
fmt.Printf("itest error from [node:%s]: %s\n", fmt.Printf("itest error from [node:%s]: %s\n",
hn.Cfg.Name, fmt.Sprintf(format, a...)) hn.Cfg.Name, fmt.Sprintf(format, a...))
} }
// MakeOutpoint returns the outpoint of the channel's funding transaction.
func MakeOutpoint(chanPoint *lnrpc.ChannelPoint) (wire.OutPoint, error) {
fundingTxID, err := lnrpc.GetChanPointFundingTxid(chanPoint)
if err != nil {
return wire.OutPoint{}, err
}
return wire.OutPoint{
Hash: *fundingTxID,
Index: chanPoint.OutputIndex,
}, nil
}
// handleChannelEdgeUpdates takes a series of channel edge updates, extracts
// the outpoints, and saves them to harness node's internal state.
func (hn *HarnessNode) handleChannelEdgeUpdates(
updates []*lnrpc.ChannelEdgeUpdate) {
// For each new channel, we'll increment the number of
// edges seen by one.
for _, newChan := range updates {
op, err := MakeOutpoint(newChan.ChanPoint)
if err != nil {
hn.PrintErr("failed to create outpoint for %v "+
"got err: %v", newChan.ChanPoint, err)
return
}
hn.openChans[op]++
// For this new channel, if the number of edges seen is less
// than two, then the channel hasn't been fully announced yet.
if numEdges := hn.openChans[op]; numEdges < 2 {
return
}
// Otherwise, we'll notify all the registered watchers and
// remove the dispatched watchers.
for _, eventChan := range hn.openChanWatchers[op] {
close(eventChan)
}
delete(hn.openChanWatchers, op)
}
}
// handleOpenChannelWatchRequest processes a watch open channel request by
// checking the number of the edges seen for a given channel point. If the
// number is no less than 2 then the channel is considered open. Otherwise, we
// will attempt to find it in its channel graph. If neither can be found, the
// request is added to a watch request list than will be handled by
// handleChannelEdgeUpdates.
func (hn *HarnessNode) handleOpenChannelWatchRequest(req *chanWatchRequest) {
targetChan := req.chanPoint
// If this is an open request, then it can be dispatched if the number
// of edges seen for the channel is at least two.
if numEdges := hn.openChans[targetChan]; numEdges >= 2 {
close(req.eventChan)
return
}
// Before we add the channel to our set of open clients, we'll check to
// see if the channel is already in the channel graph of the target
// node. This lets us handle the case where a node has already seen a
// channel before a notification has been requested, causing us to miss
// it.
chanFound := checkChanPointInGraph(context.Background(), hn, targetChan)
if chanFound {
close(req.eventChan)
return
}
// Otherwise, we'll add this to the list of open channel watchers for
// this out point.
hn.openChanWatchers[targetChan] = append(
hn.openChanWatchers[targetChan],
req.eventChan,
)
}