Merge pull request #2006 from halseth/autpilot-score-attachement

Scoring based autopilot attachment heuristic
This commit is contained in:
Olaoluwa Osuntokun 2018-12-06 16:59:12 -08:00 committed by GitHub
commit c071a17ac3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 959 additions and 721 deletions

View file

@ -1,9 +1,13 @@
package autopilot
import (
"bytes"
"fmt"
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcutil"
@ -51,11 +55,9 @@ type Config struct {
// within the graph.
Graph ChannelGraph
// MaxPendingOpens is the maximum number of pending channel
// establishment goroutines that can be lingering. We cap this value in
// order to control the level of parallelism caused by the autopilot
// agent.
MaxPendingOpens uint16
// Constraints is the set of constraints the autopilot must adhere to
// when opening channels.
Constraints *HeuristicConstraints
// TODO(roasbeef): add additional signals from fee rates and revenue of
// currently opened channels
@ -145,6 +147,22 @@ type Agent struct {
// when the agent receives external balance update signals.
totalBalance btcutil.Amount
// failedNodes lists nodes that we've previously attempted to initiate
// channels with, but didn't succeed.
failedNodes map[NodeID]struct{}
// pendingConns tracks the nodes that we are attempting to make
// connections to. This prevents us from making duplicate connection
// requests to the same node.
pendingConns map[NodeID]struct{}
// pendingOpens tracks the channels that we've requested to be
// initiated, but haven't yet been confirmed as being fully opened.
// This state is required as otherwise, we may go over our allotted
// channel limit, or open multiple channels to the same node.
pendingOpens map[NodeID]Channel
pendingMtx sync.Mutex
quit chan struct{}
wg sync.WaitGroup
}
@ -163,6 +181,9 @@ func New(cfg Config, initialState []Channel) (*Agent, error) {
nodeUpdates: make(chan *nodeUpdates, 1),
chanOpenFailures: make(chan *chanOpenFailureUpdate, 1),
pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1),
failedNodes: make(map[NodeID]struct{}),
pendingConns: make(map[NodeID]struct{}),
pendingOpens: make(map[NodeID]Channel),
}
for _, c := range initialState {
@ -179,6 +200,7 @@ func (a *Agent) Start() error {
return nil
}
rand.Seed(time.Now().Unix())
log.Infof("Autopilot Agent starting")
a.wg.Add(1)
@ -344,6 +366,73 @@ func mergeChanState(pendingChans map[NodeID]Channel,
return totalChans
}
// weightedChoice draws a random index from the map of channel candidates, with
// a probability propotional to their score.
func weightedChoice(s map[NodeID]*AttachmentDirective) (NodeID, error) {
// Calculate the sum of scores found in the map.
var sum float64
for _, v := range s {
sum += v.Score
}
if sum <= 0 {
return NodeID{}, fmt.Errorf("non-positive sum")
}
// Create a map of normalized scores such, that they sum to 1.0.
norm := make(map[NodeID]float64)
for k, v := range s {
norm[k] = v.Score / sum
}
// Pick a random number in the range [0.0, 1.0), and iterate the map
// until the number goes below 0. This means that each index is picked
// with a probablity equal to their normalized score.
//
// Example:
// Items with scores [1, 5, 2, 2]
// Normalized scores [0.1, 0.5, 0.2, 0.2]
// Imagine they each occupy a "range" equal to their normalized score
// in [0, 1.0]:
// [|-0.1-||-----0.5-----||--0.2--||--0.2--|]
// The following loop is now equivalent to "hitting" the intervals.
r := rand.Float64()
for k, v := range norm {
r -= v
if r <= 0 {
return k, nil
}
}
return NodeID{}, fmt.Errorf("no choice made")
}
// chooseN picks at random min[n, len(s)] nodes if from the
// AttachmentDirectives map, with a probability weighted by their score.
func chooseN(n int, s map[NodeID]*AttachmentDirective) (
map[NodeID]*AttachmentDirective, error) {
// Keep a map of nodes not yet choosen.
rem := make(map[NodeID]*AttachmentDirective)
for k, v := range s {
rem[k] = v
}
// Pick a weighted choice from the remaining nodes as long as there are
// nodes left, and we haven't already picked n.
chosen := make(map[NodeID]*AttachmentDirective)
for len(chosen) < n && len(rem) > 0 {
choice, err := weightedChoice(rem)
if err != nil {
return nil, err
}
chosen[choice] = rem[choice]
delete(rem, choice)
}
return chosen, nil
}
// controller implements the closed-loop control system of the Agent. The
// controller will make a decision w.r.t channel placement within the graph
// based on: its current internal state of the set of active channels open,
@ -359,23 +448,6 @@ func (a *Agent) controller() {
// TODO(roasbeef): do we in fact need to maintain order?
// * use sync.Cond if so
// failedNodes lists nodes that we've previously attempted to initiate
// channels with, but didn't succeed.
failedNodes := make(map[NodeID]struct{})
// pendingConns tracks the nodes that we are attempting to make
// connections to. This prevents us from making duplicate connection
// requests to the same node.
pendingConns := make(map[NodeID]struct{})
// pendingOpens tracks the channels that we've requested to be
// initiated, but haven't yet been confirmed as being fully opened.
// This state is required as otherwise, we may go over our allotted
// channel limit, or open multiple channels to the same node.
pendingOpens := make(map[NodeID]Channel)
var pendingMtx sync.Mutex
updateBalance := func() {
newBalance, err := a.cfg.WalletBalance()
if err != nil {
@ -407,9 +479,9 @@ func (a *Agent) controller() {
newChan := update.newChan
a.chanState[newChan.ChanID] = newChan
pendingMtx.Lock()
delete(pendingOpens, newChan.Node)
pendingMtx.Unlock()
a.pendingMtx.Lock()
delete(a.pendingOpens, newChan.Node)
a.pendingMtx.Unlock()
updateBalance()
// A channel has been closed, this may free up an
@ -460,17 +532,17 @@ func (a *Agent) controller() {
return
}
pendingMtx.Lock()
log.Debugf("Pending channels: %v", spew.Sdump(pendingOpens))
pendingMtx.Unlock()
a.pendingMtx.Lock()
log.Debugf("Pending channels: %v", spew.Sdump(a.pendingOpens))
a.pendingMtx.Unlock()
// With all the updates applied, we'll obtain a set of the
// current active channels (confirmed channels), and also
// factor in our set of unconfirmed channels.
confirmedChans := a.chanState
pendingMtx.Lock()
totalChans := mergeChanState(pendingOpens, confirmedChans)
pendingMtx.Unlock()
a.pendingMtx.Lock()
totalChans := mergeChanState(a.pendingOpens, confirmedChans)
a.pendingMtx.Unlock()
// Now that we've updated our internal state, we'll consult our
// channel attachment heuristic to determine if we should open
@ -485,211 +557,230 @@ func (a *Agent) controller() {
log.Infof("Triggering attachment directive dispatch, "+
"total_funds=%v", a.totalBalance)
// We're to attempt an attachment so we'll obtain the set of
// nodes that we currently have channels with so we avoid
// duplicate edges.
connectedNodes := a.chanState.ConnectedNodes()
pendingMtx.Lock()
nodesToSkip := mergeNodeMaps(pendingOpens,
pendingConns, connectedNodes, failedNodes,
)
pendingMtx.Unlock()
// If we reach this point, then according to our heuristic we
// should modify our channel state to tend towards what it
// determines to the optimal state. So we'll call Select to get
// a fresh batch of attachment directives, passing in the
// amount of funds available for us to use.
chanCandidates, err := a.cfg.Heuristic.Select(
a.cfg.Self, a.cfg.Graph, availableFunds,
numChans, nodesToSkip,
)
err := a.openChans(availableFunds, numChans, totalChans)
if err != nil {
log.Errorf("Unable to select candidates for "+
"attachment: %v", err)
continue
log.Errorf("Unable to open channels: %v", err)
}
if len(chanCandidates) == 0 {
log.Infof("No eligible candidates to connect to")
continue
}
log.Infof("Attempting to execute channel attachment "+
"directives: %v", spew.Sdump(chanCandidates))
// Before proceeding, check to see if we have any slots
// available to open channels. If there are any, we will attempt
// to dispatch the retrieved directives since we can't be
// certain which ones may actually succeed. If too many
// connections succeed, we will they will be ignored and made
// available to future heuristic selections.
pendingMtx.Lock()
if uint16(len(pendingOpens)) >= a.cfg.MaxPendingOpens {
pendingMtx.Unlock()
log.Debugf("Reached cap of %v pending "+
"channel opens, will retry "+
"after success/failure",
a.cfg.MaxPendingOpens)
continue
}
// For each recommended attachment directive, we'll launch a
// new goroutine to attempt to carry out the directive. If any
// of these succeed, then we'll receive a new state update,
// taking us back to the top of our controller loop.
for _, chanCandidate := range chanCandidates {
// Skip candidates which we are already trying
// to establish a connection with.
nodeID := chanCandidate.NodeID
if _, ok := pendingConns[nodeID]; ok {
continue
}
pendingConns[nodeID] = struct{}{}
go func(directive AttachmentDirective) {
// We'll start out by attempting to connect to
// the peer in order to begin the funding
// workflow.
pub := directive.NodeKey
alreadyConnected, err := a.cfg.ConnectToPeer(
pub, directive.Addrs,
)
if err != nil {
log.Warnf("Unable to connect "+
"to %x: %v",
pub.SerializeCompressed(),
err)
// Since we failed to connect to them,
// we'll mark them as failed so that we
// don't attempt to connect to them
// again.
nodeID := NewNodeID(pub)
pendingMtx.Lock()
delete(pendingConns, nodeID)
failedNodes[nodeID] = struct{}{}
pendingMtx.Unlock()
// Finally, we'll trigger the agent to
// select new peers to connect to.
a.OnChannelOpenFailure()
return
}
// The connection was successful, though before
// progressing we must check that we have not
// already met our quota for max pending open
// channels. This can happen if multiple
// directives were spawned but fewer slots were
// available, and other successful attempts
// finished first.
pendingMtx.Lock()
if uint16(len(pendingOpens)) >=
a.cfg.MaxPendingOpens {
// Since we've reached our max number of
// pending opens, we'll disconnect this
// peer and exit. However, if we were
// previously connected to them, then
// we'll make sure to maintain the
// connection alive.
if alreadyConnected {
// Since we succeeded in
// connecting, we won't add this
// peer to the failed nodes map,
// but we will remove it from
// pendingConns so that it can
// be retried in the future.
delete(pendingConns, nodeID)
pendingMtx.Unlock()
return
}
err = a.cfg.DisconnectPeer(
pub,
)
if err != nil {
log.Warnf("Unable to "+
"disconnect peer "+
"%x: %v",
pub.SerializeCompressed(),
err)
}
// Now that we have disconnected, we can
// remove this node from our pending
// conns map, permitting subsequent
// connection attempts.
delete(pendingConns, nodeID)
pendingMtx.Unlock()
return
}
// If we were successful, we'll track this peer
// in our set of pending opens. We do this here
// to ensure we don't stall on selecting new
// peers if the connection attempt happens to
// take too long.
nodeID := directive.NodeID
delete(pendingConns, nodeID)
pendingOpens[nodeID] = Channel{
Capacity: directive.ChanAmt,
Node: nodeID,
}
pendingMtx.Unlock()
// We can then begin the funding workflow with
// this peer.
err = a.cfg.ChanController.OpenChannel(
pub, directive.ChanAmt,
)
if err != nil {
log.Warnf("Unable to open "+
"channel to %x of %v: %v",
pub.SerializeCompressed(),
directive.ChanAmt, err)
// As the attempt failed, we'll clear
// the peer from the set of pending
// opens and mark them as failed so we
// don't attempt to open a channel to
// them again.
pendingMtx.Lock()
delete(pendingOpens, nodeID)
failedNodes[nodeID] = struct{}{}
pendingMtx.Unlock()
// Trigger the agent to re-evaluate
// everything and possibly retry with a
// different node.
a.OnChannelOpenFailure()
// Finally, we should also disconnect
// the peer if we weren't already
// connected to them beforehand by an
// external subsystem.
if alreadyConnected {
return
}
err = a.cfg.DisconnectPeer(pub)
if err != nil {
log.Warnf("Unable to "+
"disconnect peer "+
"%x: %v",
pub.SerializeCompressed(),
err)
}
}
// Since the channel open was successful and is
// currently pending, we'll trigger the
// autopilot agent to query for more peers.
a.OnChannelPendingOpen()
}(chanCandidate)
}
pendingMtx.Unlock()
}
}
// openChans queries the agent's heuristic for a set of channel candidates, and
// attempts to open channels to them.
func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
totalChans []Channel) error {
// We're to attempt an attachment so we'll obtain the set of
// nodes that we currently have channels with so we avoid
// duplicate edges.
connectedNodes := a.chanState.ConnectedNodes()
a.pendingMtx.Lock()
nodesToSkip := mergeNodeMaps(a.pendingOpens,
a.pendingConns, connectedNodes, a.failedNodes,
)
a.pendingMtx.Unlock()
// Gather the set of all nodes in the graph, except those we
// want to skip.
selfPubBytes := a.cfg.Self.SerializeCompressed()
nodes := make(map[NodeID]struct{})
if err := a.cfg.Graph.ForEachNode(func(node Node) error {
nID := NodeID(node.PubKey())
// If we come across ourselves, them we'll continue in
// order to avoid attempting to make a channel with
// ourselves.
if bytes.Equal(nID[:], selfPubBytes) {
return nil
}
// Additionally, if this node is in the blacklist, then
// we'll skip it.
if _, ok := nodesToSkip[nID]; ok {
return nil
}
nodes[nID] = struct{}{}
return nil
}); err != nil {
return fmt.Errorf("unable to get graph nodes: %v", err)
}
// Use the heuristic to calculate a score for each node in the
// graph.
scores, err := a.cfg.Heuristic.NodeScores(
a.cfg.Graph, totalChans, availableFunds, nodes,
)
if err != nil {
return fmt.Errorf("unable to calculate node scores : %v", err)
}
log.Debugf("Got scores for %d nodes", len(scores))
// Now use the score to make a weighted choice which
// nodes to attempt to open channels to.
chanCandidates, err := chooseN(int(numChans), scores)
if err != nil {
return fmt.Errorf("Unable to make weighted choice: %v",
err)
}
if len(chanCandidates) == 0 {
log.Infof("No eligible candidates to connect to")
return nil
}
log.Infof("Attempting to execute channel attachment "+
"directives: %v", spew.Sdump(chanCandidates))
// Before proceeding, check to see if we have any slots
// available to open channels. If there are any, we will attempt
// to dispatch the retrieved directives since we can't be
// certain which ones may actually succeed. If too many
// connections succeed, we will they will be ignored and made
// available to future heuristic selections.
a.pendingMtx.Lock()
defer a.pendingMtx.Unlock()
if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens {
log.Debugf("Reached cap of %v pending "+
"channel opens, will retry "+
"after success/failure",
a.cfg.Constraints.MaxPendingOpens)
return nil
}
// For each recommended attachment directive, we'll launch a
// new goroutine to attempt to carry out the directive. If any
// of these succeed, then we'll receive a new state update,
// taking us back to the top of our controller loop.
for _, chanCandidate := range chanCandidates {
// Skip candidates which we are already trying
// to establish a connection with.
nodeID := chanCandidate.NodeID
if _, ok := a.pendingConns[nodeID]; ok {
continue
}
a.pendingConns[nodeID] = struct{}{}
a.wg.Add(1)
go a.executeDirective(*chanCandidate)
}
return nil
}
// executeDirective attempts to connect to the channel candidate specified by
// the given attachment directive, and open a channel of the given size.
//
// NOTE: MUST be run as a goroutine.
func (a *Agent) executeDirective(directive AttachmentDirective) {
defer a.wg.Done()
// We'll start out by attempting to connect to the peer in order to
// begin the funding workflow.
nodeID := directive.NodeID
pub, err := btcec.ParsePubKey(nodeID[:], btcec.S256())
if err != nil {
log.Errorf("Unable to parse pubkey %x: %v", nodeID, err)
return
}
alreadyConnected, err := a.cfg.ConnectToPeer(pub, directive.Addrs)
if err != nil {
log.Warnf("Unable to connect to %x: %v",
pub.SerializeCompressed(), err)
// Since we failed to connect to them, we'll mark them as
// failed so that we don't attempt to connect to them again.
a.pendingMtx.Lock()
delete(a.pendingConns, nodeID)
a.failedNodes[nodeID] = struct{}{}
a.pendingMtx.Unlock()
// Finally, we'll trigger the agent to select new peers to
// connect to.
a.OnChannelOpenFailure()
return
}
// The connection was successful, though before progressing we must
// check that we have not already met our quota for max pending open
// channels. This can happen if multiple directives were spawned but
// fewer slots were available, and other successful attempts finished
// first.
a.pendingMtx.Lock()
if uint16(len(a.pendingOpens)) >=
a.cfg.Constraints.MaxPendingOpens {
// Since we've reached our max number of pending opens, we'll
// disconnect this peer and exit. However, if we were
// previously connected to them, then we'll make sure to
// maintain the connection alive.
if alreadyConnected {
// Since we succeeded in connecting, we won't add this
// peer to the failed nodes map, but we will remove it
// from a.pendingConns so that it can be retried in the
// future.
delete(a.pendingConns, nodeID)
a.pendingMtx.Unlock()
return
}
err = a.cfg.DisconnectPeer(pub)
if err != nil {
log.Warnf("Unable to disconnect peer %x: %v",
pub.SerializeCompressed(), err)
}
// Now that we have disconnected, we can remove this node from
// our pending conns map, permitting subsequent connection
// attempts.
delete(a.pendingConns, nodeID)
a.pendingMtx.Unlock()
return
}
// If we were successful, we'll track this peer in our set of pending
// opens. We do this here to ensure we don't stall on selecting new
// peers if the connection attempt happens to take too long.
delete(a.pendingConns, nodeID)
a.pendingOpens[nodeID] = Channel{
Capacity: directive.ChanAmt,
Node: nodeID,
}
a.pendingMtx.Unlock()
// We can then begin the funding workflow with this peer.
err = a.cfg.ChanController.OpenChannel(pub, directive.ChanAmt)
if err != nil {
log.Warnf("Unable to open channel to %x of %v: %v",
pub.SerializeCompressed(), directive.ChanAmt, err)
// As the attempt failed, we'll clear the peer from the set of
// pending opens and mark them as failed so we don't attempt to
// open a channel to them again.
a.pendingMtx.Lock()
delete(a.pendingOpens, nodeID)
a.failedNodes[nodeID] = struct{}{}
a.pendingMtx.Unlock()
// Trigger the agent to re-evaluate everything and possibly
// retry with a different node.
a.OnChannelOpenFailure()
// Finally, we should also disconnect the peer if we weren't
// already connected to them beforehand by an external
// subsystem.
if alreadyConnected {
return
}
err = a.cfg.DisconnectPeer(pub)
if err != nil {
log.Warnf("Unable to disconnect peer %x: %v",
pub.SerializeCompressed(), err)
}
}
// Since the channel open was successful and is currently pending,
// we'll trigger the autopilot agent to query for more peers.
a.OnChannelPendingOpen()
}

View file

@ -29,8 +29,8 @@ type mockHeuristic struct {
moreChansResps chan moreChansResp
moreChanArgs chan moreChanArg
directiveResps chan []AttachmentDirective
directiveArgs chan directiveArg
nodeScoresResps chan map[NodeID]*AttachmentDirective
nodeScoresArgs chan directiveArg
quit chan struct{}
}
@ -60,33 +60,33 @@ func (m *mockHeuristic) NeedMoreChans(chans []Channel,
}
type directiveArg struct {
self *btcec.PublicKey
graph ChannelGraph
amt btcutil.Amount
skip map[NodeID]struct{}
chans []Channel
nodes map[NodeID]struct{}
}
func (m *mockHeuristic) Select(self *btcec.PublicKey, graph ChannelGraph,
amtToUse btcutil.Amount, numChans uint32,
skipChans map[NodeID]struct{}) ([]AttachmentDirective, error) {
func (m *mockHeuristic) NodeScores(g ChannelGraph, chans []Channel,
fundsAvailable btcutil.Amount, nodes map[NodeID]struct{}) (
map[NodeID]*AttachmentDirective, error) {
if m.directiveArgs != nil {
if m.nodeScoresArgs != nil {
directive := directiveArg{
self: self,
graph: graph,
amt: amtToUse,
skip: skipChans,
graph: g,
amt: fundsAvailable,
chans: chans,
nodes: nodes,
}
select {
case m.directiveArgs <- directive:
case m.nodeScoresArgs <- directive:
case <-m.quit:
return nil, errors.New("exiting")
}
}
select {
case resp := <-m.directiveResps:
case resp := <-m.nodeScoresResps:
return resp, nil
case <-m.quit:
return nil, errors.New("exiting")
@ -144,8 +144,8 @@ func TestAgentChannelOpenSignal(t *testing.T) {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
moreChansResps: make(chan moreChansResp),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent, 10),
@ -167,8 +167,10 @@ func TestAgentChannelOpenSignal(t *testing.T) {
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph,
MaxPendingOpens: 10,
Graph: memGraph,
Constraints: &HeuristicConstraints{
MaxPendingOpens: 10,
},
}
initialChans := []Channel{}
agent, err := New(testCfg, initialChans)
@ -224,7 +226,7 @@ func TestAgentChannelOpenSignal(t *testing.T) {
// If this send success, then Select was erroneously called and the
// test should be failed.
case heuristic.directiveResps <- []AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
t.Fatalf("Select was called but shouldn't have been")
// This is the correct path as Select should've be called.
@ -267,8 +269,8 @@ func TestAgentChannelFailureSignal(t *testing.T) {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
moreChansResps: make(chan moreChansResp),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
chanController := &mockFailingChanController{}
memGraph, _, _ := newMemChanGraph()
@ -288,8 +290,10 @@ func TestAgentChannelFailureSignal(t *testing.T) {
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph,
MaxPendingOpens: 10,
Graph: memGraph,
Constraints: &HeuristicConstraints{
MaxPendingOpens: 10,
},
}
initialChans := []Channel{}
@ -320,8 +324,7 @@ func TestAgentChannelFailureSignal(t *testing.T) {
// At this point, the agent should now be querying the heuristic to
// request attachment directives, return a fake so the agent will
// attempt to open a channel.
var fakeDirective = AttachmentDirective{
NodeKey: self,
var fakeDirective = &AttachmentDirective{
NodeID: NewNodeID(self),
ChanAmt: btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{
@ -329,10 +332,13 @@ func TestAgentChannelFailureSignal(t *testing.T) {
IP: bytes.Repeat([]byte("a"), 16),
},
},
Score: 0.5,
}
select {
case heuristic.directiveResps <- []AttachmentDirective{fakeDirective}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{
NewNodeID(self): fakeDirective,
}:
case <-time.After(time.Second * 10):
t.Fatal("heuristic wasn't queried in time")
}
@ -347,7 +353,7 @@ func TestAgentChannelFailureSignal(t *testing.T) {
}
select {
case heuristic.directiveResps <- []AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
case <-time.After(time.Second * 10):
t.Fatal("heuristic wasn't queried in time")
}
@ -366,8 +372,8 @@ func TestAgentChannelCloseSignal(t *testing.T) {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
moreChansResps: make(chan moreChansResp),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
@ -389,8 +395,10 @@ func TestAgentChannelCloseSignal(t *testing.T) {
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph,
MaxPendingOpens: 10,
Graph: memGraph,
Constraints: &HeuristicConstraints{
MaxPendingOpens: 10,
},
}
// We'll start the agent with two channels already being active.
@ -453,7 +461,7 @@ func TestAgentChannelCloseSignal(t *testing.T) {
// If this send success, then Select was erroneously called and the
// test should be failed.
case heuristic.directiveResps <- []AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
t.Fatalf("Select was called but shouldn't have been")
// This is the correct path as Select should've be called.
@ -474,8 +482,8 @@ func TestAgentBalanceUpdate(t *testing.T) {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
moreChansResps: make(chan moreChansResp),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
@ -503,8 +511,10 @@ func TestAgentBalanceUpdate(t *testing.T) {
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph,
MaxPendingOpens: 10,
Graph: memGraph,
Constraints: &HeuristicConstraints{
MaxPendingOpens: 10,
},
}
initialChans := []Channel{}
agent, err := New(testCfg, initialChans)
@ -562,7 +572,7 @@ func TestAgentBalanceUpdate(t *testing.T) {
// If this send success, then Select was erroneously called and the
// test should be failed.
case heuristic.directiveResps <- []AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
t.Fatalf("Select was called but shouldn't have been")
// This is the correct path as Select should've be called.
@ -582,8 +592,8 @@ func TestAgentImmediateAttach(t *testing.T) {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
moreChansResps: make(chan moreChansResp),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
@ -608,8 +618,10 @@ func TestAgentImmediateAttach(t *testing.T) {
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph,
MaxPendingOpens: 10,
Graph: memGraph,
Constraints: &HeuristicConstraints{
MaxPendingOpens: 10,
},
}
initialChans := []Channel{}
agent, err := New(testCfg, initialChans)
@ -650,7 +662,7 @@ func TestAgentImmediateAttach(t *testing.T) {
// At this point, the agent should now be querying the heuristic to
// requests attachment directives. We'll generate 5 mock directives so
// it can progress within its loop.
directives := make([]AttachmentDirective, numChans)
directives := make(map[NodeID]*AttachmentDirective)
nodeKeys := make(map[NodeID]struct{})
for i := 0; i < numChans; i++ {
pub, err := randKey()
@ -658,8 +670,7 @@ func TestAgentImmediateAttach(t *testing.T) {
t.Fatalf("unable to generate key: %v", err)
}
nodeID := NewNodeID(pub)
directives[i] = AttachmentDirective{
NodeKey: pub,
directives[nodeID] = &AttachmentDirective{
NodeID: nodeID,
ChanAmt: btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{
@ -667,6 +678,7 @@ func TestAgentImmediateAttach(t *testing.T) {
IP: bytes.Repeat([]byte("a"), 16),
},
},
Score: 0.5,
}
nodeKeys[nodeID] = struct{}{}
}
@ -674,7 +686,7 @@ func TestAgentImmediateAttach(t *testing.T) {
// With our fake directives created, we'll now send then to the agent
// as a return value for the Select function.
select {
case heuristic.directiveResps <- directives:
case heuristic.nodeScoresResps <- directives:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
@ -696,6 +708,7 @@ func TestAgentImmediateAttach(t *testing.T) {
nodeID)
}
delete(nodeKeys, nodeID)
case <-time.After(time.Second * 10):
t.Fatalf("channel not opened in time")
}
@ -714,8 +727,8 @@ func TestAgentPrivateChannels(t *testing.T) {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
moreChansResps: make(chan moreChansResp),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
// The chanController should be initialized such that all of its open
// channel requests are for private channels.
@ -743,8 +756,10 @@ func TestAgentPrivateChannels(t *testing.T) {
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph,
MaxPendingOpens: 10,
Graph: memGraph,
Constraints: &HeuristicConstraints{
MaxPendingOpens: 10,
},
}
agent, err := New(cfg, nil)
if err != nil {
@ -783,14 +798,13 @@ func TestAgentPrivateChannels(t *testing.T) {
// At this point, the agent should now be querying the heuristic to
// requests attachment directives. We'll generate 5 mock directives so
// it can progress within its loop.
directives := make([]AttachmentDirective, numChans)
directives := make(map[NodeID]*AttachmentDirective)
for i := 0; i < numChans; i++ {
pub, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
directives[i] = AttachmentDirective{
NodeKey: pub,
directives[NewNodeID(pub)] = &AttachmentDirective{
NodeID: NewNodeID(pub),
ChanAmt: btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{
@ -798,13 +812,14 @@ func TestAgentPrivateChannels(t *testing.T) {
IP: bytes.Repeat([]byte("a"), 16),
},
},
Score: 0.5,
}
}
// With our fake directives created, we'll now send then to the agent
// as a return value for the Select function.
select {
case heuristic.directiveResps <- directives:
case heuristic.nodeScoresResps <- directives:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
@ -836,8 +851,8 @@ func TestAgentPendingChannelState(t *testing.T) {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
moreChansResps: make(chan moreChansResp),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
@ -866,8 +881,10 @@ func TestAgentPendingChannelState(t *testing.T) {
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph,
MaxPendingOpens: 10,
Graph: memGraph,
Constraints: &HeuristicConstraints{
MaxPendingOpens: 10,
},
}
initialChans := []Channel{}
agent, err := New(testCfg, initialChans)
@ -910,8 +927,7 @@ func TestAgentPendingChannelState(t *testing.T) {
t.Fatalf("unable to generate key: %v", err)
}
nodeID := NewNodeID(nodeKey)
nodeDirective := AttachmentDirective{
NodeKey: nodeKey,
nodeDirective := &AttachmentDirective{
NodeID: nodeID,
ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{
@ -919,14 +935,18 @@ func TestAgentPendingChannelState(t *testing.T) {
IP: bytes.Repeat([]byte("a"), 16),
},
},
Score: 0.5,
}
select {
case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{
nodeID: nodeDirective,
}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
heuristic.directiveArgs = make(chan directiveArg)
heuristic.nodeScoresArgs = make(chan directiveArg)
// A request to open the channel should've also been sent.
select {
@ -989,12 +1009,12 @@ func TestAgentPendingChannelState(t *testing.T) {
// Select method. The arguments passed should reflect the fact that the
// node we have a pending channel to, should be ignored.
select {
case req := <-heuristic.directiveArgs:
if len(req.skip) == 0 {
case req := <-heuristic.nodeScoresArgs:
if len(req.chans) == 0 {
t.Fatalf("expected to skip %v nodes, instead "+
"skipping %v", 1, len(req.skip))
"skipping %v", 1, len(req.chans))
}
if _, ok := req.skip[nodeID]; !ok {
if req.chans[0].Node != nodeID {
t.Fatalf("pending node not included in skip arguments")
}
case <-time.After(time.Second * 10):
@ -1015,8 +1035,8 @@ func TestAgentPendingOpenChannel(t *testing.T) {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
moreChansResps: make(chan moreChansResp),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
@ -1035,8 +1055,10 @@ func TestAgentPendingOpenChannel(t *testing.T) {
WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil
},
Graph: memGraph,
MaxPendingOpens: 10,
Graph: memGraph,
Constraints: &HeuristicConstraints{
MaxPendingOpens: 10,
},
}
agent, err := New(cfg, nil)
if err != nil {
@ -1077,7 +1099,7 @@ func TestAgentPendingOpenChannel(t *testing.T) {
// There shouldn't be a call to the Select method as we've returned
// "false" for NeedMoreChans above.
select {
case heuristic.directiveResps <- []AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
t.Fatalf("Select was called but shouldn't have been")
default:
}
@ -1098,8 +1120,8 @@ func TestAgentOnNodeUpdates(t *testing.T) {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
moreChansResps: make(chan moreChansResp),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
@ -1118,8 +1140,10 @@ func TestAgentOnNodeUpdates(t *testing.T) {
WalletBalance: func() (btcutil.Amount, error) {
return walletBalance, nil
},
Graph: memGraph,
MaxPendingOpens: 10,
Graph: memGraph,
Constraints: &HeuristicConstraints{
MaxPendingOpens: 10,
},
}
agent, err := New(cfg, nil)
if err != nil {
@ -1153,7 +1177,7 @@ func TestAgentOnNodeUpdates(t *testing.T) {
// Send over an empty list of attachment directives, which should cause
// the agent to return to waiting on a new signal.
select {
case heuristic.directiveResps <- []AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
case <-time.After(time.Second * 10):
t.Fatalf("Select was not called but should have been")
}
@ -1179,7 +1203,7 @@ func TestAgentOnNodeUpdates(t *testing.T) {
// It's not important that this list is also empty, so long as the node
// updates signal is causing the agent to make this attempt.
select {
case heuristic.directiveResps <- []AttachmentDirective{}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{}:
case <-time.After(time.Second * 10):
t.Fatalf("Select was not called but should have been")
}
@ -1201,8 +1225,8 @@ func TestAgentSkipPendingConns(t *testing.T) {
t.Fatalf("unable to generate key: %v", err)
}
heuristic := &mockHeuristic{
moreChansResps: make(chan moreChansResp),
directiveResps: make(chan []AttachmentDirective),
moreChansResps: make(chan moreChansResp),
nodeScoresResps: make(chan map[NodeID]*AttachmentDirective),
}
chanController := &mockChanController{
openChanSignals: make(chan openChanIntent),
@ -1213,6 +1237,7 @@ func TestAgentSkipPendingConns(t *testing.T) {
const walletBalance = btcutil.SatoshiPerBitcoin * 6
connect := make(chan chan error)
quit := make(chan struct{})
// With the dependencies we created, we can now create the initial
// agent itself.
@ -1225,15 +1250,27 @@ func TestAgentSkipPendingConns(t *testing.T) {
},
ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) {
errChan := make(chan error)
connect <- errChan
err := <-errChan
return false, err
select {
case connect <- errChan:
case <-quit:
return false, errors.New("quit")
}
select {
case err := <-errChan:
return false, err
case <-quit:
return false, errors.New("quit")
}
},
DisconnectPeer: func(*btcec.PublicKey) error {
return nil
},
Graph: memGraph,
MaxPendingOpens: 10,
Graph: memGraph,
Constraints: &HeuristicConstraints{
MaxPendingOpens: 10,
},
}
initialChans := []Channel{}
agent, err := New(testCfg, initialChans)
@ -1252,6 +1289,11 @@ func TestAgentSkipPendingConns(t *testing.T) {
}
defer agent.Stop()
// We must defer the closing of quit after the defer agent.Stop(), to
// make sure ConnectToPeer won't block preventing the agent from
// exiting.
defer close(quit)
// We'll send an initial "yes" response to advance the agent past its
// initial check. This will cause it to try to get directives from the
// graph.
@ -1272,8 +1314,7 @@ func TestAgentSkipPendingConns(t *testing.T) {
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
nodeDirective := AttachmentDirective{
NodeKey: nodeKey,
nodeDirective := &AttachmentDirective{
NodeID: NewNodeID(nodeKey),
ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin,
Addrs: []net.Addr{
@ -1281,9 +1322,13 @@ func TestAgentSkipPendingConns(t *testing.T) {
IP: bytes.Repeat([]byte("a"), 16),
},
},
Score: 0.5,
}
select {
case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{
NewNodeID(nodeKey): nodeDirective,
}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
@ -1311,7 +1356,9 @@ func TestAgentSkipPendingConns(t *testing.T) {
// Send a directive for the same node, which already has a pending conn.
select {
case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{
NewNodeID(nodeKey): nodeDirective,
}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}
@ -1348,7 +1395,9 @@ func TestAgentSkipPendingConns(t *testing.T) {
// Send a directive for the same node, which already has a pending conn.
select {
case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}:
case heuristic.nodeScoresResps <- map[NodeID]*AttachmentDirective{
NewNodeID(nodeKey): nodeDirective,
}:
case <-time.After(time.Second * 10):
t.Fatalf("heuristic wasn't queried in time")
}

View file

@ -0,0 +1,76 @@
package autopilot
import (
"github.com/btcsuite/btcutil"
)
// HeuristicConstraints is a struct that indicate the constraints an autopilot
// heuristic must adhere to when opening channels.
type HeuristicConstraints struct {
// MinChanSize is the smallest channel that the autopilot agent should
// create.
MinChanSize btcutil.Amount
// MaxChanSize the largest channel that the autopilot agent should
// create.
MaxChanSize btcutil.Amount
// ChanLimit the maximum number of channels that should be created.
ChanLimit uint16
// Allocation the percentage of total funds that should be committed to
// automatic channel establishment.
Allocation float64
// MaxPendingOpens is the maximum number of pending channel
// establishment goroutines that can be lingering. We cap this value in
// order to control the level of parallelism caused by the autopilot
// agent.
MaxPendingOpens uint16
}
// availableChans returns the funds and number of channels slots the autopilot
// has available towards new channels, and still be within the set constraints.
func (h *HeuristicConstraints) availableChans(channels []Channel,
funds btcutil.Amount) (btcutil.Amount, uint32) {
// If we're already over our maximum allowed number of channels, then
// we'll instruct the controller not to create any more channels.
if len(channels) >= int(h.ChanLimit) {
return 0, 0
}
// The number of additional channels that should be opened is the
// difference between the channel limit, and the number of channels we
// already have open.
numAdditionalChans := uint32(h.ChanLimit) - uint32(len(channels))
// First, we'll tally up the total amount of funds that are currently
// present within the set of active channels.
var totalChanAllocation btcutil.Amount
for _, channel := range channels {
totalChanAllocation += channel.Capacity
}
// With this value known, we'll now compute the total amount of fund
// allocated across regular utxo's and channel utxo's.
totalFunds := funds + totalChanAllocation
// Once the total amount has been computed, we then calculate the
// fraction of funds currently allocated to channels.
fundsFraction := float64(totalChanAllocation) / float64(totalFunds)
// If this fraction is below our threshold, then we'll return true, to
// indicate the controller should call Select to obtain a candidate set
// of channels to attempt to open.
needMore := fundsFraction < h.Allocation
if !needMore {
return 0, 0
}
// Now that we know we need more funds, we'll compute the amount of
// additional funds we should allocate towards channels.
targetAllocation := btcutil.Amount(float64(totalFunds) * h.Allocation)
fundsAvailable := targetAllocation - totalChanAllocation
return fundsAvailable, numAdditionalChans
}

View file

@ -85,12 +85,10 @@ type ChannelGraph interface {
// AttachmentHeuristic. It details to which node a channel should be created
// to, and also the parameters which should be used in the channel creation.
type AttachmentDirective struct {
// NodeKey is the target node for this attachment directive. It can be
// identified by its public key, and therefore can be used along with
// a ChannelOpener implementation to execute the directive.
NodeKey *btcec.PublicKey
// NodeID is the serialized compressed pubkey of the target node.
// NodeID is the serialized compressed pubkey of the target node for
// this attachment directive. It can be identified by its public key,
// and therefore can be used along with a ChannelOpener implementation
// to execute the directive.
NodeID NodeID
// ChanAmt is the size of the channel that should be opened, expressed
@ -100,6 +98,10 @@ type AttachmentDirective struct {
// Addrs is a list of addresses that the target peer may be reachable
// at.
Addrs []net.Addr
// Score is the score given by the heuristic for opening a channel of
// the given size to this node.
Score float64
}
// AttachmentHeuristic is one of the primary interfaces within this package.
@ -119,16 +121,23 @@ type AttachmentHeuristic interface {
// ideal state.
NeedMoreChans(chans []Channel, balance btcutil.Amount) (btcutil.Amount, uint32, bool)
// Select is a method that given the current state of the channel
// graph, a set of nodes to ignore, and an amount of available funds,
// should return a set of attachment directives which describe which
// additional channels should be opened within the graph to push the
// heuristic back towards its equilibrium state. The numNewChans
// argument represents the additional number of channels that should be
// open.
Select(self *btcec.PublicKey, graph ChannelGraph,
amtToUse btcutil.Amount, numNewChans uint32,
skipNodes map[NodeID]struct{}) ([]AttachmentDirective, error)
// NodeScores is a method that given the current channel graph, current
// set of local channels and funds available, scores the given nodes
// according to the preference of opening a channel with them. The
// returned channel candidates maps the NodeID to an attachemnt
// directive containing a score and a channel size.
//
// The scores will be in the range [0, M], where 0 indicates no
// improvement in connectivity if a channel is opened to this node,
// while M is the maximum possible improvement in connectivity. The
// size of M is up to the implementation of this interface, so scores
// must be normalized if compared against other implementations.
//
// NOTE: A NodeID not found in the returned map is implicitly given a
// score of 0.
NodeScores(g ChannelGraph, chans []Channel,
fundsAvailable btcutil.Amount, nodes map[NodeID]struct{}) (
map[NodeID]*AttachmentDirective, error)
}
// ChannelController is a simple interface that allows an auto-pilot agent to

View file

@ -1,9 +1,8 @@
package autopilot
import (
"bytes"
"fmt"
prand "math/rand"
"net"
"time"
"github.com/btcsuite/btcd/btcec"
@ -22,28 +21,20 @@ import (
//
// TODO(roasbeef): BA, with k=-3
type ConstrainedPrefAttachment struct {
minChanSize btcutil.Amount
maxChanSize btcutil.Amount
chanLimit uint16
threshold float64
constraints *HeuristicConstraints
}
// NewConstrainedPrefAttachment creates a new instance of a
// ConstrainedPrefAttachment heuristics given bounds on allowed channel sizes,
// and an allocation amount which is interpreted as a percentage of funds that
// is to be committed to channels at all times.
func NewConstrainedPrefAttachment(minChanSize, maxChanSize btcutil.Amount,
chanLimit uint16, allocation float64) *ConstrainedPrefAttachment {
func NewConstrainedPrefAttachment(
cfg *HeuristicConstraints) *ConstrainedPrefAttachment {
prand.Seed(time.Now().Unix())
return &ConstrainedPrefAttachment{
minChanSize: minChanSize,
chanLimit: chanLimit,
maxChanSize: maxChanSize,
threshold: allocation,
constraints: cfg,
}
}
@ -61,45 +52,11 @@ var _ AttachmentHeuristic = (*ConstrainedPrefAttachment)(nil)
func (p *ConstrainedPrefAttachment) NeedMoreChans(channels []Channel,
funds btcutil.Amount) (btcutil.Amount, uint32, bool) {
// If we're already over our maximum allowed number of channels, then
// we'll instruct the controller not to create any more channels.
if len(channels) >= int(p.chanLimit) {
return 0, 0, false
}
// The number of additional channels that should be opened is the
// difference between the channel limit, and the number of channels we
// already have open.
numAdditionalChans := uint32(p.chanLimit) - uint32(len(channels))
// First, we'll tally up the total amount of funds that are currently
// present within the set of active channels.
var totalChanAllocation btcutil.Amount
for _, channel := range channels {
totalChanAllocation += channel.Capacity
}
// With this value known, we'll now compute the total amount of fund
// allocated across regular utxo's and channel utxo's.
totalFunds := funds + totalChanAllocation
// Once the total amount has been computed, we then calculate the
// fraction of funds currently allocated to channels.
fundsFraction := float64(totalChanAllocation) / float64(totalFunds)
// If this fraction is below our threshold, then we'll return true, to
// indicate the controller should call Select to obtain a candidate set
// of channels to attempt to open.
needMore := fundsFraction < p.threshold
if !needMore {
return 0, 0, false
}
// Now that we know we need more funds, we'll compute the amount of
// additional funds we should allocate towards channels.
targetAllocation := btcutil.Amount(float64(totalFunds) * p.threshold)
fundsAvailable := targetAllocation - totalChanAllocation
return fundsAvailable, numAdditionalChans, true
// We'll try to open more channels as long as the constraints allow it.
availableFunds, availableChans := p.constraints.availableChans(
channels, funds,
)
return availableFunds, availableChans, availableChans > 0
}
// NodeID is a simple type that holds an EC public key serialized in compressed
@ -113,189 +70,107 @@ func NewNodeID(pub *btcec.PublicKey) NodeID {
return n
}
// shuffleCandidates shuffles the set of candidate nodes for preferential
// attachment in order to break any ordering already enforced by the sorted
// order of the public key for each node. To shuffle the set of candidates, we
// use a version of the FisherYates shuffle algorithm.
func shuffleCandidates(candidates []Node) []Node {
shuffledNodes := make([]Node, len(candidates))
perm := prand.Perm(len(candidates))
for i, v := range perm {
shuffledNodes[v] = candidates[i]
}
return shuffledNodes
}
// Select returns a candidate set of attachment directives that should be
// executed based on the current internal state, the state of the channel
// graph, the set of nodes we should exclude, and the amount of funds
// available. The heuristic employed by this method is one that attempts to
// promote a scale-free network globally, via local attachment preferences for
// new nodes joining the network with an amount of available funds to be
// allocated to channels. Specifically, we consider the degree of each node
// (and the flow in/out of the node available via its open channels) and
// utilize the BarabásiAlbert model to drive our recommended attachment
// heuristics. If implemented globally for each new participant, this results
// in a channel graph that is scale-free and follows a power law distribution
// with k=-3.
// NodeScores is a method that given the current channel graph, current set of
// local channels and funds available, scores the given nodes according the the
// preference of opening a channel with them.
//
// The heuristic employed by this method is one that attempts to promote a
// scale-free network globally, via local attachment preferences for new nodes
// joining the network with an amount of available funds to be allocated to
// channels. Specifically, we consider the degree of each node (and the flow
// in/out of the node available via its open channels) and utilize the
// BarabásiAlbert model to drive our recommended attachment heuristics. If
// implemented globally for each new participant, this results in a channel
// graph that is scale-free and follows a power law distribution with k=-3.
//
// The returned scores will be in the range [0.0, 1.0], where higher scores are
// given to nodes already having high connectivity in the graph.
//
// NOTE: This is a part of the AttachmentHeuristic interface.
func (p *ConstrainedPrefAttachment) Select(self *btcec.PublicKey, g ChannelGraph,
fundsAvailable btcutil.Amount, numNewChans uint32,
skipNodes map[NodeID]struct{}) ([]AttachmentDirective, error) {
// TODO(roasbeef): rename?
var directives []AttachmentDirective
if fundsAvailable < p.minChanSize {
return directives, nil
}
selfPubBytes := self.SerializeCompressed()
// We'll continue our attachment loop until we've exhausted the current
// amount of available funds.
visited := make(map[NodeID]struct{})
for i := uint32(0); i < numNewChans; i++ {
// selectionSlice will be used to randomly select a node
// according to a power law distribution. For each connected
// edge, we'll add an instance of the node to this slice. Thus,
// for a given node, the probability that we'll attach to it
// is: k_i / sum(k_j), where k_i is the degree of the target
// node, and k_j is the degree of all other nodes i != j. This
// implements the classic BarabásiAlbert model for
// preferential attachment.
var selectionSlice []Node
// For each node, and each channel that the node has, we'll add
// an instance of that node to the selection slice above.
// This'll slice where the frequency of each node is equivalent
// to the number of channels that connect to it.
//
// TODO(roasbeef): add noise to make adversarially resistant?
if err := g.ForEachNode(func(node Node) error {
nID := NodeID(node.PubKey())
// Once a node has already been attached to, we'll
// ensure that it isn't factored into any further
// decisions within this round.
if _, ok := visited[nID]; ok {
return nil
}
// If we come across ourselves, them we'll continue in
// order to avoid attempting to make a channel with
// ourselves.
if bytes.Equal(nID[:], selfPubBytes) {
return nil
}
// Additionally, if this node is in the blacklist, then
// we'll skip it.
if _, ok := skipNodes[nID]; ok {
return nil
}
// For initial bootstrap purposes, if a node doesn't
// have any channels, then we'll ensure that it has at
// least one item in the selection slice.
//
// TODO(roasbeef): make conditional?
selectionSlice = append(selectionSlice, node)
// For each active channel the node has, we'll add an
// additional channel to the selection slice to
// increase their weight.
if err := node.ForEachChannel(func(channel ChannelEdge) error {
selectionSlice = append(selectionSlice, node)
return nil
}); err != nil {
return err
}
func (p *ConstrainedPrefAttachment) NodeScores(g ChannelGraph, chans []Channel,
fundsAvailable btcutil.Amount, nodes map[NodeID]struct{}) (
map[NodeID]*AttachmentDirective, error) {
// Count the number of channels in the graph. We'll also count the
// number of channels as we go for the nodes we are interested in, and
// record their addresses found in the db.
var graphChans int
nodeChanNum := make(map[NodeID]int)
addresses := make(map[NodeID][]net.Addr)
if err := g.ForEachNode(func(n Node) error {
var nodeChans int
err := n.ForEachChannel(func(_ ChannelEdge) error {
nodeChans++
graphChans++
return nil
}); err != nil {
return nil, err
}
// If no nodes at all were accumulated, then we'll exit early
// as there are no eligible candidates.
if len(selectionSlice) == 0 {
break
}
// Given our selection slice, we'll now generate a random index
// into this slice. The node we select will be recommended by
// us to create a channel to.
candidates := shuffleCandidates(selectionSlice)
selectedIndex := prand.Int31n(int32(len(candidates)))
selectedNode := candidates[selectedIndex]
// TODO(roasbeef): cap on num channels to same participant?
// With the node selected, we'll add this (node, amount) tuple
// to out set of recommended directives.
pubBytes := selectedNode.PubKey()
pub, err := btcec.ParsePubKey(pubBytes[:], btcec.S256())
if err != nil {
return nil, err
}
directives = append(directives, AttachmentDirective{
// TODO(roasbeef): need curve?
NodeKey: &btcec.PublicKey{
X: pub.X,
Y: pub.Y,
},
NodeID: NewNodeID(pub),
Addrs: selectedNode.Addrs(),
})
// With the node selected, we'll add it to the set of visited
// nodes to avoid attaching to it again.
visited[NodeID(pubBytes)] = struct{}{}
}
numSelectedNodes := int64(len(directives))
switch {
// If we have enough available funds to distribute the maximum channel
// size for each of the selected peers to attach to, then we'll
// allocate the maximum amount to each peer.
case int64(fundsAvailable) >= numSelectedNodes*int64(p.maxChanSize):
for i := 0; i < int(numSelectedNodes); i++ {
directives[i].ChanAmt = p.maxChanSize
if err != nil {
return err
}
return directives, nil
// Otherwise, we'll greedily allocate our funds to the channels
// successively until we run out of available funds, or can't create a
// channel above the min channel size.
case int64(fundsAvailable) < numSelectedNodes*int64(p.maxChanSize):
i := 0
for fundsAvailable > p.minChanSize {
// We'll attempt to allocate the max channel size
// initially. If we don't have enough funds to do this,
// then we'll allocate the remainder of the funds
// available to the channel.
delta := p.maxChanSize
if fundsAvailable-delta < 0 {
delta = fundsAvailable
}
directives[i].ChanAmt = delta
fundsAvailable -= delta
i++
// If this node is not among our nodes to score, we can return
// early.
nID := NodeID(n.PubKey())
if _, ok := nodes[nID]; !ok {
return nil
}
// We'll slice the initial set of directives to properly
// reflect the amount of funds we were able to allocate.
return directives[:i:i], nil
// Otherwise we'll record the number of channels, and also
// populate the address in our channel candidates map.
nodeChanNum[nID] = nodeChans
addresses[nID] = n.Addrs()
default:
return nil, fmt.Errorf("err")
return nil
}); err != nil {
return nil, err
}
// If there are no channels in the graph we cannot determine any
// preferences, so we return, indicating all candidates get a score of
// zero.
if graphChans == 0 {
return nil, nil
}
existingPeers := make(map[NodeID]struct{})
for _, c := range chans {
existingPeers[c.Node] = struct{}{}
}
// For each node in the set of nodes, count their fraction of channels
// in the graph, and use that as the score.
candidates := make(map[NodeID]*AttachmentDirective)
for nID, nodeChans := range nodeChanNum {
// As channel size we'll use the maximum channel size available.
chanSize := p.constraints.MaxChanSize
if fundsAvailable-chanSize < 0 {
chanSize = fundsAvailable
}
_, ok := existingPeers[nID]
switch {
// If the node is among or existing channel peers, we don't
// need another channel.
case ok:
continue
// If the amount is too small, we don't want to attempt opening
// another channel.
case chanSize == 0 || chanSize < p.constraints.MinChanSize:
continue
}
// Otherwise we score the node according to its fraction of
// channels in the graph.
score := float64(nodeChans) / float64(graphChans)
candidates[nID] = &AttachmentDirective{
NodeID: nID,
ChanAmt: chanSize,
Score: score,
}
}
return candidates, nil
}

View file

@ -29,6 +29,13 @@ func TestConstrainedPrefAttachmentNeedMoreChan(t *testing.T) {
threshold = 0.5
)
constraints := &HeuristicConstraints{
MinChanSize: minChanSize,
MaxChanSize: maxChanSize,
ChanLimit: chanLimit,
Allocation: threshold,
}
randChanID := func() lnwire.ShortChannelID {
return lnwire.NewShortChanIDFromInt(uint64(prand.Int63()))
}
@ -146,8 +153,7 @@ func TestConstrainedPrefAttachmentNeedMoreChan(t *testing.T) {
},
}
prefAttach := NewConstrainedPrefAttachment(minChanSize, maxChanSize,
chanLimit, threshold)
prefAttach := NewConstrainedPrefAttachment(constraints)
for i, testCase := range testCases {
amtToAllocate, numMore, needMore := prefAttach.NeedMoreChans(
@ -224,10 +230,8 @@ var chanGraphs = []struct {
},
}
// TestConstrainedPrefAttachmentSelectEmptyGraph ensures that when passed en
// empty graph, the Select function always detects the state, and returns nil.
// Otherwise, it would be possible for the main Select loop to entire an
// infinite loop.
// TestConstrainedPrefAttachmentSelectEmptyGraph ensures that when passed an
// empty graph, the NodeSores function always returns a score of 0.
func TestConstrainedPrefAttachmentSelectEmptyGraph(t *testing.T) {
const (
minChanSize = 0
@ -236,16 +240,25 @@ func TestConstrainedPrefAttachmentSelectEmptyGraph(t *testing.T) {
threshold = 0.5
)
// First, we'll generate a random key that represents "us", and create
// a new instance of the heuristic with our set parameters.
self, err := randKey()
if err != nil {
t.Fatalf("unable to generate self key: %v", err)
constraints := &HeuristicConstraints{
MinChanSize: minChanSize,
MaxChanSize: maxChanSize,
ChanLimit: chanLimit,
Allocation: threshold,
}
prefAttach := NewConstrainedPrefAttachment(constraints)
// Create a random public key, which we will query to get a score for.
pub, err := randKey()
if err != nil {
t.Fatalf("unable to generate key: %v", err)
}
nodes := map[NodeID]struct{}{
NewNodeID(pub): {},
}
prefAttach := NewConstrainedPrefAttachment(minChanSize, maxChanSize,
chanLimit, threshold)
skipNodes := make(map[NodeID]struct{})
for _, graph := range chanGraphs {
success := t.Run(graph.name, func(t1 *testing.T) {
graph, cleanup, err := graph.genFunc()
@ -256,23 +269,21 @@ func TestConstrainedPrefAttachmentSelectEmptyGraph(t *testing.T) {
defer cleanup()
}
// With the necessary state initialized, we'll not
// attempt to select a set of candidates channel for
// creation given the current state of the graph.
// With the necessary state initialized, we'll now
// attempt to get the score for this one node.
const walletFunds = btcutil.SatoshiPerBitcoin
directives, err := prefAttach.Select(self, graph,
walletFunds, 5, skipNodes)
scores, err := prefAttach.NodeScores(graph, nil,
walletFunds, nodes)
if err != nil {
t1.Fatalf("unable to select attachment "+
"directives: %v", err)
}
// We shouldn't have selected any new directives as we
// started with an empty graph.
if len(directives) != 0 {
t1.Fatalf("zero attachment directives "+
"should have been returned instead %v were",
len(directives))
// Since the graph is empty, we expect the score to be
// 0, giving an empty return map.
if len(scores) != 0 {
t1.Fatalf("expected empty score map, "+
"instead got %v ", len(scores))
}
})
if !success {
@ -281,9 +292,50 @@ func TestConstrainedPrefAttachmentSelectEmptyGraph(t *testing.T) {
}
}
// completeGraph is a helper method that adds numNodes fully connected nodes to
// the graph.
func completeGraph(t *testing.T, g testGraph, numNodes int) {
const chanCapacity = btcutil.SatoshiPerBitcoin
nodes := make(map[int]*btcec.PublicKey)
for i := 0; i < numNodes; i++ {
for j := i + 1; j < numNodes; j++ {
node1 := nodes[i]
node2 := nodes[j]
edge1, edge2, err := g.addRandChannel(
node1, node2, chanCapacity)
if err != nil {
t.Fatalf("unable to generate channel: %v", err)
}
if node1 == nil {
pubKeyBytes := edge1.Peer.PubKey()
nodes[i], err = btcec.ParsePubKey(
pubKeyBytes[:], btcec.S256(),
)
if err != nil {
t.Fatalf("unable to parse pubkey: %v",
err)
}
}
if node2 == nil {
pubKeyBytes := edge2.Peer.PubKey()
nodes[j], err = btcec.ParsePubKey(
pubKeyBytes[:], btcec.S256(),
)
if err != nil {
t.Fatalf("unable to parse pubkey: %v",
err)
}
}
}
}
}
// TestConstrainedPrefAttachmentSelectTwoVertexes ensures that when passed a
// graph with only two eligible vertexes, then both are selected (without any
// repeats), and the funds are appropriately allocated across each peer.
// graph with only two eligible vertexes, then both are given the same score,
// and the funds are appropriately allocated across each peer.
func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) {
t.Parallel()
@ -296,7 +348,12 @@ func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) {
threshold = 0.5
)
skipNodes := make(map[NodeID]struct{})
constraints := &HeuristicConstraints{
MinChanSize: minChanSize,
MaxChanSize: maxChanSize,
ChanLimit: chanLimit,
Allocation: threshold,
}
for _, graph := range chanGraphs {
success := t.Run(graph.name, func(t1 *testing.T) {
graph, cleanup, err := graph.genFunc()
@ -307,15 +364,7 @@ func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) {
defer cleanup()
}
// First, we'll generate a random key that represents
// "us", and create a new instance of the heuristic
// with our set parameters.
self, err := randKey()
if err != nil {
t1.Fatalf("unable to generate self key: %v", err)
}
prefAttach := NewConstrainedPrefAttachment(minChanSize, maxChanSize,
chanLimit, threshold)
prefAttach := NewConstrainedPrefAttachment(constraints)
// For this set, we'll load the memory graph with two
// nodes, and a random channel connecting them.
@ -325,43 +374,67 @@ func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) {
t1.Fatalf("unable to generate channel: %v", err)
}
// With the necessary state initialized, we'll not
// attempt to select a set of candidates channel for
// creation given the current state of the graph.
// Get the score for all nodes found in the graph at
// this point.
nodes := make(map[NodeID]struct{})
if err := graph.ForEachNode(func(n Node) error {
nodes[n.PubKey()] = struct{}{}
return nil
}); err != nil {
t1.Fatalf("unable to traverse graph: %v", err)
}
if len(nodes) != 2 {
t1.Fatalf("expected 2 nodes, found %d", len(nodes))
}
// With the necessary state initialized, we'll now
// attempt to get our candidates channel score given
// the current state of the graph.
const walletFunds = btcutil.SatoshiPerBitcoin * 10
directives, err := prefAttach.Select(self, graph,
walletFunds, 2, skipNodes)
candidates, err := prefAttach.NodeScores(graph, nil,
walletFunds, nodes)
if err != nil {
t1.Fatalf("unable to select attachment directives: %v", err)
t1.Fatalf("unable to select attachment "+
"directives: %v", err)
}
// Two new directives should have been selected, one
// for each node already present within the graph.
if len(directives) != 2 {
t1.Fatalf("two attachment directives should have been "+
"returned instead %v were", len(directives))
if len(candidates) != len(nodes) {
t1.Fatalf("all nodes should be scored, "+
"instead %v were", len(candidates))
}
// The node attached to should be amongst the two edges
// The candidates should be amongst the two edges
// created above.
for _, directive := range directives {
for nodeID, candidate := range candidates {
edge1Pub := edge1.Peer.PubKey()
edge2Pub := edge2.Peer.PubKey()
switch {
case bytes.Equal(directive.NodeKey.SerializeCompressed(), edge1Pub[:]):
case bytes.Equal(directive.NodeKey.SerializeCompressed(), edge2Pub[:]):
case bytes.Equal(nodeID[:], edge1Pub[:]):
case bytes.Equal(nodeID[:], edge2Pub[:]):
default:
t1.Fatalf("attached to unknown node: %x",
directive.NodeKey.SerializeCompressed())
nodeID[:])
}
// As the number of funds available exceed the
// max channel size, both edges should consume
// the maximum channel size.
if directive.ChanAmt != maxChanSize {
t1.Fatalf("max channel size should be allocated, "+
"instead %v was: ", maxChanSize)
if candidate.ChanAmt != maxChanSize {
t1.Fatalf("max channel size should be "+
"allocated, instead %v was: ",
maxChanSize)
}
// Since each of the nodes has 1 channel, out
// of only one channel in the graph, we expect
// their score to be 0.5.
expScore := float64(0.5)
if candidate.Score != expScore {
t1.Fatalf("expected candidate score "+
"to be %v, instead was %v",
expScore, candidate.Score)
}
}
})
@ -386,7 +459,13 @@ func TestConstrainedPrefAttachmentSelectInsufficientFunds(t *testing.T) {
threshold = 0.5
)
skipNodes := make(map[NodeID]struct{})
constraints := &HeuristicConstraints{
MinChanSize: minChanSize,
MaxChanSize: maxChanSize,
ChanLimit: chanLimit,
Allocation: threshold,
}
for _, graph := range chanGraphs {
success := t.Run(graph.name, func(t1 *testing.T) {
graph, cleanup, err := graph.genFunc()
@ -397,30 +476,36 @@ func TestConstrainedPrefAttachmentSelectInsufficientFunds(t *testing.T) {
defer cleanup()
}
// First, we'll generate a random key that represents
// "us", and create a new instance of the heuristic
// with our set parameters.
self, err := randKey()
if err != nil {
t1.Fatalf("unable to generate self key: %v", err)
}
prefAttach := NewConstrainedPrefAttachment(
minChanSize, maxChanSize, chanLimit, threshold,
)
// Add 10 nodes to the graph, with channels between
// them.
completeGraph(t, graph, 10)
// Next, we'll attempt to select a set of candidates,
prefAttach := NewConstrainedPrefAttachment(constraints)
nodes := make(map[NodeID]struct{})
if err := graph.ForEachNode(func(n Node) error {
nodes[n.PubKey()] = struct{}{}
return nil
}); err != nil {
t1.Fatalf("unable to traverse graph: %v", err)
}
// With the necessary state initialized, we'll now
// attempt to get the score for our list of nodes,
// passing zero for the amount of wallet funds. This
// should return an empty slice of directives.
directives, err := prefAttach.Select(self, graph, 0,
0, skipNodes)
// should return an all-zero score set.
scores, err := prefAttach.NodeScores(graph, nil,
0, nodes)
if err != nil {
t1.Fatalf("unable to select attachment "+
"directives: %v", err)
}
if len(directives) != 0 {
t1.Fatalf("zero attachment directives "+
"should have been returned instead %v were",
len(directives))
// Since all should be given a score of 0, the map
// should be empty.
if len(scores) != 0 {
t1.Fatalf("expected empty score map, "+
"instead got %v ", len(scores))
}
})
if !success {
@ -430,9 +515,8 @@ func TestConstrainedPrefAttachmentSelectInsufficientFunds(t *testing.T) {
}
// TestConstrainedPrefAttachmentSelectGreedyAllocation tests that if upon
// deciding a set of candidates, we're unable to evenly split our funds, then
// we attempt to greedily allocate all funds to each selected vertex (up to the
// max channel size).
// returning node scores, the NodeScores method will attempt to greedily
// allocate all funds to each vertex (up to the max channel size).
func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) {
t.Parallel()
@ -445,7 +529,13 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) {
threshold = 0.5
)
skipNodes := make(map[NodeID]struct{})
constraints := &HeuristicConstraints{
MinChanSize: minChanSize,
MaxChanSize: maxChanSize,
ChanLimit: chanLimit,
Allocation: threshold,
}
for _, graph := range chanGraphs {
success := t.Run(graph.name, func(t1 *testing.T) {
graph, cleanup, err := graph.genFunc()
@ -456,16 +546,7 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) {
defer cleanup()
}
// First, we'll generate a random key that represents
// "us", and create a new instance of the heuristic
// with our set parameters.
self, err := randKey()
if err != nil {
t1.Fatalf("unable to generate self key: %v", err)
}
prefAttach := NewConstrainedPrefAttachment(
minChanSize, maxChanSize, chanLimit, threshold,
)
prefAttach := NewConstrainedPrefAttachment(constraints)
const chanCapacity = btcutil.SatoshiPerBitcoin
@ -494,9 +575,10 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) {
// graph, with node node having two edges.
numNodes := 0
twoChans := false
nodes := make(map[NodeID]struct{})
if err := graph.ForEachNode(func(n Node) error {
numNodes++
nodes[n.PubKey()] = struct{}{}
numChans := 0
err := n.ForEachChannel(func(c ChannelEdge) error {
numChans++
@ -526,38 +608,61 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) {
// result, the heuristic should try to greedily
// allocate funds to channels.
const availableBalance = btcutil.SatoshiPerBitcoin * 2.5
directives, err := prefAttach.Select(self, graph,
availableBalance, 5, skipNodes)
scores, err := prefAttach.NodeScores(graph, nil,
availableBalance, nodes)
if err != nil {
t1.Fatalf("unable to select attachment "+
"directives: %v", err)
}
// Three directives should have been returned.
if len(directives) != 3 {
t1.Fatalf("expected 3 directives, instead "+
"got: %v", len(directives))
if len(scores) != len(nodes) {
t1.Fatalf("all nodes should be scored, "+
"instead %v were", len(scores))
}
// The two directive should have the max channel amount
// allocated.
if directives[0].ChanAmt != maxChanSize {
t1.Fatalf("expected recommendation of %v, "+
"instead got %v", maxChanSize,
directives[0].ChanAmt)
}
if directives[1].ChanAmt != maxChanSize {
t1.Fatalf("expected recommendation of %v, "+
"instead got %v", maxChanSize,
directives[1].ChanAmt)
// The candidates should have a non-zero score, and
// have the max chan size funds recommended channel
// size.
for _, candidate := range scores {
if candidate.Score == 0 {
t1.Fatalf("Expected non-zero score")
}
if candidate.ChanAmt != maxChanSize {
t1.Fatalf("expected recommendation "+
"of %v, instead got %v",
maxChanSize, candidate.ChanAmt)
}
}
// The third channel should have been allocated the
// remainder, or 0.5 BTC.
if directives[2].ChanAmt != (btcutil.SatoshiPerBitcoin * 0.5) {
t1.Fatalf("expected recommendation of %v, "+
"instead got %v", maxChanSize,
directives[2].ChanAmt)
// Imagine a few channels are being opened, and there's
// only 0.5 BTC left. That should leave us with channel
// candidates of that size.
const remBalance = btcutil.SatoshiPerBitcoin * 0.5
scores, err = prefAttach.NodeScores(graph, nil,
remBalance, nodes)
if err != nil {
t1.Fatalf("unable to select attachment "+
"directives: %v", err)
}
if len(scores) != len(nodes) {
t1.Fatalf("all nodes should be scored, "+
"instead %v were", len(scores))
}
// Check that the recommended channel sizes are now the
// remaining channel balance.
for _, candidate := range scores {
if candidate.Score == 0 {
t1.Fatalf("Expected non-zero score")
}
if candidate.ChanAmt != remBalance {
t1.Fatalf("expected recommendation "+
"of %v, instead got %v",
remBalance, candidate.ChanAmt)
}
}
})
if !success {
@ -567,8 +672,8 @@ func TestConstrainedPrefAttachmentSelectGreedyAllocation(t *testing.T) {
}
// TestConstrainedPrefAttachmentSelectSkipNodes ensures that if a node was
// already select for attachment, then that node is excluded from the set of
// candidate nodes.
// already selected as a channel counterparty, then that node will get a score
// of zero during scoring.
func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) {
t.Parallel()
@ -581,10 +686,15 @@ func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) {
threshold = 0.5
)
constraints := &HeuristicConstraints{
MinChanSize: minChanSize,
MaxChanSize: maxChanSize,
ChanLimit: chanLimit,
Allocation: threshold,
}
for _, graph := range chanGraphs {
success := t.Run(graph.name, func(t1 *testing.T) {
skipNodes := make(map[NodeID]struct{})
graph, cleanup, err := graph.genFunc()
if err != nil {
t1.Fatalf("unable to create graph: %v", err)
@ -593,16 +703,7 @@ func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) {
defer cleanup()
}
// First, we'll generate a random key that represents
// "us", and create a new instance of the heuristic
// with our set parameters.
self, err := randKey()
if err != nil {
t1.Fatalf("unable to generate self key: %v", err)
}
prefAttach := NewConstrainedPrefAttachment(
minChanSize, maxChanSize, chanLimit, threshold,
)
prefAttach := NewConstrainedPrefAttachment(constraints)
// Next, we'll create a simple topology of two nodes,
// with a single channel connecting them.
@ -613,44 +714,74 @@ func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) {
t1.Fatalf("unable to create channel: %v", err)
}
// With our graph created, we'll now execute the Select
// function to recommend potential attachment
// candidates.
nodes := make(map[NodeID]struct{})
if err := graph.ForEachNode(func(n Node) error {
nodes[n.PubKey()] = struct{}{}
return nil
}); err != nil {
t1.Fatalf("unable to traverse graph: %v", err)
}
if len(nodes) != 2 {
t1.Fatalf("expected 2 nodes, found %d", len(nodes))
}
// With our graph created, we'll now get the scores for
// all nodes in the graph.
const availableBalance = btcutil.SatoshiPerBitcoin * 2.5
directives, err := prefAttach.Select(self, graph,
availableBalance, 2, skipNodes)
scores, err := prefAttach.NodeScores(graph, nil,
availableBalance, nodes)
if err != nil {
t1.Fatalf("unable to select attachment "+
"directives: %v", err)
}
// As the channel limit is three, and two nodes are
// present in the graph, both should be selected.
if len(directives) != 2 {
t1.Fatalf("expected two directives, instead "+
"got %v", len(directives))
if len(scores) != len(nodes) {
t1.Fatalf("all nodes should be scored, "+
"instead %v were", len(scores))
}
// THey should all have a score, and a maxChanSize
// channel size recommendation.
for _, candidate := range scores {
if candidate.Score == 0 {
t1.Fatalf("Expected non-zero score")
}
if candidate.ChanAmt != maxChanSize {
t1.Fatalf("expected recommendation "+
"of %v, instead got %v",
maxChanSize, candidate.ChanAmt)
}
}
// We'll simulate a channel update by adding the nodes
// we just establish channel with the to set of nodes
// to be skipped.
skipNodes[NewNodeID(directives[0].NodeKey)] = struct{}{}
skipNodes[NewNodeID(directives[1].NodeKey)] = struct{}{}
// to our set of channels.
var chans []Channel
for _, candidate := range scores {
chans = append(chans,
Channel{
Node: candidate.NodeID,
},
)
}
// If we attempt to make a call to the Select function,
// without providing any new information, then we
// should get no new directives as both nodes has
// already been attached to.
directives, err = prefAttach.Select(self, graph,
availableBalance, 2, skipNodes)
// If we attempt to make a call to the NodeScores
// function, without providing any new information,
// then all nodes should have a score of zero, since we
// already got channels to them.
scores, err = prefAttach.NodeScores(graph, chans,
availableBalance, nodes)
if err != nil {
t1.Fatalf("unable to select attachment "+
"directives: %v", err)
}
if len(directives) != 0 {
t1.Fatalf("zero new directives should have been "+
"selected, but %v were", len(directives))
// Since all should be given a score of 0, the map
// should be empty.
if len(scores) != 0 {
t1.Fatalf("expected empty score map, "+
"instead got %v ", len(scores))
}
})
if !success {

View file

@ -85,12 +85,19 @@ var _ autopilot.ChannelController = (*chanController)(nil)
func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error) {
atplLog.Infof("Instantiating autopilot with cfg: %v", spew.Sdump(cfg))
// Set up the constraints the autopilot heuristics must adhere to.
atplConstraints := &autopilot.HeuristicConstraints{
MinChanSize: btcutil.Amount(cfg.MinChannelSize),
MaxChanSize: btcutil.Amount(cfg.MaxChannelSize),
ChanLimit: uint16(cfg.MaxChannels),
Allocation: cfg.Allocation,
MaxPendingOpens: 10,
}
// First, we'll create the preferential attachment heuristic,
// initialized with the passed auto pilot configuration parameters.
prefAttachment := autopilot.NewConstrainedPrefAttachment(
btcutil.Amount(cfg.MinChannelSize),
btcutil.Amount(cfg.MaxChannelSize),
uint16(cfg.MaxChannels), cfg.Allocation,
atplConstraints,
)
// With the heuristic itself created, we can now populate the remainder
@ -107,8 +114,8 @@ func initAutoPilot(svr *server, cfg *autoPilotConfig) (*autopilot.Agent, error)
WalletBalance: func() (btcutil.Amount, error) {
return svr.cc.wallet.ConfirmedBalance(cfg.MinConfs)
},
Graph: autopilot.ChannelGraphFromDatabase(svr.chanDB.ChannelGraph()),
MaxPendingOpens: 10,
Graph: autopilot.ChannelGraphFromDatabase(svr.chanDB.ChannelGraph()),
Constraints: atplConstraints,
ConnectToPeer: func(target *btcec.PublicKey, addrs []net.Addr) (bool, error) {
// First, we'll check if we're already connected to the
// target peer. If we are, we can exit early. Otherwise,