1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-02-24 22:58:23 +01:00

Only sync with channel peers (#1587)

This reduces the bandwidth used: it doesn't make sense to sync with every
node that connects to us.

We also better track sync requests, to reject unsolicited sync responses.

To ensure that nodes don't need to explicitly reconnect after creating
their first channel in order to get the routing table, we add a mechanism
to trigger a sync when the first channel is created.
This commit is contained in:
Bastien Teinturier 2021-02-04 15:27:08 +01:00 committed by GitHub
parent 0127ace408
commit ac054a2bb2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 243 additions and 158 deletions

View file

@ -25,6 +25,7 @@ import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, DeterministicWallet, Satoshi, SatoshiLong, Script}
import fr.acinq.eclair.Features.Wumbo
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain.EclairWallet
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.channel._
@ -32,7 +33,6 @@ import fr.acinq.eclair.io.Monitoring.Metrics
import fr.acinq.eclair.io.PeerConnection.KillReason
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{wire, _}
import scodec.bits.ByteVector
import java.net.InetSocketAddress
@ -164,6 +164,10 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, watcher: ActorRe
case Event(ChannelIdAssigned(channel, _, temporaryChannelId, channelId), d: ConnectedData) if d.channels.contains(TemporaryChannelId(temporaryChannelId)) =>
log.info(s"channel id switch: previousId=$temporaryChannelId nextId=$channelId")
// we have our first channel with that peer: let's sync our routing table
if (!d.channels.keys.exists(_.isInstanceOf[FinalChannelId])) {
d.peerConnection ! PeerConnection.DoSync(replacePrevious = false)
}
// NB: we keep the temporary channel id because the switch is not always acknowledged at this point (see https://github.com/lightningnetwork/lightning-rfc/pull/151)
// we won't clean it up, but we won't remember the temporary id on channel termination
stay using d.copy(channels = d.channels + (FinalChannelId(channelId) -> channel))
@ -192,6 +196,7 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, watcher: ActorRe
log.info(s"channel closed: channelId=${channelIds.mkString("/")}")
if (d.channels.values.toSet - actor == Set.empty) {
log.info(s"that was the last open channel, closing the connection")
context.system.eventStream.publish(LastChannelClosed(self, remoteNodeId))
d.peerConnection ! PeerConnection.Kill(KillReason.NoRemainingChannel)
}
stay using d.copy(channels = d.channels -- channelIds)

View file

