diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala index e45e3915c..c60e2f23d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala @@ -25,6 +25,7 @@ import fr.acinq.bitcoin.Crypto.PublicKey import fr.acinq.bitcoin.{ByteVector32, DeterministicWallet, Satoshi, SatoshiLong, Script} import fr.acinq.eclair.Features.Wumbo import fr.acinq.eclair.Logs.LogCategory +import fr.acinq.eclair._ import fr.acinq.eclair.blockchain.EclairWallet import fr.acinq.eclair.blockchain.fee.FeeratePerKw import fr.acinq.eclair.channel._ @@ -32,7 +33,6 @@ import fr.acinq.eclair.io.Monitoring.Metrics import fr.acinq.eclair.io.PeerConnection.KillReason import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes import fr.acinq.eclair.wire._ -import fr.acinq.eclair.{wire, _} import scodec.bits.ByteVector import java.net.InetSocketAddress @@ -164,6 +164,10 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, watcher: ActorRe case Event(ChannelIdAssigned(channel, _, temporaryChannelId, channelId), d: ConnectedData) if d.channels.contains(TemporaryChannelId(temporaryChannelId)) => log.info(s"channel id switch: previousId=$temporaryChannelId nextId=$channelId") + // we have our first channel with that peer: let's sync our routing table + if (!d.channels.keys.exists(_.isInstanceOf[FinalChannelId])) { + d.peerConnection ! PeerConnection.DoSync(replacePrevious = false) + } // NB: we keep the temporary channel id because the switch is not always acknowledged at this point (see https://github.com/lightningnetwork/lightning-rfc/pull/151) // we won't clean it up, but we won't remember the temporary id on channel termination stay using d.copy(channels = d.channels + (FinalChannelId(channelId) -> channel)) @@ -192,6 +196,7 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, watcher: ActorRe log.info(s"channel closed: channelId=${channelIds.mkString("/")}") if (d.channels.values.toSet - actor == Set.empty) { log.info(s"that was the last open channel, closing the connection") + context.system.eventStream.publish(LastChannelClosed(self, remoteNodeId)) d.peerConnection ! PeerConnection.Kill(KillReason.NoRemainingChannel) } stay using d.copy(channels = d.channels -- channelIds) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala index d35d316eb..9b25200aa 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala @@ -16,8 +16,6 @@ package fr.acinq.eclair.io -import java.net.InetSocketAddress - import akka.actor.{ActorRef, FSM, OneForOneStrategy, PoisonPill, Props, SupervisorStrategy, Terminated} import akka.event.Logging.MDC import fr.acinq.bitcoin.ByteVector32 @@ -34,6 +32,7 @@ import fr.acinq.eclair.{wire, _} import scodec.Attempt import scodec.bits.ByteVector +import java.net.InetSocketAddress import scala.concurrent.duration._ import scala.util.Random @@ -136,29 +135,12 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A } else { Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Initialized).increment() d.peer ! ConnectionReady(self, d.remoteNodeId, d.pendingAuth.address, d.pendingAuth.outgoing, d.localInit, remoteInit) - d.pendingAuth.origin_opt.foreach(_ ! ConnectionResult.Connected) - def localHasFeature(f: Feature): Boolean = d.localInit.features.hasFeature(f) - - def remoteHasFeature(f: Feature): Boolean = remoteInit.features.hasFeature(f) - - val canUseChannelRangeQueries = localHasFeature(Features.ChannelRangeQueries) && remoteHasFeature(Features.ChannelRangeQueries) - val canUseChannelRangeQueriesEx = localHasFeature(Features.ChannelRangeQueriesExtended) && remoteHasFeature(Features.ChannelRangeQueriesExtended) - if (canUseChannelRangeQueries || canUseChannelRangeQueriesEx) { - // if they support channel queries we don't send routing info yet, if they want it they will query us - // we will query them, using extended queries if supported - val flags_opt = if (canUseChannelRangeQueriesEx) Some(QueryChannelRangeTlv.QueryFlags(QueryChannelRangeTlv.QueryFlags.WANT_ALL)) else None - if (d.doSync) { - log.info(s"sending sync channel range query with flags_opt=$flags_opt") - router ! SendChannelQuery(d.chainHash, d.remoteNodeId, self, flags_opt = flags_opt) - } else { - log.info("not syncing with this peer") - } - } else if (remoteHasFeature(Features.InitialRoutingSync)) { - // "old" nodes, do as before - log.info("peer requested a full routing table dump") - router ! GetRoutingState + if (d.doSync) { + self ! DoSync(replacePrevious = true) + } else { + log.info("not syncing with this peer") } // we will delay all rebroadcasts with this value in order to prevent herd effects (each peer has a different delay) @@ -358,6 +340,20 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A } stay using d.copy(behavior = behavior1) + case Event(DoSync(replacePrevious), d: ConnectedData) => + val canUseChannelRangeQueries = Features.canUseFeature(d.localInit.features, d.remoteInit.features, Features.ChannelRangeQueries) + val canUseChannelRangeQueriesEx = Features.canUseFeature(d.localInit.features, d.remoteInit.features, Features.ChannelRangeQueriesExtended) + if (canUseChannelRangeQueries || canUseChannelRangeQueriesEx) { + val flags_opt = if (canUseChannelRangeQueriesEx) Some(QueryChannelRangeTlv.QueryFlags(QueryChannelRangeTlv.QueryFlags.WANT_ALL)) else None + log.info(s"sending sync channel range query with flags_opt=$flags_opt replacePrevious=$replacePrevious") + router ! SendChannelQuery(d.chainHash, d.remoteNodeId, self, replacePrevious, flags_opt) + } else if (d.remoteInit.features.hasFeature(Features.InitialRoutingSync) && replacePrevious) { + // For "old" nodes that don't support channel queries, we send them the full routing table + log.info("peer requested a full routing table dump") + router ! GetRoutingState + } + stay + case Event(ResumeAnnouncements, d: ConnectedData) => log.info(s"resuming processing of network announcements for peer") stay using d.copy(behavior = d.behavior.copy(fundingTxAlreadySpentCount = 0, ignoreNetworkAnnouncement = false)) @@ -472,6 +468,7 @@ object PeerConnection { case object InitTimeout case object SendPing case object ResumeAnnouncements + case class DoSync(replacePrevious: Boolean) // @formatter:on val IGNORE_NETWORK_ANNOUNCEMENTS_PERIOD: FiniteDuration = 5 minutes diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerEvents.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerEvents.scala index 8c3af51d5..39226c90e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerEvents.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerEvents.scala @@ -16,12 +16,12 @@ package fr.acinq.eclair.io -import java.net.InetSocketAddress - import akka.actor.ActorRef import fr.acinq.bitcoin.Crypto.PublicKey -import fr.acinq.eclair.wire.UnknownMessage import fr.acinq.eclair.wire +import fr.acinq.eclair.wire.UnknownMessage + +import java.net.InetSocketAddress sealed trait PeerEvent @@ -31,4 +31,6 @@ case class PeerConnected(peer: ActorRef, nodeId: PublicKey, connectionInfo: Conn case class PeerDisconnected(peer: ActorRef, nodeId: PublicKey) extends PeerEvent +case class LastChannelClosed(peer: ActorRef, nodeId: PublicKey) extends PeerEvent + case class UnknownMessageReceived(peer: ActorRef, nodeId: PublicKey, message: UnknownMessage, connectionInfo: ConnectionInfo) extends PeerEvent diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala index d00675668..7d5744e02 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala @@ -33,8 +33,11 @@ class Switchboard(nodeParams: NodeParams, watcher: ActorRef, relayer: ActorRef, import Switchboard._ + context.system.eventStream.subscribe(self, classOf[ChannelIdAssigned]) + context.system.eventStream.subscribe(self, classOf[LastChannelClosed]) + // we load channels from database - { + private def initialPeersWithChannels: Set[PublicKey] = { // Check if channels that are still in CLOSING state have actually been closed. This can happen when the app is stopped // just after a channel state has transitioned to CLOSED and before it has effectively been removed. // Closed channels will be removed, other channels will be restored. @@ -44,12 +47,14 @@ class Switchboard(nodeParams: NodeParams, watcher: ActorRef, relayer: ActorRef, nodeParams.db.channels.removeChannel(c.channelId) }) - channels - .groupBy(_.commitments.remoteParams.nodeId) - .map { case (remoteNodeId, states) => createOrGetPeer(remoteNodeId, offlineChannels = states.toSet) } + val peerChannels = channels.groupBy(_.commitments.remoteParams.nodeId) + peerChannels.foreach { case (remoteNodeId, states) => createOrGetPeer(remoteNodeId, offlineChannels = states.toSet) } + peerChannels.keySet } - def receive: Receive = { + def receive: Receive = normal(initialPeersWithChannels) + + def normal(peersWithChannels: Set[PublicKey]): Receive = { case Peer.Connect(publicKey, _) if publicKey == nodeParams.nodeId => sender ! Status.Failure(new RuntimeException("cannot open connection with oneself")) @@ -75,9 +80,14 @@ class Switchboard(nodeParams: NodeParams, watcher: ActorRef, relayer: ActorRef, // if this is an incoming connection, we might not yet have created the peer val peer = createOrGetPeer(authenticated.remoteNodeId, offlineChannels = Set.empty) val features = nodeParams.featuresFor(authenticated.remoteNodeId) - val doSync = nodeParams.syncWhitelist.isEmpty || nodeParams.syncWhitelist.contains(authenticated.remoteNodeId) + // if the peer is whitelisted, we sync with them, otherwise we only sync with peers with whom we have at least one channel + val doSync = nodeParams.syncWhitelist.contains(authenticated.remoteNodeId) || (nodeParams.syncWhitelist.isEmpty && peersWithChannels.contains(authenticated.remoteNodeId)) authenticated.peerConnection ! PeerConnection.InitializeConnection(peer, nodeParams.chainHash, features, doSync) + case ChannelIdAssigned(_, remoteNodeId, _, _) => context.become(normal(peersWithChannels + remoteNodeId)) + + case LastChannelClosed(_, remoteNodeId) => context.become(normal(peersWithChannels - remoteNodeId)) + case Symbol("peers") => sender ! context.children case GetRouterPeerConf => sender ! RouterPeerConf(nodeParams.routerConf, nodeParams.peerConnectionConf) @@ -119,6 +129,7 @@ object Switchboard { def peerActorName(remoteNodeId: PublicKey): String = s"peer-$remoteNodeId" case object GetRouterPeerConf extends RemoteTypes + case class RouterPeerConf(routerConf: RouterConf, peerConf: PeerConnection.Conf) extends RemoteTypes } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala index 5fae8c217..74ab3ece0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala @@ -128,6 +128,7 @@ object EclairInternalsSerializer { ("chainsHash" | bytes32) :: ("remoteNodeId" | publicKey) :: ("to" | actorRefCodec(system)) :: + ("replacePrevious" | bool(8)) :: ("flags_opt" | optionQueryChannelRangeTlv)).as[SendChannelQuery] def peerRoutingMessageCodec(system: ExtendedActorSystem): Codec[PeerRoutingMessage] = ( diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 97a9f8948..13c6f50c9 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -146,8 +146,9 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[ log.info(s"subscribing listener=$listener to network events") context.system.eventStream.subscribe(listener, classOf[NetworkEvent]) context.watch(listener) + override def receive: Receive = { - case Terminated(actor) if actor == listener=> + case Terminated(actor) if actor == listener => log.warning(s"unsubscribing listener=$listener to network events") context.system.eventStream.unsubscribe(listener) context stop self @@ -519,7 +520,7 @@ object Router { // @formatter:on // @formatter:off - case class SendChannelQuery(chainHash: ByteVector32, remoteNodeId: PublicKey, to: ActorRef, flags_opt: Option[QueryChannelRangeTlv]) extends RemoteTypes + case class SendChannelQuery(chainHash: ByteVector32, remoteNodeId: PublicKey, to: ActorRef, replacePrevious: Boolean, flags_opt: Option[QueryChannelRangeTlv]) extends RemoteTypes case object GetNetworkStats case class GetNetworkStatsResponse(stats: Option[NetworkStats]) case object GetRoutingState @@ -566,7 +567,13 @@ object Router { case class ShortChannelIdAndFlag(shortChannelId: ShortChannelId, flag: Long) - case class Syncing(pending: List[RoutingMessage], total: Int) + /** + * @param remainingQueries remaining queries to send, the next one will be popped after we receive a [[ReplyShortChannelIdsEnd]] + * @param totalQueries total number of *queries* (not channels) that will be sent during this syncing session + */ + case class Syncing(remainingQueries: List[RoutingMessage], totalQueries: Int) { + def started: Boolean = totalQueries > 0 + } case class Data(nodes: Map[PublicKey, NodeAnnouncement], channels: SortedMap[ShortChannelId, PublicChannel], diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala index 1e7011159..725ecf74f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala @@ -45,23 +45,29 @@ object Sync { def handleSendChannelQuery(d: Data, s: SendChannelQuery)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors - // ask for everything - // we currently send only one query_channel_range message per peer, when we just (re)connected to it, so we don't - // have to worry about sending a new query_channel_range when another query is still in progress - val query = QueryChannelRange(s.chainHash, firstBlockNum = 0L, numberOfBlocks = Int.MaxValue.toLong, TlvStream(s.flags_opt.toList)) - log.info("sending query_channel_range={}", query) - s.to ! query + // we currently send query_channel_range when: + // * we just (re)connected to a peer with whom we have channels + // * we validate our first channel with a peer + // we must ensure we don't send a new query_channel_range while another query is still in progress + if (s.replacePrevious || !d.sync.contains(s.remoteNodeId)) { + // ask for everything + val query = QueryChannelRange(s.chainHash, firstBlockNum = 0L, numberOfBlocks = Int.MaxValue.toLong, TlvStream(s.flags_opt.toList)) + log.info("sending query_channel_range={}", query) + s.to ! query - // we also set a pass-all filter for now (we can update it later) for the future gossip messages, by setting - // the first_timestamp field to the current date/time and timestamp_range to the maximum value - // NB: we can't just set firstTimestamp to 0, because in that case peer would send us all past messages matching - // that (i.e. the whole routing table) - val filter = GossipTimestampFilter(s.chainHash, firstTimestamp = System.currentTimeMillis.milliseconds.toSeconds, timestampRange = Int.MaxValue) - s.to ! filter + // we also set a pass-all filter for now (we can update it later) for the future gossip messages, by setting + // the first_timestamp field to the current date/time and timestamp_range to the maximum value + // NB: we can't just set firstTimestamp to 0, because in that case peer would send us all past messages matching + // that (i.e. the whole routing table) + val filter = GossipTimestampFilter(s.chainHash, firstTimestamp = System.currentTimeMillis.milliseconds.toSeconds, timestampRange = Int.MaxValue) + s.to ! filter - // clean our sync state for this peer: we receive a SendChannelQuery just when we connect/reconnect to a peer and - // will start a new complete sync process - d.copy(sync = d.sync - s.remoteNodeId) + // reset our sync state for this peer: we create an entry to ensure we reject duplicate queries and unsolicited reply_channel_range + d.copy(sync = d.sync + (s.remoteNodeId -> Syncing(Nil, 0))) + } else { + log.info("not sending query_channel_range: sync already in progress") + d + } } def handleQueryChannelRange(channels: SortedMap[ShortChannelId, PublicChannel], routerConf: RouterConf, origin: RemoteGossip, q: QueryChannelRange)(implicit ctx: ActorContext, log: LoggingAdapter): Unit = { @@ -86,59 +92,70 @@ object Sync { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors ctx.sender ! TransportHandler.ReadAck(r) - Metrics.ReplyChannelRange.Blocks.withTag(Tags.Direction, Tags.Directions.Incoming).record(r.numberOfBlocks) - Metrics.ReplyChannelRange.ShortChannelIds.withTag(Tags.Direction, Tags.Directions.Incoming).record(r.shortChannelIds.array.size) + d.sync.get(origin.nodeId) match { + case None => + log.info("received unsolicited reply_channel_range with {} channels", r.shortChannelIds.array.size) + d // we didn't request a sync from this node, ignore + case Some(currentSync) if currentSync.remainingQueries.isEmpty && r.shortChannelIds.array.isEmpty => + // NB: this case deals with peers who don't return any sync data. We're currently not correctly detecting the end + // of a stream of reply_channel_range, but it's not an issue in practice (we instead rely on the remaining query_short_channel_ids). + // We should fix that once https://github.com/lightningnetwork/lightning-rfc/pull/826 is deployed. + log.info("received empty reply_channel_range, sync is complete") + d.copy(sync = d.sync - origin.nodeId) + case Some(currentSync) => + Metrics.ReplyChannelRange.Blocks.withTag(Tags.Direction, Tags.Directions.Incoming).record(r.numberOfBlocks) + Metrics.ReplyChannelRange.ShortChannelIds.withTag(Tags.Direction, Tags.Directions.Incoming).record(r.shortChannelIds.array.size) - @tailrec - def loop(ids: List[ShortChannelId], timestamps: List[ReplyChannelRangeTlv.Timestamps], checksums: List[ReplyChannelRangeTlv.Checksums], acc: List[ShortChannelIdAndFlag] = List.empty[ShortChannelIdAndFlag]): List[ShortChannelIdAndFlag] = { - ids match { - case Nil => acc.reverse - case head :: tail => - val flag = computeFlag(d.channels)(head, timestamps.headOption, checksums.headOption, routerConf.requestNodeAnnouncements) - // 0 means nothing to query, just don't include it - val acc1 = if (flag != 0) ShortChannelIdAndFlag(head, flag) :: acc else acc - loop(tail, timestamps.drop(1), checksums.drop(1), acc1) - } + @tailrec + def loop(ids: List[ShortChannelId], timestamps: List[ReplyChannelRangeTlv.Timestamps], checksums: List[ReplyChannelRangeTlv.Checksums], acc: List[ShortChannelIdAndFlag] = List.empty[ShortChannelIdAndFlag]): List[ShortChannelIdAndFlag] = { + ids match { + case Nil => acc.reverse + case head :: tail => + val flag = computeFlag(d.channels)(head, timestamps.headOption, checksums.headOption, routerConf.requestNodeAnnouncements) + // 0 means nothing to query, just don't include it + val acc1 = if (flag != 0) ShortChannelIdAndFlag(head, flag) :: acc else acc + loop(tail, timestamps.drop(1), checksums.drop(1), acc1) + } + } + + val timestamps_opt = r.timestamps_opt.map(_.timestamps).getOrElse(List.empty[ReplyChannelRangeTlv.Timestamps]) + val checksums_opt = r.checksums_opt.map(_.checksums).getOrElse(List.empty[ReplyChannelRangeTlv.Checksums]) + val shortChannelIdAndFlags = loop(r.shortChannelIds.array, timestamps_opt, checksums_opt) + val (channelCount, updatesCount) = shortChannelIdAndFlags.foldLeft((0, 0)) { + case ((c, u), ShortChannelIdAndFlag(_, flag)) => + val c1 = c + (if (QueryShortChannelIdsTlv.QueryFlagType.includeChannelAnnouncement(flag)) 1 else 0) + val u1 = u + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate1(flag)) 1 else 0) + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate2(flag)) 1 else 0) + (c1, u1) + } + log.info(s"received reply_channel_range with {} channels, we're missing {} channel announcements and {} updates, format={}", r.shortChannelIds.array.size, channelCount, updatesCount, r.shortChannelIds.encoding) + Metrics.ReplyChannelRange.NewChannelAnnouncements.withoutTags().record(channelCount) + Metrics.ReplyChannelRange.NewChannelUpdates.withoutTags().record(updatesCount) + + def buildQuery(chunk: List[ShortChannelIdAndFlag]): QueryShortChannelIds = { + // always encode empty lists as UNCOMPRESSED + val encoding = if (chunk.isEmpty) EncodingType.UNCOMPRESSED else r.shortChannelIds.encoding + val flags: TlvStream[QueryShortChannelIdsTlv] = if (r.timestamps_opt.isDefined || r.checksums_opt.isDefined) { + TlvStream(QueryShortChannelIdsTlv.EncodedQueryFlags(encoding, chunk.map(_.flag))) + } else { + TlvStream.empty + } + QueryShortChannelIds(r.chainHash, EncodedShortChannelIds(encoding, chunk.map(_.shortChannelId)), flags) + } + + // we update our sync data to this node (there may be multiple channel range responses and we can only query one set of ids at a time) + val replies = shortChannelIdAndFlags + .grouped(routerConf.channelQueryChunkSize) + .map(buildQuery) + .toList + + val (sync1, replynow_opt) = addToSync(d.sync, currentSync, origin.nodeId, replies) + // we only send a reply right away if there were no pending requests + replynow_opt.foreach(origin.peerConnection ! _) + val progress = syncProgress(sync1) + ctx.system.eventStream.publish(progress) + ctx.self ! progress + d.copy(sync = sync1) } - - val timestamps_opt = r.timestamps_opt.map(_.timestamps).getOrElse(List.empty[ReplyChannelRangeTlv.Timestamps]) - val checksums_opt = r.checksums_opt.map(_.checksums).getOrElse(List.empty[ReplyChannelRangeTlv.Checksums]) - val shortChannelIdAndFlags = loop(r.shortChannelIds.array, timestamps_opt, checksums_opt) - val (channelCount, updatesCount) = shortChannelIdAndFlags.foldLeft((0, 0)) { - case ((c, u), ShortChannelIdAndFlag(_, flag)) => - val c1 = c + (if (QueryShortChannelIdsTlv.QueryFlagType.includeChannelAnnouncement(flag)) 1 else 0) - val u1 = u + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate1(flag)) 1 else 0) + (if (QueryShortChannelIdsTlv.QueryFlagType.includeUpdate2(flag)) 1 else 0) - (c1, u1) - } - log.info(s"received reply_channel_range with {} channels, we're missing {} channel announcements and {} updates, format={}", r.shortChannelIds.array.size, channelCount, updatesCount, r.shortChannelIds.encoding) - Metrics.ReplyChannelRange.NewChannelAnnouncements.withoutTags().record(channelCount) - Metrics.ReplyChannelRange.NewChannelUpdates.withoutTags().record(updatesCount) - - def buildQuery(chunk: List[ShortChannelIdAndFlag]): QueryShortChannelIds = { - // always encode empty lists as UNCOMPRESSED - val encoding = if (chunk.isEmpty) EncodingType.UNCOMPRESSED else r.shortChannelIds.encoding - QueryShortChannelIds(r.chainHash, - shortChannelIds = EncodedShortChannelIds(encoding, chunk.map(_.shortChannelId)), - if (r.timestamps_opt.isDefined || r.checksums_opt.isDefined) - TlvStream(QueryShortChannelIdsTlv.EncodedQueryFlags(encoding, chunk.map(_.flag))) - else - TlvStream.empty - ) - } - - // we update our sync data to this node (there may be multiple channel range responses and we can only query one set of ids at a time) - val replies = shortChannelIdAndFlags - .grouped(routerConf.channelQueryChunkSize) - .map(buildQuery) - .toList - - val (sync1, replynow_opt) = addToSync(d.sync, origin.nodeId, replies) - // we only send a reply right away if there were no pending requests - replynow_opt.foreach(origin.peerConnection ! _) - val progress = syncProgress(sync1) - ctx.system.eventStream.publish(progress) - ctx.self ! progress - d.copy(sync = sync1) } def handleQueryShortChannelIds(nodes: Map[PublicKey, NodeAnnouncement], channels: SortedMap[ShortChannelId, PublicChannel], origin: RemoteGossip, q: QueryShortChannelIds)(implicit ctx: ActorContext, log: LoggingAdapter): Unit = { @@ -176,18 +193,18 @@ object Sync { def handleReplyShortChannelIdsEnd(d: Data, origin: RemoteGossip, r: ReplyShortChannelIdsEnd)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors ctx.sender ! TransportHandler.ReadAck(r) - // have we more channels to ask this peer? + // do we have more channels to request from this peer? val sync1 = d.sync.get(origin.nodeId) match { case Some(sync) => - sync.pending match { + sync.remainingQueries match { case nextRequest +: rest => - log.info(s"asking for the next slice of short_channel_ids (remaining=${sync.pending.size}/${sync.total})") + log.info(s"asking for the next slice of short_channel_ids (remaining=${sync.remainingQueries.size}/${sync.totalQueries})") origin.peerConnection ! nextRequest - d.sync + (origin.nodeId -> sync.copy(pending = rest)) + d.sync + (origin.nodeId -> sync.copy(remainingQueries = rest)) case Nil => // we received reply_short_channel_ids_end for our last query and have not sent another one, we can now remove // the remote peer from our map - log.info(s"sync complete (total=${sync.total})") + log.info(s"sync complete (total=${sync.totalQueries})") d.sync - origin.nodeId } case _ => d.sync @@ -343,7 +360,7 @@ object Sync { def syncProgress(sync: Map[PublicKey, Syncing]): SyncProgress = { // NB: progress is in terms of requests, not individual channels val (pending, total) = sync.foldLeft((0, 0)) { - case ((p, t), (_, sync)) => (p + sync.pending.size, t + sync.total) + case ((p, t), (_, sync)) => (p + sync.remainingQueries.size, t + sync.totalQueries) } if (total == 0) { SyncProgress(1) @@ -481,18 +498,17 @@ object Sync { checksums = checksums) } - def addToSync(syncMap: Map[PublicKey, Syncing], remoteNodeId: PublicKey, pending: List[RoutingMessage]): (Map[PublicKey, Syncing], Option[RoutingMessage]) = { + def addToSync(syncMap: Map[PublicKey, Syncing], current: Syncing, remoteNodeId: PublicKey, pending: List[QueryShortChannelIds]): (Map[PublicKey, Syncing], Option[QueryShortChannelIds]) = { pending match { case head +: rest => // they may send back several reply_channel_range messages for a single query_channel_range query, and we must not // send another query_short_channel_ids query if they're still processing one - syncMap.get(remoteNodeId) match { - case None => - // we don't have a pending query with this peer, let's send it - (syncMap + (remoteNodeId -> Syncing(rest, pending.size)), Some(head)) - case Some(sync) => - // we already have a pending query with this peer, add missing ids to our "sync" state - (syncMap + (remoteNodeId -> Syncing(sync.pending ++ pending, sync.total + pending.size)), None) + if (current.started) { + // we already have a pending query with this peer, add missing ids to our "sync" state + (syncMap + (remoteNodeId -> Syncing(current.remainingQueries ++ pending, current.totalQueries + pending.size)), None) + } else { + // we don't have a pending query with this peer, let's send it + (syncMap + (remoteNodeId -> Syncing(rest, pending.size)), Some(head)) } case Nil => // there is nothing to send diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala index 6884638d4..38a67f216 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala @@ -377,14 +377,14 @@ object Validation { case RemoteGossip(peerConnection, remoteNodeId) => val query = QueryShortChannelIds(u.chainHash, EncodedShortChannelIds(routerConf.encodingType, List(u.shortChannelId)), TlvStream.empty) d.sync.get(remoteNodeId) match { - case Some(sync) => + case Some(sync) if sync.started => // we already have a pending request to that node, let's add this channel to the list and we'll get it later // TODO: we only request channels with old style channel_query - d.copy(sync = d.sync + (remoteNodeId -> sync.copy(pending = sync.pending :+ query, total = sync.total + 1))) - case None => - // we send the query right away + d.copy(sync = d.sync + (remoteNodeId -> sync.copy(remainingQueries = sync.remainingQueries :+ query, totalQueries = sync.totalQueries + 1))) + case _ => + // otherwise we send the query right away peerConnection ! query - d.copy(sync = d.sync + (remoteNodeId -> Syncing(pending = Nil, total = 1))) + d.copy(sync = d.sync + (remoteNodeId -> Syncing(remainingQueries = Nil, totalQueries = 1))) } case _ => // we don't know which node this update came from (maybe it was stashed and the channel got pruned in the meantime or some other corner case). diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/LightningMessageTypes.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/LightningMessageTypes.scala index 51d5560b3..13cd66bdc 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/LightningMessageTypes.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/LightningMessageTypes.scala @@ -241,8 +241,7 @@ object EncodingType { } // @formatter:on -case class EncodedShortChannelIds(encoding: EncodingType, - array: List[ShortChannelId]) +case class EncodedShortChannelIds(encoding: EncodingType, array: List[ShortChannelId]) case class QueryShortChannelIds(chainHash: ByteVector32, shortChannelIds: EncodedShortChannelIds, @@ -250,9 +249,7 @@ case class QueryShortChannelIds(chainHash: ByteVector32, val queryFlags_opt: Option[QueryShortChannelIdsTlv.EncodedQueryFlags] = tlvStream.get[QueryShortChannelIdsTlv.EncodedQueryFlags] } -case class ReplyShortChannelIdsEnd(chainHash: ByteVector32, - complete: Byte) extends RoutingMessage with HasChainHash - +case class ReplyShortChannelIdsEnd(chainHash: ByteVector32, complete: Byte) extends RoutingMessage with HasChainHash case class QueryChannelRange(chainHash: ByteVector32, firstBlockNum: Long, @@ -268,7 +265,6 @@ case class ReplyChannelRange(chainHash: ByteVector32, shortChannelIds: EncodedShortChannelIds, tlvStream: TlvStream[ReplyChannelRangeTlv] = TlvStream.empty) extends RoutingMessage { val timestamps_opt: Option[ReplyChannelRangeTlv.EncodedTimestamps] = tlvStream.get[ReplyChannelRangeTlv.EncodedTimestamps] - val checksums_opt: Option[ReplyChannelRangeTlv.EncodedChecksums] = tlvStream.get[ReplyChannelRangeTlv.EncodedChecksums] } @@ -279,17 +275,14 @@ object ReplyChannelRange { complete: Byte, shortChannelIds: EncodedShortChannelIds, timestamps: Option[ReplyChannelRangeTlv.EncodedTimestamps], - checksums: Option[ReplyChannelRangeTlv.EncodedChecksums]) = { + checksums: Option[ReplyChannelRangeTlv.EncodedChecksums]): ReplyChannelRange = { timestamps.foreach(ts => require(ts.timestamps.length == shortChannelIds.array.length)) checksums.foreach(cs => require(cs.checksums.length == shortChannelIds.array.length)) new ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, complete, shortChannelIds, TlvStream(timestamps.toList ::: checksums.toList)) } } - -case class GossipTimestampFilter(chainHash: ByteVector32, - firstTimestamp: Long, - timestampRange: Long) extends RoutingMessage with HasChainHash +case class GossipTimestampFilter(chainHash: ByteVector32, firstTimestamp: Long, timestampRange: Long) extends RoutingMessage with HasChainHash // NB: blank lines to minimize merge conflicts diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/SwitchboardSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/SwitchboardSpec.scala index fa01a5d61..f30be1e52 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/SwitchboardSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/SwitchboardSpec.scala @@ -6,8 +6,9 @@ import fr.acinq.bitcoin.ByteVector64 import fr.acinq.bitcoin.Crypto.PublicKey import fr.acinq.eclair.TestConstants._ import fr.acinq.eclair.blockchain.TestWallet +import fr.acinq.eclair.channel.ChannelIdAssigned import fr.acinq.eclair.wire._ -import fr.acinq.eclair.{Features, NodeParams, TestKitBaseClass, randomKey} +import fr.acinq.eclair.{Features, NodeParams, TestKitBaseClass, randomBytes32, randomKey} import org.scalatest.funsuite.AnyFunSuiteLike import scodec.bits._ @@ -44,27 +45,55 @@ class SwitchboardSpec extends TestKitBaseClass with AnyFunSuiteLike { peer.expectMsg(Peer.Connect(remoteNodeId, None)) } - def sendFeatures(remoteNodeId: PublicKey, features: Features, syncWhitelist: Set[PublicKey], expectedFeatures: Features, expectedSync: Boolean) = { + def sendFeatures(nodeParams: NodeParams, remoteNodeId: PublicKey, expectedFeatures: Features, expectedSync: Boolean) = { val peer = TestProbe() val peerConnection = TestProbe() - val nodeParams = Alice.nodeParams.copy(features = features, syncWhitelist = syncWhitelist) val switchboard = TestActorRef(new TestSwitchboard(nodeParams, remoteNodeId, peer)) switchboard ! PeerConnection.Authenticated(peerConnection.ref, remoteNodeId) peerConnection.expectMsg(PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, expectedFeatures, doSync = expectedSync)) } - test("sync if no whitelist is defined") { - sendFeatures(randomKey.publicKey, Alice.nodeParams.features, Set.empty, Alice.nodeParams.features, expectedSync = true) + test("sync if no whitelist is defined and peer has channels") { + val nodeParams = Alice.nodeParams.copy(syncWhitelist = Set.empty) + val remoteNodeId = ChannelCodecsSpec.normal.commitments.remoteParams.nodeId + nodeParams.db.channels.addOrUpdateChannel(ChannelCodecsSpec.normal) + sendFeatures(nodeParams, remoteNodeId, nodeParams.features, expectedSync = true) + } + + test("sync if no whitelist is defined and peer creates a channel") { + val peer = TestProbe() + val peerConnection = TestProbe() + val nodeParams = Alice.nodeParams.copy(syncWhitelist = Set.empty) + val remoteNodeId = ChannelCodecsSpec.normal.commitments.remoteParams.nodeId + val switchboard = TestActorRef(new TestSwitchboard(nodeParams, remoteNodeId, peer)) + + // We have a channel with our peer, so we trigger a sync when connecting. + switchboard ! ChannelIdAssigned(TestProbe().ref, remoteNodeId, randomBytes32, randomBytes32) + switchboard ! PeerConnection.Authenticated(peerConnection.ref, remoteNodeId) + peerConnection.expectMsg(PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = true)) + + // We don't have channels with our peer, so we won't trigger a sync when connecting. + switchboard ! LastChannelClosed(peer.ref, remoteNodeId) + switchboard ! PeerConnection.Authenticated(peerConnection.ref, remoteNodeId) + peerConnection.expectMsg(PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = false)) + } + + test("don't sync if no whitelist is defined and peer does not have channels") { + val nodeParams = Alice.nodeParams.copy(syncWhitelist = Set.empty) + sendFeatures(nodeParams, randomKey.publicKey, nodeParams.features, expectedSync = false) } test("sync if whitelist contains peer") { val remoteNodeId = randomKey.publicKey - sendFeatures(remoteNodeId, Alice.nodeParams.features, Set(remoteNodeId, randomKey.publicKey, randomKey.publicKey), Alice.nodeParams.features, expectedSync = true) + val nodeParams = Alice.nodeParams.copy(syncWhitelist = Set(remoteNodeId, randomKey.publicKey, randomKey.publicKey)) + sendFeatures(nodeParams, remoteNodeId, nodeParams.features, expectedSync = true) } test("don't sync if whitelist doesn't contain peer") { - val remoteNodeId = randomKey.publicKey - sendFeatures(remoteNodeId, Alice.nodeParams.features, Set(randomKey.publicKey, randomKey.publicKey, randomKey.publicKey), Alice.nodeParams.features, expectedSync = false) + val nodeParams = Alice.nodeParams.copy(syncWhitelist = Set(randomKey.publicKey, randomKey.publicKey, randomKey.publicKey)) + val remoteNodeId = ChannelCodecsSpec.normal.commitments.remoteParams.nodeId + nodeParams.db.channels.addOrUpdateChannel(ChannelCodecsSpec.normal) + sendFeatures(nodeParams, remoteNodeId, nodeParams.features, expectedSync = false) } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala index d29cd69ba..196b2e4ef 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala @@ -16,8 +16,8 @@ package fr.acinq.eclair.router -import akka.actor.{Actor, ActorSystem, Props} -import akka.testkit.{TestFSMRef, TestKit, TestProbe} +import akka.actor.{Actor, Props} +import akka.testkit.{TestFSMRef, TestProbe} import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey} import fr.acinq.bitcoin.{Block, ByteVector32, Satoshi, Script, Transaction, TxIn, TxOut} import fr.acinq.eclair.TestConstants.{Alice, Bob} @@ -27,20 +27,17 @@ import fr.acinq.eclair.crypto.TransportHandler import fr.acinq.eclair.io.Peer.PeerRoutingMessage import fr.acinq.eclair.router.Announcements.{makeChannelUpdate, makeNodeAnnouncement} import fr.acinq.eclair.router.BaseRouterSpec.channelAnnouncement -import fr.acinq.eclair.router.Router.{Data, GossipDecision, PublicChannel, SendChannelQuery, State} +import fr.acinq.eclair.router.Router._ import fr.acinq.eclair.router.Sync._ import fr.acinq.eclair.transactions.Scripts import fr.acinq.eclair.wire._ import org.scalatest.ParallelTestExecution import org.scalatest.funsuite.AnyFunSuiteLike -import scodec.bits.HexStringSyntax import scala.collection.immutable.TreeMap import scala.collection.{SortedSet, mutable} -import scala.compat.Platform import scala.concurrent.duration._ - class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with ParallelTestExecution { import RoutingSyncSpec._ @@ -71,7 +68,7 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle case class BasicSyncResult(ranges: Int, queries: Int, channels: Int, updates: Int, nodes: Int) case class SyncResult(ranges: Seq[ReplyChannelRange], queries: Seq[QueryShortChannelIds], channels: Seq[ChannelAnnouncement], updates: Seq[ChannelUpdate], nodes: Seq[NodeAnnouncement]) { - def counts = BasicSyncResult(ranges.size, queries.size, channels.size, updates.size, nodes.size) + def counts: BasicSyncResult = BasicSyncResult(ranges.size, queries.size, channels.size, updates.size, nodes.size) } def sync(src: TestFSMRef[State, Data, Router], tgt: TestFSMRef[State, Data, Router], extendedQueryFlags_opt: Option[QueryChannelRangeTlv]): SyncResult = { @@ -85,7 +82,7 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle } val srcId = src.underlyingActor.nodeParams.nodeId val tgtId = tgt.underlyingActor.nodeParams.nodeId - sender.send(src, SendChannelQuery(src.underlyingActor.nodeParams.chainHash, tgtId, pipe.ref, extendedQueryFlags_opt)) + sender.send(src, SendChannelQuery(src.underlyingActor.nodeParams.chainHash, tgtId, pipe.ref, replacePrevious = true, extendedQueryFlags_opt)) // src sends a query_channel_range to bob val qcr = pipe.expectMsgType[QueryChannelRange] pipe.send(tgt, PeerRoutingMessage(pipe.ref, srcId, qcr)) @@ -127,7 +124,7 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle SyncResult(rcrs, queries, channels, updates, nodes) } - def countUpdates(channels: Map[ShortChannelId, PublicChannel]) = channels.values.foldLeft(0) { + def countUpdates(channels: Map[ShortChannelId, PublicChannel]): Int = channels.values.foldLeft(0) { case (count, pc) => count + pc.update_1_opt.map(_ => 1).getOrElse(0) + pc.update_2_opt.map(_ => 1).getOrElse(0) } @@ -257,11 +254,18 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle val sender = TestProbe() sender.ignoreMsg { case _: TransportHandler.ReadAck => true } val remoteNodeId = TestConstants.Bob.nodeParams.nodeId + assert(!router.stateData.sync.contains(remoteNodeId)) // ask router to send a channel range query - sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, None)) + sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, replacePrevious = true, None)) val QueryChannelRange(chainHash, firstBlockNum, numberOfBlocks, _) = sender.expectMsgType[QueryChannelRange] sender.expectMsgType[GossipTimestampFilter] + assert(router.stateData.sync.get(remoteNodeId) === Some(Syncing(Nil, 0))) + + // ask router to send another channel range query + sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, replacePrevious = false, None)) + sender.expectNoMsg(100 millis) // it's a duplicate and should be ignored + assert(router.stateData.sync.get(remoteNodeId) === Some(Syncing(Nil, 0))) val block1 = ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, 1, EncodedShortChannelIds(EncodingType.UNCOMPRESSED, fakeRoutingInfo.take(params.routerConf.channelQueryChunkSize).keys.toList), None, None) @@ -272,12 +276,32 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle assert(peerConnection.expectMsgType[QueryShortChannelIds] === QueryShortChannelIds(chainHash, block1.shortChannelIds, TlvStream.empty)) // router should think that it is missing 100 channels, in one request val Some(sync) = router.stateData.sync.get(remoteNodeId) - assert(sync.total == 1) + assert(sync.remainingQueries.isEmpty) // the request was sent already + assert(sync.totalQueries == 1) // simulate a re-connection - sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, None)) + sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, replacePrevious = true, None)) sender.expectMsgType[QueryChannelRange] sender.expectMsgType[GossipTimestampFilter] + assert(router.stateData.sync.get(remoteNodeId) === Some(Syncing(Nil, 0))) + } + + test("reject unsolicited sync") { + val params = TestConstants.Alice.nodeParams + val router = TestFSMRef(new Router(params, TestProbe().ref)) + val peerConnection = TestProbe() + peerConnection.ignoreMsg { case _: TransportHandler.ReadAck => true } + val sender = TestProbe() + sender.ignoreMsg { case _: TransportHandler.ReadAck => true } + val remoteNodeId = TestConstants.Bob.nodeParams.nodeId + assert(!router.stateData.sync.contains(remoteNodeId)) + + // we didn't send a corresponding query_channel_range, but peer sends us a reply_channel_range + val unsolicitedBlocks = ReplyChannelRange(params.chainHash, 10, 5, 1, EncodedShortChannelIds(EncodingType.UNCOMPRESSED, fakeRoutingInfo.take(5).keys.toList), None, None) + peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, unsolicitedBlocks)) + + // it will be simply ignored + peerConnection.expectNoMsg(100 millis) assert(!router.stateData.sync.contains(remoteNodeId)) } @@ -285,19 +309,19 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle def req = QueryShortChannelIds(Block.RegtestGenesisBlock.hash, EncodedShortChannelIds(EncodingType.UNCOMPRESSED, List(ShortChannelId(42))), TlvStream.empty) - val nodeidA = randomKey.publicKey - val nodeidB = randomKey.publicKey + val nodeIdA = randomKey.publicKey + val nodeIdB = randomKey.publicKey - val (sync1, _) = addToSync(Map.empty, nodeidA, List(req, req, req, req)) + val sync1 = Map(nodeIdA -> Syncing(List(req, req, req), 4)) assert(syncProgress(sync1) == SyncProgress(0.25D)) - val (sync2, _) = addToSync(sync1, nodeidB, List(req, req, req, req, req, req, req, req, req, req, req, req)) + val sync2 = sync1.updated(nodeIdB, Syncing(List(req, req, req, req, req, req, req, req, req, req, req), 12)) assert(syncProgress(sync2) == SyncProgress(0.125D)) // let's assume we made some progress val sync3 = sync2 - .updated(nodeidA, sync2(nodeidA).copy(pending = List(req))) - .updated(nodeidB, sync2(nodeidB).copy(pending = List(req))) + .updated(nodeIdA, sync2(nodeIdA).copy(remainingQueries = List(req))) + .updated(nodeIdB, sync2(nodeIdB).copy(remainingQueries = List(req))) assert(syncProgress(sync3) == SyncProgress(0.875D)) } }