From e98d85988236515cdeba184274f183c6ab7313f4 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 22 Nov 2018 23:18:08 +0100 Subject: [PATCH 01/12] autopilot: define HeuristicConstraints This commit defines a new struct HeuristicConstraints that will be used to keep track of the initial constraints the autopilot agent needs to adhere to. This is currently done in the ConstrainedPrefAttachement heuristic itself, but this lets us share these constraints and common method netween several heuristics. --- autopilot/heuristic_constraints.go | 76 ++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 autopilot/heuristic_constraints.go diff --git a/autopilot/heuristic_constraints.go b/autopilot/heuristic_constraints.go new file mode 100644 index 000000000..916f85071 --- /dev/null +++ b/autopilot/heuristic_constraints.go @@ -0,0 +1,76 @@ +package autopilot + +import ( + "github.com/btcsuite/btcutil" +) + +// HeuristicConstraints is a struct that indicate the constraints an autopilot +// heuristic must adhere to when opening channels. +type HeuristicConstraints struct { + // MinChanSize is the smallest channel that the autopilot agent should + // create. + MinChanSize btcutil.Amount + + // MaxChanSize the largest channel that the autopilot agent should + // create. + MaxChanSize btcutil.Amount + + // ChanLimit the maximum number of channels that should be created. + ChanLimit uint16 + + // Allocation the percentage of total funds that should be committed to + // automatic channel establishment. + Allocation float64 + + // MaxPendingOpens is the maximum number of pending channel + // establishment goroutines that can be lingering. We cap this value in + // order to control the level of parallelism caused by the autopilot + // agent. + MaxPendingOpens uint16 +} + +// availableChans returns the funds and number of channels slots the autopilot +// has available towards new channels, and still be within the set constraints. +func (h *HeuristicConstraints) availableChans(channels []Channel, + funds btcutil.Amount) (btcutil.Amount, uint32) { + + // If we're already over our maximum allowed number of channels, then + // we'll instruct the controller not to create any more channels. + if len(channels) >= int(h.ChanLimit) { + return 0, 0 + } + + // The number of additional channels that should be opened is the + // difference between the channel limit, and the number of channels we + // already have open. + numAdditionalChans := uint32(h.ChanLimit) - uint32(len(channels)) + + // First, we'll tally up the total amount of funds that are currently + // present within the set of active channels. + var totalChanAllocation btcutil.Amount + for _, channel := range channels { + totalChanAllocation += channel.Capacity + } + + // With this value known, we'll now compute the total amount of fund + // allocated across regular utxo's and channel utxo's. + totalFunds := funds + totalChanAllocation + + // Once the total amount has been computed, we then calculate the + // fraction of funds currently allocated to channels. + fundsFraction := float64(totalChanAllocation) / float64(totalFunds) + + // If this fraction is below our threshold, then we'll return true, to + // indicate the controller should call Select to obtain a candidate set + // of channels to attempt to open. + needMore := fundsFraction < h.Allocation + if !needMore { + return 0, 0 + } + + // Now that we know we need more funds, we'll compute the amount of + // additional funds we should allocate towards channels. + targetAllocation := btcutil.Amount(float64(totalFunds) * h.Allocation) + fundsAvailable := targetAllocation - totalChanAllocation + return fundsAvailable, numAdditionalChans +} From 35f4ec84d15a14b3cdf97f4d298b40da38c3fc85 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 22 Nov 2018 23:18:08 +0100 Subject: [PATCH 02/12] autopilot/prefattach: use HeuristicConstraints This commit makes the pref attach heuristic and the agent use the HeuristicConstraints internally. --- autopilot/agent.go | 14 +++---- autopilot/agent_test.go | 60 ++++++++++++++++++++---------- autopilot/prefattach.go | 72 ++++++++---------------------------- autopilot/prefattach_test.go | 62 +++++++++++++++++++++++-------- pilot.go | 17 ++++++--- 5 files changed, 120 insertions(+), 105 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index 6d888b356..12761db30 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -51,11 +51,9 @@ type Config struct { // within the graph. Graph ChannelGraph - // MaxPendingOpens is the maximum number of pending channel - // establishment goroutines that can be lingering. We cap this value in - // order to control the level of parallelism caused by the autopilot - // agent. - MaxPendingOpens uint16 + // Constraints is the set of constraints the autopilot must adhere to + // when opening channels. + Constraints *HeuristicConstraints // TODO(roasbeef): add additional signals from fee rates and revenue of // currently opened channels @@ -525,12 +523,12 @@ func (a *Agent) controller() { // connections succeed, we will they will be ignored and made // available to future heuristic selections. pendingMtx.Lock() - if uint16(len(pendingOpens)) >= a.cfg.MaxPendingOpens { + if uint16(len(pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens { pendingMtx.Unlock() log.Debugf("Reached cap of %v pending "+ "channel opens, will retry "+ "after success/failure", - a.cfg.MaxPendingOpens) + a.cfg.Constraints.MaxPendingOpens) continue } @@ -587,7 +585,7 @@ func (a *Agent) controller() { // finished first. pendingMtx.Lock() if uint16(len(pendingOpens)) >= - a.cfg.MaxPendingOpens { + a.cfg.Constraints.MaxPendingOpens { // Since we've reached our max number of // pending opens, we'll disconnect this // peer and exit. However, if we were diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index 16099eeb3..3330ffd3b 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -167,8 +167,10 @@ func TestAgentChannelOpenSignal(t *testing.T) { DisconnectPeer: func(*btcec.PublicKey) error { return nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } initialChans := []Channel{} agent, err := New(testCfg, initialChans) @@ -288,8 +290,10 @@ func TestAgentChannelFailureSignal(t *testing.T) { DisconnectPeer: func(*btcec.PublicKey) error { return nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } initialChans := []Channel{} @@ -389,8 +393,10 @@ func TestAgentChannelCloseSignal(t *testing.T) { DisconnectPeer: func(*btcec.PublicKey) error { return nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } // We'll start the agent with two channels already being active. @@ -503,8 +509,10 @@ func TestAgentBalanceUpdate(t *testing.T) { DisconnectPeer: func(*btcec.PublicKey) error { return nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } initialChans := []Channel{} agent, err := New(testCfg, initialChans) @@ -608,8 +616,10 @@ func TestAgentImmediateAttach(t *testing.T) { DisconnectPeer: func(*btcec.PublicKey) error { return nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } initialChans := []Channel{} agent, err := New(testCfg, initialChans) @@ -743,8 +753,10 @@ func TestAgentPrivateChannels(t *testing.T) { DisconnectPeer: func(*btcec.PublicKey) error { return nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } agent, err := New(cfg, nil) if err != nil { @@ -866,8 +878,10 @@ func TestAgentPendingChannelState(t *testing.T) { DisconnectPeer: func(*btcec.PublicKey) error { return nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } initialChans := []Channel{} agent, err := New(testCfg, initialChans) @@ -1035,8 +1049,10 @@ func TestAgentPendingOpenChannel(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return walletBalance, nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } agent, err := New(cfg, nil) if err != nil { @@ -1118,8 +1134,10 @@ func TestAgentOnNodeUpdates(t *testing.T) { WalletBalance: func() (btcutil.Amount, error) { return walletBalance, nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } agent, err := New(cfg, nil) if err != nil { @@ -1232,8 +1250,10 @@ func TestAgentSkipPendingConns(t *testing.T) { DisconnectPeer: func(*btcec.PublicKey) error { return nil }, - Graph: memGraph, - MaxPendingOpens: 10, + Graph: memGraph, + Constraints: &HeuristicConstraints{ + MaxPendingOpens: 10, + }, } initialChans := []Channel{} agent, err := New(testCfg, initialChans) diff --git a/autopilot/prefattach.go b/autopilot/prefattach.go index 44c428fe5..85969992c 100644 --- a/autopilot/prefattach.go +++ b/autopilot/prefattach.go @@ -22,28 +22,20 @@ import ( // // TODO(roasbeef): BA, with k=-3 type ConstrainedPrefAttachment struct { - minChanSize btcutil.Amount - maxChanSize btcutil.Amount - - chanLimit uint16 - - threshold float64 + constraints *HeuristicConstraints } // NewConstrainedPrefAttachment creates a new instance of a // ConstrainedPrefAttachment heuristics given bounds on allowed channel sizes, // and an allocation amount which is interpreted as a percentage of funds that // is to be committed to channels at all times. -func NewConstrainedPrefAttachment(minChanSize, maxChanSize btcutil.Amount, - chanLimit uint16, allocation float64) *ConstrainedPrefAttachment { +func NewConstrainedPrefAttachment( + cfg *HeuristicConstraints) *ConstrainedPrefAttachment { prand.Seed(time.Now().Unix()) return &ConstrainedPrefAttachment{ - minChanSize: minChanSize, - chanLimit: chanLimit, - maxChanSize: maxChanSize, - threshold: allocation, + constraints: cfg, } } @@ -61,45 +53,11 @@ var _ AttachmentHeuristic = (*ConstrainedPrefAttachment)(nil) func (p *ConstrainedPrefAttachment) NeedMoreChans(channels []Channel, funds btcutil.Amount) (btcutil.Amount, uint32, bool) { - // If we're already over our maximum allowed number of channels, then - // we'll instruct the controller not to create any more channels. - if len(channels) >= int(p.chanLimit) { - return 0, 0, false - } - - // The number of additional channels that should be opened is the - // difference between the channel limit, and the number of channels we - // already have open. - numAdditionalChans := uint32(p.chanLimit) - uint32(len(channels)) - - // First, we'll tally up the total amount of funds that are currently - // present within the set of active channels. - var totalChanAllocation btcutil.Amount - for _, channel := range channels { - totalChanAllocation += channel.Capacity - } - - // With this value known, we'll now compute the total amount of fund - // allocated across regular utxo's and channel utxo's. - totalFunds := funds + totalChanAllocation - - // Once the total amount has been computed, we then calculate the - // fraction of funds currently allocated to channels. - fundsFraction := float64(totalChanAllocation) / float64(totalFunds) - - // If this fraction is below our threshold, then we'll return true, to - // indicate the controller should call Select to obtain a candidate set - // of channels to attempt to open. - needMore := fundsFraction < p.threshold - if !needMore { - return 0, 0, false - } - - // Now that we know we need more funds, we'll compute the amount of - // additional funds we should allocate towards channels. - targetAllocation := btcutil.Amount(float64(totalFunds) * p.threshold) - fundsAvailable := targetAllocation - totalChanAllocation - return fundsAvailable, numAdditionalChans, true + // We'll try to open more channels as long as the constraints allow it. + availableFunds, availableChans := p.constraints.availableChans( + channels, funds, + ) + return availableFunds, availableChans, availableChans > 0 } // NodeID is a simple type that holds an EC public key serialized in compressed @@ -150,7 +108,7 @@ func (p *ConstrainedPrefAttachment) Select(self *btcec.PublicKey, g ChannelGraph var directives []AttachmentDirective - if fundsAvailable < p.minChanSize { + if fundsAvailable < p.constraints.MinChanSize { return directives, nil } @@ -263,9 +221,9 @@ func (p *ConstrainedPrefAttachment) Select(self *btcec.PublicKey, g ChannelGraph // If we have enough available funds to distribute the maximum channel // size for each of the selected peers to attach to, then we'll // allocate the maximum amount to each peer. - case int64(fundsAvailable) >= numSelectedNodes*int64(p.maxChanSize): + case int64(fundsAvailable) >= numSelectedNodes*int64(p.constraints.MaxChanSize): for i := 0; i < int(numSelectedNodes); i++ { - directives[i].ChanAmt = p.maxChanSize + directives[i].ChanAmt = p.constraints.MaxChanSize } return directives, nil @@ -273,14 +231,14 @@ func (p *ConstrainedPrefAttachment) Select(self *btcec.PublicKey, g ChannelGraph // Otherwise, we'll greedily allocate our funds to the channels // successively until we run out of available funds, or can't create a // channel above the min channel size. - case int64(fundsAvailable) < numSelectedNodes*int64(p.maxChanSize): + case int64(fundsAvailable) < numSelectedNodes*int64(p.constraints.MaxChanSize): i := 0 - for fundsAvailable > p.minChanSize { + for fundsAvailable > p.constraints.MinChanSize { // We'll attempt to allocate the max channel size // initially. If we don't have enough funds to do this, // then we'll allocate the remainder of the funds // available to the channel. - delta := p.maxChanSize + delta := p.constraints.MaxChanSize if fundsAvailable-delta < 0 { delta = fundsAvailable } diff --git a/autopilot/prefattach_test.go b/autopilot/prefattach_test.go index d34402776..6a6dc44c9 100644 --- a/autopilot/prefattach_test.go +++ b/autopilot/prefattach_test.go @@ -29,6 +29,13 @@ func TestConstrainedPrefAttachmentNeedMoreChan(t *testing.T) { threshold = 0.5 ) + constraints := &HeuristicConstraints{ + MinChanSize: minChanSize, + MaxChanSize: maxChanSize, + ChanLimit: chanLimit, + Allocation: threshold, + } + randChanID := func() lnwire.ShortChannelID { return lnwire.NewShortChanIDFromInt(uint64(prand.Int63())) } @@ -146,8 +153,7 @@ func TestConstrainedPrefAttachmentNeedMoreChan(t *testing.T) { }, } - prefAttach := NewConstrainedPrefAttachment(minChanSize, maxChanSize, - chanLimit, threshold) + prefAttach := NewConstrainedPrefAttachment(constraints) for i, testCase := range testCases { amtToAllocate, numMore, needMore := prefAttach.NeedMoreChans( @@ -236,14 +242,20 @@ func TestConstrainedPrefAttachmentSelectEmptyGraph(t *testing.T) { threshold = 0.5 ) + constraints := &HeuristicConstraints{ + MinChanSize: minChanSize, + MaxChanSize: maxChanSize, + ChanLimit: chanLimit, + Allocation: threshold, + } + // First, we'll generate a random key that represents "us", and create // a new instance of the heuristic with our set parameters. self, err := randKey() if err != nil { t.Fatalf("unable to generate self key: %v", err) } - prefAttach := NewConstrainedPrefAttachment(minChanSize, maxChanSize, - chanLimit, threshold) + prefAttach := NewConstrainedPrefAttachment(constraints) skipNodes := make(map[NodeID]struct{}) for _, graph := range chanGraphs { @@ -296,6 +308,12 @@ func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) { threshold = 0.5 ) + constraints := &HeuristicConstraints{ + MinChanSize: minChanSize, + MaxChanSize: maxChanSize, + ChanLimit: chanLimit, + Allocation: threshold, + } skipNodes := make(map[NodeID]struct{}) for _, graph := range chanGraphs { success := t.Run(graph.name, func(t1 *testing.T) { @@ -314,8 +332,7 @@ func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) { if err != nil { t1.Fatalf("unable to generate self key: %v", err) } - prefAttach := NewConstrainedPrefAttachment(minChanSize, maxChanSize, - chanLimit, threshold) + prefAttach := NewConstrainedPrefAttachment(constraints) // For this set, we'll load the memory graph with two // nodes, and a random channel connecting them. @@ -386,6 +403,13 @@ func TestConstrainedPrefAttachmentSelectInsufficientFunds(t *testing.T) { threshold = 0.5 ) + constraints := &HeuristicConstraints{ + MinChanSize: minChanSize, + MaxChanSize: maxChanSize, + ChanLimit: chanLimit, + Allocation: threshold, + } + skipNodes := make(map[NodeID]struct{}) for _, graph := range chanGraphs { success := t.Run(graph.name, func(t1 *testing.T) { @@ -404,9 +428,7 @@ func TestConstrainedPrefAttachmentSelectInsufficientFunds(t *testing.T) { if err != nil { t1.Fatalf("unable to generate self key: %v", err) } - prefAttach := NewConstrainedPrefAttachment( - minChanSize, maxChanSize, chanLimit, threshold, - ) + prefAttach := NewConstrainedPrefAttachment(constraints) // Next, we'll attempt to select a set of candidates, // passing zero for the amount of wallet funds. This @@ -445,6 +467,13 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) { threshold = 0.5 ) + constraints := &HeuristicConstraints{ + MinChanSize: minChanSize, + MaxChanSize: maxChanSize, + ChanLimit: chanLimit, + Allocation: threshold, + } + skipNodes := make(map[NodeID]struct{}) for _, graph := range chanGraphs { success := t.Run(graph.name, func(t1 *testing.T) { @@ -463,9 +492,7 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) { if err != nil { t1.Fatalf("unable to generate self key: %v", err) } - prefAttach := NewConstrainedPrefAttachment( - minChanSize, maxChanSize, chanLimit, threshold, - ) + prefAttach := NewConstrainedPrefAttachment(constraints) const chanCapacity = btcutil.SatoshiPerBitcoin @@ -581,6 +608,13 @@ func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) { threshold = 0.5 ) + constraints := &HeuristicConstraints{ + MinChanSize: minChanSize, + MaxChanSize: maxChanSize, + ChanLimit: chanLimit, + Allocation: threshold, + } + for _, graph := range chanGraphs { success := t.Run(graph.name, func(t1 *testing.T) { skipNodes := make(map[NodeID]struct{}) @@ -600,9 +634,7 @@ func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) { if err != nil { t1.Fatalf("unable to generate self key: %v", err) } - prefAttach := NewConstrainedPrefAttachment( - minChanSize, maxChanSize, chanLimit, threshold, - ) + prefAttach := NewConstrainedPrefAttachment(constraints) // Next, we'll create a simple topology of two nodes, // with a single channel connecting them. diff --git a/pilot.go b/pilot.go index c745082c2..174452d72 100644 --- a/pilot.go +++ b/pilot.go @@ -85,12 +85,19 @@ var _ autopilot.ChannelController = (*chanController)(nil) func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error) { atplLog.Infof("Instantiating autopilot with cfg: %v", spew.Sdump(cfg)) + // Set up the constraints the autopilot heuristics must adhere to. + atplConstraints := &autopilot.HeuristicConstraints{ + MinChanSize: btcutil.Amount(cfg.MinChannelSize), + MaxChanSize: btcutil.Amount(cfg.MaxChannelSize), + ChanLimit: uint16(cfg.MaxChannels), + Allocation: cfg.Allocation, + MaxPendingOpens: 10, + } + // First, we'll create the preferential attachment heuristic, // initialized with the passed auto pilot configuration parameters. prefAttachment := autopilot.NewConstrainedPrefAttachment( - btcutil.Amount(cfg.MinChannelSize), - btcutil.Amount(cfg.MaxChannelSize), - uint16(cfg.MaxChannels), cfg.Allocation, + atplConstraints, ) // With the heuristic itself created, we can now populate the remainder @@ -107,8 +114,8 @@ func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error) WalletBalance: func() (btcutil.Amount, error) { return svr.cc.wallet.ConfirmedBalance(cfg.MinConfs) }, - Graph: autopilot.ChannelGraphFromDatabase(svr.chanDB.ChannelGraph()), - MaxPendingOpens: 10, + Graph: autopilot.ChannelGraphFromDatabase(svr.chanDB.ChannelGraph()), + Constraints: atplConstraints, ConnectToPeer: func(target *btcec.PublicKey, addrs []net.Addr) (bool, error) { // First, we'll check if we're already connected to the // target peer. If we are, we can exit early. Otherwise, From 86e6d230f2cb22d817da5258f265e5044900f1b9 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 6 Dec 2018 14:24:15 +0100 Subject: [PATCH 03/12] autopilot/agent: add attachment directive goroutine to wait group --- autopilot/agent.go | 3 +++ autopilot/agent_test.go | 22 +++++++++++++++++++--- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index 12761db30..a59602c0e 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -545,7 +545,10 @@ func (a *Agent) controller() { } pendingConns[nodeID] = struct{}{} + a.wg.Add(1) go func(directive AttachmentDirective) { + defer a.wg.Done() + // We'll start out by attempting to connect to // the peer in order to begin the funding // workflow. diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index 3330ffd3b..98b03a571 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -1231,6 +1231,7 @@ func TestAgentSkipPendingConns(t *testing.T) { const walletBalance = btcutil.SatoshiPerBitcoin * 6 connect := make(chan chan error) + quit := make(chan struct{}) // With the dependencies we created, we can now create the initial // agent itself. @@ -1243,9 +1244,19 @@ func TestAgentSkipPendingConns(t *testing.T) { }, ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { errChan := make(chan error) - connect <- errChan - err := <-errChan - return false, err + + select { + case connect <- errChan: + case <-quit: + return false, errors.New("quit") + } + + select { + case err := <-errChan: + return false, err + case <-quit: + return false, errors.New("quit") + } }, DisconnectPeer: func(*btcec.PublicKey) error { return nil @@ -1272,6 +1283,11 @@ func TestAgentSkipPendingConns(t *testing.T) { } defer agent.Stop() + // We must defer the closing of quit after the defer agent.Stop(), to + // make sure ConnectToPeer won't block preventing the agent from + // exiting. + defer close(quit) + // We'll send an initial "yes" response to advance the agent past its // initial check. This will cause it to try to get directives from the // graph. From fb10175ea504db80915d18a891eb88ba60f0b48e Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 22 Nov 2018 23:18:09 +0100 Subject: [PATCH 04/12] autopilot/agent: add maps to agent struct This commit moves the maps used by the controller loop to the Agent struct, in preparation for breaking it up into smaller parts. --- autopilot/agent.go | 110 +++++++++++++++++++++++---------------------- 1 file changed, 56 insertions(+), 54 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index a59602c0e..10be87b9a 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -143,6 +143,22 @@ type Agent struct { // when the agent receives external balance update signals. totalBalance btcutil.Amount + // failedNodes lists nodes that we've previously attempted to initiate + // channels with, but didn't succeed. + failedNodes map[NodeID]struct{} + + // pendingConns tracks the nodes that we are attempting to make + // connections to. This prevents us from making duplicate connection + // requests to the same node. + pendingConns map[NodeID]struct{} + + // pendingOpens tracks the channels that we've requested to be + // initiated, but haven't yet been confirmed as being fully opened. + // This state is required as otherwise, we may go over our allotted + // channel limit, or open multiple channels to the same node. + pendingOpens map[NodeID]Channel + pendingMtx sync.Mutex + quit chan struct{} wg sync.WaitGroup } @@ -161,6 +177,9 @@ func New(cfg Config, initialState []Channel) (*Agent, error) { nodeUpdates: make(chan *nodeUpdates, 1), chanOpenFailures: make(chan *chanOpenFailureUpdate, 1), pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1), + failedNodes: make(map[NodeID]struct{}), + pendingConns: make(map[NodeID]struct{}), + pendingOpens: make(map[NodeID]Channel), } for _, c := range initialState { @@ -357,23 +376,6 @@ func (a *Agent) controller() { // TODO(roasbeef): do we in fact need to maintain order? // * use sync.Cond if so - - // failedNodes lists nodes that we've previously attempted to initiate - // channels with, but didn't succeed. - failedNodes := make(map[NodeID]struct{}) - - // pendingConns tracks the nodes that we are attempting to make - // connections to. This prevents us from making duplicate connection - // requests to the same node. - pendingConns := make(map[NodeID]struct{}) - - // pendingOpens tracks the channels that we've requested to be - // initiated, but haven't yet been confirmed as being fully opened. - // This state is required as otherwise, we may go over our allotted - // channel limit, or open multiple channels to the same node. - pendingOpens := make(map[NodeID]Channel) - var pendingMtx sync.Mutex - updateBalance := func() { newBalance, err := a.cfg.WalletBalance() if err != nil { @@ -405,9 +407,9 @@ func (a *Agent) controller() { newChan := update.newChan a.chanState[newChan.ChanID] = newChan - pendingMtx.Lock() - delete(pendingOpens, newChan.Node) - pendingMtx.Unlock() + a.pendingMtx.Lock() + delete(a.pendingOpens, newChan.Node) + a.pendingMtx.Unlock() updateBalance() // A channel has been closed, this may free up an @@ -458,17 +460,17 @@ func (a *Agent) controller() { return } - pendingMtx.Lock() - log.Debugf("Pending channels: %v", spew.Sdump(pendingOpens)) - pendingMtx.Unlock() + a.pendingMtx.Lock() + log.Debugf("Pending channels: %v", spew.Sdump(a.pendingOpens)) + a.pendingMtx.Unlock() // With all the updates applied, we'll obtain a set of the // current active channels (confirmed channels), and also // factor in our set of unconfirmed channels. confirmedChans := a.chanState - pendingMtx.Lock() - totalChans := mergeChanState(pendingOpens, confirmedChans) - pendingMtx.Unlock() + a.pendingMtx.Lock() + totalChans := mergeChanState(a.pendingOpens, confirmedChans) + a.pendingMtx.Unlock() // Now that we've updated our internal state, we'll consult our // channel attachment heuristic to determine if we should open @@ -487,11 +489,11 @@ func (a *Agent) controller() { // nodes that we currently have channels with so we avoid // duplicate edges. connectedNodes := a.chanState.ConnectedNodes() - pendingMtx.Lock() - nodesToSkip := mergeNodeMaps(pendingOpens, - pendingConns, connectedNodes, failedNodes, + a.pendingMtx.Lock() + nodesToSkip := mergeNodeMaps(a.pendingOpens, + a.pendingConns, connectedNodes, a.failedNodes, ) - pendingMtx.Unlock() + a.pendingMtx.Unlock() // If we reach this point, then according to our heuristic we // should modify our channel state to tend towards what it @@ -522,9 +524,9 @@ func (a *Agent) controller() { // certain which ones may actually succeed. If too many // connections succeed, we will they will be ignored and made // available to future heuristic selections. - pendingMtx.Lock() - if uint16(len(pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens { - pendingMtx.Unlock() + a.pendingMtx.Lock() + if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens { + a.pendingMtx.Unlock() log.Debugf("Reached cap of %v pending "+ "channel opens, will retry "+ "after success/failure", @@ -540,10 +542,10 @@ func (a *Agent) controller() { // Skip candidates which we are already trying // to establish a connection with. nodeID := chanCandidate.NodeID - if _, ok := pendingConns[nodeID]; ok { + if _, ok := a.pendingConns[nodeID]; ok { continue } - pendingConns[nodeID] = struct{}{} + a.pendingConns[nodeID] = struct{}{} a.wg.Add(1) go func(directive AttachmentDirective) { @@ -567,10 +569,10 @@ func (a *Agent) controller() { // don't attempt to connect to them // again. nodeID := NewNodeID(pub) - pendingMtx.Lock() - delete(pendingConns, nodeID) - failedNodes[nodeID] = struct{}{} - pendingMtx.Unlock() + a.pendingMtx.Lock() + delete(a.pendingConns, nodeID) + a.failedNodes[nodeID] = struct{}{} + a.pendingMtx.Unlock() // Finally, we'll trigger the agent to // select new peers to connect to. @@ -586,8 +588,8 @@ func (a *Agent) controller() { // directives were spawned but fewer slots were // available, and other successful attempts // finished first. - pendingMtx.Lock() - if uint16(len(pendingOpens)) >= + a.pendingMtx.Lock() + if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens { // Since we've reached our max number of // pending opens, we'll disconnect this @@ -600,10 +602,10 @@ func (a *Agent) controller() { // connecting, we won't add this // peer to the failed nodes map, // but we will remove it from - // pendingConns so that it can + // a.pendingConns so that it can // be retried in the future. - delete(pendingConns, nodeID) - pendingMtx.Unlock() + delete(a.pendingConns, nodeID) + a.pendingMtx.Unlock() return } @@ -622,8 +624,8 @@ func (a *Agent) controller() { // remove this node from our pending // conns map, permitting subsequent // connection attempts. - delete(pendingConns, nodeID) - pendingMtx.Unlock() + delete(a.pendingConns, nodeID) + a.pendingMtx.Unlock() return } @@ -633,12 +635,12 @@ func (a *Agent) controller() { // peers if the connection attempt happens to // take too long. nodeID := directive.NodeID - delete(pendingConns, nodeID) - pendingOpens[nodeID] = Channel{ + delete(a.pendingConns, nodeID) + a.pendingOpens[nodeID] = Channel{ Capacity: directive.ChanAmt, Node: nodeID, } - pendingMtx.Unlock() + a.pendingMtx.Unlock() // We can then begin the funding workflow with // this peer. @@ -656,10 +658,10 @@ func (a *Agent) controller() { // opens and mark them as failed so we // don't attempt to open a channel to // them again. - pendingMtx.Lock() - delete(pendingOpens, nodeID) - failedNodes[nodeID] = struct{}{} - pendingMtx.Unlock() + a.pendingMtx.Lock() + delete(a.pendingOpens, nodeID) + a.failedNodes[nodeID] = struct{}{} + a.pendingMtx.Unlock() // Trigger the agent to re-evaluate // everything and possibly retry with a @@ -690,7 +692,7 @@ func (a *Agent) controller() { a.OnChannelPendingOpen() }(chanCandidate) } - pendingMtx.Unlock() + a.pendingMtx.Unlock() } } From 26810fe928762c83300b3fd4b1698ab51ba90c0c Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 6 Dec 2018 13:03:10 +0100 Subject: [PATCH 05/12] autopilot/agent: split attachement directive attempts into method --- autopilot/agent.go | 258 ++++++++++++++++++++------------------------- 1 file changed, 114 insertions(+), 144 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index 10be87b9a..42370fd0c 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -548,151 +548,121 @@ func (a *Agent) controller() { a.pendingConns[nodeID] = struct{}{} a.wg.Add(1) - go func(directive AttachmentDirective) { - defer a.wg.Done() - - // We'll start out by attempting to connect to - // the peer in order to begin the funding - // workflow. - pub := directive.NodeKey - alreadyConnected, err := a.cfg.ConnectToPeer( - pub, directive.Addrs, - ) - if err != nil { - log.Warnf("Unable to connect "+ - "to %x: %v", - pub.SerializeCompressed(), - err) - - // Since we failed to connect to them, - // we'll mark them as failed so that we - // don't attempt to connect to them - // again. - nodeID := NewNodeID(pub) - a.pendingMtx.Lock() - delete(a.pendingConns, nodeID) - a.failedNodes[nodeID] = struct{}{} - a.pendingMtx.Unlock() - - // Finally, we'll trigger the agent to - // select new peers to connect to. - a.OnChannelOpenFailure() - - return - } - - // The connection was successful, though before - // progressing we must check that we have not - // already met our quota for max pending open - // channels. This can happen if multiple - // directives were spawned but fewer slots were - // available, and other successful attempts - // finished first. - a.pendingMtx.Lock() - if uint16(len(a.pendingOpens)) >= - a.cfg.Constraints.MaxPendingOpens { - // Since we've reached our max number of - // pending opens, we'll disconnect this - // peer and exit. However, if we were - // previously connected to them, then - // we'll make sure to maintain the - // connection alive. - if alreadyConnected { - // Since we succeeded in - // connecting, we won't add this - // peer to the failed nodes map, - // but we will remove it from - // a.pendingConns so that it can - // be retried in the future. - delete(a.pendingConns, nodeID) - a.pendingMtx.Unlock() - return - } - - err = a.cfg.DisconnectPeer( - pub, - ) - if err != nil { - log.Warnf("Unable to "+ - "disconnect peer "+ - "%x: %v", - pub.SerializeCompressed(), - err) - } - - // Now that we have disconnected, we can - // remove this node from our pending - // conns map, permitting subsequent - // connection attempts. - delete(a.pendingConns, nodeID) - a.pendingMtx.Unlock() - return - } - - // If we were successful, we'll track this peer - // in our set of pending opens. We do this here - // to ensure we don't stall on selecting new - // peers if the connection attempt happens to - // take too long. - nodeID := directive.NodeID - delete(a.pendingConns, nodeID) - a.pendingOpens[nodeID] = Channel{ - Capacity: directive.ChanAmt, - Node: nodeID, - } - a.pendingMtx.Unlock() - - // We can then begin the funding workflow with - // this peer. - err = a.cfg.ChanController.OpenChannel( - pub, directive.ChanAmt, - ) - if err != nil { - log.Warnf("Unable to open "+ - "channel to %x of %v: %v", - pub.SerializeCompressed(), - directive.ChanAmt, err) - - // As the attempt failed, we'll clear - // the peer from the set of pending - // opens and mark them as failed so we - // don't attempt to open a channel to - // them again. - a.pendingMtx.Lock() - delete(a.pendingOpens, nodeID) - a.failedNodes[nodeID] = struct{}{} - a.pendingMtx.Unlock() - - // Trigger the agent to re-evaluate - // everything and possibly retry with a - // different node. - a.OnChannelOpenFailure() - - // Finally, we should also disconnect - // the peer if we weren't already - // connected to them beforehand by an - // external subsystem. - if alreadyConnected { - return - } - - err = a.cfg.DisconnectPeer(pub) - if err != nil { - log.Warnf("Unable to "+ - "disconnect peer "+ - "%x: %v", - pub.SerializeCompressed(), - err) - } - } - - // Since the channel open was successful and is - // currently pending, we'll trigger the - // autopilot agent to query for more peers. - a.OnChannelPendingOpen() - }(chanCandidate) + go a.executeDirective(chanCandidate) } a.pendingMtx.Unlock() - } } + +// executeDirective attempts to connect to the channel candidate specified by +// the given attachment directive, and open a channel of the given size. +// +// NOTE: MUST be run as a goroutine. +func (a *Agent) executeDirective(directive AttachmentDirective) { + defer a.wg.Done() + + // We'll start out by attempting to connect to the peer in order to + // begin the funding workflow. + pub := directive.NodeKey + nodeID := directive.NodeID + alreadyConnected, err := a.cfg.ConnectToPeer(pub, directive.Addrs) + if err != nil { + log.Warnf("Unable to connect to %x: %v", + pub.SerializeCompressed(), err) + + // Since we failed to connect to them, we'll mark them as + // failed so that we don't attempt to connect to them again. + a.pendingMtx.Lock() + delete(a.pendingConns, nodeID) + a.failedNodes[nodeID] = struct{}{} + a.pendingMtx.Unlock() + + // Finally, we'll trigger the agent to select new peers to + // connect to. + a.OnChannelOpenFailure() + + return + } + + // The connection was successful, though before progressing we must + // check that we have not already met our quota for max pending open + // channels. This can happen if multiple directives were spawned but + // fewer slots were available, and other successful attempts finished + // first. + a.pendingMtx.Lock() + if uint16(len(a.pendingOpens)) >= + a.cfg.Constraints.MaxPendingOpens { + // Since we've reached our max number of pending opens, we'll + // disconnect this peer and exit. However, if we were + // previously connected to them, then we'll make sure to + // maintain the connection alive. + if alreadyConnected { + // Since we succeeded in connecting, we won't add this + // peer to the failed nodes map, but we will remove it + // from a.pendingConns so that it can be retried in the + // future. + delete(a.pendingConns, nodeID) + a.pendingMtx.Unlock() + return + } + + err = a.cfg.DisconnectPeer(pub) + if err != nil { + log.Warnf("Unable to disconnect peer %x: %v", + pub.SerializeCompressed(), err) + } + + // Now that we have disconnected, we can remove this node from + // our pending conns map, permitting subsequent connection + // attempts. + delete(a.pendingConns, nodeID) + a.pendingMtx.Unlock() + return + } + + // If we were successful, we'll track this peer in our set of pending + // opens. We do this here to ensure we don't stall on selecting new + // peers if the connection attempt happens to take too long. + delete(a.pendingConns, nodeID) + a.pendingOpens[nodeID] = Channel{ + Capacity: directive.ChanAmt, + Node: nodeID, + } + a.pendingMtx.Unlock() + + // We can then begin the funding workflow with this peer. + err = a.cfg.ChanController.OpenChannel(pub, directive.ChanAmt) + if err != nil { + log.Warnf("Unable to open channel to %x of %v: %v", + pub.SerializeCompressed(), directive.ChanAmt, err) + + // As the attempt failed, we'll clear the peer from the set of + // pending opens and mark them as failed so we don't attempt to + // open a channel to them again. + a.pendingMtx.Lock() + delete(a.pendingOpens, nodeID) + a.failedNodes[nodeID] = struct{}{} + a.pendingMtx.Unlock() + + // Trigger the agent to re-evaluate everything and possibly + // retry with a different node. + a.OnChannelOpenFailure() + + // Finally, we should also disconnect the peer if we weren't + // already connected to them beforehand by an external + // subsystem. + if alreadyConnected { + return + } + + err = a.cfg.DisconnectPeer(pub) + if err != nil { + log.Warnf("Unable to disconnect peer %x: %v", + pub.SerializeCompressed(), err) + } + } + + // Since the channel open was successful and is currently pending, + // we'll trigger the autopilot agent to query for more peers. + a.OnChannelPendingOpen() +} From 89c3c5319f85af1025eb1aaaa08a0790617b5705 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 22 Nov 2018 23:18:09 +0100 Subject: [PATCH 06/12] autopilot/agent: split opening logic into own method This commit takes the logic after the autopilot agent has decided that it needs to open more channels, and moves it into a new method openChan. --- autopilot/agent.go | 139 ++++++++++++++++++++++++--------------------- 1 file changed, 75 insertions(+), 64 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index 42370fd0c..e984c8fde 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -1,6 +1,7 @@ package autopilot import ( + "fmt" "net" "sync" "sync/atomic" @@ -485,75 +486,85 @@ func (a *Agent) controller() { log.Infof("Triggering attachment directive dispatch, "+ "total_funds=%v", a.totalBalance) - // We're to attempt an attachment so we'll obtain the set of - // nodes that we currently have channels with so we avoid - // duplicate edges. - connectedNodes := a.chanState.ConnectedNodes() - a.pendingMtx.Lock() - nodesToSkip := mergeNodeMaps(a.pendingOpens, - a.pendingConns, connectedNodes, a.failedNodes, - ) - a.pendingMtx.Unlock() - - // If we reach this point, then according to our heuristic we - // should modify our channel state to tend towards what it - // determines to the optimal state. So we'll call Select to get - // a fresh batch of attachment directives, passing in the - // amount of funds available for us to use. - chanCandidates, err := a.cfg.Heuristic.Select( - a.cfg.Self, a.cfg.Graph, availableFunds, - numChans, nodesToSkip, - ) + err := a.openChans(availableFunds, numChans, totalChans) if err != nil { - log.Errorf("Unable to select candidates for "+ - "attachment: %v", err) - continue + log.Errorf("Unable to open channels: %v", err) } - - if len(chanCandidates) == 0 { - log.Infof("No eligible candidates to connect to") - continue - } - - log.Infof("Attempting to execute channel attachment "+ - "directives: %v", spew.Sdump(chanCandidates)) - - // Before proceeding, check to see if we have any slots - // available to open channels. If there are any, we will attempt - // to dispatch the retrieved directives since we can't be - // certain which ones may actually succeed. If too many - // connections succeed, we will they will be ignored and made - // available to future heuristic selections. - a.pendingMtx.Lock() - if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens { - a.pendingMtx.Unlock() - log.Debugf("Reached cap of %v pending "+ - "channel opens, will retry "+ - "after success/failure", - a.cfg.Constraints.MaxPendingOpens) - continue - } - - // For each recommended attachment directive, we'll launch a - // new goroutine to attempt to carry out the directive. If any - // of these succeed, then we'll receive a new state update, - // taking us back to the top of our controller loop. - for _, chanCandidate := range chanCandidates { - // Skip candidates which we are already trying - // to establish a connection with. - nodeID := chanCandidate.NodeID - if _, ok := a.pendingConns[nodeID]; ok { - continue - } - a.pendingConns[nodeID] = struct{}{} - - a.wg.Add(1) - go a.executeDirective(chanCandidate) - } - a.pendingMtx.Unlock() } } +// openChans queries the agent's heuristic for a set of channel candidates, and +// attempts to open channels to them. +func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32, + totalChans []Channel) error { + + // We're to attempt an attachment so we'll obtain the set of + // nodes that we currently have channels with so we avoid + // duplicate edges. + connectedNodes := a.chanState.ConnectedNodes() + a.pendingMtx.Lock() + nodesToSkip := mergeNodeMaps(a.pendingOpens, + a.pendingConns, connectedNodes, a.failedNodes, + ) + a.pendingMtx.Unlock() + + // If we reach this point, then according to our heuristic we + // should modify our channel state to tend towards what it + // determines to the optimal state. So we'll call Select to get + // a fresh batch of attachment directives, passing in the + // amount of funds available for us to use. + chanCandidates, err := a.cfg.Heuristic.Select( + a.cfg.Self, a.cfg.Graph, availableFunds, + numChans, nodesToSkip, + ) + if err != nil { + return fmt.Errorf("Unable to select candidates for "+ + "attachment: %v", err) + } + + if len(chanCandidates) == 0 { + log.Infof("No eligible candidates to connect to") + return nil + } + + log.Infof("Attempting to execute channel attachment "+ + "directives: %v", spew.Sdump(chanCandidates)) + + // Before proceeding, check to see if we have any slots + // available to open channels. If there are any, we will attempt + // to dispatch the retrieved directives since we can't be + // certain which ones may actually succeed. If too many + // connections succeed, we will they will be ignored and made + // available to future heuristic selections. + a.pendingMtx.Lock() + defer a.pendingMtx.Unlock() + if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens { + log.Debugf("Reached cap of %v pending "+ + "channel opens, will retry "+ + "after success/failure", + a.cfg.Constraints.MaxPendingOpens) + return nil + } + + // For each recommended attachment directive, we'll launch a + // new goroutine to attempt to carry out the directive. If any + // of these succeed, then we'll receive a new state update, + // taking us back to the top of our controller loop. + for _, chanCandidate := range chanCandidates { + // Skip candidates which we are already trying + // to establish a connection with. + nodeID := chanCandidate.NodeID + if _, ok := a.pendingConns[nodeID]; ok { + continue + } + a.pendingConns[nodeID] = struct{}{} + + a.wg.Add(1) + go a.executeDirective(chanCandidate) + } + return nil +} + // executeDirective attempts to connect to the channel candidate specified by // the given attachment directive, and open a channel of the given size. // From 5ecc209c419681139bb4c9e8a839f45ea6a903a0 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 6 Dec 2018 13:59:46 +0100 Subject: [PATCH 07/12] autopilot/interface+agent: remove NodeKey from AttachmentDirective Instead parse the pubkey bytes only when needed. --- autopilot/agent.go | 7 ++++++- autopilot/agent_test.go | 5 ----- autopilot/interface.go | 10 ++++------ autopilot/prefattach.go | 14 +++----------- autopilot/prefattach_test.go | 10 +++++----- 5 files changed, 18 insertions(+), 28 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index e984c8fde..f4b25b9ac 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -574,8 +574,13 @@ func (a *Agent) executeDirective(directive AttachmentDirective) { // We'll start out by attempting to connect to the peer in order to // begin the funding workflow. - pub := directive.NodeKey nodeID := directive.NodeID + pub, err := btcec.ParsePubKey(nodeID[:], btcec.S256()) + if err != nil { + log.Errorf("Unable to parse pubkey %x: %v", nodeID, err) + return + } + alreadyConnected, err := a.cfg.ConnectToPeer(pub, directive.Addrs) if err != nil { log.Warnf("Unable to connect to %x: %v", diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index 98b03a571..c5c07de4a 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -325,7 +325,6 @@ func TestAgentChannelFailureSignal(t *testing.T) { // request attachment directives, return a fake so the agent will // attempt to open a channel. var fakeDirective = AttachmentDirective{ - NodeKey: self, NodeID: NewNodeID(self), ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -669,7 +668,6 @@ func TestAgentImmediateAttach(t *testing.T) { } nodeID := NewNodeID(pub) directives[i] = AttachmentDirective{ - NodeKey: pub, NodeID: nodeID, ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -802,7 +800,6 @@ func TestAgentPrivateChannels(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } directives[i] = AttachmentDirective{ - NodeKey: pub, NodeID: NewNodeID(pub), ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -925,7 +922,6 @@ func TestAgentPendingChannelState(t *testing.T) { } nodeID := NewNodeID(nodeKey) nodeDirective := AttachmentDirective{ - NodeKey: nodeKey, NodeID: nodeID, ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -1309,7 +1305,6 @@ func TestAgentSkipPendingConns(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } nodeDirective := AttachmentDirective{ - NodeKey: nodeKey, NodeID: NewNodeID(nodeKey), ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ diff --git a/autopilot/interface.go b/autopilot/interface.go index efa992985..35e447e7c 100644 --- a/autopilot/interface.go +++ b/autopilot/interface.go @@ -85,12 +85,10 @@ type ChannelGraph interface { // AttachmentHeuristic. It details to which node a channel should be created // to, and also the parameters which should be used in the channel creation. type AttachmentDirective struct { - // NodeKey is the target node for this attachment directive. It can be - // identified by its public key, and therefore can be used along with - // a ChannelOpener implementation to execute the directive. - NodeKey *btcec.PublicKey - - // NodeID is the serialized compressed pubkey of the target node. + // NodeID is the serialized compressed pubkey of the target node for + // this attachment directive. It can be identified by its public key, + // and therefore can be used along with a ChannelOpener implementation + // to execute the directive. NodeID NodeID // ChanAmt is the size of the channel that should be opened, expressed diff --git a/autopilot/prefattach.go b/autopilot/prefattach.go index 85969992c..d675ac981 100644 --- a/autopilot/prefattach.go +++ b/autopilot/prefattach.go @@ -197,23 +197,15 @@ func (p *ConstrainedPrefAttachment) Select(self *btcec.PublicKey, g ChannelGraph // With the node selected, we'll add this (node, amount) tuple // to out set of recommended directives. pubBytes := selectedNode.PubKey() - pub, err := btcec.ParsePubKey(pubBytes[:], btcec.S256()) - if err != nil { - return nil, err - } + nID := NodeID(pubBytes) directives = append(directives, AttachmentDirective{ - // TODO(roasbeef): need curve? - NodeKey: &btcec.PublicKey{ - X: pub.X, - Y: pub.Y, - }, - NodeID: NewNodeID(pub), + NodeID: nID, Addrs: selectedNode.Addrs(), }) // With the node selected, we'll add it to the set of visited // nodes to avoid attaching to it again. - visited[NodeID(pubBytes)] = struct{}{} + visited[nID] = struct{}{} } numSelectedNodes := int64(len(directives)) diff --git a/autopilot/prefattach_test.go b/autopilot/prefattach_test.go index 6a6dc44c9..f3e6daf80 100644 --- a/autopilot/prefattach_test.go +++ b/autopilot/prefattach_test.go @@ -366,11 +366,11 @@ func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) { edge2Pub := edge2.Peer.PubKey() switch { - case bytes.Equal(directive.NodeKey.SerializeCompressed(), edge1Pub[:]): - case bytes.Equal(directive.NodeKey.SerializeCompressed(), edge2Pub[:]): + case bytes.Equal(directive.NodeID[:], edge1Pub[:]): + case bytes.Equal(directive.NodeID[:], edge2Pub[:]): default: t1.Fatalf("attached to unknown node: %x", - directive.NodeKey.SerializeCompressed()) + directive.NodeID[:]) } // As the number of funds available exceed the @@ -666,8 +666,8 @@ func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) { // We'll simulate a channel update by adding the nodes // we just establish channel with the to set of nodes // to be skipped. - skipNodes[NewNodeID(directives[0].NodeKey)] = struct{}{} - skipNodes[NewNodeID(directives[1].NodeKey)] = struct{}{} + skipNodes[directives[0].NodeID] = struct{}{} + skipNodes[directives[1].NodeID] = struct{}{} // If we attempt to make a call to the Select function, // without providing any new information, then we From 5e8e54083fb0b7ad1ffa2027e6b9b496aadb4a66 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 22 Nov 2018 23:18:09 +0100 Subject: [PATCH 08/12] autopilot/prefattach+interface: add API NodeScores This commit adds a new method NodeScores to the AttachementHeuristic interface. Its intended use is to score a set of nodes according to their preference as channel counterparties. The PrefAttach heuristic gets a NodeScores method that will score the ndoes according to their number of already existing channels, similar to what is done already in Select. --- autopilot/agent_test.go | 7 +++ autopilot/interface.go | 22 +++++++++ autopilot/prefattach.go | 106 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 135 insertions(+) diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index c5c07de4a..0017eb459 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -93,6 +93,13 @@ func (m *mockHeuristic) Select(self *btcec.PublicKey, graph ChannelGraph, } } +func (m *mockHeuristic) NodeScores(g ChannelGraph, chans []Channel, + fundsAvailable btcutil.Amount, nodes map[NodeID]struct{}) ( + map[NodeID]*AttachmentDirective, error) { + + return nil, nil +} + var _ AttachmentHeuristic = (*mockHeuristic)(nil) type openChanIntent struct { diff --git a/autopilot/interface.go b/autopilot/interface.go index 35e447e7c..492a2ba3f 100644 --- a/autopilot/interface.go +++ b/autopilot/interface.go @@ -98,6 +98,10 @@ type AttachmentDirective struct { // Addrs is a list of addresses that the target peer may be reachable // at. Addrs []net.Addr + + // Score is the score given by the heuristic for opening a channel of + // the given size to this node. + Score float64 } // AttachmentHeuristic is one of the primary interfaces within this package. @@ -127,6 +131,24 @@ type AttachmentHeuristic interface { Select(self *btcec.PublicKey, graph ChannelGraph, amtToUse btcutil.Amount, numNewChans uint32, skipNodes map[NodeID]struct{}) ([]AttachmentDirective, error) + + // NodeScores is a method that given the current channel graph, current + // set of local channels and funds available, scores the given nodes + // according to the preference of opening a channel with them. The + // returned channel candidates maps the NodeID to an attachemnt + // directive containing a score and a channel size. + // + // The scores will be in the range [0, M], where 0 indicates no + // improvement in connectivity if a channel is opened to this node, + // while M is the maximum possible improvement in connectivity. The + // size of M is up to the implementation of this interface, so scores + // must be normalized if compared against other implementations. + // + // NOTE: A NodeID not found in the returned map is implicitly given a + // score of 0. + NodeScores(g ChannelGraph, chans []Channel, + fundsAvailable btcutil.Amount, nodes map[NodeID]struct{}) ( + map[NodeID]*AttachmentDirective, error) } // ChannelController is a simple interface that allows an auto-pilot agent to diff --git a/autopilot/prefattach.go b/autopilot/prefattach.go index d675ac981..59f89bb26 100644 --- a/autopilot/prefattach.go +++ b/autopilot/prefattach.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" prand "math/rand" + "net" "time" "github.com/btcsuite/btcd/btcec" @@ -249,3 +250,108 @@ func (p *ConstrainedPrefAttachment) Select(self *btcec.PublicKey, g ChannelGraph return nil, fmt.Errorf("err") } } + +// NodeScores is a method that given the current channel graph, current set of +// local channels and funds available, scores the given nodes according the the +// preference of opening a channel with them. +// +// The heuristic employed by this method is one that attempts to promote a +// scale-free network globally, via local attachment preferences for new nodes +// joining the network with an amount of available funds to be allocated to +// channels. Specifically, we consider the degree of each node (and the flow +// in/out of the node available via its open channels) and utilize the +// Barabási–Albert model to drive our recommended attachment heuristics. If +// implemented globally for each new participant, this results in a channel +// graph that is scale-free and follows a power law distribution with k=-3. +// +// The returned scores will be in the range [0.0, 1.0], where higher scores are +// given to nodes already having high connectivity in the graph. +// +// NOTE: This is a part of the AttachmentHeuristic interface. +func (p *ConstrainedPrefAttachment) NodeScores(g ChannelGraph, chans []Channel, + fundsAvailable btcutil.Amount, nodes map[NodeID]struct{}) ( + map[NodeID]*AttachmentDirective, error) { + + // Count the number of channels in the graph. We'll also count the + // number of channels as we go for the nodes we are interested in, and + // record their addresses found in the db. + var graphChans int + nodeChanNum := make(map[NodeID]int) + addresses := make(map[NodeID][]net.Addr) + if err := g.ForEachNode(func(n Node) error { + var nodeChans int + err := n.ForEachChannel(func(_ ChannelEdge) error { + nodeChans++ + graphChans++ + return nil + }) + if err != nil { + return err + } + + // If this node is not among our nodes to score, we can return + // early. + nID := NodeID(n.PubKey()) + if _, ok := nodes[nID]; !ok { + return nil + } + + // Otherwise we'll record the number of channels, and also + // populate the address in our channel candidates map. + nodeChanNum[nID] = nodeChans + addresses[nID] = n.Addrs() + + return nil + }); err != nil { + return nil, err + } + + // If there are no channels in the graph we cannot determine any + // preferences, so we return, indicating all candidates get a score of + // zero. + if graphChans == 0 { + return nil, nil + } + + existingPeers := make(map[NodeID]struct{}) + for _, c := range chans { + existingPeers[c.Node] = struct{}{} + } + + // For each node in the set of nodes, count their fraction of channels + // in the graph, and use that as the score. + candidates := make(map[NodeID]*AttachmentDirective) + for nID, nodeChans := range nodeChanNum { + // As channel size we'll use the maximum channel size available. + chanSize := p.constraints.MaxChanSize + if fundsAvailable-chanSize < 0 { + chanSize = fundsAvailable + } + + _, ok := existingPeers[nID] + + switch { + + // If the node is among or existing channel peers, we don't + // need another channel. + case ok: + continue + + // If the amount is too small, we don't want to attempt opening + // another channel. + case chanSize == 0 || chanSize < p.constraints.MinChanSize: + continue + } + + // Otherwise we score the node according to its fraction of + // channels in the graph. + score := float64(nodeChans) / float64(graphChans) + candidates[nID] = &AttachmentDirective{ + NodeID: nID, + ChanAmt: chanSize, + Score: score, + } + } + + return candidates, nil +} From be45697c6d1bbc344bd577e959bbdb9da587f575 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 22 Nov 2018 23:18:09 +0100 Subject: [PATCH 09/12] autopilot/prefattach_test: use NodeScores API for prefAttach tests This commit converts the existing unit tests of Select into tests of NodeScores. --- autopilot/prefattach_test.go | 371 ++++++++++++++++++++++------------- 1 file changed, 235 insertions(+), 136 deletions(-) diff --git a/autopilot/prefattach_test.go b/autopilot/prefattach_test.go index f3e6daf80..adaee08b7 100644 --- a/autopilot/prefattach_test.go +++ b/autopilot/prefattach_test.go @@ -230,10 +230,8 @@ var chanGraphs = []struct { }, } -// TestConstrainedPrefAttachmentSelectEmptyGraph ensures that when passed en -// empty graph, the Select function always detects the state, and returns nil. -// Otherwise, it would be possible for the main Select loop to entire an -// infinite loop. +// TestConstrainedPrefAttachmentSelectEmptyGraph ensures that when passed an +// empty graph, the NodeSores function always returns a score of 0. func TestConstrainedPrefAttachmentSelectEmptyGraph(t *testing.T) { const ( minChanSize = 0 @@ -249,15 +247,18 @@ func TestConstrainedPrefAttachmentSelectEmptyGraph(t *testing.T) { Allocation: threshold, } - // First, we'll generate a random key that represents "us", and create - // a new instance of the heuristic with our set parameters. - self, err := randKey() - if err != nil { - t.Fatalf("unable to generate self key: %v", err) - } prefAttach := NewConstrainedPrefAttachment(constraints) - skipNodes := make(map[NodeID]struct{}) + // Create a random public key, which we will query to get a score for. + pub, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + + nodes := map[NodeID]struct{}{ + NewNodeID(pub): {}, + } + for _, graph := range chanGraphs { success := t.Run(graph.name, func(t1 *testing.T) { graph, cleanup, err := graph.genFunc() @@ -268,23 +269,21 @@ func TestConstrainedPrefAttachmentSelectEmptyGraph(t *testing.T) { defer cleanup() } - // With the necessary state initialized, we'll not - // attempt to select a set of candidates channel for - // creation given the current state of the graph. + // With the necessary state initialized, we'll now + // attempt to get the score for this one node. const walletFunds = btcutil.SatoshiPerBitcoin - directives, err := prefAttach.Select(self, graph, - walletFunds, 5, skipNodes) + scores, err := prefAttach.NodeScores(graph, nil, + walletFunds, nodes) if err != nil { t1.Fatalf("unable to select attachment "+ "directives: %v", err) } - // We shouldn't have selected any new directives as we - // started with an empty graph. - if len(directives) != 0 { - t1.Fatalf("zero attachment directives "+ - "should have been returned instead %v were", - len(directives)) + // Since the graph is empty, we expect the score to be + // 0, giving an empty return map. + if len(scores) != 0 { + t1.Fatalf("expected empty score map, "+ + "instead got %v ", len(scores)) } }) if !success { @@ -293,9 +292,50 @@ func TestConstrainedPrefAttachmentSelectEmptyGraph(t *testing.T) { } } +// completeGraph is a helper method that adds numNodes fully connected nodes to +// the graph. +func completeGraph(t *testing.T, g testGraph, numNodes int) { + const chanCapacity = btcutil.SatoshiPerBitcoin + nodes := make(map[int]*btcec.PublicKey) + for i := 0; i < numNodes; i++ { + for j := i + 1; j < numNodes; j++ { + + node1 := nodes[i] + node2 := nodes[j] + edge1, edge2, err := g.addRandChannel( + node1, node2, chanCapacity) + if err != nil { + t.Fatalf("unable to generate channel: %v", err) + } + + if node1 == nil { + pubKeyBytes := edge1.Peer.PubKey() + nodes[i], err = btcec.ParsePubKey( + pubKeyBytes[:], btcec.S256(), + ) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", + err) + } + } + + if node2 == nil { + pubKeyBytes := edge2.Peer.PubKey() + nodes[j], err = btcec.ParsePubKey( + pubKeyBytes[:], btcec.S256(), + ) + if err != nil { + t.Fatalf("unable to parse pubkey: %v", + err) + } + } + } + } +} + // TestConstrainedPrefAttachmentSelectTwoVertexes ensures that when passed a -// graph with only two eligible vertexes, then both are selected (without any -// repeats), and the funds are appropriately allocated across each peer. +// graph with only two eligible vertexes, then both are given the same score, +// and the funds are appropriately allocated across each peer. func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) { t.Parallel() @@ -314,7 +354,6 @@ func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) { ChanLimit: chanLimit, Allocation: threshold, } - skipNodes := make(map[NodeID]struct{}) for _, graph := range chanGraphs { success := t.Run(graph.name, func(t1 *testing.T) { graph, cleanup, err := graph.genFunc() @@ -325,13 +364,6 @@ func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) { defer cleanup() } - // First, we'll generate a random key that represents - // "us", and create a new instance of the heuristic - // with our set parameters. - self, err := randKey() - if err != nil { - t1.Fatalf("unable to generate self key: %v", err) - } prefAttach := NewConstrainedPrefAttachment(constraints) // For this set, we'll load the memory graph with two @@ -342,43 +374,67 @@ func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) { t1.Fatalf("unable to generate channel: %v", err) } - // With the necessary state initialized, we'll not - // attempt to select a set of candidates channel for - // creation given the current state of the graph. + // Get the score for all nodes found in the graph at + // this point. + nodes := make(map[NodeID]struct{}) + if err := graph.ForEachNode(func(n Node) error { + nodes[n.PubKey()] = struct{}{} + return nil + }); err != nil { + t1.Fatalf("unable to traverse graph: %v", err) + } + + if len(nodes) != 2 { + t1.Fatalf("expected 2 nodes, found %d", len(nodes)) + } + + // With the necessary state initialized, we'll now + // attempt to get our candidates channel score given + // the current state of the graph. const walletFunds = btcutil.SatoshiPerBitcoin * 10 - directives, err := prefAttach.Select(self, graph, - walletFunds, 2, skipNodes) + candidates, err := prefAttach.NodeScores(graph, nil, + walletFunds, nodes) if err != nil { - t1.Fatalf("unable to select attachment directives: %v", err) + t1.Fatalf("unable to select attachment "+ + "directives: %v", err) } - // Two new directives should have been selected, one - // for each node already present within the graph. - if len(directives) != 2 { - t1.Fatalf("two attachment directives should have been "+ - "returned instead %v were", len(directives)) + if len(candidates) != len(nodes) { + t1.Fatalf("all nodes should be scored, "+ + "instead %v were", len(candidates)) } - // The node attached to should be amongst the two edges + // The candidates should be amongst the two edges // created above. - for _, directive := range directives { + for nodeID, candidate := range candidates { edge1Pub := edge1.Peer.PubKey() edge2Pub := edge2.Peer.PubKey() switch { - case bytes.Equal(directive.NodeID[:], edge1Pub[:]): - case bytes.Equal(directive.NodeID[:], edge2Pub[:]): + case bytes.Equal(nodeID[:], edge1Pub[:]): + case bytes.Equal(nodeID[:], edge2Pub[:]): default: t1.Fatalf("attached to unknown node: %x", - directive.NodeID[:]) + nodeID[:]) } // As the number of funds available exceed the // max channel size, both edges should consume // the maximum channel size. - if directive.ChanAmt != maxChanSize { - t1.Fatalf("max channel size should be allocated, "+ - "instead %v was: ", maxChanSize) + if candidate.ChanAmt != maxChanSize { + t1.Fatalf("max channel size should be "+ + "allocated, instead %v was: ", + maxChanSize) + } + + // Since each of the nodes has 1 channel, out + // of only one channel in the graph, we expect + // their score to be 0.5. + expScore := float64(0.5) + if candidate.Score != expScore { + t1.Fatalf("expected candidate score "+ + "to be %v, instead was %v", + expScore, candidate.Score) } } }) @@ -410,7 +466,6 @@ func TestConstrainedPrefAttachmentSelectInsufficientFunds(t *testing.T) { Allocation: threshold, } - skipNodes := make(map[NodeID]struct{}) for _, graph := range chanGraphs { success := t.Run(graph.name, func(t1 *testing.T) { graph, cleanup, err := graph.genFunc() @@ -421,28 +476,36 @@ func TestConstrainedPrefAttachmentSelectInsufficientFunds(t *testing.T) { defer cleanup() } - // First, we'll generate a random key that represents - // "us", and create a new instance of the heuristic - // with our set parameters. - self, err := randKey() - if err != nil { - t1.Fatalf("unable to generate self key: %v", err) - } + // Add 10 nodes to the graph, with channels between + // them. + completeGraph(t, graph, 10) + prefAttach := NewConstrainedPrefAttachment(constraints) - // Next, we'll attempt to select a set of candidates, + nodes := make(map[NodeID]struct{}) + if err := graph.ForEachNode(func(n Node) error { + nodes[n.PubKey()] = struct{}{} + return nil + }); err != nil { + t1.Fatalf("unable to traverse graph: %v", err) + } + + // With the necessary state initialized, we'll now + // attempt to get the score for our list of nodes, // passing zero for the amount of wallet funds. This - // should return an empty slice of directives. - directives, err := prefAttach.Select(self, graph, 0, - 0, skipNodes) + // should return an all-zero score set. + scores, err := prefAttach.NodeScores(graph, nil, + 0, nodes) if err != nil { t1.Fatalf("unable to select attachment "+ "directives: %v", err) } - if len(directives) != 0 { - t1.Fatalf("zero attachment directives "+ - "should have been returned instead %v were", - len(directives)) + + // Since all should be given a score of 0, the map + // should be empty. + if len(scores) != 0 { + t1.Fatalf("expected empty score map, "+ + "instead got %v ", len(scores)) } }) if !success { @@ -452,9 +515,8 @@ func TestConstrainedPrefAttachmentSelectInsufficientFunds(t *testing.T) { } // TestConstrainedPrefAttachmentSelectGreedyAllocation tests that if upon -// deciding a set of candidates, we're unable to evenly split our funds, then -// we attempt to greedily allocate all funds to each selected vertex (up to the -// max channel size). +// returning node scores, the NodeScores method will attempt to greedily +// allocate all funds to each vertex (up to the max channel size). func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) { t.Parallel() @@ -474,7 +536,6 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) { Allocation: threshold, } - skipNodes := make(map[NodeID]struct{}) for _, graph := range chanGraphs { success := t.Run(graph.name, func(t1 *testing.T) { graph, cleanup, err := graph.genFunc() @@ -485,13 +546,6 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) { defer cleanup() } - // First, we'll generate a random key that represents - // "us", and create a new instance of the heuristic - // with our set parameters. - self, err := randKey() - if err != nil { - t1.Fatalf("unable to generate self key: %v", err) - } prefAttach := NewConstrainedPrefAttachment(constraints) const chanCapacity = btcutil.SatoshiPerBitcoin @@ -521,9 +575,10 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) { // graph, with node node having two edges. numNodes := 0 twoChans := false + nodes := make(map[NodeID]struct{}) if err := graph.ForEachNode(func(n Node) error { numNodes++ - + nodes[n.PubKey()] = struct{}{} numChans := 0 err := n.ForEachChannel(func(c ChannelEdge) error { numChans++ @@ -553,38 +608,61 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) { // result, the heuristic should try to greedily // allocate funds to channels. const availableBalance = btcutil.SatoshiPerBitcoin * 2.5 - directives, err := prefAttach.Select(self, graph, - availableBalance, 5, skipNodes) + scores, err := prefAttach.NodeScores(graph, nil, + availableBalance, nodes) if err != nil { t1.Fatalf("unable to select attachment "+ "directives: %v", err) } - // Three directives should have been returned. - if len(directives) != 3 { - t1.Fatalf("expected 3 directives, instead "+ - "got: %v", len(directives)) + if len(scores) != len(nodes) { + t1.Fatalf("all nodes should be scored, "+ + "instead %v were", len(scores)) } - // The two directive should have the max channel amount - // allocated. - if directives[0].ChanAmt != maxChanSize { - t1.Fatalf("expected recommendation of %v, "+ - "instead got %v", maxChanSize, - directives[0].ChanAmt) - } - if directives[1].ChanAmt != maxChanSize { - t1.Fatalf("expected recommendation of %v, "+ - "instead got %v", maxChanSize, - directives[1].ChanAmt) + // The candidates should have a non-zero score, and + // have the max chan size funds recommended channel + // size. + for _, candidate := range scores { + if candidate.Score == 0 { + t1.Fatalf("Expected non-zero score") + } + + if candidate.ChanAmt != maxChanSize { + t1.Fatalf("expected recommendation "+ + "of %v, instead got %v", + maxChanSize, candidate.ChanAmt) + } } - // The third channel should have been allocated the - // remainder, or 0.5 BTC. - if directives[2].ChanAmt != (btcutil.SatoshiPerBitcoin * 0.5) { - t1.Fatalf("expected recommendation of %v, "+ - "instead got %v", maxChanSize, - directives[2].ChanAmt) + // Imagine a few channels are being opened, and there's + // only 0.5 BTC left. That should leave us with channel + // candidates of that size. + const remBalance = btcutil.SatoshiPerBitcoin * 0.5 + scores, err = prefAttach.NodeScores(graph, nil, + remBalance, nodes) + if err != nil { + t1.Fatalf("unable to select attachment "+ + "directives: %v", err) + } + + if len(scores) != len(nodes) { + t1.Fatalf("all nodes should be scored, "+ + "instead %v were", len(scores)) + } + + // Check that the recommended channel sizes are now the + // remaining channel balance. + for _, candidate := range scores { + if candidate.Score == 0 { + t1.Fatalf("Expected non-zero score") + } + + if candidate.ChanAmt != remBalance { + t1.Fatalf("expected recommendation "+ + "of %v, instead got %v", + remBalance, candidate.ChanAmt) + } } }) if !success { @@ -594,8 +672,8 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) { } // TestConstrainedPrefAttachmentSelectSkipNodes ensures that if a node was -// already select for attachment, then that node is excluded from the set of -// candidate nodes. +// already selected as a channel counterparty, then that node will get a score +// of zero during scoring. func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) { t.Parallel() @@ -617,8 +695,6 @@ func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) { for _, graph := range chanGraphs { success := t.Run(graph.name, func(t1 *testing.T) { - skipNodes := make(map[NodeID]struct{}) - graph, cleanup, err := graph.genFunc() if err != nil { t1.Fatalf("unable to create graph: %v", err) @@ -627,13 +703,6 @@ func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) { defer cleanup() } - // First, we'll generate a random key that represents - // "us", and create a new instance of the heuristic - // with our set parameters. - self, err := randKey() - if err != nil { - t1.Fatalf("unable to generate self key: %v", err) - } prefAttach := NewConstrainedPrefAttachment(constraints) // Next, we'll create a simple topology of two nodes, @@ -645,44 +714,74 @@ func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) { t1.Fatalf("unable to create channel: %v", err) } - // With our graph created, we'll now execute the Select - // function to recommend potential attachment - // candidates. + nodes := make(map[NodeID]struct{}) + if err := graph.ForEachNode(func(n Node) error { + nodes[n.PubKey()] = struct{}{} + return nil + }); err != nil { + t1.Fatalf("unable to traverse graph: %v", err) + } + + if len(nodes) != 2 { + t1.Fatalf("expected 2 nodes, found %d", len(nodes)) + } + + // With our graph created, we'll now get the scores for + // all nodes in the graph. const availableBalance = btcutil.SatoshiPerBitcoin * 2.5 - directives, err := prefAttach.Select(self, graph, - availableBalance, 2, skipNodes) + scores, err := prefAttach.NodeScores(graph, nil, + availableBalance, nodes) if err != nil { t1.Fatalf("unable to select attachment "+ "directives: %v", err) } - // As the channel limit is three, and two nodes are - // present in the graph, both should be selected. - if len(directives) != 2 { - t1.Fatalf("expected two directives, instead "+ - "got %v", len(directives)) + if len(scores) != len(nodes) { + t1.Fatalf("all nodes should be scored, "+ + "instead %v were", len(scores)) + } + + // THey should all have a score, and a maxChanSize + // channel size recommendation. + for _, candidate := range scores { + if candidate.Score == 0 { + t1.Fatalf("Expected non-zero score") + } + + if candidate.ChanAmt != maxChanSize { + t1.Fatalf("expected recommendation "+ + "of %v, instead got %v", + maxChanSize, candidate.ChanAmt) + } } // We'll simulate a channel update by adding the nodes - // we just establish channel with the to set of nodes - // to be skipped. - skipNodes[directives[0].NodeID] = struct{}{} - skipNodes[directives[1].NodeID] = struct{}{} + // to our set of channels. + var chans []Channel + for _, candidate := range scores { + chans = append(chans, + Channel{ + Node: candidate.NodeID, + }, + ) + } - // If we attempt to make a call to the Select function, - // without providing any new information, then we - // should get no new directives as both nodes has - // already been attached to. - directives, err = prefAttach.Select(self, graph, - availableBalance, 2, skipNodes) + // If we attempt to make a call to the NodeScores + // function, without providing any new information, + // then all nodes should have a score of zero, since we + // already got channels to them. + scores, err = prefAttach.NodeScores(graph, chans, + availableBalance, nodes) if err != nil { t1.Fatalf("unable to select attachment "+ "directives: %v", err) } - if len(directives) != 0 { - t1.Fatalf("zero new directives should have been "+ - "selected, but %v were", len(directives)) + // Since all should be given a score of 0, the map + // should be empty. + if len(scores) != 0 { + t1.Fatalf("expected empty score map, "+ + "instead got %v ", len(scores)) } }) if !success { From e84bd298369bd2d983fea8b0e9af106dd482c588 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 22 Nov 2018 23:18:09 +0100 Subject: [PATCH 10/12] autopilot/agent: add weightedChoice and chooseN algorithm The algorithms will be used to select nodes at random from the weighted distribution set by the node's scores given by the heuristic. --- autopilot/agent.go | 70 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/autopilot/agent.go b/autopilot/agent.go index f4b25b9ac..26859a7f3 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -2,9 +2,11 @@ package autopilot import ( "fmt" + "math/rand" "net" "sync" "sync/atomic" + "time" "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcutil" @@ -197,6 +199,7 @@ func (a *Agent) Start() error { return nil } + rand.Seed(time.Now().Unix()) log.Infof("Autopilot Agent starting") a.wg.Add(1) @@ -362,6 +365,73 @@ func mergeChanState(pendingChans map[NodeID]Channel, return totalChans } +// weightedChoice draws a random index from the map of channel candidates, with +// a probability propotional to their score. +func weightedChoice(s map[NodeID]*AttachmentDirective) (NodeID, error) { + // Calculate the sum of scores found in the map. + var sum float64 + for _, v := range s { + sum += v.Score + } + + if sum <= 0 { + return NodeID{}, fmt.Errorf("non-positive sum") + } + + // Create a map of normalized scores such, that they sum to 1.0. + norm := make(map[NodeID]float64) + for k, v := range s { + norm[k] = v.Score / sum + } + + // Pick a random number in the range [0.0, 1.0), and iterate the map + // until the number goes below 0. This means that each index is picked + // with a probablity equal to their normalized score. + // + // Example: + // Items with scores [1, 5, 2, 2] + // Normalized scores [0.1, 0.5, 0.2, 0.2] + // Imagine they each occupy a "range" equal to their normalized score + // in [0, 1.0]: + // [|-0.1-||-----0.5-----||--0.2--||--0.2--|] + // The following loop is now equivalent to "hitting" the intervals. + r := rand.Float64() + for k, v := range norm { + r -= v + if r <= 0 { + return k, nil + } + } + return NodeID{}, fmt.Errorf("no choice made") +} + +// chooseN picks at random min[n, len(s)] nodes if from the +// AttachmentDirectives map, with a probability weighted by their score. +func chooseN(n int, s map[NodeID]*AttachmentDirective) ( + map[NodeID]*AttachmentDirective, error) { + + // Keep a map of nodes not yet choosen. + rem := make(map[NodeID]*AttachmentDirective) + for k, v := range s { + rem[k] = v + } + + // Pick a weighted choice from the remaining nodes as long as there are + // nodes left, and we haven't already picked n. + chosen := make(map[NodeID]*AttachmentDirective) + for len(chosen) < n && len(rem) > 0 { + choice, err := weightedChoice(rem) + if err != nil { + return nil, err + } + + chosen[choice] = rem[choice] + delete(rem, choice) + } + + return chosen, nil +} + // controller implements the closed-loop control system of the Agent. The // controller will make a decision w.r.t channel placement within the graph // based on: its current internal state of the set of active channels open, From b3d315298c6c7307a76e9c2638baf61fdd65eee5 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 22 Nov 2018 23:18:09 +0100 Subject: [PATCH 11/12] autopilot/agent: use NodeScores to select channel candidates This commit makes the autopilot agent use the new NodeScores heuristic API to select channel candiates, instead of the Select API. The result will be similar, but instead of selecting a set of nodes to open channels to, we get a score based results which can later be used together with other heuristics to choose nodes to open channels to. This commit also makes the existing autopilot agent tests compatible with the new NodeScores API. --- autopilot/agent.go | 54 ++++++++++--- autopilot/agent_test.go | 163 ++++++++++++++++++++++------------------ 2 files changed, 133 insertions(+), 84 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index 26859a7f3..be6480c4e 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -1,6 +1,7 @@ package autopilot import ( + "bytes" "fmt" "math/rand" "net" @@ -578,18 +579,49 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32, ) a.pendingMtx.Unlock() - // If we reach this point, then according to our heuristic we - // should modify our channel state to tend towards what it - // determines to the optimal state. So we'll call Select to get - // a fresh batch of attachment directives, passing in the - // amount of funds available for us to use. - chanCandidates, err := a.cfg.Heuristic.Select( - a.cfg.Self, a.cfg.Graph, availableFunds, - numChans, nodesToSkip, + // Gather the set of all nodes in the graph, except those we + // want to skip. + selfPubBytes := a.cfg.Self.SerializeCompressed() + nodes := make(map[NodeID]struct{}) + if err := a.cfg.Graph.ForEachNode(func(node Node) error { + nID := NodeID(node.PubKey()) + + // If we come across ourselves, them we'll continue in + // order to avoid attempting to make a channel with + // ourselves. + if bytes.Equal(nID[:], selfPubBytes) { + return nil + } + + // Additionally, if this node is in the blacklist, then + // we'll skip it. + if _, ok := nodesToSkip[nID]; ok { + return nil + } + + nodes[nID] = struct{}{} + return nil + }); err != nil { + return fmt.Errorf("unable to get graph nodes: %v", err) + } + + // Use the heuristic to calculate a score for each node in the + // graph. + scores, err := a.cfg.Heuristic.NodeScores( + a.cfg.Graph, totalChans, availableFunds, nodes, ) if err != nil { - return fmt.Errorf("Unable to select candidates for "+ - "attachment: %v", err) + return fmt.Errorf("unable to calculate node scores : %v", err) + } + + log.Debugf("Got scores for %d nodes", len(scores)) + + // Now use the score to make a weighted choice which + // nodes to attempt to open channels to. + chanCandidates, err := chooseN(int(numChans), scores) + if err != nil { + return fmt.Errorf("Unable to make weighted choice: %v", + err) } if len(chanCandidates) == 0 { @@ -630,7 +662,7 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32, a.pendingConns[nodeID] = struct{}{} a.wg.Add(1) - go a.executeDirective(chanCandidate) + go a.executeDirective(*chanCandidate) } return nil } diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index 0017eb459..9cf8aae28 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -29,8 +29,8 @@ type mockHeuristic struct { moreChansResps chan moreChansResp moreChanArgs chan moreChanArg - directiveResps chan []AttachmentDirective - directiveArgs chan directiveArg + nodeScoresResps chan map[NodeID]*AttachmentDirective + nodeScoresArgs chan directiveArg quit chan struct{} } @@ -60,44 +60,43 @@ func (m *mockHeuristic) NeedMoreChans(chans []Channel, } type directiveArg struct { - self *btcec.PublicKey graph ChannelGraph amt btcutil.Amount - skip map[NodeID]struct{} + chans []Channel + nodes map[NodeID]struct{} } func (m *mockHeuristic) Select(self *btcec.PublicKey, graph ChannelGraph, amtToUse btcutil.Amount, numChans uint32, skipChans map[NodeID]struct{}) ([]AttachmentDirective, error) { - - if m.directiveArgs != nil { - directive := directiveArg{ - self: self, - graph: graph, - amt: amtToUse, - skip: skipChans, - } - - select { - case m.directiveArgs <- directive: - case <-m.quit: - return nil, errors.New("exiting") - } - } - - select { - case resp := <-m.directiveResps: - return resp, nil - case <-m.quit: - return nil, errors.New("exiting") - } + return nil, nil } func (m *mockHeuristic) NodeScores(g ChannelGraph, chans []Channel, fundsAvailable btcutil.Amount, nodes map[NodeID]struct{}) ( map[NodeID]*AttachmentDirective, error) { - return nil, nil + if m.nodeScoresArgs != nil { + directive := directiveArg{ + graph: g, + amt: fundsAvailable, + chans: chans, + nodes: nodes, + } + + select { + case m.nodeScoresArgs <- directive: + case <-m.quit: + return nil, errors.New("exiting") + } + } + + select { + case resp := <-m.nodeScoresResps: + return resp, nil + case <-m.quit: + return nil, errors.New("exiting") + } } var _ AttachmentHeuristic = (*mockHeuristic)(nil) @@ -151,8 +150,8 @@ func TestAgentChannelOpenSignal(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent, 10), @@ -233,7 +232,7 @@ func TestAgentChannelOpenSignal(t *testing.T) { // If this send success, then Select was erroneously called and the // test should be failed. - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: t.Fatalf("Select was called but shouldn't have been") // This is the correct path as Select should've be called. @@ -276,8 +275,8 @@ func TestAgentChannelFailureSignal(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockFailingChanController{} memGraph, _, _ := newMemChanGraph() @@ -331,7 +330,7 @@ func TestAgentChannelFailureSignal(t *testing.T) { // At this point, the agent should now be querying the heuristic to // request attachment directives, return a fake so the agent will // attempt to open a channel. - var fakeDirective = AttachmentDirective{ + var fakeDirective = &AttachmentDirective{ NodeID: NewNodeID(self), ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -339,10 +338,13 @@ func TestAgentChannelFailureSignal(t *testing.T) { IP: bytes.Repeat([]byte("a"), 16), }, }, + Score: 0.5, } select { - case heuristic.directiveResps <- []AttachmentDirective{fakeDirective}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{ + NewNodeID(self): fakeDirective, + }: case <-time.After(time.Second * 10): t.Fatal("heuristic wasn't queried in time") } @@ -357,7 +359,7 @@ func TestAgentChannelFailureSignal(t *testing.T) { } select { - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: case <-time.After(time.Second * 10): t.Fatal("heuristic wasn't queried in time") } @@ -376,8 +378,8 @@ func TestAgentChannelCloseSignal(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -465,7 +467,7 @@ func TestAgentChannelCloseSignal(t *testing.T) { // If this send success, then Select was erroneously called and the // test should be failed. - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: t.Fatalf("Select was called but shouldn't have been") // This is the correct path as Select should've be called. @@ -486,8 +488,8 @@ func TestAgentBalanceUpdate(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -576,7 +578,7 @@ func TestAgentBalanceUpdate(t *testing.T) { // If this send success, then Select was erroneously called and the // test should be failed. - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: t.Fatalf("Select was called but shouldn't have been") // This is the correct path as Select should've be called. @@ -596,8 +598,8 @@ func TestAgentImmediateAttach(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -666,7 +668,7 @@ func TestAgentImmediateAttach(t *testing.T) { // At this point, the agent should now be querying the heuristic to // requests attachment directives. We'll generate 5 mock directives so // it can progress within its loop. - directives := make([]AttachmentDirective, numChans) + directives := make(map[NodeID]*AttachmentDirective) nodeKeys := make(map[NodeID]struct{}) for i := 0; i < numChans; i++ { pub, err := randKey() @@ -674,7 +676,7 @@ func TestAgentImmediateAttach(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } nodeID := NewNodeID(pub) - directives[i] = AttachmentDirective{ + directives[nodeID] = &AttachmentDirective{ NodeID: nodeID, ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -682,6 +684,7 @@ func TestAgentImmediateAttach(t *testing.T) { IP: bytes.Repeat([]byte("a"), 16), }, }, + Score: 0.5, } nodeKeys[nodeID] = struct{}{} } @@ -689,7 +692,7 @@ func TestAgentImmediateAttach(t *testing.T) { // With our fake directives created, we'll now send then to the agent // as a return value for the Select function. select { - case heuristic.directiveResps <- directives: + case heuristic.nodeScoresResps <- directives: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } @@ -711,6 +714,7 @@ func TestAgentImmediateAttach(t *testing.T) { nodeID) } delete(nodeKeys, nodeID) + case <-time.After(time.Second * 10): t.Fatalf("channel not opened in time") } @@ -729,8 +733,8 @@ func TestAgentPrivateChannels(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } // The chanController should be initialized such that all of its open // channel requests are for private channels. @@ -800,13 +804,13 @@ func TestAgentPrivateChannels(t *testing.T) { // At this point, the agent should now be querying the heuristic to // requests attachment directives. We'll generate 5 mock directives so // it can progress within its loop. - directives := make([]AttachmentDirective, numChans) + directives := make(map[NodeID]*AttachmentDirective) for i := 0; i < numChans; i++ { pub, err := randKey() if err != nil { t.Fatalf("unable to generate key: %v", err) } - directives[i] = AttachmentDirective{ + directives[NewNodeID(pub)] = &AttachmentDirective{ NodeID: NewNodeID(pub), ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -814,13 +818,14 @@ func TestAgentPrivateChannels(t *testing.T) { IP: bytes.Repeat([]byte("a"), 16), }, }, + Score: 0.5, } } // With our fake directives created, we'll now send then to the agent // as a return value for the Select function. select { - case heuristic.directiveResps <- directives: + case heuristic.nodeScoresResps <- directives: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } @@ -852,8 +857,8 @@ func TestAgentPendingChannelState(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -928,7 +933,7 @@ func TestAgentPendingChannelState(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } nodeID := NewNodeID(nodeKey) - nodeDirective := AttachmentDirective{ + nodeDirective := &AttachmentDirective{ NodeID: nodeID, ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -936,14 +941,18 @@ func TestAgentPendingChannelState(t *testing.T) { IP: bytes.Repeat([]byte("a"), 16), }, }, + Score: 0.5, } + select { - case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{ + nodeID: nodeDirective, + }: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } - heuristic.directiveArgs = make(chan directiveArg) + heuristic.nodeScoresArgs = make(chan directiveArg) // A request to open the channel should've also been sent. select { @@ -1006,12 +1015,12 @@ func TestAgentPendingChannelState(t *testing.T) { // Select method. The arguments passed should reflect the fact that the // node we have a pending channel to, should be ignored. select { - case req := <-heuristic.directiveArgs: - if len(req.skip) == 0 { + case req := <-heuristic.nodeScoresArgs: + if len(req.chans) == 0 { t.Fatalf("expected to skip %v nodes, instead "+ - "skipping %v", 1, len(req.skip)) + "skipping %v", 1, len(req.chans)) } - if _, ok := req.skip[nodeID]; !ok { + if req.chans[0].Node != nodeID { t.Fatalf("pending node not included in skip arguments") } case <-time.After(time.Second * 10): @@ -1032,8 +1041,8 @@ func TestAgentPendingOpenChannel(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -1096,7 +1105,7 @@ func TestAgentPendingOpenChannel(t *testing.T) { // There shouldn't be a call to the Select method as we've returned // "false" for NeedMoreChans above. select { - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: t.Fatalf("Select was called but shouldn't have been") default: } @@ -1117,8 +1126,8 @@ func TestAgentOnNodeUpdates(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -1174,7 +1183,7 @@ func TestAgentOnNodeUpdates(t *testing.T) { // Send over an empty list of attachment directives, which should cause // the agent to return to waiting on a new signal. select { - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: case <-time.After(time.Second * 10): t.Fatalf("Select was not called but should have been") } @@ -1200,7 +1209,7 @@ func TestAgentOnNodeUpdates(t *testing.T) { // It's not important that this list is also empty, so long as the node // updates signal is causing the agent to make this attempt. select { - case heuristic.directiveResps <- []AttachmentDirective{}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}: case <-time.After(time.Second * 10): t.Fatalf("Select was not called but should have been") } @@ -1222,8 +1231,8 @@ func TestAgentSkipPendingConns(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } heuristic := &mockHeuristic{ - moreChansResps: make(chan moreChansResp), - directiveResps: make(chan []AttachmentDirective), + moreChansResps: make(chan moreChansResp), + nodeScoresResps: make(chan map[NodeID]*AttachmentDirective), } chanController := &mockChanController{ openChanSignals: make(chan openChanIntent), @@ -1311,7 +1320,7 @@ func TestAgentSkipPendingConns(t *testing.T) { if err != nil { t.Fatalf("unable to generate key: %v", err) } - nodeDirective := AttachmentDirective{ + nodeDirective := &AttachmentDirective{ NodeID: NewNodeID(nodeKey), ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -1319,9 +1328,13 @@ func TestAgentSkipPendingConns(t *testing.T) { IP: bytes.Repeat([]byte("a"), 16), }, }, + Score: 0.5, } + select { - case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{ + NewNodeID(nodeKey): nodeDirective, + }: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } @@ -1349,7 +1362,9 @@ func TestAgentSkipPendingConns(t *testing.T) { // Send a directive for the same node, which already has a pending conn. select { - case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{ + NewNodeID(nodeKey): nodeDirective, + }: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } @@ -1386,7 +1401,9 @@ func TestAgentSkipPendingConns(t *testing.T) { // Send a directive for the same node, which already has a pending conn. select { - case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{ + NewNodeID(nodeKey): nodeDirective, + }: case <-time.After(time.Second * 10): t.Fatalf("heuristic wasn't queried in time") } From 6130189d95fa5cd396f58b84012ca1d2f35c3e6e Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 22 Nov 2018 23:18:09 +0100 Subject: [PATCH 12/12] autopilot/interface+agent: remove Select --- autopilot/agent_test.go | 6 -- autopilot/interface.go | 11 --- autopilot/prefattach.go | 181 ---------------------------------------- 3 files changed, 198 deletions(-) diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index 9cf8aae28..e3aee2642 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -66,12 +66,6 @@ type directiveArg struct { nodes map[NodeID]struct{} } -func (m *mockHeuristic) Select(self *btcec.PublicKey, graph ChannelGraph, - amtToUse btcutil.Amount, numChans uint32, - skipChans map[NodeID]struct{}) ([]AttachmentDirective, error) { - return nil, nil -} - func (m *mockHeuristic) NodeScores(g ChannelGraph, chans []Channel, fundsAvailable btcutil.Amount, nodes map[NodeID]struct{}) ( map[NodeID]*AttachmentDirective, error) { diff --git a/autopilot/interface.go b/autopilot/interface.go index 492a2ba3f..e0d0cb90d 100644 --- a/autopilot/interface.go +++ b/autopilot/interface.go @@ -121,17 +121,6 @@ type AttachmentHeuristic interface { // ideal state. NeedMoreChans(chans []Channel, balance btcutil.Amount) (btcutil.Amount, uint32, bool) - // Select is a method that given the current state of the channel - // graph, a set of nodes to ignore, and an amount of available funds, - // should return a set of attachment directives which describe which - // additional channels should be opened within the graph to push the - // heuristic back towards its equilibrium state. The numNewChans - // argument represents the additional number of channels that should be - // open. - Select(self *btcec.PublicKey, graph ChannelGraph, - amtToUse btcutil.Amount, numNewChans uint32, - skipNodes map[NodeID]struct{}) ([]AttachmentDirective, error) - // NodeScores is a method that given the current channel graph, current // set of local channels and funds available, scores the given nodes // according to the preference of opening a channel with them. The diff --git a/autopilot/prefattach.go b/autopilot/prefattach.go index 59f89bb26..d90437b89 100644 --- a/autopilot/prefattach.go +++ b/autopilot/prefattach.go @@ -1,8 +1,6 @@ package autopilot import ( - "bytes" - "fmt" prand "math/rand" "net" "time" @@ -72,185 +70,6 @@ func NewNodeID(pub *btcec.PublicKey) NodeID { return n } -// shuffleCandidates shuffles the set of candidate nodes for preferential -// attachment in order to break any ordering already enforced by the sorted -// order of the public key for each node. To shuffle the set of candidates, we -// use a version of the Fisher–Yates shuffle algorithm. -func shuffleCandidates(candidates []Node) []Node { - shuffledNodes := make([]Node, len(candidates)) - perm := prand.Perm(len(candidates)) - - for i, v := range perm { - shuffledNodes[v] = candidates[i] - } - - return shuffledNodes -} - -// Select returns a candidate set of attachment directives that should be -// executed based on the current internal state, the state of the channel -// graph, the set of nodes we should exclude, and the amount of funds -// available. The heuristic employed by this method is one that attempts to -// promote a scale-free network globally, via local attachment preferences for -// new nodes joining the network with an amount of available funds to be -// allocated to channels. Specifically, we consider the degree of each node -// (and the flow in/out of the node available via its open channels) and -// utilize the Barabási–Albert model to drive our recommended attachment -// heuristics. If implemented globally for each new participant, this results -// in a channel graph that is scale-free and follows a power law distribution -// with k=-3. -// -// NOTE: This is a part of the AttachmentHeuristic interface. -func (p *ConstrainedPrefAttachment) Select(self *btcec.PublicKey, g ChannelGraph, - fundsAvailable btcutil.Amount, numNewChans uint32, - skipNodes map[NodeID]struct{}) ([]AttachmentDirective, error) { - - // TODO(roasbeef): rename? - - var directives []AttachmentDirective - - if fundsAvailable < p.constraints.MinChanSize { - return directives, nil - } - - selfPubBytes := self.SerializeCompressed() - - // We'll continue our attachment loop until we've exhausted the current - // amount of available funds. - visited := make(map[NodeID]struct{}) - for i := uint32(0); i < numNewChans; i++ { - // selectionSlice will be used to randomly select a node - // according to a power law distribution. For each connected - // edge, we'll add an instance of the node to this slice. Thus, - // for a given node, the probability that we'll attach to it - // is: k_i / sum(k_j), where k_i is the degree of the target - // node, and k_j is the degree of all other nodes i != j. This - // implements the classic Barabási–Albert model for - // preferential attachment. - var selectionSlice []Node - - // For each node, and each channel that the node has, we'll add - // an instance of that node to the selection slice above. - // This'll slice where the frequency of each node is equivalent - // to the number of channels that connect to it. - // - // TODO(roasbeef): add noise to make adversarially resistant? - if err := g.ForEachNode(func(node Node) error { - nID := NodeID(node.PubKey()) - - // Once a node has already been attached to, we'll - // ensure that it isn't factored into any further - // decisions within this round. - if _, ok := visited[nID]; ok { - return nil - } - - // If we come across ourselves, them we'll continue in - // order to avoid attempting to make a channel with - // ourselves. - if bytes.Equal(nID[:], selfPubBytes) { - return nil - } - - // Additionally, if this node is in the blacklist, then - // we'll skip it. - if _, ok := skipNodes[nID]; ok { - return nil - } - - // For initial bootstrap purposes, if a node doesn't - // have any channels, then we'll ensure that it has at - // least one item in the selection slice. - // - // TODO(roasbeef): make conditional? - selectionSlice = append(selectionSlice, node) - - // For each active channel the node has, we'll add an - // additional channel to the selection slice to - // increase their weight. - if err := node.ForEachChannel(func(channel ChannelEdge) error { - selectionSlice = append(selectionSlice, node) - return nil - }); err != nil { - return err - } - - return nil - }); err != nil { - return nil, err - } - - // If no nodes at all were accumulated, then we'll exit early - // as there are no eligible candidates. - if len(selectionSlice) == 0 { - break - } - - // Given our selection slice, we'll now generate a random index - // into this slice. The node we select will be recommended by - // us to create a channel to. - candidates := shuffleCandidates(selectionSlice) - selectedIndex := prand.Int31n(int32(len(candidates))) - selectedNode := candidates[selectedIndex] - - // TODO(roasbeef): cap on num channels to same participant? - - // With the node selected, we'll add this (node, amount) tuple - // to out set of recommended directives. - pubBytes := selectedNode.PubKey() - nID := NodeID(pubBytes) - directives = append(directives, AttachmentDirective{ - NodeID: nID, - Addrs: selectedNode.Addrs(), - }) - - // With the node selected, we'll add it to the set of visited - // nodes to avoid attaching to it again. - visited[nID] = struct{}{} - } - - numSelectedNodes := int64(len(directives)) - switch { - // If we have enough available funds to distribute the maximum channel - // size for each of the selected peers to attach to, then we'll - // allocate the maximum amount to each peer. - case int64(fundsAvailable) >= numSelectedNodes*int64(p.constraints.MaxChanSize): - for i := 0; i < int(numSelectedNodes); i++ { - directives[i].ChanAmt = p.constraints.MaxChanSize - } - - return directives, nil - - // Otherwise, we'll greedily allocate our funds to the channels - // successively until we run out of available funds, or can't create a - // channel above the min channel size. - case int64(fundsAvailable) < numSelectedNodes*int64(p.constraints.MaxChanSize): - i := 0 - for fundsAvailable > p.constraints.MinChanSize { - // We'll attempt to allocate the max channel size - // initially. If we don't have enough funds to do this, - // then we'll allocate the remainder of the funds - // available to the channel. - delta := p.constraints.MaxChanSize - if fundsAvailable-delta < 0 { - delta = fundsAvailable - } - - directives[i].ChanAmt = delta - - fundsAvailable -= delta - i++ - } - - // We'll slice the initial set of directives to properly - // reflect the amount of funds we were able to allocate. - return directives[:i:i], nil - - default: - return nil, fmt.Errorf("err") - } -} - // NodeScores is a method that given the current channel graph, current set of // local channels and funds available, scores the given nodes according the the // preference of opening a channel with them.