From 95380fee1bc7e5ceb095c12fbfa385a9ebae79d0 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 30 Aug 2016 19:34:13 -0700 Subject: [PATCH] test: add async txn seen notifications to network harness This commit adds a new feature to the network harness enabling callers to receive async notifications once a particular transaction is seen on the network. Such a feature is useful when due to the asynchronous behavior of node communications. With this new feature, tests can now wait for a particular transaction to be seen within the network before proceeding. --- lnd_test.go | 30 ++++++++++---- networkharness.go | 102 ++++++++++++++++++++++++++++++++++++++-------- rpcserver.go | 2 + 3 files changed, 109 insertions(+), 25 deletions(-) diff --git a/lnd_test.go b/lnd_test.go index 133bc78f4..a566e8a0d 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -154,10 +154,22 @@ func TestLightningNetworkDaemon(t *testing.T) { } }() + // First create the network harness to gain access to its + // 'OnTxAccepted' call back. + lightningNetwork, err = newNetworkHarness(nil) + if err != nil { + t.Fatalf("unable to create lightning network harness: %v", err) + } + defer lightningNetwork.TearDownAll() + + handlers := &btcrpcclient.NotificationHandlers{ + OnTxAccepted: lightningNetwork.OnTxAccepted, + } + // First create an intance of the btcd's rpctest.Harness. This will be // used to fund the wallets of the nodes within the test network and to // drive blockchain related events within the network. - btcdHarness, err = rpctest.New(harnessNetParams, nil, nil) + btcdHarness, err = rpctest.New(harnessNetParams, handlers, nil) if err != nil { t.Fatalf("unable to create mining node: %v", err) } @@ -165,15 +177,15 @@ func TestLightningNetworkDaemon(t *testing.T) { if err = btcdHarness.SetUp(true, 50); err != nil { t.Fatalf("unable to set up mining node: %v", err) } - - // With the btcd harness created, create an instance of the lightning - // network harness as it depends on the btcd harness to script network - // activity. - lightningNetwork, err = newNetworkHarness(btcdHarness, nil) - if err != nil { - t.Fatalf("unable to create lightning network harness: %v", err) + if err := btcdHarness.Node.NotifyNewTransactions(false); err != nil { + t.Fatalf("unable to request transaction notifications: %v", err) + } + + // With the btcd harness created, we can now complete the + // initialization of the network. + if err := lightningNetwork.InitializeSeedNodes(btcdHarness); err != nil { + t.Fatalf("unable to initialize seed nodes: %v", err) } - defer lightningNetwork.TearDownAll() if err = lightningNetwork.SetUp(); err != nil { t.Fatalf("unable to set up test lightning network: %v", err) } diff --git a/networkharness.go b/networkharness.go index dd88e61f6..50b88a1c9 100644 --- a/networkharness.go +++ b/networkharness.go @@ -226,6 +226,9 @@ type networkHarness struct { AliceClient lnrpc.LightningClient BobClient lnrpc.LightningClient + + seenTxns chan wire.ShaHash + watchRequests chan *watchRequest } // newNetworkHarness creates a new network test harness given an already @@ -235,32 +238,35 @@ type networkHarness struct { // TODO(roasbeef): add option to use golang's build library to a binary of the // current repo. This'll save developers from having to manually `go install` // within the repo each time before changes. -func newNetworkHarness(r *rpctest.Harness, lndArgs []string) (*networkHarness, error) { - var err error +func newNetworkHarness(lndArgs []string) (*networkHarness, error) { + return &networkHarness{ + activeNodes: make(map[int]*lightningNode), + seenTxns: make(chan wire.ShaHash), + watchRequests: make(chan *watchRequest), + }, nil +} +func (n *networkHarness) InitializeSeedNodes(r *rpctest.Harness) error { nodeConfig := r.RPCConfig() - testNet := &networkHarness{ - rpcConfig: nodeConfig, - netParams: r.ActiveNet, - Miner: r, + n.netParams = r.ActiveNet + n.Miner = r + n.rpcConfig = nodeConfig - activeNodes: make(map[int]*lightningNode), - } - - testNet.aliceNode, err = newLightningNode(&nodeConfig, nil) + var err error + n.aliceNode, err = newLightningNode(&nodeConfig, nil) if err != nil { - return nil, err + return err } - testNet.bobNode, err = newLightningNode(&nodeConfig, nil) + n.bobNode, err = newLightningNode(&nodeConfig, nil) if err != nil { - return nil, err + return err } - testNet.activeNodes[testNet.aliceNode.nodeId] = testNet.aliceNode - testNet.activeNodes[testNet.bobNode.nodeId] = testNet.bobNode + n.activeNodes[n.aliceNode.nodeId] = n.aliceNode + n.activeNodes[n.bobNode.nodeId] = n.bobNode - return testNet, nil + return err } // fakeLogger is a fake grpclog.Logger implementation. This is used to stop @@ -396,9 +402,73 @@ out: } } + go n.networkWatcher() + return nil } +// watchRequest encapsulates a request to the harness' network watcher to +// dispatch a notification once a transaction with the target txid is seen +// within the test network. +type watchRequest struct { + txid wire.ShaHash + eventChan chan struct{} +} + +// networkWatcher is a goroutine which accepts async notification requests for +// the broadcast of a target transaction, and then dispatches the transaction +// once its seen on the network. +func (n *networkHarness) networkWatcher() { + seenTxns := make(map[wire.ShaHash]struct{}) + clients := make(map[wire.ShaHash][]chan struct{}) + + for { + + select { + case req := <-n.watchRequests: + // If we've already seen this transaction, then + // immediately dispatch the request. Otherwise, append + // to the list of clients who are watching for the + // broadcast of this transaction. + if _, ok := seenTxns[req.txid]; ok { + close(req.eventChan) + } else { + clients[req.txid] = append(clients[req.txid], req.eventChan) + } + case txid := <-n.seenTxns: + + // If there isn't a registered notification for this + // transaction then ignore it. + txClients, ok := clients[txid] + if !ok { + continue + } + + // Otherwise, dispatch the notification to all clients, + // cleaning up the now un-needed state. + for _, client := range txClients { + close(client) + } + delete(clients, txid) + } + } +} + +func (n *networkHarness) OnTxAccepted(hash *wire.ShaHash, amount btcutil.Amount) { + go func() { + n.seenTxns <- *hash + }() +} + +// WaitForTxBroadcast blocks until the target txid is seen on the network. +func (n *networkHarness) WaitForTxBroadcast(txid wire.ShaHash) { + eventChan := make(chan struct{}) + + n.watchRequests <- &watchRequest{txid, eventChan} + + <-eventChan +} + // TearDownAll tears down all active nodes within the test lightning network. func (n *networkHarness) TearDownAll() error { for _, node := range n.activeNodes { diff --git a/rpcserver.go b/rpcserver.go index 67a97b0cb..18598c804 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -270,6 +270,8 @@ out: "ChannelPoint(%v): %v", targetChannelPoint, err) return err case closingUpdate := <-updateChan: + rpcsLog.Tracef("[closechannel] sending update: %v", + closingUpdate) if err := updateStream.Send(closingUpdate); err != nil { return err }