test: add async txn seen notifications to network harness

This commit adds a new feature to the network harness enabling callers
to receive async notifications once a particular transaction is seen on
the network. Such a feature is useful when due to the asynchronous
behavior of node communications.

With this new feature, tests can now wait for a particular transaction
to be seen within the network before proceeding.
This commit is contained in:
Olaoluwa Osuntokun 2016-08-30 19:34:13 -07:00
parent 9eb2a4219f
commit 95380fee1b
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
3 changed files with 109 additions and 25 deletions

View File

@ -154,10 +154,22 @@ func TestLightningNetworkDaemon(t *testing.T) {
}
}()
// First create the network harness to gain access to its
// 'OnTxAccepted' call back.
lightningNetwork, err = newNetworkHarness(nil)
if err != nil {
t.Fatalf("unable to create lightning network harness: %v", err)
}
defer lightningNetwork.TearDownAll()
handlers := &btcrpcclient.NotificationHandlers{
OnTxAccepted: lightningNetwork.OnTxAccepted,
}
// First create an intance of the btcd's rpctest.Harness. This will be
// used to fund the wallets of the nodes within the test network and to
// drive blockchain related events within the network.
btcdHarness, err = rpctest.New(harnessNetParams, nil, nil)
btcdHarness, err = rpctest.New(harnessNetParams, handlers, nil)
if err != nil {
t.Fatalf("unable to create mining node: %v", err)
}
@ -165,15 +177,15 @@ func TestLightningNetworkDaemon(t *testing.T) {
if err = btcdHarness.SetUp(true, 50); err != nil {
t.Fatalf("unable to set up mining node: %v", err)
}
// With the btcd harness created, create an instance of the lightning
// network harness as it depends on the btcd harness to script network
// activity.
lightningNetwork, err = newNetworkHarness(btcdHarness, nil)
if err != nil {
t.Fatalf("unable to create lightning network harness: %v", err)
if err := btcdHarness.Node.NotifyNewTransactions(false); err != nil {
t.Fatalf("unable to request transaction notifications: %v", err)
}
// With the btcd harness created, we can now complete the
// initialization of the network.
if err := lightningNetwork.InitializeSeedNodes(btcdHarness); err != nil {
t.Fatalf("unable to initialize seed nodes: %v", err)
}
defer lightningNetwork.TearDownAll()
if err = lightningNetwork.SetUp(); err != nil {
t.Fatalf("unable to set up test lightning network: %v", err)
}

View File

@ -226,6 +226,9 @@ type networkHarness struct {
AliceClient lnrpc.LightningClient
BobClient lnrpc.LightningClient
seenTxns chan wire.ShaHash
watchRequests chan *watchRequest
}
// newNetworkHarness creates a new network test harness given an already
@ -235,32 +238,35 @@ type networkHarness struct {
// TODO(roasbeef): add option to use golang's build library to a binary of the
// current repo. This'll save developers from having to manually `go install`
// within the repo each time before changes.
func newNetworkHarness(r *rpctest.Harness, lndArgs []string) (*networkHarness, error) {
var err error
func newNetworkHarness(lndArgs []string) (*networkHarness, error) {
return &networkHarness{
activeNodes: make(map[int]*lightningNode),
seenTxns: make(chan wire.ShaHash),
watchRequests: make(chan *watchRequest),
}, nil
}
func (n *networkHarness) InitializeSeedNodes(r *rpctest.Harness) error {
nodeConfig := r.RPCConfig()
testNet := &networkHarness{
rpcConfig: nodeConfig,
netParams: r.ActiveNet,
Miner: r,
n.netParams = r.ActiveNet
n.Miner = r
n.rpcConfig = nodeConfig
activeNodes: make(map[int]*lightningNode),
}
testNet.aliceNode, err = newLightningNode(&nodeConfig, nil)
var err error
n.aliceNode, err = newLightningNode(&nodeConfig, nil)
if err != nil {
return nil, err
return err
}
testNet.bobNode, err = newLightningNode(&nodeConfig, nil)
n.bobNode, err = newLightningNode(&nodeConfig, nil)
if err != nil {
return nil, err
return err
}
testNet.activeNodes[testNet.aliceNode.nodeId] = testNet.aliceNode
testNet.activeNodes[testNet.bobNode.nodeId] = testNet.bobNode
n.activeNodes[n.aliceNode.nodeId] = n.aliceNode
n.activeNodes[n.bobNode.nodeId] = n.bobNode
return testNet, nil
return err
}
// fakeLogger is a fake grpclog.Logger implementation. This is used to stop
@ -396,9 +402,73 @@ out:
}
}
go n.networkWatcher()
return nil
}
// watchRequest encapsulates a request to the harness' network watcher to
// dispatch a notification once a transaction with the target txid is seen
// within the test network.
type watchRequest struct {
txid wire.ShaHash
eventChan chan struct{}
}
// networkWatcher is a goroutine which accepts async notification requests for
// the broadcast of a target transaction, and then dispatches the transaction
// once its seen on the network.
func (n *networkHarness) networkWatcher() {
seenTxns := make(map[wire.ShaHash]struct{})
clients := make(map[wire.ShaHash][]chan struct{})
for {
select {
case req := <-n.watchRequests:
// If we've already seen this transaction, then
// immediately dispatch the request. Otherwise, append
// to the list of clients who are watching for the
// broadcast of this transaction.
if _, ok := seenTxns[req.txid]; ok {
close(req.eventChan)
} else {
clients[req.txid] = append(clients[req.txid], req.eventChan)
}
case txid := <-n.seenTxns:
// If there isn't a registered notification for this
// transaction then ignore it.
txClients, ok := clients[txid]
if !ok {
continue
}
// Otherwise, dispatch the notification to all clients,
// cleaning up the now un-needed state.
for _, client := range txClients {
close(client)
}
delete(clients, txid)
}
}
}
func (n *networkHarness) OnTxAccepted(hash *wire.ShaHash, amount btcutil.Amount) {
go func() {
n.seenTxns <- *hash
}()
}
// WaitForTxBroadcast blocks until the target txid is seen on the network.
func (n *networkHarness) WaitForTxBroadcast(txid wire.ShaHash) {
eventChan := make(chan struct{})
n.watchRequests <- &watchRequest{txid, eventChan}
<-eventChan
}
// TearDownAll tears down all active nodes within the test lightning network.
func (n *networkHarness) TearDownAll() error {
for _, node := range n.activeNodes {

View File

@ -270,6 +270,8 @@ out:
"ChannelPoint(%v): %v", targetChannelPoint, err)
return err
case closingUpdate := <-updateChan:
rpcsLog.Tracef("[closechannel] sending update: %v",
closingUpdate)
if err := updateStream.Send(closingUpdate); err != nil {
return err
}