1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-03-13 11:35:47 +01:00

Only sync with top peers

By default we sync with every peer when reconnecting, which can be a lot. We now only sync on reconnection with our top peers (by capacity of our shared channels).
This commit is contained in:
Thomas HUET 2025-01-17 15:59:01 +01:00
parent 1c38591d0b
commit 288e50c2ac
14 changed files with 107 additions and 142 deletions

View file

@ -96,7 +96,6 @@ eclair {
# features { }
# }
]
sync-whitelist = [] // a list of public keys; if non-empty, we will only do the initial sync with those peers
channel {
channel-flags {
@ -393,6 +392,8 @@ eclair {
request-node-announcements = true // if true we will ask for node announcements when we receive channel ids that we don't know
channel-range-chunk-size = 1500 // max number of short_channel_ids (+ timestamps + checksums) in reply_channel_range *do not change this unless you know what you are doing*
channel-query-chunk-size = 100 // max number of short_channel_ids in query_short_channel_ids *do not change this unless you know what you are doing*
peer-limit = 10 // number of peers to do the initial sync with. We limit the initial sync to the peers that have the largest capacity with us.
whitelist = [] // a list of public keys to do the initial sync with, in addition to the top peers by capacity
}
message-path-finding {

View file

@ -35,7 +35,7 @@ import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, Re
import fr.acinq.eclair.router.Announcements.AddressException
import fr.acinq.eclair.router.Graph.{HeuristicsConstants, PaymentWeightRatios}
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.router.{Graph, PathFindingExperimentConf}
import fr.acinq.eclair.router.{Graph, PathFindingExperimentConf, Router}
import fr.acinq.eclair.tor.Socks5ProxyParams
import fr.acinq.eclair.transactions.Transactions
import fr.acinq.eclair.wire.protocol._
@ -66,7 +66,6 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
torAddress_opt: Option[NodeAddress],
features: Features[Feature],
private val overrideInitFeatures: Map[PublicKey, Features[InitFeature]],
syncWhitelist: Set[PublicKey],
pluginParams: Seq[PluginParams],
channelConf: ChannelConf,
onChainFeeConf: OnChainFeeConf,
@ -412,8 +411,6 @@ object NodeParams extends Logging {
p -> (f.copy(unknown = f.unknown ++ pluginMessageParams.map(_.pluginFeature)): Features[InitFeature])
}.toMap
val syncWhitelist: Set[PublicKey] = config.getStringList("sync-whitelist").asScala.map(s => PublicKey(ByteVector.fromValidHex(s))).toSet
val socksProxy_opt = parseSocks5ProxyParams(config)
val publicTorAddress_opt = if (config.getBoolean("tor.publish-onion-address")) torAddress_opt else None
@ -560,7 +557,6 @@ object NodeParams extends Logging {
features = coreAndPluginFeatures,
pluginParams = pluginParams,
overrideInitFeatures = overrideInitFeatures,
syncWhitelist = syncWhitelist,
channelConf = ChannelConf(
channelFlags = channelFlags,
dustLimit = dustLimitSatoshis,
@ -657,10 +653,14 @@ object NodeParams extends Logging {
watchSpentWindow = watchSpentWindow,
channelExcludeDuration = FiniteDuration(config.getDuration("router.channel-exclude-duration").getSeconds, TimeUnit.SECONDS),
routerBroadcastInterval = FiniteDuration(config.getDuration("router.broadcast-interval").getSeconds, TimeUnit.SECONDS),
requestNodeAnnouncements = config.getBoolean("router.sync.request-node-announcements"),
encodingType = EncodingType.UNCOMPRESSED,
channelRangeChunkSize = config.getInt("router.sync.channel-range-chunk-size"),
channelQueryChunkSize = config.getInt("router.sync.channel-query-chunk-size"),
syncConf = Router.SyncConf(
requestNodeAnnouncements = config.getBoolean("router.sync.request-node-announcements"),
channelRangeChunkSize = config.getInt("router.sync.channel-range-chunk-size"),
channelQueryChunkSize = config.getInt("router.sync.channel-query-chunk-size"),
peerLimit = config.getInt("router.sync.peer-limit"),
whitelist = config.getStringList("router.sync.whitelist").asScala.map(s => PublicKey(ByteVector.fromValidHex(s))).toSet
),
pathFindingExperimentConf = getPathFindingExperimentConf(config.getConfig("router.path-finding.experiments")),
messageRouteParams = getMessageRouteParams(config.getConfig("router.message-path-finding")),
balanceEstimateHalfLife = FiniteDuration(config.getDuration("router.balance-estimate-half-life").getSeconds, TimeUnit.SECONDS),

View file

@ -60,7 +60,7 @@ private class IncomingConnectionsTracker(nodeParams: NodeParams, switchboard: Ac
Metrics.IncomingConnectionsNoChannels.withoutTags().update(incomingConnections.size)
Behaviors.receiveMessage {
case TrackIncomingConnection(remoteNodeId) =>
if (nodeParams.syncWhitelist.contains(remoteNodeId)) {
if (nodeParams.routerConf.syncConf.whitelist.contains(remoteNodeId)) {
Behaviors.same
} else {
if (incomingConnections.size >= nodeParams.peerConnectionConf.maxNoChannels) {

View file

@ -474,7 +474,7 @@ class Peer(val nodeParams: NodeParams,
log.info(s"channel id switch: previousId=$temporaryChannelId nextId=$channelId")
// we have our first channel with that peer: let's sync our routing table
if (!d.channels.keys.exists(_.isInstanceOf[FinalChannelId])) {
d.peerConnection ! PeerConnection.DoSync(replacePrevious = false)
d.peerConnection ! PeerConnection.DoSync(isReconnection = false)
}
// NB: we keep the temporary channel id because the switch is not always acknowledged at this point (see https://github.com/lightningnetwork/lightning-rfc/pull/151)
// we won't clean it up, but we won't remember the temporary id on channel termination

View file

@ -103,7 +103,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
}
when(BEFORE_INIT) {
case Event(InitializeConnection(peer, chainHash, localFeatures, doSync, fundingRates_opt), d: BeforeInitData) =>
case Event(InitializeConnection(peer, chainHash, localFeatures, fundingRates_opt), d: BeforeInitData) =>
d.transport ! TransportHandler.Listener(self)
Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Initializing).increment()
log.debug(s"using features=$localFeatures")
@ -120,7 +120,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
d.transport ! localInit
startSingleTimer(INIT_TIMER, InitTimeout, conf.initTimeout)
unstashAll() // unstash remote init if it already arrived
goto(INITIALIZING) using InitializingData(chainHash, d.pendingAuth, d.remoteNodeId, d.transport, peer, localInit, doSync, d.isPersistent)
goto(INITIALIZING) using InitializingData(chainHash, d.pendingAuth, d.remoteNodeId, d.transport, peer, localInit, d.isPersistent)
case Event(_: protocol.Init, _) =>
log.debug("stashing remote init")
@ -160,11 +160,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
d.peer ! ConnectionReady(self, d.remoteNodeId, d.pendingAuth.address, d.pendingAuth.outgoing, d.localInit, remoteInit)
d.pendingAuth.origin_opt.foreach(_ ! ConnectionResult.Connected(self, d.peer))
if (d.doSync) {
self ! DoSync(replacePrevious = true)
} else {
log.info("not syncing with this peer")
}
self ! DoSync(isReconnection = true)
// we will delay all rebroadcasts with this value in order to prevent herd effects (each peer has a different delay)
val rebroadcastDelay = Random.nextInt(conf.maxRebroadcastDelay.toSeconds.toInt).seconds
@ -399,13 +395,12 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
}
stay() using d.copy(behavior = behavior1)
case Event(DoSync(replacePrevious), d: ConnectedData) =>
case Event(DoSync(isReconnection), d: ConnectedData) =>
val canUseChannelRangeQueries = Features.canUseFeature(d.localInit.features, d.remoteInit.features, Features.ChannelRangeQueries)
val canUseChannelRangeQueriesEx = Features.canUseFeature(d.localInit.features, d.remoteInit.features, Features.ChannelRangeQueriesExtended)
if (canUseChannelRangeQueries || canUseChannelRangeQueriesEx) {
val flags_opt = if (canUseChannelRangeQueriesEx) Some(QueryChannelRangeTlv.QueryFlags(QueryChannelRangeTlv.QueryFlags.WANT_ALL)) else None
log.debug(s"sending sync channel range query with flags_opt=$flags_opt replacePrevious=$replacePrevious")
router ! SendChannelQuery(d.chainHash, d.remoteNodeId, self, replacePrevious, flags_opt)
router ! SendChannelQuery(d.chainHash, d.remoteNodeId, self, isReconnection, flags_opt)
}
stay()
@ -536,7 +531,7 @@ object PeerConnection {
case object SendPing
case object KillIdle
case object ResumeAnnouncements
case class DoSync(replacePrevious: Boolean) extends RemoteTypes
case class DoSync(isReconnection: Boolean) extends RemoteTypes
// @formatter:on
val IGNORE_NETWORK_ANNOUNCEMENTS_PERIOD: FiniteDuration = 5 minutes
@ -563,7 +558,7 @@ object PeerConnection {
case object Nothing extends Data
case class AuthenticatingData(pendingAuth: PendingAuth, transport: ActorRef, isPersistent: Boolean) extends Data with HasTransport
case class BeforeInitData(remoteNodeId: PublicKey, pendingAuth: PendingAuth, transport: ActorRef, isPersistent: Boolean) extends Data with HasTransport
case class InitializingData(chainHash: BlockHash, pendingAuth: PendingAuth, remoteNodeId: PublicKey, transport: ActorRef, peer: ActorRef, localInit: protocol.Init, doSync: Boolean, isPersistent: Boolean) extends Data with HasTransport
case class InitializingData(chainHash: BlockHash, pendingAuth: PendingAuth, remoteNodeId: PublicKey, transport: ActorRef, peer: ActorRef, localInit: protocol.Init, isPersistent: Boolean) extends Data with HasTransport
case class ConnectedData(chainHash: BlockHash, remoteNodeId: PublicKey, transport: ActorRef, peer: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, rebroadcastDelay: FiniteDuration, gossipTimestampFilter: Option[GossipTimestampFilter] = None, behavior: Behavior = Behavior(), expectedPong_opt: Option[ExpectedPong] = None, isPersistent: Boolean) extends Data with HasTransport
case class ExpectedPong(ping: Ping, timestamp: TimestampMilli = TimestampMilli.now())
@ -580,7 +575,7 @@ object PeerConnection {
def outgoing: Boolean = remoteNodeId_opt.isDefined // if this is an outgoing connection, we know the node id in advance
}
case class Authenticated(peerConnection: ActorRef, remoteNodeId: PublicKey, outgoing: Boolean) extends RemoteTypes
case class InitializeConnection(peer: ActorRef, chainHash: BlockHash, features: Features[InitFeature], doSync: Boolean, fundingRates_opt: Option[LiquidityAds.WillFundRates]) extends RemoteTypes
case class InitializeConnection(peer: ActorRef, chainHash: BlockHash, features: Features[InitFeature], fundingRates_opt: Option[LiquidityAds.WillFundRates]) extends RemoteTypes
case class ConnectionReady(peerConnection: ActorRef, remoteNodeId: PublicKey, address: NodeAddress, outgoing: Boolean, localInit: protocol.Init, remoteInit: protocol.Init) extends RemoteTypes
sealed trait ConnectionResult extends RemoteTypes

View file

@ -110,9 +110,7 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory)
val peer = createOrGetPeer(authenticated.remoteNodeId, offlineChannels = Set.empty, pendingOnTheFlyFunding = Map.empty)
val features = nodeParams.initFeaturesFor(authenticated.remoteNodeId)
val hasChannels = peersWithChannels.contains(authenticated.remoteNodeId)
// if the peer is whitelisted, we sync with them, otherwise we only sync with peers with whom we have at least one channel
val doSync = nodeParams.syncWhitelist.contains(authenticated.remoteNodeId) || (nodeParams.syncWhitelist.isEmpty && hasChannels)
authenticated.peerConnection ! PeerConnection.InitializeConnection(peer, nodeParams.chainHash, features, doSync, nodeParams.willFundRates_opt)
authenticated.peerConnection ! PeerConnection.InitializeConnection(peer, nodeParams.chainHash, features, nodeParams.willFundRates_opt)
if (!hasChannels && !authenticated.outgoing) {
incomingConnectionsTracker ! TrackIncomingConnection(authenticated.remoteNodeId)
}

View file

@ -97,16 +97,21 @@ object EclairInternalsSerializer {
("ageFactor" | double) ::
("capacityFactor" | double)).as[Graph.MessageWeightRatios]).as[MessageRouteParams]
val syncConfCodec: Codec[Router.SyncConf] = (
("requestNodeAnnouncements" | bool(8)) ::
("channelRangeChunkSize" | int32) ::
("channelQueryChunkSize" | int32) ::
("peerLimit" | int32) ::
("whitelist" | listOfN(uint16, publicKey).xmap[Set[PublicKey]](_.toSet, _.toList))).as[Router.SyncConf]
val routerConfCodec: Codec[RouterConf] = (
("watchSpentWindow" | finiteDurationCodec) ::
("channelExcludeDuration" | finiteDurationCodec) ::
("routerBroadcastInterval" | finiteDurationCodec) ::
("requestNodeAnnouncements" | bool(8)) ::
("encodingType" | discriminated[EncodingType].by(uint8)
.typecase(0, provide(EncodingType.UNCOMPRESSED))
.typecase(1, provide(EncodingType.COMPRESSED_ZLIB))) ::
("channelRangeChunkSize" | int32) ::
("channelQueryChunkSize" | int32) ::
("syncConf" | syncConfCodec) ::
("pathFindingExperimentConf" | pathFindingExperimentConfCodec) ::
("messageRouteParams" | messageRouteParamsCodec) ::
("balanceEstimateHalfLife" | finiteDurationCodec)).as[RouterConf]
@ -156,7 +161,6 @@ object EclairInternalsSerializer {
("peer" | actorRefCodec(system)) ::
("chainHash" | blockHash) ::
("features" | variableSizeBytes(uint16, initFeaturesCodec)) ::
("doSync" | bool(8)) ::
("fundingRates" | optional(bool(8), LiquidityAds.Codecs.willFundRates))).as[PeerConnection.InitializeConnection]
def connectionReadyCodec(system: ExtendedActorSystem): Codec[PeerConnection.ConnectionReady] = (

View file

@ -87,6 +87,16 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
context.system.eventStream.publish(ChannelUpdatesReceived(channels.values.flatMap(pc => pc.update_1_opt ++ pc.update_2_opt ++ Nil)))
context.system.eventStream.publish(NodesDiscovered(nodes))
val peerCapacities = channels.values.map(pc =>
if (pc.nodeId1 == nodeParams.nodeId) {
Some((pc.nodeId2, pc.capacity))
} else if (pc.nodeId2 == nodeParams.nodeId) {
Some((pc.nodeId1, pc.capacity))
} else {
None
}).flatten.groupMapReduce(_._1)(_._2)(_ + _)
val topCapacityPeers = peerCapacities.toSeq.sortWith { case ((_, c1), (_, c2)) => c1 > c2 }.take(nodeParams.routerConf.syncConf.peerLimit).map(_._1).toSet
// watch the funding tx of all these channels
// note: some of them may already have been spent, in that case we will receive the watch event immediately
(channels.values ++ pruned.values).foreach { pc =>
@ -115,7 +125,8 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
excludedChannels = Map.empty,
graphWithBalances = GraphWithBalanceEstimates(graph, nodeParams.routerConf.balanceEstimateHalfLife),
sync = Map.empty,
spentChannels = Map.empty)
spentChannels = Map.empty,
topCapacityPeers = topCapacityPeers)
startWith(NORMAL, data)
}
@ -298,7 +309,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
stay() using Validation.handleAvailableBalanceChanged(d, e)
case Event(s: SendChannelQuery, d) =>
stay() using Sync.handleSendChannelQuery(d, s)
stay() using Sync.handleSendChannelQuery(nodeParams.routerConf.syncConf, d, s)
case Event(PeerRoutingMessage(peerConnection, remoteNodeId, q: QueryChannelRange), d) =>
Sync.handleQueryChannelRange(d.channels, nodeParams.routerConf, RemoteGossip(peerConnection, remoteNodeId), q)
@ -366,20 +377,24 @@ object Router {
)
}
case class RouterConf(watchSpentWindow: FiniteDuration,
channelExcludeDuration: FiniteDuration,
routerBroadcastInterval: FiniteDuration,
requestNodeAnnouncements: Boolean,
encodingType: EncodingType,
channelRangeChunkSize: Int,
channelQueryChunkSize: Int,
pathFindingExperimentConf: PathFindingExperimentConf,
messageRouteParams: MessageRouteParams,
balanceEstimateHalfLife: FiniteDuration) {
case class SyncConf(requestNodeAnnouncements: Boolean,
channelRangeChunkSize: Int,
channelQueryChunkSize: Int,
peerLimit: Int,
whitelist: Set[PublicKey]) {
require(channelRangeChunkSize <= Sync.MAXIMUM_CHUNK_SIZE, "channel range chunk size exceeds the size of a lightning message")
require(channelQueryChunkSize <= Sync.MAXIMUM_CHUNK_SIZE, "channel query chunk size exceeds the size of a lightning message")
}
case class RouterConf(watchSpentWindow: FiniteDuration,
channelExcludeDuration: FiniteDuration,
routerBroadcastInterval: FiniteDuration,
encodingType: EncodingType,
syncConf: SyncConf,
pathFindingExperimentConf: PathFindingExperimentConf,
messageRouteParams: MessageRouteParams,
balanceEstimateHalfLife: FiniteDuration)
// @formatter:off
case class ChannelDesc private(shortChannelId: ShortChannelId, a: PublicKey, b: PublicKey){
def reversed: ChannelDesc = ChannelDesc(shortChannelId, b, a)
@ -698,7 +713,7 @@ object Router {
// @formatter:on
// @formatter:off
case class SendChannelQuery(chainHash: BlockHash, remoteNodeId: PublicKey, to: ActorRef, replacePrevious: Boolean, flags_opt: Option[QueryChannelRangeTlv]) extends RemoteTypes
case class SendChannelQuery(chainHash: BlockHash, remoteNodeId: PublicKey, to: ActorRef, isReconnection: Boolean, flags_opt: Option[QueryChannelRangeTlv]) extends RemoteTypes
case object GetRoutingState
case class RoutingState(channels: Iterable[PublicChannel], nodes: Iterable[NodeAnnouncement])
case object GetRoutingStateStreaming extends RemoteTypes
@ -767,6 +782,7 @@ object Router {
graphWithBalances: GraphWithBalanceEstimates,
sync: Map[PublicKey, Syncing], // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
spentChannels: Map[TxId, RealShortChannelId], // transactions that spend funding txs that are not yet deeply buried
topCapacityPeers: Set[PublicKey],
) {
def resolve(scid: ShortChannelId): Option[KnownChannel] = {
// let's assume this is a real scid

View file

@ -42,13 +42,21 @@ object Sync {
// block almost never exceeds 2800 so this should very rarely be limiting
val MAXIMUM_CHUNK_SIZE = 2700
def handleSendChannelQuery(d: Data, s: SendChannelQuery)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
def handleSendChannelQuery(conf: SyncConf, d: Data, s: SendChannelQuery)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
// TODO: check that s.remoteNodeId is eligible for sync
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
// we currently send query_channel_range when:
// * we just (re)connected to a peer with whom we have channels
// * we just reconnected to a peer with whom we have channels
// * we validate our first channel with a peer
// we must ensure we don't send a new query_channel_range while another query is still in progress
if (s.replacePrevious || !d.sync.contains(s.remoteNodeId)) {
val shouldSync = if (s.isReconnection) {
conf.whitelist.contains(s.remoteNodeId) || d.topCapacityPeers.contains(s.remoteNodeId)
} else if (d.sync.contains(s.remoteNodeId)) { // we must ensure we don't send a new query_channel_range while another query is still in progress
log.debug("not sending query_channel_range: sync already in progress")
false
} else {
true
}
if (shouldSync) {
// ask for everything
val query = QueryChannelRange(s.chainHash, firstBlock = BlockHeight(0), numberOfBlocks = Int.MaxValue.toLong, TlvStream(s.flags_opt.toSet))
log.debug("sending query_channel_range={}", query)
@ -64,7 +72,6 @@ object Sync {
// reset our sync state for this peer: we create an entry to ensure we reject duplicate queries and unsolicited reply_channel_range
d.copy(sync = d.sync + (s.remoteNodeId -> Syncing(Nil, 0)))
} else {
log.debug("not sending query_channel_range: sync already in progress")
d
}
}
@ -77,7 +84,7 @@ object Sync {
// keep channel ids that are in [firstBlockNum, firstBlockNum + numberOfBlocks]
val shortChannelIds: SortedSet[RealShortChannelId] = channels.keySet.filter(keep(q.firstBlock, q.numberOfBlocks, _))
log.info("replying with {} items for range=({}, {})", shortChannelIds.size, q.firstBlock, q.numberOfBlocks)
val chunks = split(shortChannelIds, q.firstBlock, q.numberOfBlocks, routerConf.channelRangeChunkSize)
val chunks = split(shortChannelIds, q.firstBlock, q.numberOfBlocks, routerConf.syncConf.channelRangeChunkSize)
Metrics.QueryChannelRange.Replies.withoutTags().record(chunks.size)
chunks.zipWithIndex.foreach { case (chunk, i) =>
val syncComplete = i == chunks.size - 1
@ -111,7 +118,7 @@ object Sync {
ids match {
case Nil => acc.reverse
case head :: tail =>
val flag = computeFlag(d.channels)(head, timestamps.headOption, checksums.headOption, routerConf.requestNodeAnnouncements)
val flag = computeFlag(d.channels)(head, timestamps.headOption, checksums.headOption, routerConf.syncConf.requestNodeAnnouncements)
// 0 means nothing to query, just don't include it
val acc1 = if (flag != 0) ShortChannelIdAndFlag(head, flag) :: acc else acc
loop(tail, timestamps.drop(1), checksums.drop(1), acc1)
@ -144,7 +151,7 @@ object Sync {
// we update our sync data to this node (there may be multiple channel range responses and we can only query one set of ids at a time)
val replies = shortChannelIdAndFlags
.grouped(routerConf.channelQueryChunkSize)
.grouped(routerConf.syncConf.channelQueryChunkSize)
.map(buildQuery)
.toList

View file

@ -375,7 +375,7 @@ class EclairImplSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with I
shortIds4.real.toOption.get.toLong -> channelId4,
)
val g = GraphWithBalanceEstimates(DirectedGraph(Nil), 1 hour)
val routerData = Router.Data(Map.empty, publicChannels, SortedMap.empty, Router.Stash(Map.empty, Map.empty), Router.Rebroadcast(Map.empty, Map.empty, Map.empty), Map.empty, privateChannels, scidMapping, Map.empty, g, Map.empty, Map.empty)
val routerData = Router.Data(Map.empty, publicChannels, SortedMap.empty, Router.Stash(Map.empty, Map.empty), Router.Rebroadcast(Map.empty, Map.empty, Map.empty), Map.empty, privateChannels, scidMapping, Map.empty, g, Map.empty, Map.empty, Set.empty)
eclair.findRoute(c, 250_000 msat, None)
val routeRequest1 = router.expectMsgType[RouteRequest]

View file

@ -29,7 +29,7 @@ import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig
import fr.acinq.eclair.payment.relay.OnTheFlyFunding
import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams}
import fr.acinq.eclair.router.Graph.{MessageWeightRatios, PaymentWeightRatios}
import fr.acinq.eclair.router.PathFindingExperimentConf
import fr.acinq.eclair.router.{PathFindingExperimentConf, Router}
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.wire.protocol._
import org.scalatest.Tag
@ -114,7 +114,6 @@ object TestConstants {
),
pluginParams = List(pluginParams),
overrideInitFeatures = Map.empty,
syncWhitelist = Set.empty,
channelConf = ChannelConf(
dustLimit = 1100 sat,
maxRemoteDustLimit = 1500 sat,
@ -196,10 +195,14 @@ object TestConstants {
watchSpentWindow = 1 second,
channelExcludeDuration = 60 seconds,
routerBroadcastInterval = 1 day, // "disables" rebroadcast
requestNodeAnnouncements = true,
encodingType = EncodingType.COMPRESSED_ZLIB,
channelRangeChunkSize = 20,
channelQueryChunkSize = 5,
syncConf = Router.SyncConf(
requestNodeAnnouncements = true,
channelRangeChunkSize = 20,
channelQueryChunkSize = 5,
peerLimit = 10,
whitelist = Set.empty
),
pathFindingExperimentConf = PathFindingExperimentConf(Map("alice-test-experiment" -> PathFindingConf(
randomize = false,
boundaries = SearchBoundaries(
@ -292,7 +295,6 @@ object TestConstants {
),
pluginParams = Nil,
overrideInitFeatures = Map.empty,
syncWhitelist = Set.empty,
channelConf = ChannelConf(
dustLimit = 1000 sat,
maxRemoteDustLimit = 1500 sat,
@ -374,10 +376,14 @@ object TestConstants {
watchSpentWindow = 1 second,
channelExcludeDuration = 60 seconds,
routerBroadcastInterval = 1 day, // "disables" rebroadcast
requestNodeAnnouncements = true,
encodingType = EncodingType.UNCOMPRESSED,
channelRangeChunkSize = 20,
channelQueryChunkSize = 5,
syncConf = Router.SyncConf(
requestNodeAnnouncements = true,
channelRangeChunkSize = 20,
channelQueryChunkSize = 5,
peerLimit = 20,
whitelist = Set.empty
),
pathFindingExperimentConf = PathFindingExperimentConf(Map("bob-test-experiment" -> PathFindingConf(
randomize = false,
boundaries = SearchBoundaries(

View file

@ -68,23 +68,19 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
withFixture(test.toNoArgTest(FixtureParam(aliceParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer)))
}
def connect(aliceParams: NodeParams, remoteNodeId: PublicKey, switchboard: TestProbe, router: TestProbe, connection: TestProbe, transport: TestProbe, peerConnection: TestFSMRef[PeerConnection.State, PeerConnection.Data, PeerConnection], peer: TestProbe, remoteInit: protocol.Init = protocol.Init(Bob.nodeParams.features.initFeatures()), doSync: Boolean = false, isPersistent: Boolean = true): Unit = {
def connect(aliceParams: NodeParams, remoteNodeId: PublicKey, switchboard: TestProbe, router: TestProbe, connection: TestProbe, transport: TestProbe, peerConnection: TestFSMRef[PeerConnection.State, PeerConnection.Data, PeerConnection], peer: TestProbe, remoteInit: protocol.Init = protocol.Init(Bob.nodeParams.features.initFeatures()), isPersistent: Boolean = true): Unit = {
// let's simulate a connection
val probe = TestProbe()
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref), isPersistent = isPersistent))
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
switchboard.expectMsg(PeerConnection.Authenticated(peerConnection, remoteNodeId, outgoing = true))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, aliceParams.chainHash, aliceParams.features.initFeatures(), doSync, None))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, aliceParams.chainHash, aliceParams.features.initFeatures(), None))
transport.expectMsgType[TransportHandler.Listener]
val localInit = transport.expectMsgType[protocol.Init]
assert(localInit.networks == List(Block.RegtestGenesisBlock.hash))
transport.send(peerConnection, remoteInit)
transport.expectMsgType[TransportHandler.ReadAck]
if (doSync) {
router.expectMsgType[SendChannelQuery]
} else {
router.expectNoMessage(1 second)
}
router.expectMsgType[SendChannelQuery]
peer.expectMsg(PeerConnection.ConnectionReady(peerConnection, remoteNodeId, address, outgoing = true, localInit, remoteInit))
assert(peerConnection.stateName == PeerConnection.CONNECTED)
}
@ -102,7 +98,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
probe.send(peerConnection, incomingConnection)
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
switchboard.expectMsg(PeerConnection.Authenticated(peerConnection, remoteNodeId, outgoing = false))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), doSync = false, None))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), None))
transport.expectMsgType[TransportHandler.Listener]
val localInit = transport.expectMsgType[protocol.Init]
assert(localInit.remoteAddress_opt == Some(fakeIPAddress))
@ -134,7 +130,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
probe.watch(peerConnection)
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true))
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), doSync = true, None))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), None))
probe.expectTerminated(peerConnection, nodeParams.peerConnectionConf.initTimeout / transport.testKitSettings.TestTimeFactor + 1.second) // we don't want dilated time here
origin.expectMsg(PeerConnection.ConnectionResult.InitializationFailed("initialization timed out"))
peer.expectMsg(ConnectionDown(peerConnection))
@ -147,7 +143,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
probe.watch(transport.ref)
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true))
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), doSync = true, None))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), None))
transport.expectMsgType[TransportHandler.Listener]
transport.expectMsgType[protocol.Init]
transport.send(peerConnection, LightningMessageCodecs.initCodec.decode(hex"0000 00050100000000".bits).require.value)
@ -164,7 +160,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
probe.watch(transport.ref)
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true))
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), doSync = true, None))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), None))
transport.expectMsgType[TransportHandler.Listener]
transport.expectMsgType[protocol.Init]
transport.send(peerConnection, LightningMessageCodecs.initCodec.decode(hex"00050100000000 0000".bits).require.value)
@ -181,7 +177,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
probe.watch(transport.ref)
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true))
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), doSync = true, None))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), None))
transport.expectMsgType[TransportHandler.Listener]
transport.expectMsgType[protocol.Init]
// remote activated MPP but forgot payment secret
@ -199,7 +195,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
probe.watch(transport.ref)
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true))
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), doSync = true, None))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), None))
transport.expectMsgType[TransportHandler.Listener]
transport.expectMsgType[protocol.Init]
transport.send(peerConnection, protocol.Init(Bob.nodeParams.features.initFeatures(), TlvStream(InitTlv.Networks(Block.LivenetGenesisBlock.hash :: Block.SignetGenesisBlock.hash :: Nil))))
@ -212,7 +208,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
test("sync when requested") { f =>
import f._
val remoteInit = protocol.Init(Features(ChannelRangeQueries -> Optional, VariableLengthOnion -> Mandatory, PaymentSecret -> Mandatory, StaticRemoteKey -> Mandatory))
connect(nodeParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer, remoteInit, doSync = true)
connect(nodeParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer, remoteInit)
}
test("reply to ping") { f =>

View file

@ -104,64 +104,6 @@ class SwitchboardSpec extends TestKitBaseClass with AnyFunSuiteLike {
peer.expectMsg(Peer.Disconnect(remoteNodeId))
}
def sendFeatures(nodeParams: NodeParams, channels: Seq[PersistentChannelData], remoteNodeId: PublicKey, expectedFeatures: Features[InitFeature], expectedSync: Boolean): Unit = {
val (probe, peer, peerConnection) = (TestProbe(), TestProbe(), TestProbe())
val switchboard = TestActorRef(new Switchboard(nodeParams, FakePeerFactory(probe, peer)))
switchboard ! Switchboard.Init(channels)
switchboard ! PeerConnection.Authenticated(peerConnection.ref, remoteNodeId, outgoing = true)
val initConnection = peerConnection.expectMsgType[PeerConnection.InitializeConnection]
assert(initConnection.chainHash == nodeParams.chainHash)
assert(initConnection.features == expectedFeatures)
assert(initConnection.doSync == expectedSync)
}
test("sync if no whitelist is defined and peer has channels") {
val nodeParams = Alice.nodeParams.copy(syncWhitelist = Set.empty)
val remoteNodeId = ChannelCodecsSpec.normal.commitments.remoteNodeId
sendFeatures(nodeParams, List(ChannelCodecsSpec.normal), remoteNodeId, nodeParams.features.initFeatures(), expectedSync = true)
}
test("sync if no whitelist is defined and peer creates a channel") {
val nodeParams = Alice.nodeParams.copy(syncWhitelist = Set.empty)
val (probe, peer, peerConnection) = (TestProbe(), TestProbe(), TestProbe())
val remoteNodeId = ChannelCodecsSpec.normal.commitments.remoteNodeId
val switchboard = TestActorRef(new Switchboard(nodeParams, FakePeerFactory(probe, peer)))
switchboard ! Switchboard.Init(Nil)
// We have a channel with our peer, so we trigger a sync when connecting.
switchboard ! ChannelIdAssigned(TestProbe().ref, remoteNodeId, randomBytes32(), randomBytes32())
switchboard ! PeerConnection.Authenticated(peerConnection.ref, remoteNodeId, outgoing = true)
val initConnection1 = peerConnection.expectMsgType[PeerConnection.InitializeConnection]
assert(initConnection1.chainHash == nodeParams.chainHash)
assert(initConnection1.features == nodeParams.features.initFeatures())
assert(initConnection1.doSync)
// We don't have channels with our peer, so we won't trigger a sync when connecting.
switchboard ! LastChannelClosed(peer.ref, remoteNodeId)
switchboard ! PeerConnection.Authenticated(peerConnection.ref, remoteNodeId, outgoing = true)
val initConnection2 = peerConnection.expectMsgType[PeerConnection.InitializeConnection]
assert(initConnection2.chainHash == nodeParams.chainHash)
assert(initConnection2.features == nodeParams.features.initFeatures())
assert(!initConnection2.doSync)
}
test("don't sync if no whitelist is defined and peer does not have channels") {
val nodeParams = Alice.nodeParams.copy(syncWhitelist = Set.empty)
sendFeatures(nodeParams, Nil, randomKey().publicKey, nodeParams.features.initFeatures(), expectedSync = false)
}
test("sync if whitelist contains peer") {
val remoteNodeId = randomKey().publicKey
val nodeParams = Alice.nodeParams.copy(syncWhitelist = Set(remoteNodeId, randomKey().publicKey, randomKey().publicKey))
sendFeatures(nodeParams, Nil, remoteNodeId, nodeParams.features.initFeatures(), expectedSync = true)
}
test("don't sync if whitelist doesn't contain peer") {
val nodeParams = Alice.nodeParams.copy(syncWhitelist = Set(randomKey().publicKey, randomKey().publicKey, randomKey().publicKey))
val remoteNodeId = ChannelCodecsSpec.normal.commitments.remoteNodeId
sendFeatures(nodeParams, List(ChannelCodecsSpec.normal), remoteNodeId, nodeParams.features.initFeatures(), expectedSync = false)
}
test("get peer info") {
val (probe, peer) = (TestProbe(), TestProbe())
val switchboard = TestActorRef(new Switchboard(Alice.nodeParams, FakePeerFactory(probe, peer)))

View file

@ -84,7 +84,7 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle
}
val srcId = src.underlyingActor.nodeParams.nodeId
val tgtId = tgt.underlyingActor.nodeParams.nodeId
sender.send(src, SendChannelQuery(src.underlyingActor.nodeParams.chainHash, tgtId, pipe.ref, replacePrevious = true, extendedQueryFlags_opt))
sender.send(src, SendChannelQuery(src.underlyingActor.nodeParams.chainHash, tgtId, pipe.ref, isReconnection = true, extendedQueryFlags_opt))
// src sends a query_channel_range to bob
val qcr = pipe.expectMsgType[QueryChannelRange]
pipe.send(tgt, PeerRoutingMessage(pipe.ref, srcId, qcr))
@ -134,7 +134,7 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle
test("sync with standard channel queries") {
val watcher = system.actorOf(Props(new YesWatcher()))
val alice = TestFSMRef(new Router(Alice.nodeParams, watcher))
val alice = TestFSMRef(new Router(Alice.nodeParams.copy(routerConf = Alice.nodeParams.routerConf.copy(syncConf = Alice.nodeParams.routerConf.syncConf.copy(whitelist = Set(Bob.nodeParams.nodeId)))), watcher))
val bob = TestFSMRef(new Router(Bob.nodeParams, watcher))
val charlieId = randomKey().publicKey
val sender = TestProbe()
@ -183,7 +183,7 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle
def syncWithExtendedQueries(requestNodeAnnouncements: Boolean): Unit = {
val watcher = system.actorOf(Props(new YesWatcher()))
val alice = TestFSMRef(new Router(Alice.nodeParams.copy(routerConf = Alice.nodeParams.routerConf.copy(requestNodeAnnouncements = requestNodeAnnouncements)), watcher))
val alice = TestFSMRef(new Router(Alice.nodeParams.copy(routerConf = Alice.nodeParams.routerConf.copy(syncConf = Alice.nodeParams.routerConf.syncConf.copy(requestNodeAnnouncements = requestNodeAnnouncements, whitelist = Set(Bob.nodeParams.nodeId)))), watcher))
val bob = TestFSMRef(new Router(Bob.nodeParams, watcher))
val charlieId = randomKey().publicKey
val sender = TestProbe()
@ -251,7 +251,7 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle
}
test("reset sync state on reconnection") {
val params = TestConstants.Alice.nodeParams
val params = Alice.nodeParams.copy(routerConf = Alice.nodeParams.routerConf.copy(syncConf = Alice.nodeParams.routerConf.syncConf.copy(whitelist = Set(Bob.nodeParams.nodeId))))
val router = TestFSMRef(new Router(params, TestProbe().ref))
val peerConnection = TestProbe()
peerConnection.ignoreMsg { case _: TransportHandler.ReadAck => true }
@ -261,17 +261,17 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle
assert(!router.stateData.sync.contains(remoteNodeId))
// ask router to send a channel range query
sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, replacePrevious = true, None))
sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, isReconnection = true, None))
val QueryChannelRange(chainHash, firstBlockNum, numberOfBlocks, _) = sender.expectMsgType[QueryChannelRange]
sender.expectMsgType[GossipTimestampFilter]
assert(router.stateData.sync.get(remoteNodeId).contains(Syncing(Nil, 0)))
// ask router to send another channel range query
sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, replacePrevious = false, None))
sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, isReconnection = false, None))
sender.expectNoMessage(100 millis) // it's a duplicate and should be ignored
assert(router.stateData.sync.get(remoteNodeId).contains(Syncing(Nil, 0)))
val block1 = ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, 1, EncodedShortChannelIds(EncodingType.UNCOMPRESSED, fakeRoutingInfo.take(params.routerConf.channelQueryChunkSize).keys.toList), None, None)
val block1 = ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, 1, EncodedShortChannelIds(EncodingType.UNCOMPRESSED, fakeRoutingInfo.take(params.routerConf.syncConf.channelQueryChunkSize).keys.toList), None, None)
// send first block
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, block1))
@ -284,7 +284,7 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle
assert(sync.totalQueries == 1)
// simulate a re-connection
sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, replacePrevious = true, None))
sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, isReconnection = true, None))
sender.expectMsgType[QueryChannelRange]
sender.expectMsgType[GossipTimestampFilter]
assert(router.stateData.sync.get(remoteNodeId).contains(Syncing(Nil, 0)))