mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-03-26 08:55:59 +01:00
Merge pull request #3039 from Crypt-iQ/predicate_channel_accept_0428
rpc: bi-directional streaming for predicate-based channel acceptance
This commit is contained in:
commit
e0d7854432
12 changed files with 1571 additions and 652 deletions
156
chanacceptor/acceptor_test.go
Normal file
156
chanacceptor/acceptor_test.go
Normal file
|
@ -0,0 +1,156 @@
|
|||
package chanacceptor
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/lightningnetwork/lnd/lnrpc"
|
||||
|
||||
"github.com/btcsuite/btcd/btcec"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
)
|
||||
|
||||
func randKey(t *testing.T) *btcec.PublicKey {
|
||||
t.Helper()
|
||||
|
||||
priv, err := btcec.NewPrivateKey(btcec.S256())
|
||||
if err != nil {
|
||||
t.Fatalf("unable to generate new public key")
|
||||
}
|
||||
|
||||
return priv.PubKey()
|
||||
}
|
||||
|
||||
// requestInfo encapsulates the information sent from the RPCAcceptor to the
|
||||
// receiver on the other end of the stream.
|
||||
type requestInfo struct {
|
||||
chanReq *ChannelAcceptRequest
|
||||
responseChan chan lnrpc.ChannelAcceptResponse
|
||||
}
|
||||
|
||||
var defaultAcceptTimeout = 5 * time.Second
|
||||
|
||||
func acceptAndIncrementCtr(rpc ChannelAcceptor, req *ChannelAcceptRequest,
|
||||
ctr *uint32, success chan struct{}) {
|
||||
|
||||
result := rpc.Accept(req)
|
||||
if !result {
|
||||
return
|
||||
}
|
||||
|
||||
val := atomic.AddUint32(ctr, 1)
|
||||
if val == 3 {
|
||||
success <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// TestMultipleRPCClients tests that the RPCAcceptor is able to handle multiple
|
||||
// callers to its Accept method and respond to them correctly.
|
||||
func TestRPCMultipleAcceptClients(t *testing.T) {
|
||||
|
||||
var (
|
||||
node = randKey(t)
|
||||
|
||||
firstOpenReq = &ChannelAcceptRequest{
|
||||
Node: node,
|
||||
OpenChanMsg: &lnwire.OpenChannel{
|
||||
PendingChannelID: [32]byte{0},
|
||||
},
|
||||
}
|
||||
|
||||
secondOpenReq = &ChannelAcceptRequest{
|
||||
Node: node,
|
||||
OpenChanMsg: &lnwire.OpenChannel{
|
||||
PendingChannelID: [32]byte{1},
|
||||
},
|
||||
}
|
||||
|
||||
thirdOpenReq = &ChannelAcceptRequest{
|
||||
Node: node,
|
||||
OpenChanMsg: &lnwire.OpenChannel{
|
||||
PendingChannelID: [32]byte{2},
|
||||
},
|
||||
}
|
||||
|
||||
counter uint32
|
||||
)
|
||||
|
||||
quit := make(chan struct{})
|
||||
defer close(quit)
|
||||
|
||||
// Create channels to handle requests and successes.
|
||||
requests := make(chan *requestInfo)
|
||||
successChan := make(chan struct{})
|
||||
errChan := make(chan struct{}, 4)
|
||||
|
||||
// demultiplexReq is a closure used to abstract the RPCAcceptor's request
|
||||
// and response logic.
|
||||
demultiplexReq := func(req *ChannelAcceptRequest) bool {
|
||||
respChan := make(chan lnrpc.ChannelAcceptResponse, 1)
|
||||
|
||||
newRequest := &requestInfo{
|
||||
chanReq: req,
|
||||
responseChan: respChan,
|
||||
}
|
||||
|
||||
// Send the newRequest to the requests channel.
|
||||
select {
|
||||
case requests <- newRequest:
|
||||
case <-quit:
|
||||
return false
|
||||
}
|
||||
|
||||
// Receive the response and verify that the PendingChanId matches
|
||||
// the ID found in the ChannelAcceptRequest. If no response has been
|
||||
// received in defaultAcceptTimeout, then return false.
|
||||
select {
|
||||
case resp := <-respChan:
|
||||
pendingID := req.OpenChanMsg.PendingChannelID
|
||||
if !bytes.Equal(pendingID[:], resp.PendingChanId) {
|
||||
errChan <- struct{}{}
|
||||
return false
|
||||
}
|
||||
|
||||
return resp.Accept
|
||||
case <-time.After(defaultAcceptTimeout):
|
||||
errChan <- struct{}{}
|
||||
return false
|
||||
case <-quit:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
rpcAcceptor := NewRPCAcceptor(demultiplexReq)
|
||||
|
||||
// Now we call the Accept method for each request.
|
||||
go func() {
|
||||
acceptAndIncrementCtr(rpcAcceptor, firstOpenReq, &counter, successChan)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
acceptAndIncrementCtr(rpcAcceptor, secondOpenReq, &counter, successChan)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
acceptAndIncrementCtr(rpcAcceptor, thirdOpenReq, &counter, successChan)
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case newRequest := <-requests:
|
||||
newResponse := lnrpc.ChannelAcceptResponse{
|
||||
Accept: true,
|
||||
PendingChanId: newRequest.chanReq.OpenChanMsg.PendingChannelID[:],
|
||||
}
|
||||
|
||||
newRequest.responseChan <- newResponse
|
||||
case <-errChan:
|
||||
t.Fatalf("unable to accept ChannelAcceptRequest")
|
||||
case <-successChan:
|
||||
return
|
||||
case <-quit:
|
||||
}
|
||||
}
|
||||
}
|
65
chanacceptor/chainedacceptor.go
Normal file
65
chanacceptor/chainedacceptor.go
Normal file
|
@ -0,0 +1,65 @@
|
|||
package chanacceptor
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// ChainedAcceptor represents a conjunction of ChannelAcceptor results.
|
||||
type ChainedAcceptor struct {
|
||||
// acceptors is a map of ChannelAcceptors that will be evaluated when
|
||||
// the ChainedAcceptor's Accept method is called.
|
||||
acceptors map[uint64]ChannelAcceptor
|
||||
acceptorsMtx sync.RWMutex
|
||||
|
||||
acceptorID uint64 // To be used atomically.
|
||||
}
|
||||
|
||||
// NewChainedAcceptor initializes a ChainedAcceptor.
|
||||
func NewChainedAcceptor() *ChainedAcceptor {
|
||||
return &ChainedAcceptor{
|
||||
acceptors: make(map[uint64]ChannelAcceptor),
|
||||
}
|
||||
}
|
||||
|
||||
// AddAcceptor adds a ChannelAcceptor to this ChainedAcceptor.
|
||||
func (c *ChainedAcceptor) AddAcceptor(acceptor ChannelAcceptor) uint64 {
|
||||
id := atomic.AddUint64(&c.acceptorID, 1)
|
||||
|
||||
c.acceptorsMtx.Lock()
|
||||
c.acceptors[id] = acceptor
|
||||
c.acceptorsMtx.Unlock()
|
||||
|
||||
// Return the id so that a caller can call RemoveAcceptor.
|
||||
return id
|
||||
}
|
||||
|
||||
// RemoveAcceptor removes a ChannelAcceptor from this ChainedAcceptor given
|
||||
// an ID.
|
||||
func (c *ChainedAcceptor) RemoveAcceptor(id uint64) {
|
||||
c.acceptorsMtx.Lock()
|
||||
delete(c.acceptors, id)
|
||||
c.acceptorsMtx.Unlock()
|
||||
}
|
||||
|
||||
// Accept evaluates the results of all ChannelAcceptors in the acceptors map
|
||||
// and returns the conjunction of all these predicates.
|
||||
//
|
||||
// NOTE: Part of the ChannelAcceptor interface.
|
||||
func (c *ChainedAcceptor) Accept(req *ChannelAcceptRequest) bool {
|
||||
result := true
|
||||
|
||||
c.acceptorsMtx.RLock()
|
||||
for _, acceptor := range c.acceptors {
|
||||
// We call Accept first in case any acceptor (perhaps an RPCAcceptor)
|
||||
// wishes to be notified about ChannelAcceptRequest.
|
||||
result = acceptor.Accept(req) && result
|
||||
}
|
||||
c.acceptorsMtx.RUnlock()
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// A compile-time constraint to ensure ChainedAcceptor implements the
|
||||
// ChannelAcceptor interface.
|
||||
var _ ChannelAcceptor = (*ChainedAcceptor)(nil)
|
25
chanacceptor/interface.go
Normal file
25
chanacceptor/interface.go
Normal file
|
@ -0,0 +1,25 @@
|
|||
package chanacceptor
|
||||
|
||||
import (
|
||||
"github.com/btcsuite/btcd/btcec"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
)
|
||||
|
||||
// ChannelAcceptRequest is a struct containing the requesting node's public key
|
||||
// along with the lnwire.OpenChannel message that they sent when requesting an
|
||||
// inbound channel. This information is provided to each acceptor so that they
|
||||
// can each leverage their own decision-making with this information.
|
||||
type ChannelAcceptRequest struct {
|
||||
// Node is the public key of the node requesting to open a channel.
|
||||
Node *btcec.PublicKey
|
||||
|
||||
// OpenChanMsg is the actual OpenChannel protocol message that the peer
|
||||
// sent to us.
|
||||
OpenChanMsg *lnwire.OpenChannel
|
||||
}
|
||||
|
||||
// ChannelAcceptor is an interface that represents a predicate on the data
|
||||
// contained in ChannelAcceptRequest.
|
||||
type ChannelAcceptor interface {
|
||||
Accept(req *ChannelAcceptRequest) bool
|
||||
}
|
27
chanacceptor/rpcacceptor.go
Normal file
27
chanacceptor/rpcacceptor.go
Normal file
|
@ -0,0 +1,27 @@
|
|||
package chanacceptor
|
||||
|
||||
// RPCAcceptor represents the RPC-controlled variant of the ChannelAcceptor.
|
||||
// One RPCAcceptor allows one RPC client.
|
||||
type RPCAcceptor struct {
|
||||
acceptClosure func(req *ChannelAcceptRequest) bool
|
||||
}
|
||||
|
||||
// Accept is a predicate on the ChannelAcceptRequest which is sent to the RPC
|
||||
// client who will respond with the ultimate decision. This assumes an accept
|
||||
// closure has been specified during creation.
|
||||
//
|
||||
// NOTE: Part of the ChannelAcceptor interface.
|
||||
func (r *RPCAcceptor) Accept(req *ChannelAcceptRequest) bool {
|
||||
return r.acceptClosure(req)
|
||||
}
|
||||
|
||||
// NewRPCAcceptor creates and returns an instance of the RPCAcceptor.
|
||||
func NewRPCAcceptor(closure func(*ChannelAcceptRequest) bool) *RPCAcceptor {
|
||||
return &RPCAcceptor{
|
||||
acceptClosure: closure,
|
||||
}
|
||||
}
|
||||
|
||||
// A compile-time constraint to ensure RPCAcceptor implements the ChannelAcceptor
|
||||
// interface.
|
||||
var _ ChannelAcceptor = (*RPCAcceptor)(nil)
|
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/go-errors/errors"
|
||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||
"github.com/lightningnetwork/lnd/chanacceptor"
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/discovery"
|
||||
"github.com/lightningnetwork/lnd/htlcswitch"
|
||||
|
@ -338,6 +339,11 @@ type fundingConfig struct {
|
|||
// NotifyOpenChannelEvent informs the ChannelNotifier when channels
|
||||
// transition from pending open to open.
|
||||
NotifyOpenChannelEvent func(wire.OutPoint)
|
||||
|
||||
// OpenChannelPredicate is a predicate on the lnwire.OpenChannel message
|
||||
// and on the requesting node's public key that returns a bool which tells
|
||||
// the funding manager whether or not to accept the channel.
|
||||
OpenChannelPredicate chanacceptor.ChannelAcceptor
|
||||
}
|
||||
|
||||
// fundingManager acts as an orchestrator/bridge between the wallet's
|
||||
|
@ -1057,7 +1063,23 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
|
|||
if f.cfg.RejectPush && msg.PushAmount > 0 {
|
||||
f.failFundingFlow(
|
||||
fmsg.peer, fmsg.msg.PendingChannelID,
|
||||
lnwallet.ErrNonZeroPushAmount())
|
||||
lnwallet.ErrNonZeroPushAmount(),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
// Send the OpenChannel request to the ChannelAcceptor to determine whether
|
||||
// this node will accept the channel.
|
||||
chanReq := &chanacceptor.ChannelAcceptRequest{
|
||||
Node: fmsg.peer.IdentityKey(),
|
||||
OpenChanMsg: fmsg.msg,
|
||||
}
|
||||
|
||||
if !f.cfg.OpenChannelPredicate.Accept(chanReq) {
|
||||
f.failFundingFlow(
|
||||
fmsg.peer, fmsg.msg.PendingChannelID,
|
||||
fmt.Errorf("open channel request rejected"),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/btcsuite/btcutil"
|
||||
|
||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||
"github.com/lightningnetwork/lnd/chanacceptor"
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/discovery"
|
||||
"github.com/lightningnetwork/lnd/htlcswitch"
|
||||
|
@ -281,6 +282,8 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
|
|||
|
||||
var chanIDSeed [32]byte
|
||||
|
||||
chainedAcceptor := chanacceptor.NewChainedAcceptor()
|
||||
|
||||
fundingCfg := fundingConfig{
|
||||
IDKey: privKey.PubKey(),
|
||||
Wallet: lnw,
|
||||
|
@ -364,6 +367,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
|
|||
ReservationTimeout: 1 * time.Nanosecond,
|
||||
MaxPendingChannels: DefaultMaxPendingChannels,
|
||||
NotifyOpenChannelEvent: func(wire.OutPoint) {},
|
||||
OpenChannelPredicate: chainedAcceptor,
|
||||
}
|
||||
|
||||
for _, op := range options {
|
||||
|
@ -414,6 +418,8 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
|
|||
|
||||
oldCfg := alice.fundingMgr.cfg
|
||||
|
||||
chainedAcceptor := chanacceptor.NewChainedAcceptor()
|
||||
|
||||
f, err := newFundingManager(fundingConfig{
|
||||
IDKey: oldCfg.IDKey,
|
||||
Wallet: oldCfg.Wallet,
|
||||
|
@ -458,6 +464,7 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
|
|||
},
|
||||
ZombieSweeperInterval: oldCfg.ZombieSweeperInterval,
|
||||
ReservationTimeout: oldCfg.ReservationTimeout,
|
||||
OpenChannelPredicate: chainedAcceptor,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed recreating aliceFundingManager: %v", err)
|
||||
|
|
8
lnd.go
8
lnd.go
|
@ -43,6 +43,7 @@ import (
|
|||
|
||||
"github.com/lightningnetwork/lnd/autopilot"
|
||||
"github.com/lightningnetwork/lnd/build"
|
||||
"github.com/lightningnetwork/lnd/chanacceptor"
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/keychain"
|
||||
"github.com/lightningnetwork/lnd/lncfg"
|
||||
|
@ -488,11 +489,14 @@ func Main(lisCfg ListenerCfg) error {
|
|||
}
|
||||
}
|
||||
|
||||
// Initialize the ChainedAcceptor.
|
||||
chainedAcceptor := chanacceptor.NewChainedAcceptor()
|
||||
|
||||
// Set up the core server which will listen for incoming peer
|
||||
// connections.
|
||||
server, err := newServer(
|
||||
cfg.Listeners, chanDB, towerClientDB, activeChainControl,
|
||||
idPrivKey, walletInitParams.ChansToRestore,
|
||||
idPrivKey, walletInitParams.ChansToRestore, chainedAcceptor,
|
||||
)
|
||||
if err != nil {
|
||||
err := fmt.Errorf("Unable to create server: %v", err)
|
||||
|
@ -547,7 +551,7 @@ func Main(lisCfg ListenerCfg) error {
|
|||
rpcServer, err := newRPCServer(
|
||||
server, macaroonService, cfg.SubRPCServers, restDialOpts,
|
||||
restProxyDest, atplManager, server.invoices, tower, tlsCfg,
|
||||
rpcListeners,
|
||||
rpcListeners, chainedAcceptor,
|
||||
)
|
||||
if err != nil {
|
||||
err := fmt.Errorf("Unable to create RPC server: %v", err)
|
||||
|
|
1587
lnrpc/rpc.pb.go
1587
lnrpc/rpc.pb.go
File diff suppressed because it is too large
Load diff
|
@ -430,6 +430,15 @@ service Lightning {
|
|||
*/
|
||||
rpc OpenChannel (OpenChannelRequest) returns (stream OpenStatusUpdate);
|
||||
|
||||
/**
|
||||
ChannelAcceptor dispatches a bi-directional streaming RPC in which
|
||||
OpenChannel requests are sent to the client and the client responds with
|
||||
a boolean that tells LND whether or not to accept the channel. This allows
|
||||
node operators to specify their own criteria for accepting inbound channels
|
||||
through a single persistent connection.
|
||||
*/
|
||||
rpc ChannelAcceptor (stream ChannelAcceptResponse) returns (stream ChannelAcceptRequest);
|
||||
|
||||
/** lncli: `closechannel`
|
||||
CloseChannel attempts to close an active channel identified by its channel
|
||||
outpoint (ChannelPoint). The actions of this method can additionally be
|
||||
|
@ -912,6 +921,58 @@ message SendToRouteRequest {
|
|||
Route route = 4;
|
||||
}
|
||||
|
||||
message ChannelAcceptRequest {
|
||||
/// The pubkey of the node that wishes to open an inbound channel.
|
||||
bytes node_pubkey = 1;
|
||||
|
||||
/// The hash of the genesis block that the proposed channel resides in.
|
||||
bytes chain_hash = 2;
|
||||
|
||||
/// The pending channel id.
|
||||
bytes pending_chan_id = 3;
|
||||
|
||||
/// The funding amount in satoshis that initiator wishes to use in the channel.
|
||||
uint64 funding_amt = 4;
|
||||
|
||||
/// The push amount of the proposed channel in millisatoshis.
|
||||
uint64 push_amt = 5;
|
||||
|
||||
/// The dust limit of the initiator's commitment tx.
|
||||
uint64 dust_limit = 6;
|
||||
|
||||
/// The maximum amount of coins in millisatoshis that can be pending in this channel.
|
||||
uint64 max_value_in_flight = 7;
|
||||
|
||||
/// The minimum amount of satoshis the initiator requires us to have at all times.
|
||||
uint64 channel_reserve = 8;
|
||||
|
||||
/// The smallest HTLC in millisatoshis that the initiator will accept.
|
||||
uint64 min_htlc = 9;
|
||||
|
||||
/// The initial fee rate that the initiator suggests for both commitment transactions.
|
||||
uint64 fee_per_kw = 10;
|
||||
|
||||
/**
|
||||
The number of blocks to use for the relative time lock in the pay-to-self output
|
||||
of both commitment transactions.
|
||||
*/
|
||||
uint32 csv_delay = 11;
|
||||
|
||||
/// The total number of incoming HTLC's that the initiator will accept.
|
||||
uint32 max_accepted_htlcs = 12;
|
||||
|
||||
/// A bit-field which the initiator uses to specify proposed channel behavior.
|
||||
uint32 channel_flags = 13;
|
||||
}
|
||||
|
||||
message ChannelAcceptResponse {
|
||||
/// Whether or not the client accepts the channel.
|
||||
bool accept = 1;
|
||||
|
||||
/// The pending channel id to which this response applies.
|
||||
bytes pending_chan_id = 2;
|
||||
}
|
||||
|
||||
message ChannelPoint {
|
||||
oneof funding_txid {
|
||||
/// Txid of the funding transaction
|
||||
|
|
|
@ -1657,6 +1657,76 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
"lnrpcChannelAcceptRequest": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"node_pubkey": {
|
||||
"type": "string",
|
||||
"format": "byte",
|
||||
"description": "/ The pubkey of the node that wishes to open an inbound channel."
|
||||
},
|
||||
"chain_hash": {
|
||||
"type": "string",
|
||||
"format": "byte",
|
||||
"description": "/ The hash of the genesis block that the proposed channel resides in."
|
||||
},
|
||||
"pending_chan_id": {
|
||||
"type": "string",
|
||||
"format": "byte",
|
||||
"description": "/ The pending channel id."
|
||||
},
|
||||
"funding_amt": {
|
||||
"type": "string",
|
||||
"format": "uint64",
|
||||
"description": "/ The funding amount in satoshis that initiator wishes to use in the channel."
|
||||
},
|
||||
"push_amt": {
|
||||
"type": "string",
|
||||
"format": "uint64",
|
||||
"description": "/ The push amount of the proposed channel in millisatoshis."
|
||||
},
|
||||
"dust_limit": {
|
||||
"type": "string",
|
||||
"format": "uint64",
|
||||
"description": "/ The dust limit of the initiator's commitment tx."
|
||||
},
|
||||
"max_value_in_flight": {
|
||||
"type": "string",
|
||||
"format": "uint64",
|
||||
"description": "/ The maximum amount of coins in millisatoshis that can be pending in this channel."
|
||||
},
|
||||
"channel_reserve": {
|
||||
"type": "string",
|
||||
"format": "uint64",
|
||||
"description": "/ The minimum amount of satoshis the initiator requires us to have at all times."
|
||||
},
|
||||
"min_htlc": {
|
||||
"type": "string",
|
||||
"format": "uint64",
|
||||
"description": "/ The smallest HTLC in millisatoshis that the initiator will accept."
|
||||
},
|
||||
"fee_per_kw": {
|
||||
"type": "string",
|
||||
"format": "uint64",
|
||||
"description": "/ The initial fee rate that the initiator suggests for both commitment transactions."
|
||||
},
|
||||
"csv_delay": {
|
||||
"type": "integer",
|
||||
"format": "int64",
|
||||
"description": "*\nThe number of blocks to use for the relative time lock in the pay-to-self output\nof both commitment transactions."
|
||||
},
|
||||
"max_accepted_htlcs": {
|
||||
"type": "integer",
|
||||
"format": "int64",
|
||||
"description": "/ The total number of incoming HTLC's that the initiator will accept."
|
||||
},
|
||||
"channel_flags": {
|
||||
"type": "integer",
|
||||
"format": "int64",
|
||||
"description": "/ A bit-field which the initiator uses to specify proposed channel behavior."
|
||||
}
|
||||
}
|
||||
},
|
||||
"lnrpcChannelBackup": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
|
187
rpcserver.go
187
rpcserver.go
|
@ -16,6 +16,7 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/lightningnetwork/lnd/chanacceptor"
|
||||
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
|
||||
"github.com/lightningnetwork/lnd/routing/route"
|
||||
"github.com/lightningnetwork/lnd/tlv"
|
||||
|
@ -76,6 +77,12 @@ var (
|
|||
// It is set to the value under the Bitcoin chain as default.
|
||||
MaxPaymentMSat = maxBtcPaymentMSat
|
||||
|
||||
// defaultAcceptorTimeout is the time after which an RPCAcceptor will time
|
||||
// out and return false if it hasn't yet received a response.
|
||||
//
|
||||
// TODO: Make this configurable
|
||||
defaultAcceptorTimeout = 15 * time.Second
|
||||
|
||||
// readPermissions is a slice of all entities that allow read
|
||||
// permissions for authorization purposes, all lowercase.
|
||||
readPermissions = []bakery.Op{
|
||||
|
@ -382,6 +389,13 @@ func mainRPCServerPermissions() map[string][]bakery.Op {
|
|||
Entity: "offchain",
|
||||
Action: "read",
|
||||
}},
|
||||
"/lnrpc.Lightning/ChannelAcceptor": {{
|
||||
Entity: "onchain",
|
||||
Action: "write",
|
||||
}, {
|
||||
Entity: "offchain",
|
||||
Action: "write",
|
||||
}},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -430,6 +444,10 @@ type rpcServer struct {
|
|||
// rpc sub server.
|
||||
routerBackend *routerrpc.RouterBackend
|
||||
|
||||
// chanPredicate is used in the bidirectional ChannelAcceptor streaming
|
||||
// method.
|
||||
chanPredicate *chanacceptor.ChainedAcceptor
|
||||
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
|
@ -446,7 +464,8 @@ func newRPCServer(s *server, macService *macaroons.Service,
|
|||
subServerCgs *subRPCServerConfigs, restDialOpts []grpc.DialOption,
|
||||
restProxyDest string, atpl *autopilot.Manager,
|
||||
invoiceRegistry *invoices.InvoiceRegistry, tower *watchtower.Standalone,
|
||||
tlsCfg *tls.Config, getListeners rpcListeners) (*rpcServer, error) {
|
||||
tlsCfg *tls.Config, getListeners rpcListeners,
|
||||
chanPredicate *chanacceptor.ChainedAcceptor) (*rpcServer, error) {
|
||||
|
||||
// Set up router rpc backend.
|
||||
channelGraph := s.chanDB.ChannelGraph()
|
||||
|
@ -601,6 +620,7 @@ func newRPCServer(s *server, macService *macaroons.Service,
|
|||
grpcServer: grpcServer,
|
||||
server: s,
|
||||
routerBackend: routerBackend,
|
||||
chanPredicate: chanPredicate,
|
||||
quit: make(chan struct{}, 1),
|
||||
}
|
||||
lnrpc.RegisterLightningServer(grpcServer, rootRPCServer)
|
||||
|
@ -5051,3 +5071,168 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// chanAcceptInfo is used in the ChannelAcceptor bidirectional stream and
|
||||
// encapsulates the request information sent from the RPCAcceptor to the
|
||||
// RPCServer.
|
||||
type chanAcceptInfo struct {
|
||||
chanReq *chanacceptor.ChannelAcceptRequest
|
||||
responseChan chan bool
|
||||
}
|
||||
|
||||
// ChannelAcceptor dispatches a bi-directional streaming RPC in which
|
||||
// OpenChannel requests are sent to the client and the client responds with
|
||||
// a boolean that tells LND whether or not to accept the channel. This allows
|
||||
// node operators to specify their own criteria for accepting inbound channels
|
||||
// through a single persistent connection.
|
||||
func (r *rpcServer) ChannelAcceptor(stream lnrpc.Lightning_ChannelAcceptorServer) error {
|
||||
chainedAcceptor := r.chanPredicate
|
||||
|
||||
// Create two channels to handle requests and responses respectively.
|
||||
newRequests := make(chan *chanAcceptInfo)
|
||||
responses := make(chan lnrpc.ChannelAcceptResponse)
|
||||
|
||||
// Define a quit channel that will be used to signal to the RPCAcceptor's
|
||||
// closure whether the stream still exists.
|
||||
quit := make(chan struct{})
|
||||
defer close(quit)
|
||||
|
||||
// demultiplexReq is a closure that will be passed to the RPCAcceptor and
|
||||
// acts as an intermediary between the RPCAcceptor and the RPCServer.
|
||||
demultiplexReq := func(req *chanacceptor.ChannelAcceptRequest) bool {
|
||||
respChan := make(chan bool, 1)
|
||||
|
||||
newRequest := &chanAcceptInfo{
|
||||
chanReq: req,
|
||||
responseChan: respChan,
|
||||
}
|
||||
|
||||
// timeout is the time after which ChannelAcceptRequests expire.
|
||||
timeout := time.After(defaultAcceptorTimeout)
|
||||
|
||||
// Send the request to the newRequests channel.
|
||||
select {
|
||||
case newRequests <- newRequest:
|
||||
case <-timeout:
|
||||
rpcsLog.Errorf("RPCAcceptor returned false - reached timeout of %d",
|
||||
defaultAcceptorTimeout)
|
||||
return false
|
||||
case <-quit:
|
||||
return false
|
||||
case <-r.quit:
|
||||
return false
|
||||
}
|
||||
|
||||
// Receive the response and return it. If no response has been received
|
||||
// in defaultAcceptorTimeout, then return false.
|
||||
select {
|
||||
case resp := <-respChan:
|
||||
return resp
|
||||
case <-timeout:
|
||||
rpcsLog.Errorf("RPCAcceptor returned false - reached timeout of %d",
|
||||
defaultAcceptorTimeout)
|
||||
return false
|
||||
case <-quit:
|
||||
return false
|
||||
case <-r.quit:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Create a new RPCAcceptor via the NewRPCAcceptor method.
|
||||
rpcAcceptor := chanacceptor.NewRPCAcceptor(demultiplexReq)
|
||||
|
||||
// Add the RPCAcceptor to the ChainedAcceptor and defer its removal.
|
||||
id := chainedAcceptor.AddAcceptor(rpcAcceptor)
|
||||
defer chainedAcceptor.RemoveAcceptor(id)
|
||||
|
||||
// errChan is used by the receive loop to signal any errors that occur
|
||||
// during reading from the stream. This is primarily used to shutdown the
|
||||
// send loop in the case of an RPC client disconnecting.
|
||||
errChan := make(chan error, 1)
|
||||
|
||||
// We need to have the stream.Recv() in a goroutine since the call is
|
||||
// blocking and would prevent us from sending more ChannelAcceptRequests to
|
||||
// the RPC client.
|
||||
go func() {
|
||||
for {
|
||||
resp, err := stream.Recv()
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
|
||||
var pendingID [32]byte
|
||||
copy(pendingID[:], resp.PendingChanId)
|
||||
|
||||
openChanResp := lnrpc.ChannelAcceptResponse{
|
||||
Accept: resp.Accept,
|
||||
PendingChanId: pendingID[:],
|
||||
}
|
||||
|
||||
// Now that we have the response from the RPC client, send it to
|
||||
// the responses chan.
|
||||
select {
|
||||
case responses <- openChanResp:
|
||||
case <-quit:
|
||||
return
|
||||
case <-r.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
acceptRequests := make(map[[32]byte]chan bool)
|
||||
|
||||
for {
|
||||
select {
|
||||
case newRequest := <-newRequests:
|
||||
|
||||
req := newRequest.chanReq
|
||||
pendingChanID := req.OpenChanMsg.PendingChannelID
|
||||
|
||||
acceptRequests[pendingChanID] = newRequest.responseChan
|
||||
|
||||
// A ChannelAcceptRequest has been received, send it to the client.
|
||||
chanAcceptReq := &lnrpc.ChannelAcceptRequest{
|
||||
NodePubkey: req.Node.SerializeCompressed(),
|
||||
ChainHash: req.OpenChanMsg.ChainHash[:],
|
||||
PendingChanId: req.OpenChanMsg.PendingChannelID[:],
|
||||
FundingAmt: uint64(req.OpenChanMsg.FundingAmount),
|
||||
PushAmt: uint64(req.OpenChanMsg.PushAmount),
|
||||
DustLimit: uint64(req.OpenChanMsg.DustLimit),
|
||||
MaxValueInFlight: uint64(req.OpenChanMsg.MaxValueInFlight),
|
||||
ChannelReserve: uint64(req.OpenChanMsg.ChannelReserve),
|
||||
MinHtlc: uint64(req.OpenChanMsg.HtlcMinimum),
|
||||
FeePerKw: uint64(req.OpenChanMsg.FeePerKiloWeight),
|
||||
CsvDelay: uint32(req.OpenChanMsg.CsvDelay),
|
||||
MaxAcceptedHtlcs: uint32(req.OpenChanMsg.MaxAcceptedHTLCs),
|
||||
ChannelFlags: uint32(req.OpenChanMsg.ChannelFlags),
|
||||
}
|
||||
|
||||
if err := stream.Send(chanAcceptReq); err != nil {
|
||||
return err
|
||||
}
|
||||
case resp := <-responses:
|
||||
// Look up the appropriate channel to send on given the pending ID.
|
||||
// If a channel is found, send the response over it.
|
||||
var pendingID [32]byte
|
||||
copy(pendingID[:], resp.PendingChanId)
|
||||
respChan, ok := acceptRequests[pendingID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Send the response boolean over the buffered response channel.
|
||||
respChan <- resp.Accept
|
||||
|
||||
// Delete the channel from the acceptRequests map.
|
||||
delete(acceptRequests, pendingID)
|
||||
case err := <-errChan:
|
||||
rpcsLog.Errorf("Received an error: %v, shutting down", err)
|
||||
return err
|
||||
case <-r.quit:
|
||||
return fmt.Errorf("RPC server is shutting down")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
sphinx "github.com/lightningnetwork/lightning-onion"
|
||||
"github.com/lightningnetwork/lnd/autopilot"
|
||||
"github.com/lightningnetwork/lnd/brontide"
|
||||
"github.com/lightningnetwork/lnd/chanacceptor"
|
||||
"github.com/lightningnetwork/lnd/chanbackup"
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/channelnotifier"
|
||||
|
@ -297,7 +298,8 @@ func noiseDial(idPriv *btcec.PrivateKey) func(net.Addr) (net.Conn, error) {
|
|||
func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
|
||||
towerClientDB *wtdb.ClientDB, cc *chainControl,
|
||||
privKey *btcec.PrivateKey,
|
||||
chansToRestore walletunlocker.ChannelsToRecover) (*server, error) {
|
||||
chansToRestore walletunlocker.ChannelsToRecover,
|
||||
chanPredicate chanacceptor.ChannelAcceptor) (*server, error) {
|
||||
|
||||
var err error
|
||||
|
||||
|
@ -908,6 +910,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
|
|||
if _, err := rand.Read(chanIDSeed[:]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.fundingMgr, err = newFundingManager(fundingConfig{
|
||||
IDKey: privKey.PubKey(),
|
||||
Wallet: cc.wallet,
|
||||
|
@ -1069,6 +1072,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
|
|||
MaxPendingChannels: cfg.MaxPendingChannels,
|
||||
RejectPush: cfg.RejectPush,
|
||||
NotifyOpenChannelEvent: s.channelNotifier.NotifyOpenChannelEvent,
|
||||
OpenChannelPredicate: chanPredicate,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
Loading…
Add table
Reference in a new issue