@ -16,8 +16,6 @@
package fr.acinq.eclair.io
import java.net.InetSocketAddress
import akka.actor.{ActorRef, FSM, OneForOneStrategy, PoisonPill, Props, SupervisorStrategy, Terminated}
import akka.event.Logging.MDC
import fr.acinq.bitcoin.ByteVector32
@ -34,6 +32,7 @@ import fr.acinq.eclair.{wire, _}
import scodec.Attempt
import scodec.bits.ByteVector
import java.net.InetSocketAddress
import scala.concurrent.duration._
import scala.util.Random
@ -136,30 +135,13 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
} else {
Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Initialized).increment()
d.peer ! ConnectionReady(self, d.remoteNodeId, d.pendingAuth.address, d.pendingAuth.outgoing, d.localInit, remoteInit)
d.pendingAuth.origin_opt.foreach(_ ! ConnectionResult.Connected)
def localHasFeature(f: Feature): Boolean = d.localInit.features.hasFeature(f)
def remoteHasFeature(f: Feature): Boolean = remoteInit.features.hasFeature(f)
val canUseChannelRangeQueries = localHasFeature(Features.ChannelRangeQueries) && remoteHasFeature(Features.ChannelRangeQueries)
val canUseChannelRangeQueriesEx = localHasFeature(Features.ChannelRangeQueriesExtended) && remoteHasFeature(Features.ChannelRangeQueriesExtended)
if (canUseChannelRangeQueries || canUseChannelRangeQueriesEx) {
// if they support channel queries we don't send routing info yet, if they want it they will query us
// we will query them, using extended queries if supported
val flags_opt = if (canUseChannelRangeQueriesEx) Some(QueryChannelRangeTlv.QueryFlags(QueryChannelRangeTlv.QueryFlags.WANT_ALL)) else None
if (d.doSync) {
log.info(s"sending sync channel range query with flags_opt=$flags_opt")
router ! SendChannelQuery(d.chainHash, d.remoteNodeId, self, flags_opt = flags_opt)
self ! DoSync(replacePrevious = true)
} else {
log.info("not syncing with this peer")
}
} else if (remoteHasFeature(Features.InitialRoutingSync)) {
// "old" nodes, do as before
log.info("peer requested a full routing table dump")
router ! GetRoutingState
}
// we will delay all rebroadcasts with this value in order to prevent herd effects (each peer has a different delay)
val rebroadcastDelay = Random.nextInt(conf.maxRebroadcastDelay.toSeconds.toInt).seconds
@ -358,6 +340,20 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
}
stay using d.copy(behavior = behavior1)
case Event(DoSync(replacePrevious), d: ConnectedData) =>
val canUseChannelRangeQueries = Features.canUseFeature(d.localInit.features, d.remoteInit.features, Features.ChannelRangeQueries)
val canUseChannelRangeQueriesEx = Features.canUseFeature(d.localInit.features, d.remoteInit.features, Features.ChannelRangeQueriesExtended)
if (canUseChannelRangeQueries || canUseChannelRangeQueriesEx) {
val flags_opt = if (canUseChannelRangeQueriesEx) Some(QueryChannelRangeTlv.QueryFlags(QueryChannelRangeTlv.QueryFlags.WANT_ALL)) else None
log.info(s"sending sync channel range query with flags_opt=$flags_opt replacePrevious=$replacePrevious")
router ! SendChannelQuery(d.chainHash, d.remoteNodeId, self, replacePrevious, flags_opt)
} else if (d.remoteInit.features.hasFeature(Features.InitialRoutingSync) && replacePrevious) {
// For "old" nodes that don't support channel queries, we send them the full routing table
log.info("peer requested a full routing table dump")
router ! GetRoutingState
}
stay
case Event(ResumeAnnouncements, d: ConnectedData) =>
log.info(s"resuming processing of network announcements for peer")
stay using d.copy(behavior = d.behavior.copy(fundingTxAlreadySpentCount = 0, ignoreNetworkAnnouncement = false))
@ -472,6 +468,7 @@ object PeerConnection {
case object InitTimeout
case object SendPing
case object ResumeAnnouncements
case class DoSync(replacePrevious: Boolean)
// @formatter:on
val IGNORE_NETWORK_ANNOUNCEMENTS_PERIOD: FiniteDuration = 5 minutes

View file

@ -16,12 +16,12 @@
package fr.acinq.eclair.io
import java.net.InetSocketAddress
import akka.actor.ActorRef
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.wire.UnknownMessage
import fr.acinq.eclair.wire
import fr.acinq.eclair.wire.UnknownMessage
import java.net.InetSocketAddress
sealed trait PeerEvent
@ -31,4 +31,6 @@ case class PeerConnected(peer: ActorRef, nodeId: PublicKey, connectionInfo: Conn
case class PeerDisconnected(peer: ActorRef, nodeId: PublicKey) extends PeerEvent
case class LastChannelClosed(peer: ActorRef, nodeId: PublicKey) extends PeerEvent
case class UnknownMessageReceived(peer: ActorRef, nodeId: PublicKey, message: UnknownMessage, connectionInfo: ConnectionInfo) extends PeerEvent

View file

@ -33,8 +33,11 @@ class Switchboard(nodeParams: NodeParams, watcher: ActorRef, relayer: ActorRef,
import Switchboard._
context.system.eventStream.subscribe(self, classOf[ChannelIdAssigned])
context.system.eventStream.subscribe(self, classOf[LastChannelClosed])
// we load channels from database
{
private def initialPeersWithChannels: Set[PublicKey] = {
// Check if channels that are still in CLOSING state have actually been closed. This can happen when the app is stopped
// just after a channel state has transitioned to CLOSED and before it has effectively been removed.
// Closed channels will be removed, other channels will be restored.
@ -44,12 +47,14 @@ class Switchboard(nodeParams: NodeParams, watcher: ActorRef, relayer: ActorRef,
nodeParams.db.channels.removeChannel(c.channelId)
})
channels
.groupBy(_.commitments.remoteParams.nodeId)
.map { case (remoteNodeId, states) => createOrGetPeer(remoteNodeId, offlineChannels = states.toSet) }
val peerChannels = channels.groupBy(_.commitments.remoteParams.nodeId)
peerChannels.foreach { case (remoteNodeId, states) => createOrGetPeer(remoteNodeId, offlineChannels = states.toSet) }
peerChannels.keySet
}
def receive: Receive = {
def receive: Receive = normal(initialPeersWithChannels)
def normal(peersWithChannels: Set[PublicKey]): Receive = {
case Peer.Connect(publicKey, _) if publicKey == nodeParams.nodeId =>
sender ! Status.Failure(new RuntimeException("cannot open connection with oneself"))
@ -75,9 +80,14 @@ class Switchboard(nodeParams: NodeParams, watcher: ActorRef, relayer: ActorRef,
// if this is an incoming connection, we might not yet have created the peer
val peer = createOrGetPeer(authenticated.remoteNodeId, offlineChannels = Set.empty)
val features = nodeParams.featuresFor(authenticated.remoteNodeId)
val doSync = nodeParams.syncWhitelist.isEmpty || nodeParams.syncWhitelist.contains(authenticated.remoteNodeId)
// if the peer is whitelisted, we sync with them, otherwise we only sync with peers with whom we have at least one channel
val doSync = nodeParams.syncWhitelist.contains(authenticated.remoteNodeId) || (nodeParams.syncWhitelist.isEmpty && peersWithChannels.contains(authenticated.remoteNodeId))
authenticated.peerConnection ! PeerConnection.InitializeConnection(peer, nodeParams.chainHash, features, doSync)
case ChannelIdAssigned(_, remoteNodeId, _, _) => context.become(normal(peersWithChannels + remoteNodeId))
case LastChannelClosed(_, remoteNodeId) => context.become(normal(peersWithChannels - remoteNodeId))
case Symbol("peers") => sender ! context.children
case GetRouterPeerConf => sender ! RouterPeerConf(nodeParams.routerConf, nodeParams.peerConnectionConf)
@ -119,6 +129,7 @@ object Switchboard {
def peerActorName(remoteNodeId: PublicKey): String = s"peer-$remoteNodeId"
case object GetRouterPeerConf extends RemoteTypes
case class RouterPeerConf(routerConf: RouterConf, peerConf: PeerConnection.Conf) extends RemoteTypes
}

View file

@ -128,6 +128,7 @@ object EclairInternalsSerializer {
("chainsHash" | bytes32) ::
("remoteNodeId" | publicKey) ::
("to" | actorRefCodec(system)) ::
("replacePrevious" | bool(8)) ::
("flags_opt" | optionQueryChannelRangeTlv)).as[SendChannelQuery]
def peerRoutingMessageCodec(system: ExtendedActorSystem): Codec[PeerRoutingMessage] = (

View file

@ -146,6 +146,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
log.info(s"subscribing listener=$listener to network events")
context.system.eventStream.subscribe(listener, classOf[NetworkEvent])
context.watch(listener)
override def receive: Receive = {
case Terminated(actor) if actor == listener =>
log.warning(s"unsubscribing listener=$listener to network events")
@ -519,7 +520,7 @@ object Router {
// @formatter:on
// @formatter:off
case class SendChannelQuery(chainHash: ByteVector32, remoteNodeId: PublicKey, to: ActorRef, flags_opt: Option[QueryChannelRangeTlv]) extends RemoteTypes
case class SendChannelQuery(chainHash: ByteVector32, remoteNodeId: PublicKey, to: ActorRef, replacePrevious: Boolean, flags_opt: Option[QueryChannelRangeTlv]) extends RemoteTypes
case object GetNetworkStats
case class GetNetworkStatsResponse(stats: Option[NetworkStats])
case object GetRoutingState
@ -566,7 +567,13 @@ object Router {
case class ShortChannelIdAndFlag(shortChannelId: ShortChannelId, flag: Long)
case class Syncing(pending: List[RoutingMessage], total: Int)
/**
* @param remainingQueries remaining queries to send, the next one will be popped after we receive a [[ReplyShortChannelIdsEnd]]
* @param totalQueries total number of *queries* (not channels) that will be sent during this syncing session
*/
case class Syncing(remainingQueries: List[RoutingMessage], totalQueries: Int) {
def started: Boolean = totalQueries > 0
}
case class Data(nodes: Map[PublicKey, NodeAnnouncement],
channels: SortedMap[ShortChannelId, PublicChannel],

View file

@ -45,9 +45,12 @@ object Sync {
def handleSendChannelQuery(d: Data, s: SendChannelQuery)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
// we currently send query_channel_range when:
// * we just (re)connected to a peer with whom we have channels
// * we validate our first channel with a peer
// we must ensure we don't send a new query_channel_range while another query is still in progress
if (s.replacePrevious || !d.sync.contains(s.remoteNodeId)) {
// ask for everything
// we currently send only one query_channel_range message per peer, when we just (re)connected to it, so we don't
// have to worry about sending a new query_channel_range when another query is still in progress
val query = QueryChannelRange(s.chainHash, firstBlockNum = 0L, numberOfBlocks = Int.MaxValue.toLong, TlvStream(s.flags_opt.toList))
log.info("sending query_channel_range={}", query)
s.to ! query
@ -59,9 +62,12 @@ object Sync {
val filter = GossipTimestampFilter(s.chainHash, firstTimestamp = System.currentTimeMillis.milliseconds.toSeconds, timestampRange = Int.MaxValue)
s.to ! filter
// clean our sync state for this peer: we receive a SendChannelQuery just when we connect/reconnect to a peer and
// will start a new complete sync process
d.copy(sync = d.sync - s.remoteNodeId)
// reset our sync state for this peer: we create an entry to ensure we reject duplicate queries and unsolicited reply_channel_range
d.copy(sync = d.sync + (s.remoteNodeId -> Syncing(Nil, 0)))
} else {
log.info("not sending query_channel_range: sync already in progress")
d
}
}
def handleQueryChannelRange(channels: SortedMap[ShortChannelId, PublicChannel], routerConf: RouterConf, origin: RemoteGossip, q: QueryChannelRange)(implicit ctx: ActorContext, log: LoggingAdapter): Unit = {
@ -86,6 +92,17 @@ object Sync {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
ctx.sender ! TransportHandler.ReadAck(r)
d.sync.get(origin.nodeId) match {
case None =>
log.info("received unsolicited reply_channel_range with {} channels", r.shortChannelIds.array.size)
d // we didn't request a sync from this node, ignore
case Some(currentSync) if currentSync.remainingQueries.isEmpty && r.shortChannelIds.array.isEmpty =>
// NB: this case deals with peers who don't return any sync data. We're currently not correctly detecting the end
// of a stream of reply_channel_range, but it's not an issue in practice (we instead rely on the remaining query_short_channel_ids).
// We should fix that once https://github.com/lightningnetwork/lightning-rfc/pull/826 is deployed.
log.info("received empty reply_channel_range, sync is complete")
d.copy(sync = d.sync - origin.nodeId)
case Some(currentSync) =>
Metrics.ReplyChannelRange.Blocks.withTag(Tags.Direction, Tags.Directions.Incoming).record(r.numberOfBlocks)
Metrics.ReplyChannelRange.ShortChannelIds.withTag(Tags.Direction, Tags.Directions.Incoming).record(r.shortChannelIds.array.size)
@ -117,13 +134,12 @@ object Sync {
def buildQuery(chunk: List[ShortChannelIdAndFlag]): QueryShortChannelIds = {
// always encode empty lists as UNCOMPRESSED
val encoding = if (chunk.isEmpty) EncodingType.UNCOMPRESSED else r.shortChannelIds.encoding
QueryShortChannelIds(r.chainHash,
shortChannelIds = EncodedShortChannelIds(encoding, chunk.map(_.shortChannelId)),
if (r.timestamps_opt.isDefined || r.checksums_opt.isDefined)
val flags: TlvStream[QueryShortChannelIdsTlv] = if (r.timestamps_opt.isDefined || r.checksums_opt.isDefined) {
TlvStream(QueryShortChannelIdsTlv.EncodedQueryFlags(encoding, chunk.map(_.flag)))
else
} else {
TlvStream.empty
)
}
QueryShortChannelIds(r.chainHash, EncodedShortChannelIds(encoding, chunk.map(_.shortChannelId)), flags)
}
// we update our sync data to this node (there may be multiple channel range responses and we can only query one set of ids at a time)
@ -132,7 +148,7 @@ object Sync {
.map(buildQuery)
.toList
val (sync1, replynow_opt) = addToSync(d.sync, origin.nodeId, replies)
val (sync1, replynow_opt) = addToSync(d.sync, currentSync, origin.nodeId, replies)
// we only send a reply right away if there were no pending requests
replynow_opt.foreach(origin.peerConnection ! _)
val progress = syncProgress(sync1)
@ -140,6 +156,7 @@ object Sync {
ctx.self ! progress
d.copy(sync = sync1)
}
}
def handleQueryShortChannelIds(nodes: Map[PublicKey, NodeAnnouncement], channels: SortedMap[ShortChannelId, PublicChannel], origin: RemoteGossip, q: QueryShortChannelIds)(implicit ctx: ActorContext, log: LoggingAdapter): Unit = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
@ -176,18 +193,18 @@ object Sync {
def handleReplyShortChannelIdsEnd(d: Data, origin: RemoteGossip, r: ReplyShortChannelIdsEnd)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
ctx.sender ! TransportHandler.ReadAck(r)
// have we more channels to ask this peer?
// do we have more channels to request from this peer?
val sync1 = d.sync.get(origin.nodeId) match {
case Some(sync) =>
sync.pending match {
sync.remainingQueries match {
case nextRequest +: rest =>
log.info(s"asking for the next slice of short_channel_ids (remaining=${sync.pending.size}/${sync.total})")
log.info(s"asking for the next slice of short_channel_ids (remaining=${sync.remainingQueries.size}/${sync.totalQueries})")
origin.peerConnection ! nextRequest
d.sync + (origin.nodeId -> sync.copy(pending = rest))
d.sync + (origin.nodeId -> sync.copy(remainingQueries = rest))
case Nil =>
// we received reply_short_channel_ids_end for our last query and have not sent another one, we can now remove
// the remote peer from our map
log.info(s"sync complete (total=${sync.total})")
log.info(s"sync complete (total=${sync.totalQueries})")
d.sync - origin.nodeId
}
case _ => d.sync
@ -343,7 +360,7 @@ object Sync {
def syncProgress(sync: Map[PublicKey, Syncing]): SyncProgress = {
// NB: progress is in terms of requests, not individual channels
val (pending, total) = sync.foldLeft((0, 0)) {
case ((p, t), (_, sync)) => (p + sync.pending.size, t + sync.total)
case ((p, t), (_, sync)) => (p + sync.remainingQueries.size, t + sync.totalQueries)
}
if (total == 0) {
SyncProgress(1)
@ -481,18 +498,17 @@ object Sync {
checksums = checksums)
}
def addToSync(syncMap: Map[PublicKey, Syncing], remoteNodeId: PublicKey, pending: List[RoutingMessage]): (Map[PublicKey, Syncing], Option[RoutingMessage]) = {
def addToSync(syncMap: Map[PublicKey, Syncing], current: Syncing, remoteNodeId: PublicKey, pending: List[QueryShortChannelIds]): (Map[PublicKey, Syncing], Option[QueryShortChannelIds]) = {
pending match {
case head +: rest =>
// they may send back several reply_channel_range messages for a single query_channel_range query, and we must not
// send another query_short_channel_ids query if they're still processing one
syncMap.get(remoteNodeId) match {
case None =>
if (current.started) {
// we already have a pending query with this peer, add missing ids to our "sync" state
(syncMap + (remoteNodeId -> Syncing(current.remainingQueries ++ pending, current.totalQueries + pending.size)), None)
} else {
// we don't have a pending query with this peer, let's send it
(syncMap + (remoteNodeId -> Syncing(rest, pending.size)), Some(head))
case Some(sync) =>
// we already have a pending query with this peer, add missing ids to our "sync" state
(syncMap + (remoteNodeId -> Syncing(sync.pending ++ pending, sync.total + pending.size)), None)
}
case Nil =>
// there is nothing to send

View file

@ -377,14 +377,14 @@ object Validation {
case RemoteGossip(peerConnection, remoteNodeId) =>
val query = QueryShortChannelIds(u.chainHash, EncodedShortChannelIds(routerConf.encodingType, List(u.shortChannelId)), TlvStream.empty)
d.sync.get(remoteNodeId) match {
case Some(sync) =>
case Some(sync) if sync.started =>
// we already have a pending request to that node, let's add this channel to the list and we'll get it later
// TODO: we only request channels with old style channel_query
d.copy(sync = d.sync + (remoteNodeId -> sync.copy(pending = sync.pending :+ query, total = sync.total + 1)))
case None =>
// we send the query right away
d.copy(sync = d.sync + (remoteNodeId -> sync.copy(remainingQueries = sync.remainingQueries :+ query, totalQueries = sync.totalQueries + 1)))
case _ =>
// otherwise we send the query right away
peerConnection ! query
d.copy(sync = d.sync + (remoteNodeId -> Syncing(pending = Nil, total = 1)))
d.copy(sync = d.sync + (remoteNodeId -> Syncing(remainingQueries = Nil, totalQueries = 1)))
}
case _ =>
// we don't know which node this update came from (maybe it was stashed and the channel got pruned in the meantime or some other corner case).

View file

@ -241,8 +241,7 @@ object EncodingType {
}
// @formatter:on
case class EncodedShortChannelIds(encoding: EncodingType,
array: List[ShortChannelId])
case class EncodedShortChannelIds(encoding: EncodingType, array: List[ShortChannelId])
case class QueryShortChannelIds(chainHash: ByteVector32,
shortChannelIds: EncodedShortChannelIds,
@ -250,9 +249,7 @@ case class QueryShortChannelIds(chainHash: ByteVector32,
val queryFlags_opt: Option[QueryShortChannelIdsTlv.EncodedQueryFlags] = tlvStream.get[QueryShortChannelIdsTlv.EncodedQueryFlags]
}
case class ReplyShortChannelIdsEnd(chainHash: ByteVector32,
complete: Byte) extends RoutingMessage with HasChainHash
case class ReplyShortChannelIdsEnd(chainHash: ByteVector32, complete: Byte) extends RoutingMessage with HasChainHash
case class QueryChannelRange(chainHash: ByteVector32,
firstBlockNum: Long,
@ -268,7 +265,6 @@ case class ReplyChannelRange(chainHash: ByteVector32,
shortChannelIds: EncodedShortChannelIds,
tlvStream: TlvStream[ReplyChannelRangeTlv] = TlvStream.empty) extends RoutingMessage {
val timestamps_opt: Option[ReplyChannelRangeTlv.EncodedTimestamps] = tlvStream.get[ReplyChannelRangeTlv.EncodedTimestamps]
val checksums_opt: Option[ReplyChannelRangeTlv.EncodedChecksums] = tlvStream.get[ReplyChannelRangeTlv.EncodedChecksums]
}
@ -279,17 +275,14 @@ object ReplyChannelRange {
complete: Byte,
shortChannelIds: EncodedShortChannelIds,
timestamps: Option[ReplyChannelRangeTlv.EncodedTimestamps],
checksums: Option[ReplyChannelRangeTlv.EncodedChecksums]) = {
checksums: Option[ReplyChannelRangeTlv.EncodedChecksums]): ReplyChannelRange = {
timestamps.foreach(ts => require(ts.timestamps.length == shortChannelIds.array.length))
checksums.foreach(cs => require(cs.checksums.length == shortChannelIds.array.length))
new ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, complete, shortChannelIds, TlvStream(timestamps.toList ::: checksums.toList))
}
}
case class GossipTimestampFilter(chainHash: ByteVector32,
firstTimestamp: Long,
timestampRange: Long) extends RoutingMessage with HasChainHash
case class GossipTimestampFilter(chainHash: ByteVector32, firstTimestamp: Long, timestampRange: Long) extends RoutingMessage with HasChainHash
// NB: blank lines to minimize merge conflicts

View file

@ -6,8 +6,9 @@ import fr.acinq.bitcoin.ByteVector64
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.TestConstants._
import fr.acinq.eclair.blockchain.TestWallet
import fr.acinq.eclair.channel.ChannelIdAssigned
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{Features, NodeParams, TestKitBaseClass, randomKey}
import fr.acinq.eclair.{Features, NodeParams, TestKitBaseClass, randomBytes32, randomKey}
import org.scalatest.funsuite.AnyFunSuiteLike
import scodec.bits._
@ -44,27 +45,55 @@ class SwitchboardSpec extends TestKitBaseClass with AnyFunSuiteLike {
peer.expectMsg(Peer.Connect(remoteNodeId, None))
}
def sendFeatures(remoteNodeId: PublicKey, features: Features, syncWhitelist: Set[PublicKey], expectedFeatures: Features, expectedSync: Boolean) = {
def sendFeatures(nodeParams: NodeParams, remoteNodeId: PublicKey, expectedFeatures: Features, expectedSync: Boolean) = {
val peer = TestProbe()
val peerConnection = TestProbe()
val nodeParams = Alice.nodeParams.copy(features = features, syncWhitelist = syncWhitelist)
val switchboard = TestActorRef(new TestSwitchboard(nodeParams, remoteNodeId, peer))
switchboard ! PeerConnection.Authenticated(peerConnection.ref, remoteNodeId)
peerConnection.expectMsg(PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, expectedFeatures, doSync = expectedSync))
}
test("sync if no whitelist is defined") {
sendFeatures(randomKey.publicKey, Alice.nodeParams.features, Set.empty, Alice.nodeParams.features, expectedSync = true)
test("sync if no whitelist is defined and peer has channels") {
val nodeParams = Alice.nodeParams.copy(syncWhitelist = Set.empty)
val remoteNodeId = ChannelCodecsSpec.normal.commitments.remoteParams.nodeId
nodeParams.db.channels.addOrUpdateChannel(ChannelCodecsSpec.normal)
sendFeatures(nodeParams, remoteNodeId, nodeParams.features, expectedSync = true)
}
test("sync if no whitelist is defined and peer creates a channel") {
val peer = TestProbe()
val peerConnection = TestProbe()
val nodeParams = Alice.nodeParams.copy(syncWhitelist = Set.empty)
val remoteNodeId = ChannelCodecsSpec.normal.commitments.remoteParams.nodeId
val switchboard = TestActorRef(new TestSwitchboard(nodeParams, remoteNodeId, peer))
// We have a channel with our peer, so we trigger a sync when connecting.
switchboard ! ChannelIdAssigned(TestProbe().ref, remoteNodeId, randomBytes32, randomBytes32)
switchboard ! PeerConnection.Authenticated(peerConnection.ref, remoteNodeId)
peerConnection.expectMsg(PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = true))
// We don't have channels with our peer, so we won't trigger a sync when connecting.
switchboard ! LastChannelClosed(peer.ref, remoteNodeId)
switchboard ! PeerConnection.Authenticated(peerConnection.ref, remoteNodeId)
peerConnection.expectMsg(PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = false))
}
test("don't sync if no whitelist is defined and peer does not have channels") {
val nodeParams = Alice.nodeParams.copy(syncWhitelist = Set.empty)
sendFeatures(nodeParams, randomKey.publicKey, nodeParams.features, expectedSync = false)
}
test("sync if whitelist contains peer") {
val remoteNodeId = randomKey.publicKey
sendFeatures(remoteNodeId, Alice.nodeParams.features, Set(remoteNodeId, randomKey.publicKey, randomKey.publicKey), Alice.nodeParams.features, expectedSync = true)
val nodeParams = Alice.nodeParams.copy(syncWhitelist = Set(remoteNodeId, randomKey.publicKey, randomKey.publicKey))
sendFeatures(nodeParams, remoteNodeId, nodeParams.features, expectedSync = true)
}
test("don't sync if whitelist doesn't contain peer") {
val remoteNodeId = randomKey.publicKey
sendFeatures(remoteNodeId, Alice.nodeParams.features, Set(randomKey.publicKey, randomKey.publicKey, randomKey.publicKey), Alice.nodeParams.features, expectedSync = false)
val nodeParams = Alice.nodeParams.copy(syncWhitelist = Set(randomKey.publicKey, randomKey.publicKey, randomKey.publicKey))
val remoteNodeId = ChannelCodecsSpec.normal.commitments.remoteParams.nodeId
nodeParams.db.channels.addOrUpdateChannel(ChannelCodecsSpec.normal)
sendFeatures(nodeParams, remoteNodeId, nodeParams.features, expectedSync = false)
}
}

View file

@ -16,8 +16,8 @@
package fr.acinq.eclair.router
import akka.actor.{Actor, ActorSystem, Props}
import akka.testkit.{TestFSMRef, TestKit, TestProbe}
import akka.actor.{Actor, Props}
import akka.testkit.{TestFSMRef, TestProbe}
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.{Block, ByteVector32, Satoshi, Script, Transaction, TxIn, TxOut}
import fr.acinq.eclair.TestConstants.{Alice, Bob}
@ -27,20 +27,17 @@ import fr.acinq.eclair.crypto.TransportHandler
import fr.acinq.eclair.io.Peer.PeerRoutingMessage
import fr.acinq.eclair.router.Announcements.{makeChannelUpdate, makeNodeAnnouncement}
import fr.acinq.eclair.router.BaseRouterSpec.channelAnnouncement
import fr.acinq.eclair.router.Router.{Data, GossipDecision, PublicChannel, SendChannelQuery, State}
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.router.Sync._
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.wire._
import org.scalatest.ParallelTestExecution
import org.scalatest.funsuite.AnyFunSuiteLike
import scodec.bits.HexStringSyntax
import scala.collection.immutable.TreeMap
import scala.collection.{SortedSet, mutable}
import scala.compat.Platform
import scala.concurrent.duration._
class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with ParallelTestExecution {
import RoutingSyncSpec._
@ -71,7 +68,7 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle
case class BasicSyncResult(ranges: Int, queries: Int, channels: Int, updates: Int, nodes: Int)
case class SyncResult(ranges: Seq[ReplyChannelRange], queries: Seq[QueryShortChannelIds], channels: Seq[ChannelAnnouncement], updates: Seq[ChannelUpdate], nodes: Seq[NodeAnnouncement]) {
def counts = BasicSyncResult(ranges.size, queries.size, channels.size, updates.size, nodes.size)
def counts: BasicSyncResult = BasicSyncResult(ranges.size, queries.size, channels.size, updates.size, nodes.size)
}
def sync(src: TestFSMRef[State, Data, Router], tgt: TestFSMRef[State, Data, Router], extendedQueryFlags_opt: Option[QueryChannelRangeTlv]): SyncResult = {
@ -85,7 +82,7 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle
}
val srcId = src.underlyingActor.nodeParams.nodeId
val tgtId = tgt.underlyingActor.nodeParams.nodeId
sender.send(src, SendChannelQuery(src.underlyingActor.nodeParams.chainHash, tgtId, pipe.ref, extendedQueryFlags_opt))
sender.send(src, SendChannelQuery(src.underlyingActor.nodeParams.chainHash, tgtId, pipe.ref, replacePrevious = true, extendedQueryFlags_opt))
// src sends a query_channel_range to bob
val qcr = pipe.expectMsgType[QueryChannelRange]
pipe.send(tgt, PeerRoutingMessage(pipe.ref, srcId, qcr))
@ -127,7 +124,7 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle
SyncResult(rcrs, queries, channels, updates, nodes)
}
def countUpdates(channels: Map[ShortChannelId, PublicChannel]) = channels.values.foldLeft(0) {
def countUpdates(channels: Map[ShortChannelId, PublicChannel]): Int = channels.values.foldLeft(0) {
case (count, pc) => count + pc.update_1_opt.map(_ => 1).getOrElse(0) + pc.update_2_opt.map(_ => 1).getOrElse(0)
}
@ -257,11 +254,18 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle
val sender = TestProbe()
sender.ignoreMsg { case _: TransportHandler.ReadAck => true }
val remoteNodeId = TestConstants.Bob.nodeParams.nodeId
assert(!router.stateData.sync.contains(remoteNodeId))
// ask router to send a channel range query
sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, None))
sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, replacePrevious = true, None))
val QueryChannelRange(chainHash, firstBlockNum, numberOfBlocks, _) = sender.expectMsgType[QueryChannelRange]
sender.expectMsgType[GossipTimestampFilter]
assert(router.stateData.sync.get(remoteNodeId) === Some(Syncing(Nil, 0)))
// ask router to send another channel range query
sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, replacePrevious = false, None))
sender.expectNoMsg(100 millis) // it's a duplicate and should be ignored
assert(router.stateData.sync.get(remoteNodeId) === Some(Syncing(Nil, 0)))
val block1 = ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, 1, EncodedShortChannelIds(EncodingType.UNCOMPRESSED, fakeRoutingInfo.take(params.routerConf.channelQueryChunkSize).keys.toList), None, None)
@ -272,12 +276,32 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle
assert(peerConnection.expectMsgType[QueryShortChannelIds] === QueryShortChannelIds(chainHash, block1.shortChannelIds, TlvStream.empty))
// router should think that it is missing 100 channels, in one request
val Some(sync) = router.stateData.sync.get(remoteNodeId)
assert(sync.total == 1)
assert(sync.remainingQueries.isEmpty) // the request was sent already
assert(sync.totalQueries == 1)
// simulate a re-connection
sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, None))
sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, replacePrevious = true, None))
sender.expectMsgType[QueryChannelRange]
sender.expectMsgType[GossipTimestampFilter]
assert(router.stateData.sync.get(remoteNodeId) === Some(Syncing(Nil, 0)))
}
test("reject unsolicited sync") {
val params = TestConstants.Alice.nodeParams
val router = TestFSMRef(new Router(params, TestProbe().ref))
val peerConnection = TestProbe()
peerConnection.ignoreMsg { case _: TransportHandler.ReadAck => true }
val sender = TestProbe()
sender.ignoreMsg { case _: TransportHandler.ReadAck => true }
val remoteNodeId = TestConstants.Bob.nodeParams.nodeId
assert(!router.stateData.sync.contains(remoteNodeId))
// we didn't send a corresponding query_channel_range, but peer sends us a reply_channel_range
val unsolicitedBlocks = ReplyChannelRange(params.chainHash, 10, 5, 1, EncodedShortChannelIds(EncodingType.UNCOMPRESSED, fakeRoutingInfo.take(5).keys.toList), None, None)
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, unsolicitedBlocks))
// it will be simply ignored
peerConnection.expectNoMsg(100 millis)
assert(!router.stateData.sync.contains(remoteNodeId))
}
@ -285,19 +309,19 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle
def req = QueryShortChannelIds(Block.RegtestGenesisBlock.hash, EncodedShortChannelIds(EncodingType.UNCOMPRESSED, List(ShortChannelId(42))), TlvStream.empty)
val nodeidA = randomKey.publicKey
val nodeidB = randomKey.publicKey
val nodeIdA = randomKey.publicKey
val nodeIdB = randomKey.publicKey
val (sync1, _) = addToSync(Map.empty, nodeidA, List(req, req, req, req))
val sync1 = Map(nodeIdA -> Syncing(List(req, req, req), 4))
assert(syncProgress(sync1) == SyncProgress(0.25D))
val (sync2, _) = addToSync(sync1, nodeidB, List(req, req, req, req, req, req, req, req, req, req, req, req))
val sync2 = sync1.updated(nodeIdB, Syncing(List(req, req, req, req, req, req, req, req, req, req, req), 12))
assert(syncProgress(sync2) == SyncProgress(0.125D))
// let's assume we made some progress
val sync3 = sync2
.updated(nodeidA, sync2(nodeidA).copy(pending = List(req)))
.updated(nodeidB, sync2(nodeidB).copy(pending = List(req)))
.updated(nodeIdA, sync2(nodeIdA).copy(remainingQueries = List(req)))
.updated(nodeIdB, sync2(nodeIdB).copy(remainingQueries = List(req)))
assert(syncProgress(sync3) == SyncProgress(0.875D))
}
}