From c0383679d24435357a5cd90226cf4abe856c3a3e Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 7 Jul 2016 15:30:55 -0700 Subject: [PATCH] lnd: use asynchronous streaming responses for [open|close]channel RPC's MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit switches the implementation of the open/close channel RPC’s from a fully blocking synchronous model to one that’s async by default, allowing callers to add a sync wrapper. The new proto specs also allow for “updates” for the pending channels in the form of new confirmations which progress the pending status of the channel. At this point, only the final open/close updates have been implemented. Obtaining confirmation notifications requires a bit of re-working within the current ChainNotifier interface, thus this has been deferred to a later time. --- peer.go | 3 ++- rpcserver.go | 63 +++++++++++++++++++++++++++++++++++----------------- server.go | 18 +++++---------- 3 files changed, 51 insertions(+), 33 deletions(-) diff --git a/peer.go b/peer.go index bbae8eb4c..2c76ad843 100644 --- a/peer.go +++ b/peer.go @@ -555,11 +555,12 @@ func (p *peer) handleLocalClose(req *closeChanReq) { success = true case <-p.quit: + return } // Respond to the local sub-system which requested the channel // closure. - req.resp <- &closeChanResp{success} + req.resp <- &closeChanResp{txid, success} req.err <- nil }() } diff --git a/rpcserver.go b/rpcserver.go index eea874222..fba8d357c 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -188,8 +188,8 @@ func (r *rpcServer) ConnectPeer(ctx context.Context, // OpenChannel attempts to open a singly funded channel specified in the // request to a remote peer. -func (r *rpcServer) OpenChannel(ctx context.Context, - in *lnrpc.OpenChannelRequest) (*lnrpc.OpenChannelResponse, error) { +func (r *rpcServer) OpenChannel(in *lnrpc.OpenChannelRequest, + updateStream lnrpc.Lightning_OpenChannelServer) error { rpcsLog.Tracef("[openchannel] request to peerid(%v) "+ "allocation(us=%v, them=%v) numconfs=%v", in.TargetPeerId, @@ -199,50 +199,73 @@ func (r *rpcServer) OpenChannel(ctx context.Context, remoteFundingAmt := btcutil.Amount(in.RemoteFundingAmount) target := in.TargetPeerId numConfs := in.NumConfs - resp, err := r.server.OpenChannel(target, localFundingAmt, + respChan, errChan := r.server.OpenChannel(target, localFundingAmt, remoteFundingAmt, numConfs) - if err != nil { + if err := <-errChan; err != nil { rpcsLog.Errorf("unable to open channel to peerid(%v): %v", target, err) - return nil, err + return err } + var outpoint *wire.OutPoint + select { + case resp := <-respChan: + outpoint = resp.chanPoint + openUpdate := &lnrpc.ChannelOpenUpdate{ + &lnrpc.ChannelPoint{ + FundingTxid: outpoint.Hash[:], + OutputIndex: outpoint.Index, + }, + } + if err := updateStream.Send(openUpdate); err != nil { + return err + } + case <-r.quit: + return nil + } rpcsLog.Tracef("[openchannel] success peerid(%v), ChannelPoint(%v)", - in.TargetPeerId, resp) - - return &lnrpc.OpenChannelResponse{ - &lnrpc.ChannelPoint{ - FundingTxid: resp.Hash[:], - OutputIndex: resp.Index, - }, - }, nil + in.TargetPeerId, outpoint) + return nil } // CloseChannel attempts to close an active channel identified by its channel // point. The actions of this method can additionally be augmented to attempt // a force close after a timeout period in the case of an inactive peer. -func (r *rpcServer) CloseChannel(ctx context.Context, - in *lnrpc.CloseChannelRequest) (*lnrpc.CloseChannelResponse, error) { +func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, + updateStream lnrpc.Lightning_CloseChannelServer) error { index := in.ChannelPoint.OutputIndex txid, err := wire.NewShaHash(in.ChannelPoint.FundingTxid) if err != nil { rpcsLog.Errorf("[closechannel] invalid txid: %v", err) - return nil, err + return err } targetChannelPoint := wire.NewOutPoint(txid, index) rpcsLog.Tracef("[closechannel] request for ChannelPoint(%v)", targetChannelPoint) - resp, err := r.server.CloseChannel(targetChannelPoint) - if err != nil { + respChan, errChan := r.server.CloseChannel(targetChannelPoint) + if err := <-errChan; err != nil { rpcsLog.Errorf("Unable to close ChannelPoint(%v): %v", targetChannelPoint, err) - return nil, err + return err } - return &lnrpc.CloseChannelResponse{resp}, nil + select { + case resp := <-respChan: + closeUpdate := &lnrpc.ChannelCloseUpdate{ + ClosingTxid: resp.txid[:], + Success: resp.success, + } + if err := updateStream.Send(closeUpdate); err != nil { + return err + } + case <-r.quit: + return nil + } + + return nil } // GetInfo serves a request to the "getinfo" RPC call. This call returns diff --git a/server.go b/server.go index 66b7db082..c98f9104d 100644 --- a/server.go +++ b/server.go @@ -256,6 +256,7 @@ type closeChanReq struct { // closeChanResp is the response to a closeChanReq is simply houses a boolean // value indicating if the channel coopertive channel closure was succesful or not. type closeChanResp struct { + txid *wire.ShaHash success bool } @@ -437,7 +438,7 @@ func (s *server) ConnectToPeer(addr *lndc.LNAdr) (int32, error) { // OpenChannel sends a request to the server to open a channel to the specified // peer identified by ID with the passed channel funding paramters. func (s *server) OpenChannel(nodeID int32, localAmt, remoteAmt btcutil.Amount, - numConfs uint32) (*wire.OutPoint, error) { + numConfs uint32) (chan *openChanResp, chan error) { errChan := make(chan error, 1) respChan := make(chan *openChanResp, 1) @@ -451,17 +452,14 @@ func (s *server) OpenChannel(nodeID int32, localAmt, remoteAmt btcutil.Amount, resp: respChan, err: errChan, } + // TODO(roasbeef): hook in "progress" channel - if err := <-errChan; err != nil { - return nil, err - } - - return (<-respChan).chanPoint, nil + return respChan, errChan } // CloseChannel attempts to close the channel identified by the specified // outpoint in a coopertaive manner. -func (s *server) CloseChannel(channelPoint *wire.OutPoint) (bool, error) { +func (s *server) CloseChannel(channelPoint *wire.OutPoint) (chan *closeChanResp, chan error) { errChan := make(chan error, 1) respChan := make(chan *closeChanResp, 1) @@ -472,11 +470,7 @@ func (s *server) CloseChannel(channelPoint *wire.OutPoint) (bool, error) { err: errChan, } - if err := <-errChan; err != nil { - return false, err - } - - return (<-respChan).success, nil + return respChan, errChan } // Peers returns a slice of all active peers.