1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-03-13 11:35:47 +01:00

Fix ChannelUpdate rebroadcast (#1294)

Comparing with the router ActorRef simply didn't work.
The reason is probably because Peers receive the router's supervisor ref
which doesn't match what `self` is inside `Router`.

Checking that the origin was the router felt brittle anyway.
We're now correctly typing the gossip origin.
This commit is contained in:
Bastien Teinturier 2020-01-30 15:17:01 +01:00 committed by GitHub
parent 49f72f09ad
commit d6d60f063f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 62 additions and 46 deletions

View file

@ -378,8 +378,8 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: A
/**
* Send and count in a single iteration
*/
def sendAndCount(msgs: Map[_ <: RoutingMessage, Set[ActorRef]]): Int = msgs.foldLeft(0) {
case (count, (_, origins)) if origins.contains(self) =>
def sendAndCount(msgs: Map[_ <: RoutingMessage, Set[GossipOrigin]]): Int = msgs.foldLeft(0) {
case (count, (_, origins)) if origins.contains(RemoteGossip(self)) =>
// the announcement came from this peer, we don't send it back
count
case (count, (msg, origins)) if !timestampInRange(msg, origins, d.gossipTimestampFilter) =>
@ -602,11 +602,11 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: A
* - true if there is a filter and msg has no timestamp, or has one that matches the filter
* - false otherwise
*/
def timestampInRange(msg: RoutingMessage, origins: Set[ActorRef], gossipTimestampFilter_opt: Option[GossipTimestampFilter]): Boolean = {
def timestampInRange(msg: RoutingMessage, origins: Set[GossipOrigin], gossipTimestampFilter_opt: Option[GossipTimestampFilter]): Boolean = {
// For our own gossip, we should ignore the peer's timestamp filter.
val isOurGossip = msg match {
case n: NodeAnnouncement if n.nodeId == nodeParams.nodeId => true
case _ if origins.isEmpty || origins == Set(router) => true // if gossip doesn't come from another peer, it's ours
case _ if origins.contains(LocalGossip) => true
case _ => false
}
// Otherwise we check if this message has a timestamp that matches the timestamp filter.

View file

@ -41,7 +41,7 @@ import scodec.bits.ByteVector
import shapeless.HNil
import scala.annotation.tailrec
import scala.collection.immutable.{Queue, SortedMap}
import scala.collection.immutable.SortedMap
import scala.collection.{SortedSet, mutable}
import scala.compat.Platform
import scala.concurrent.duration._
@ -170,9 +170,16 @@ case object GetRoutingState
case class RoutingState(channels: Iterable[PublicChannel], nodes: Iterable[NodeAnnouncement])
case class Stash(updates: Map[ChannelUpdate, Set[ActorRef]], nodes: Map[NodeAnnouncement, Set[ActorRef]])
// @formatter:off
sealed trait GossipOrigin
/** Gossip that we received from a remote peer. */
case class RemoteGossip(peer: ActorRef) extends GossipOrigin
/** Gossip that was generated by our node. */
case object LocalGossip extends GossipOrigin
case class Rebroadcast(channels: Map[ChannelAnnouncement, Set[ActorRef]], updates: Map[ChannelUpdate, Set[ActorRef]], nodes: Map[NodeAnnouncement, Set[ActorRef]])
case class Stash(updates: Map[ChannelUpdate, Set[GossipOrigin]], nodes: Map[NodeAnnouncement, Set[GossipOrigin]])
case class Rebroadcast(channels: Map[ChannelAnnouncement, Set[GossipOrigin]], updates: Map[ChannelUpdate, Set[GossipOrigin]], nodes: Map[NodeAnnouncement, Set[GossipOrigin]])
// @formatter:on
case class ShortChannelIdAndFlag(shortChannelId: ShortChannelId, flag: Long)
@ -183,7 +190,7 @@ case class Data(nodes: Map[PublicKey, NodeAnnouncement],
stats: Option[NetworkStats],
stash: Stash,
rebroadcast: Rebroadcast,
awaiting: Map[ChannelAnnouncement, Seq[ActorRef]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done
awaiting: Map[ChannelAnnouncement, Seq[RemoteGossip]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done
privateChannels: Map[ShortChannelId, PrivateChannel], // short_channel_id -> node_id
excludedChannels: Set[ChannelDesc], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
graph: DirectedGraph,
@ -261,27 +268,27 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
d.channels.get(shortChannelId) match {
case Some(_) =>
// channel has already been announced and router knows about it, we can process the channel_update
stay using handle(u, self, d)
stay using handle(u, LocalGossip, d)
case None =>
channelAnnouncement_opt match {
case Some(c) if d.awaiting.contains(c) =>
// channel is currently being verified, we can process the channel_update right away (it will be stashed)
stay using handle(u, self, d)
stay using handle(u, LocalGossip, d)
case Some(c) =>
// channel wasn't announced but here is the announcement, we will process it *before* the channel_update
watcher ! ValidateRequest(c)
val d1 = d.copy(awaiting = d.awaiting + (c -> Nil)) // no origin
// maybe the local channel was pruned (can happen if we were disconnected for more than 2 weeks)
db.removeFromPruned(c.shortChannelId)
stay using handle(u, self, d1)
stay using handle(u, LocalGossip, d1)
case None if d.privateChannels.contains(shortChannelId) =>
// channel isn't announced but we already know about it, we can process the channel_update
stay using handle(u, self, d)
stay using handle(u, LocalGossip, d)
case None =>
// channel isn't announced and we never heard of it (maybe it is a private channel or maybe it is a public channel that doesn't yet have 6 confirmations)
// let's create a corresponding private channel and process the channel_update
log.info("adding unannounced local channel to remote={} shortChannelId={}", remoteNodeId, shortChannelId)
stay using handle(u, self, d.copy(privateChannels = d.privateChannels + (shortChannelId -> PrivateChannel(nodeParams.nodeId, remoteNodeId, None, None))))
stay using handle(u, LocalGossip, d.copy(privateChannels = d.privateChannels + (shortChannelId -> PrivateChannel(nodeParams.nodeId, remoteNodeId, None, None))))
}
}
@ -327,7 +334,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
Kamon.runWithSpan(Kamon.currentSpan(), finishSpan = true) {
Kamon.runWithSpan(Kamon.spanBuilder("process-validate-result").start(), finishSpan = true) {
d0.awaiting.get(c) match {
case Some(origin +: _) => origin ! TransportHandler.ReadAck(c) // now we can acknowledge the message, we only need to do it for the first peer that sent us the announcement
case Some(origin +: _) => origin.peer ! TransportHandler.ReadAck(c) // now we can acknowledge the message, we only need to do it for the first peer that sent us the announcement
case _ => ()
}
log.info("got validation result for shortChannelId={} (awaiting={} stash.nodes={} stash.updates={})", c.shortChannelId, d0.awaiting.size, d0.stash.nodes.size, d0.stash.updates.size)
@ -346,7 +353,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
if (ok) {
log.error(s"invalid script for shortChannelId={}: txid={} does not have script=$fundingOutputScript at outputIndex=$outputIndex ann={}", c.shortChannelId, tx.txid, c)
d0.awaiting.get(c) match {
case Some(origins) => origins.foreach(_ ! InvalidAnnouncement(c))
case Some(origins) => origins.foreach(_.peer ! InvalidAnnouncement(c))
case _ => ()
}
None
@ -372,7 +379,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
log.warning("ignoring shortChannelId={} tx={} (funding tx already spent and spending tx is confirmed)", c.shortChannelId, tx.txid)
// the funding tx has been spent by a transaction that is now confirmed: peer shouldn't send us those
d0.awaiting.get(c) match {
case Some(origins) => origins.foreach(_ ! ChannelClosed(c))
case Some(origins) => origins.foreach(_.peer ! ChannelClosed(c))
case _ => ()
}
} else {
@ -580,14 +587,14 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
stay
case Event(u: ChannelUpdate, d: Data) =>
// it was sent by us (e.g. the payment lifecycle); routing messages that are sent by our peers are now wrapped in a PeerRoutingMessage
// it was sent by us (e.g. the payment lifecycle); routing messages that are sent by our peers are wrapped in a PeerRoutingMessage
log.debug("received channel update from {}", sender)
stay using handle(u, sender, d)
stay using handle(u, LocalGossip, d)
case Event(PeerRoutingMessage(transport, remoteNodeId, u: ChannelUpdate), d) =>
sender ! TransportHandler.ReadAck(u)
log.debug("received channel update for shortChannelId={}", u.shortChannelId)
stay using handle(u, sender, d, remoteNodeId_opt = Some(remoteNodeId), transport_opt = Some(transport))
stay using handle(u, RemoteGossip(sender), d, remoteNodeId_opt = Some(remoteNodeId), transport_opt = Some(transport))
case Event(PeerRoutingMessage(_, _, c: ChannelAnnouncement), d) =>
log.debug("received channel announcement for shortChannelId={} nodeId1={} nodeId2={}", c.shortChannelId, c.nodeId1, c.nodeId2)
@ -599,7 +606,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
sender ! TransportHandler.ReadAck(c)
log.debug("ignoring {} (being verified)", c)
// adding the sender to the list of origins so that we don't send back the same announcement to this peer later
val origins = d.awaiting(c) :+ sender
val origins = d.awaiting(c) :+ RemoteGossip(sender)
stay using d.copy(awaiting = d.awaiting + (c -> origins))
} else if (db.isPruned(c.shortChannelId)) {
sender ! TransportHandler.ReadAck(c)
@ -619,18 +626,18 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
}
}
// we don't acknowledge the message just yet
stay using d.copy(awaiting = d.awaiting + (c -> Seq(sender)))
stay using d.copy(awaiting = d.awaiting + (c -> Seq(RemoteGossip(sender))))
}
case Event(n: NodeAnnouncement, d: Data) =>
// it was sent by us, routing messages that are sent by our peers are now wrapped in a PeerRoutingMessage
// it was sent by us, routing messages that are sent by our peers are wrapped in a PeerRoutingMessage
log.debug("received node announcement from {}", sender)
stay using handle(n, sender, d)
stay using handle(n, LocalGossip, d)
case Event(PeerRoutingMessage(_, _, n: NodeAnnouncement), d: Data) =>
sender ! TransportHandler.ReadAck(n)
log.debug("received node announcement for nodeId={}", n.nodeId)
stay using handle(n, sender, d)
stay using handle(n, RemoteGossip(sender), d)
case Event(PeerRoutingMessage(transport, remoteNodeId, routingMessage@QueryChannelRange(chainHash, firstBlockNum, numberOfBlocks, extendedQueryFlags_opt)), d) =>
sender ! TransportHandler.ReadAck(routingMessage)
@ -776,7 +783,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
initialize()
def handle(n: NodeAnnouncement, origin: ActorRef, d: Data): Data =
def handle(n: NodeAnnouncement, origin: GossipOrigin, d: Data): Data =
if (d.stash.nodes.contains(n)) {
log.debug("ignoring {} (already stashed)", n)
val origins = d.stash.nodes(n) + origin
@ -790,7 +797,10 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
d
} else if (!Announcements.checkSig(n)) {
log.warning("bad signature for {}", n)
origin ! InvalidSignature(n)
origin match {
case RemoteGossip(peer) => peer ! InvalidSignature(n)
case LocalGossip =>
}
d
} else if (d.nodes.contains(n.nodeId)) {
log.debug("updated node nodeId={}", n.nodeId)
@ -812,7 +822,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
d
}
def handle(u: ChannelUpdate, origin: ActorRef, d: Data, remoteNodeId_opt: Option[PublicKey] = None, transport_opt: Option[ActorRef] = None): Data =
def handle(u: ChannelUpdate, origin: GossipOrigin, d: Data, remoteNodeId_opt: Option[PublicKey] = None, transport_opt: Option[ActorRef] = None): Data =
if (d.channels.contains(u.shortChannelId)) {
// related channel is already known (note: this means no related channel_update is in the stash)
val publicChannel = true
@ -830,7 +840,10 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
d
} else if (!Announcements.checkSig(u, pc.getNodeIdSameSideAs(u))) {
log.warning("bad signature for announcement shortChannelId={} {}", u.shortChannelId, u)
origin ! InvalidSignature(u)
origin match {
case RemoteGossip(peer) => peer ! InvalidSignature(u)
case LocalGossip =>
}
d
} else if (pc.getChannelUpdateSameSideAs(u).isDefined) {
log.debug("updated channel_update for shortChannelId={} public={} flags={} {}", u.shortChannelId, publicChannel, u.channelFlags, u)
@ -872,7 +885,10 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
d
} else if (!Announcements.checkSig(u, desc.a)) {
log.warning("bad signature for announcement shortChannelId={} {}", u.shortChannelId, u)
origin ! InvalidSignature(u)
origin match {
case RemoteGossip(peer) => peer ! InvalidSignature(u)
case LocalGossip =>
}
d
} else if (pc.getChannelUpdateSameSideAs(u).isDefined) {
log.debug("updated channel_update for shortChannelId={} public={} flags={} {}", u.shortChannelId, publicChannel, u.channelFlags, u)
@ -1223,9 +1239,9 @@ object Router {
* there could be several reply_channel_range messages for a single query, but we make sure that the returned
* chunks fully covers the [firstBlockNum, numberOfBlocks] range that was requested
*
* @param shortChannelIds list of short channel ids to split
* @param firstBlockNum first block height requested by our peers
* @param numberOfBlocks number of blocks requested by our peer
* @param shortChannelIds list of short channel ids to split
* @param firstBlockNum first block height requested by our peers
* @param numberOfBlocks number of blocks requested by our peer
* @param channelRangeChunkSize target chunk size. All ids that have the same block height will be grouped together, so
* returned chunks may still contain more than `channelRangeChunkSize` elements
* @return a list of short channel id chunks
@ -1275,10 +1291,11 @@ object Router {
/**
* Enforce max-size constraints for each chunk
*
* @param chunks list of short channel id chunks
* @return a processed list of chunks
*/
def enforceMaximumSize(chunks: List[ShortChannelIdsChunk]) : List[ShortChannelIdsChunk] = chunks.map(_.enforceMaximumSize(MAXIMUM_CHUNK_SIZE))
def enforceMaximumSize(chunks: List[ShortChannelIdsChunk]): List[ShortChannelIdsChunk] = chunks.map(_.enforceMaximumSize(MAXIMUM_CHUNK_SIZE))
/**
* Build a `reply_channel_range` message

View file

@ -31,7 +31,7 @@ import fr.acinq.eclair.channel.{ChannelCreated, HasCommitments}
import fr.acinq.eclair.crypto.TransportHandler
import fr.acinq.eclair.io.Peer._
import fr.acinq.eclair.router.RoutingSyncSpec.makeFakeRoutingInfo
import fr.acinq.eclair.router.{Rebroadcast, RoutingSyncSpec, SendChannelQuery}
import fr.acinq.eclair.router._
import fr.acinq.eclair.wire.{ChannelCodecsSpec, Color, EncodedShortChannelIds, EncodingType, Error, IPv4, InitTlv, LightningMessageCodecs, NodeAddress, NodeAnnouncement, Ping, Pong, QueryShortChannelIds, TlvStream}
import org.scalatest.{Outcome, Tag}
import scodec.bits.{ByteVector, _}
@ -258,7 +258,7 @@ class PeerSpec extends TestkitBaseClass with StateTestsHelperMethods {
assert(init.features === sentFeatures.bytes)
}
}
test("disconnect if incompatible networks") { f =>
import f._
val probe = TestProbe()
@ -369,7 +369,7 @@ class PeerSpec extends TestkitBaseClass with StateTestsHelperMethods {
test("filter gossip message (no filtering)") { f =>
import f._
val probe = TestProbe()
val gossipOrigin = Set(TestProbe().ref)
val gossipOrigin = Set[GossipOrigin](RemoteGossip(TestProbe().ref))
connect(remoteNodeId, authenticator, watcher, router, relayer, connection, transport, peer)
val rebroadcast = Rebroadcast(channels.map(_ -> gossipOrigin).toMap, updates.map(_ -> gossipOrigin).toMap, nodes.map(_ -> gossipOrigin).toMap)
probe.send(peer, rebroadcast)
@ -380,12 +380,12 @@ class PeerSpec extends TestkitBaseClass with StateTestsHelperMethods {
import f._
val probe = TestProbe()
connect(remoteNodeId, authenticator, watcher, router, relayer, connection, transport, peer)
val gossipOrigin = Set(TestProbe().ref)
val gossipOrigin = Set[GossipOrigin](RemoteGossip(TestProbe().ref))
val peerActor: ActorRef = peer
val rebroadcast = Rebroadcast(
channels.map(_ -> gossipOrigin).toMap + (channels(5) -> Set(peerActor)),
updates.map(_ -> gossipOrigin).toMap + (updates(6) -> Set(peerActor)) + (updates(10) -> Set(peerActor)),
nodes.map(_ -> gossipOrigin).toMap + (nodes(4) -> Set(peerActor)))
channels.map(_ -> gossipOrigin).toMap + (channels(5) -> Set(RemoteGossip(peerActor))),
updates.map(_ -> gossipOrigin).toMap + (updates(6) -> (gossipOrigin + RemoteGossip(peerActor))) + (updates(10) -> Set(RemoteGossip(peerActor))),
nodes.map(_ -> gossipOrigin).toMap + (nodes(4) -> Set(RemoteGossip(peerActor))))
val filter = wire.GossipTimestampFilter(Alice.nodeParams.chainHash, 0, Long.MaxValue) // no filtering on timestamps
probe.send(peer, filter)
probe.send(peer, rebroadcast)
@ -399,7 +399,7 @@ class PeerSpec extends TestkitBaseClass with StateTestsHelperMethods {
import f._
val probe = TestProbe()
connect(remoteNodeId, authenticator, watcher, router, relayer, connection, transport, peer)
val gossipOrigin = Set(TestProbe().ref)
val gossipOrigin = Set[GossipOrigin](RemoteGossip(TestProbe().ref))
val rebroadcast = Rebroadcast(channels.map(_ -> gossipOrigin).toMap, updates.map(_ -> gossipOrigin).toMap, nodes.map(_ -> gossipOrigin).toMap)
val timestamps = updates.map(_.timestamp).sorted.slice(10, 30)
val filter = wire.GossipTimestampFilter(Alice.nodeParams.chainHash, timestamps.head, timestamps.last - timestamps.head)
@ -416,11 +416,11 @@ class PeerSpec extends TestkitBaseClass with StateTestsHelperMethods {
import f._
val probe = TestProbe()
connect(remoteNodeId, authenticator, watcher, router, relayer, connection, transport, peer)
val gossipOrigin = Set(TestProbe().ref)
val gossipOrigin = Set[GossipOrigin](RemoteGossip(TestProbe().ref))
val rebroadcast = Rebroadcast(
channels.map(_ -> gossipOrigin).toMap + (channels(5) -> Set(router.ref)),
updates.map(_ -> gossipOrigin).toMap + (updates(6) -> Set(router.ref)) + (updates(10) -> Set(router.ref)),
nodes.map(_ -> gossipOrigin).toMap + (nodes(4) -> Set(router.ref)))
channels.map(_ -> gossipOrigin).toMap + (channels(5) -> Set(LocalGossip)),
updates.map(_ -> gossipOrigin).toMap + (updates(6) -> (gossipOrigin + LocalGossip)) + (updates(10) -> Set(LocalGossip)),
nodes.map(_ -> gossipOrigin).toMap + (nodes(4) -> Set(LocalGossip)))
// No timestamp filter set -> the only gossip we should broadcast is our own.
probe.send(peer, rebroadcast)
transport.expectMsg(channels(5))
@ -478,7 +478,6 @@ class PeerSpec extends TestkitBaseClass with StateTestsHelperMethods {
assert(error1.channelId === CHANNELID_ZERO)
assert(new String(error1.data.toArray).startsWith("couldn't verify channel! shortChannelId="))
// let's assume that one of the sigs were invalid
router.send(peer, Peer.InvalidSignature(channels(0)))
// peer will return a connection-wide error, including the hex-encoded representation of the bad message