mirror of
https://github.com/ACINQ/eclair.git
synced 2025-02-22 22:25:26 +01:00
Ignore bad announcements sent by peers (#705)
Nodes currently receive tons of bogus channel_announcements, mainly with unexisting or long-spent funding transactions. Of course those don't pass the validation and are rejected, but that takes a significant amount of resources: bandwidth, multiple calls to bitcoind, etc. On top of that, we forget those announcements as soon as we have rejected them, and will happily revalidate them next time we receive them. As a result, a typical node on mainnet will validate 10s of thousands of useless announcements every day. As far as we know, this is apparently due to bug in another implementation, but that may very well be used as a DOS attack vector in the future. This PR adds a simple mechanism to react to misbehaving peer and handle three types of misbehaviors: (a) bad announcement sigs: that is a serious offense, for now we just close the connection, but in the future we will ban the peer for that kind of things (the same way bitcoin core does) (b) funding tx already spent: peer send us channel_announcement, but the channel has been closed (funding tx already spent); if we receive too many of those, we will ignore future announcements from this peer for a given time (c) same as (b), but the channel doesn't even exist (funding tx not found). That may be due to reorgs on testnet. Needless to say that this leads to a huge reduction in CPU/bandwidth usage on well-connected nodes.
This commit is contained in:
parent
1ce486ad20
commit
52b161f5e9
8 changed files with 290 additions and 128 deletions
|
@ -27,7 +27,7 @@ import fr.acinq.bitcoin.{BinaryData, DeterministicWallet, MilliSatoshi, Protocol
|
|||
import fr.acinq.eclair.blockchain.EclairWallet
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.crypto.TransportHandler
|
||||
import fr.acinq.eclair.crypto.TransportHandler.Listener
|
||||
import fr.acinq.eclair.crypto.TransportHandler
|
||||
import fr.acinq.eclair.router._
|
||||
import fr.acinq.eclair.wire._
|
||||
import fr.acinq.eclair.{wire, _}
|
||||
|
@ -73,7 +73,7 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
|||
|
||||
case Event(Authenticator.Authenticated(_, transport, remoteNodeId, address, outgoing, origin_opt), DisconnectedData(_, channels, _)) =>
|
||||
log.debug(s"got authenticated connection to $remoteNodeId@${address.getHostString}:${address.getPort}")
|
||||
transport ! Listener(self)
|
||||
transport ! TransportHandler.Listener(self)
|
||||
context watch transport
|
||||
transport ! wire.Init(globalFeatures = nodeParams.globalFeatures, localFeatures = nodeParams.localFeatures)
|
||||
|
||||
|
@ -87,6 +87,8 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
|||
val h = channels.filter(_._2 == actor).map(_._1)
|
||||
log.info(s"channel closed: channelId=${h.mkString("/")}")
|
||||
stay using d.copy(channels = channels -- h)
|
||||
|
||||
case Event(_: wire.LightningMessage, _) => stay // we probably just got disconnected and that's the last messages we received
|
||||
}
|
||||
|
||||
when(INITIALIZING) {
|
||||
|
@ -148,14 +150,14 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
|||
}
|
||||
|
||||
when(CONNECTED, stateTimeout = nodeParams.pingInterval) {
|
||||
case Event(StateTimeout, ConnectedData(_, transport, _, _, _)) =>
|
||||
case Event(StateTimeout, ConnectedData(_, transport, _, _, _, _)) =>
|
||||
// no need to use secure random here
|
||||
val pingSize = Random.nextInt(1000)
|
||||
val pongSize = Random.nextInt(1000)
|
||||
transport ! wire.Ping(pongSize, BinaryData("00" * pingSize))
|
||||
stay
|
||||
|
||||
case Event(ping@wire.Ping(pongLength, _), ConnectedData(_, transport, _, _, _)) =>
|
||||
case Event(ping@wire.Ping(pongLength, _), ConnectedData(_, transport, _, _, _, _)) =>
|
||||
transport ! TransportHandler.ReadAck(ping)
|
||||
// TODO: (optional) check against the expected data size tat we requested when we sent ping messages
|
||||
if (pongLength > 0) {
|
||||
|
@ -163,20 +165,20 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
|||
}
|
||||
stay
|
||||
|
||||
case Event(pong@wire.Pong(data), ConnectedData(_, transport, _, _, _)) =>
|
||||
case Event(pong@wire.Pong(data), ConnectedData(_, transport, _, _, _, _)) =>
|
||||
transport ! TransportHandler.ReadAck(pong)
|
||||
// TODO: compute latency for remote peer ?
|
||||
log.debug(s"received pong with ${data.length} bytes")
|
||||
stay
|
||||
|
||||
case Event(err@wire.Error(channelId, reason), ConnectedData(_, transport, _, channels, _)) if channelId == CHANNELID_ZERO =>
|
||||
case Event(err@wire.Error(channelId, reason), ConnectedData(_, transport, _, channels, _, _)) if channelId == CHANNELID_ZERO =>
|
||||
transport ! TransportHandler.ReadAck(err)
|
||||
log.error(s"connection-level error, failing all channels! reason=${new String(reason)}")
|
||||
channels.values.toSet[ActorRef].foreach(_ forward err) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
|
||||
transport ! PoisonPill
|
||||
stay
|
||||
|
||||
case Event(err: wire.Error, ConnectedData(_, transport, _, channels, _)) =>
|
||||
case Event(err: wire.Error, ConnectedData(_, transport, _, channels, _, _)) =>
|
||||
transport ! TransportHandler.ReadAck(err)
|
||||
// error messages are a bit special because they can contain either temporaryChannelId or channelId (see BOLT 1)
|
||||
channels.get(FinalChannelId(err.channelId)).orElse(channels.get(TemporaryChannelId(err.channelId))) match {
|
||||
|
@ -185,7 +187,7 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
|||
}
|
||||
stay
|
||||
|
||||
case Event(c: Peer.OpenChannel, d@ConnectedData(_, transport, remoteInit, channels, _)) =>
|
||||
case Event(c: Peer.OpenChannel, d@ConnectedData(_, transport, remoteInit, channels, _, _)) =>
|
||||
log.info(s"requesting a new channel to $remoteNodeId with fundingSatoshis=${c.fundingSatoshis}, pushMsat=${c.pushMsat} and fundingFeeratePerByte=${c.fundingTxFeeratePerKw_opt}")
|
||||
val (channel, localParams) = createNewChannel(nodeParams, funder = true, c.fundingSatoshis.toLong, origin_opt = Some(sender))
|
||||
val temporaryChannelId = randomBytes(32)
|
||||
|
@ -194,7 +196,7 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
|||
channel ! INPUT_INIT_FUNDER(temporaryChannelId, c.fundingSatoshis.amount, c.pushMsat.amount, channelFeeratePerKw, fundingTxFeeratePerKw, localParams, transport, remoteInit, c.channelFlags.getOrElse(nodeParams.channelFlags))
|
||||
stay using d.copy(channels = channels + (TemporaryChannelId(temporaryChannelId) -> channel))
|
||||
|
||||
case Event(msg: wire.OpenChannel, d@ConnectedData(_, transport, remoteInit, channels, _)) =>
|
||||
case Event(msg: wire.OpenChannel, d@ConnectedData(_, transport, remoteInit, channels, _, _)) =>
|
||||
transport ! TransportHandler.ReadAck(msg)
|
||||
channels.get(TemporaryChannelId(msg.temporaryChannelId)) match {
|
||||
case None =>
|
||||
|
@ -209,7 +211,7 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
|||
stay
|
||||
}
|
||||
|
||||
case Event(msg: wire.HasChannelId, ConnectedData(_, transport, _, channels, _)) =>
|
||||
case Event(msg: wire.HasChannelId, ConnectedData(_, transport, _, channels, _, _)) =>
|
||||
transport ! TransportHandler.ReadAck(msg)
|
||||
channels.get(FinalChannelId(msg.channelId)) match {
|
||||
case Some(channel) => channel forward msg
|
||||
|
@ -217,7 +219,7 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
|||
}
|
||||
stay
|
||||
|
||||
case Event(msg: wire.HasTemporaryChannelId, ConnectedData(_, transport, _, channels, _)) =>
|
||||
case Event(msg: wire.HasTemporaryChannelId, ConnectedData(_, transport, _, channels, _, _)) =>
|
||||
transport ! TransportHandler.ReadAck(msg)
|
||||
channels.get(TemporaryChannelId(msg.temporaryChannelId)) match {
|
||||
case Some(channel) => channel forward msg
|
||||
|
@ -225,13 +227,13 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
|||
}
|
||||
stay
|
||||
|
||||
case Event(ChannelIdAssigned(channel, _, temporaryChannelId, channelId), d@ConnectedData(_, _, _, channels, _)) if channels.contains(TemporaryChannelId(temporaryChannelId)) =>
|
||||
case Event(ChannelIdAssigned(channel, _, temporaryChannelId, channelId), d@ConnectedData(_, _, _, channels, _, _)) if channels.contains(TemporaryChannelId(temporaryChannelId)) =>
|
||||
log.info(s"channel id switch: previousId=$temporaryChannelId nextId=$channelId")
|
||||
// NB: we keep the temporary channel id because the switch is not always acknowledged at this point (see https://github.com/lightningnetwork/lightning-rfc/pull/151)
|
||||
// we won't clean it up, but we won't remember the temporary id on channel termination
|
||||
stay using d.copy(channels = channels + (FinalChannelId(channelId) -> channel))
|
||||
|
||||
case Event(RoutingState(channels, updates, nodes), ConnectedData(_, transport, _, _, _)) =>
|
||||
case Event(RoutingState(channels, updates, nodes), ConnectedData(_, transport, _, _, _, _)) =>
|
||||
// let's send the messages
|
||||
def send(announcements: Iterable[_ <: LightningMessage]) = announcements.foldLeft(0) {
|
||||
case (c, ann) =>
|
||||
|
@ -246,7 +248,7 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
|||
log.info(s"sent all announcements to {}: channels={} updates={} nodes={}", remoteNodeId, channelsSent, updatesSent, nodesSent)
|
||||
stay
|
||||
|
||||
case Event(rebroadcast: Rebroadcast, ConnectedData(_, transport, _, _, maybeGossipTimestampFilter)) =>
|
||||
case Event(rebroadcast: Rebroadcast, ConnectedData(_, transport, _, _, maybeGossipTimestampFilter, _)) =>
|
||||
val (channels1, updates1, nodes1) = Peer.filterGossipMessages(rebroadcast, self, maybeGossipTimestampFilter)
|
||||
|
||||
/**
|
||||
|
@ -279,27 +281,72 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
|||
stay using data.copy(gossipTimestampFilter = Some(msg))
|
||||
}
|
||||
|
||||
case Event(msg: wire.RoutingMessage, _) =>
|
||||
// Note: we don't ack messages here because we don't want them to be stacked in the router's mailbox
|
||||
// we forward messages so the router can reply directly
|
||||
router forward PeerRoutingMessage(remoteNodeId, msg)
|
||||
case Event(msg: wire.RoutingMessage, ConnectedData(_, transport, _, _, _, behavior)) =>
|
||||
msg match {
|
||||
case _: ChannelAnnouncement | _: ChannelUpdate | _: NodeAnnouncement if behavior.ignoreNetworkAnnouncement =>
|
||||
// this peer is currently under embargo!
|
||||
sender ! TransportHandler.ReadAck(msg)
|
||||
case _ =>
|
||||
// Note: we don't ack messages here because we don't want them to be stacked in the router's mailbox
|
||||
router ! PeerRoutingMessage(transport, remoteNodeId, msg)
|
||||
}
|
||||
stay
|
||||
|
||||
case Event(readAck: TransportHandler.ReadAck, ConnectedData(_, transport, _, _, _)) =>
|
||||
case Event(readAck: TransportHandler.ReadAck, ConnectedData(_, transport, _, _, _, _)) =>
|
||||
// we just forward acks from router to transport
|
||||
transport forward readAck
|
||||
stay
|
||||
|
||||
case Event(Disconnect, ConnectedData(_, transport, _, _, _)) =>
|
||||
case Event(badMessage: BadMessage, data@ConnectedData(_, transport, _, _, _, behavior)) =>
|
||||
val behavior1 = badMessage match {
|
||||
case InvalidSignature(r) =>
|
||||
val bin = LightningMessageCodecs.lightningMessageCodec.encode(r)
|
||||
log.error(s"peer sent us a routing message with invalid sig: r=$r bin=$bin")
|
||||
// for now we just return an error, maybe ban the peer in the future?
|
||||
transport ! Error(CHANNELID_ZERO, s"bad announcement sig! bin=$bin".getBytes())
|
||||
behavior
|
||||
case ChannelClosed(_) =>
|
||||
if (behavior.ignoreNetworkAnnouncement) {
|
||||
// we already are ignoring announcements, we may have additional notifications for announcements that were received right before our ban
|
||||
behavior.copy(fundingTxAlreadySpentCount = behavior.fundingTxAlreadySpentCount + 1)
|
||||
} else if (behavior.fundingTxAlreadySpentCount < MAX_FUNDING_TX_ALREADY_SPENT) {
|
||||
behavior.copy(fundingTxAlreadySpentCount = behavior.fundingTxAlreadySpentCount + 1)
|
||||
} else {
|
||||
log.warning(s"peer sent us too many channel announcements with funding tx already spent (count=${behavior.fundingTxAlreadySpentCount + 1}), ignoring network announcements for $IGNORE_NETWORK_ANNOUNCEMENTS_PERIOD")
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
context.system.scheduler.scheduleOnce(IGNORE_NETWORK_ANNOUNCEMENTS_PERIOD, self, ResumeAnnouncements)
|
||||
behavior.copy(fundingTxAlreadySpentCount = behavior.fundingTxAlreadySpentCount + 1, ignoreNetworkAnnouncement = true)
|
||||
}
|
||||
case NonexistingChannel(_) =>
|
||||
// this should never happen, unless we are not in sync or there is a 6+ blocks reorg
|
||||
if (behavior.ignoreNetworkAnnouncement) {
|
||||
// we already are ignoring announcements, we may have additional notifications for announcements that were received right before our ban
|
||||
behavior.copy(fundingTxNotFoundCount = behavior.fundingTxNotFoundCount + 1)
|
||||
} else if (behavior.fundingTxNotFoundCount < MAX_FUNDING_TX_NOT_FOUND) {
|
||||
behavior.copy(fundingTxNotFoundCount = behavior.fundingTxNotFoundCount + 1)
|
||||
} else {
|
||||
log.warning(s"peer sent us too many channel announcements with non-existing funding tx (count=${behavior.fundingTxNotFoundCount + 1}), ignoring network announcements for $IGNORE_NETWORK_ANNOUNCEMENTS_PERIOD")
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
context.system.scheduler.scheduleOnce(IGNORE_NETWORK_ANNOUNCEMENTS_PERIOD, self, ResumeAnnouncements)
|
||||
behavior.copy(fundingTxNotFoundCount = behavior.fundingTxNotFoundCount + 1, ignoreNetworkAnnouncement = true)
|
||||
}
|
||||
}
|
||||
stay using data.copy(behavior = behavior1)
|
||||
|
||||
case Event(ResumeAnnouncements, data: ConnectedData) =>
|
||||
log.info(s"resuming processing of network announcements for peer")
|
||||
stay using data.copy(behavior = data.behavior.copy(fundingTxAlreadySpentCount = 0, ignoreNetworkAnnouncement = false))
|
||||
|
||||
case Event(Disconnect, ConnectedData(_, transport, _, _, _, _)) =>
|
||||
transport ! PoisonPill
|
||||
stay
|
||||
|
||||
case Event(Terminated(actor), ConnectedData(address_opt, transport, _, channels, _)) if actor == transport =>
|
||||
case Event(Terminated(actor), ConnectedData(address_opt, transport, _, channels, _, _)) if actor == transport =>
|
||||
log.info(s"lost connection to $remoteNodeId")
|
||||
channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
|
||||
goto(DISCONNECTED) using DisconnectedData(address_opt, channels.collect { case (k: FinalChannelId, v) => (k, v) })
|
||||
|
||||
case Event(Terminated(actor), d@ConnectedData(_, transport, _, channels, _)) if channels.values.toSet.contains(actor) =>
|
||||
case Event(Terminated(actor), d@ConnectedData(_, transport, _, channels, _, _)) if channels.values.toSet.contains(actor) =>
|
||||
// we will have at most 2 ids: a TemporaryChannelId and a FinalChannelId
|
||||
val channelIds = channels.filter(_._2 == actor).keys
|
||||
log.info(s"channel closed: channelId=${channelIds.mkString("/")}")
|
||||
|
@ -309,7 +356,7 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
|||
}
|
||||
stay using d.copy(channels = channels -- channelIds)
|
||||
|
||||
case Event(h: Authenticator.Authenticated, ConnectedData(address_opt, oldTransport, _, channels, _)) =>
|
||||
case Event(h: Authenticator.Authenticated, ConnectedData(address_opt, oldTransport, _, channels, _, _)) =>
|
||||
log.info(s"got new transport while already connected, switching to new transport")
|
||||
context unwatch oldTransport
|
||||
oldTransport ! PoisonPill
|
||||
|
@ -373,6 +420,12 @@ object Peer {
|
|||
|
||||
val RECONNECT_TIMER = "reconnect"
|
||||
|
||||
val MAX_FUNDING_TX_ALREADY_SPENT = 10
|
||||
|
||||
val MAX_FUNDING_TX_NOT_FOUND = 10
|
||||
|
||||
val IGNORE_NETWORK_ANNOUNCEMENTS_PERIOD = 5 minutes
|
||||
|
||||
def props(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: ActorRef, watcher: ActorRef, router: ActorRef, relayer: ActorRef, wallet: EclairWallet) = Props(new Peer(nodeParams, remoteNodeId, authenticator, watcher, router, relayer, wallet: EclairWallet))
|
||||
|
||||
// @formatter:off
|
||||
|
@ -388,7 +441,7 @@ object Peer {
|
|||
case class Nothing() extends Data { override def address_opt = None; override def channels = Map.empty }
|
||||
case class DisconnectedData(address_opt: Option[InetSocketAddress], channels: Map[FinalChannelId, ActorRef], attempts: Int = 0) extends Data
|
||||
case class InitializingData(address_opt: Option[InetSocketAddress], transport: ActorRef, channels: Map[FinalChannelId, ActorRef], origin_opt: Option[ActorRef]) extends Data
|
||||
case class ConnectedData(address_opt: Option[InetSocketAddress], transport: ActorRef, remoteInit: wire.Init, channels: Map[ChannelId, ActorRef], gossipTimestampFilter: Option[GossipTimestampFilter] = None) extends Data
|
||||
case class ConnectedData(address_opt: Option[InetSocketAddress], transport: ActorRef, remoteInit: wire.Init, channels: Map[ChannelId, ActorRef], gossipTimestampFilter: Option[GossipTimestampFilter] = None, behavior: Behavior = Behavior()) extends Data
|
||||
|
||||
sealed trait State
|
||||
case object INSTANTIATING extends State
|
||||
|
@ -400,6 +453,7 @@ object Peer {
|
|||
case class Connect(uri: NodeURI)
|
||||
case object Reconnect
|
||||
case object Disconnect
|
||||
case object ResumeAnnouncements
|
||||
case class OpenChannel(remoteNodeId: PublicKey, fundingSatoshis: Satoshi, pushMsat: MilliSatoshi, fundingTxFeeratePerKw_opt: Option[Long], channelFlags: Option[Byte]) {
|
||||
require(fundingSatoshis.amount < Channel.MAX_FUNDING_SATOSHIS, s"fundingSatoshis must be less than ${Channel.MAX_FUNDING_SATOSHIS}")
|
||||
require(pushMsat.amount <= 1000 * fundingSatoshis.amount, s"pushMsat must be less or equal to fundingSatoshis")
|
||||
|
@ -410,7 +464,14 @@ object Peer {
|
|||
case object GetPeerInfo
|
||||
case class PeerInfo(nodeId: PublicKey, state: String, address: Option[InetSocketAddress], channels: Int)
|
||||
|
||||
case class PeerRoutingMessage(remoteNodeId: PublicKey, message: RoutingMessage)
|
||||
case class PeerRoutingMessage(transport: ActorRef, remoteNodeId: PublicKey, message: RoutingMessage)
|
||||
|
||||
sealed trait BadMessage
|
||||
case class InvalidSignature(r: RoutingMessage) extends BadMessage
|
||||
case class ChannelClosed(c: ChannelAnnouncement) extends BadMessage
|
||||
case class NonexistingChannel(c: ChannelAnnouncement) extends BadMessage
|
||||
|
||||
case class Behavior(fundingTxAlreadySpentCount: Int = 0, fundingTxNotFoundCount: Int = 0, ignoreNetworkAnnouncement: Boolean = false)
|
||||
|
||||
// @formatter:on
|
||||
|
||||
|
@ -458,8 +519,8 @@ object Peer {
|
|||
}
|
||||
|
||||
// we filter out updates against their timestamp filter, and build a list of all channel ids for which we have an update
|
||||
val (updates1, shortChannelIds) = rebroadcast.updates.foldLeft((Seq.empty[ChannelUpdate], Set.empty[ShortChannelId])){
|
||||
case ((channelUpdates, shortChannelIds), (a, origins)) if !origins.contains(self) && checkTimestamp(a)=> (a +: channelUpdates, shortChannelIds + a.shortChannelId)
|
||||
val (updates1, shortChannelIds) = rebroadcast.updates.foldLeft((Seq.empty[ChannelUpdate], Set.empty[ShortChannelId])) {
|
||||
case ((channelUpdates, shortChannelIds), (a, origins)) if !origins.contains(self) && checkTimestamp(a) => (a +: channelUpdates, shortChannelIds + a.shortChannelId)
|
||||
case ((channelUpdates, shortChannelIds), (a, origins)) => (channelUpdates, shortChannelIds)
|
||||
}
|
||||
|
||||
|
|
|
@ -28,8 +28,7 @@ import fr.acinq.eclair._
|
|||
import fr.acinq.eclair.blockchain._
|
||||
import fr.acinq.eclair.channel._
|
||||
import fr.acinq.eclair.crypto.TransportHandler
|
||||
import fr.acinq.eclair.io.Peer
|
||||
import fr.acinq.eclair.io.Peer.PeerRoutingMessage
|
||||
import fr.acinq.eclair.io.Peer.{ChannelClosed, NonexistingChannel, InvalidSignature, PeerRoutingMessage}
|
||||
import fr.acinq.eclair.payment.PaymentRequest.ExtraHop
|
||||
import fr.acinq.eclair.transactions.Scripts
|
||||
import fr.acinq.eclair.wire._
|
||||
|
@ -212,7 +211,6 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
|
|||
log.warning("validation failure for shortChannelId={} reason={}", c.shortChannelId, t.getMessage)
|
||||
false
|
||||
case ValidateResult(c, Some(tx), true, None) =>
|
||||
// TODO: blacklisting
|
||||
val TxCoordinates(_, _, outputIndex) = ShortChannelId.coordinates(c.shortChannelId)
|
||||
// let's check that the output is indeed a P2WSH multisig 2-of-2 of nodeid1 and nodeid2)
|
||||
val fundingOutputScript = write(pay2wsh(Scripts.multiSig2of2(PublicKey(c.bitcoinKey1), PublicKey(c.bitcoinKey2))))
|
||||
|
@ -239,14 +237,21 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
|
|||
true
|
||||
}
|
||||
case ValidateResult(c, Some(tx), false, None) =>
|
||||
// TODO: vulnerability if they flood us with spent funding tx?
|
||||
log.warning("ignoring shortChannelId={} tx={} (funding tx not found in utxo)", c.shortChannelId, tx.txid)
|
||||
log.warning("ignoring shortChannelId={} tx={} (funding tx already spent)", c.shortChannelId, tx.txid)
|
||||
d0.awaiting.get(c) match {
|
||||
case Some(origins) => origins.foreach(_ ! ChannelClosed(c))
|
||||
case _ => ()
|
||||
}
|
||||
// there may be a record if we have just restarted
|
||||
db.removeChannel(c.shortChannelId)
|
||||
false
|
||||
case ValidateResult(c, None, _, None) =>
|
||||
// TODO: blacklist?
|
||||
// we couldn't find the funding tx in the blockchain, this is highly suspicious because it should have at least 6 confirmations to be announced
|
||||
log.warning("could not retrieve tx for shortChannelId={}", c.shortChannelId)
|
||||
d0.awaiting.get(c) match {
|
||||
case Some(origins) => origins.foreach(_ ! NonexistingChannel(c))
|
||||
case _ => ()
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
|
@ -417,7 +422,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
|
|||
stay
|
||||
|
||||
// Warning: order matters here, this must be the first match for HasChainHash messages !
|
||||
case Event(PeerRoutingMessage(remoteNodeId, routingMessage: HasChainHash), d) if routingMessage.chainHash != nodeParams.chainHash =>
|
||||
case Event(PeerRoutingMessage(_, _, routingMessage: HasChainHash), d) if routingMessage.chainHash != nodeParams.chainHash =>
|
||||
sender ! TransportHandler.ReadAck(routingMessage)
|
||||
log.warning("message {} for wrong chain {}, we're on {}", routingMessage, routingMessage.chainHash, nodeParams.chainHash)
|
||||
stay
|
||||
|
@ -427,12 +432,12 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
|
|||
log.debug("received channel update from {}", sender)
|
||||
stay using handle(u, sender, d)
|
||||
|
||||
case Event(PeerRoutingMessage(remoteNodeId, u: ChannelUpdate), d) =>
|
||||
case Event(PeerRoutingMessage(_, _, u: ChannelUpdate), d) =>
|
||||
sender ! TransportHandler.ReadAck(u)
|
||||
log.debug("received channel update for shortChannelId={}", u.shortChannelId)
|
||||
stay using handle(u, sender, d)
|
||||
|
||||
case Event(PeerRoutingMessage(remoteNodeId, c: ChannelAnnouncement), d) =>
|
||||
case Event(PeerRoutingMessage(_, _, c: ChannelAnnouncement), d) =>
|
||||
log.debug("received channel announcement for shortChannelId={} nodeId1={} nodeId2={}", c.shortChannelId, c.nodeId1, c.nodeId2)
|
||||
if (d.channels.contains(c.shortChannelId)) {
|
||||
sender ! TransportHandler.ReadAck(c)
|
||||
|
@ -447,7 +452,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
|
|||
} else if (!Announcements.checkSigs(c)) {
|
||||
sender ! TransportHandler.ReadAck(c)
|
||||
log.warning("bad signature for announcement {}", c)
|
||||
sender ! Error(Peer.CHANNELID_ZERO, "bad announcement sig!!!".getBytes())
|
||||
sender ! InvalidSignature(c)
|
||||
stay
|
||||
} else {
|
||||
log.info("validating shortChannelId={}", c.shortChannelId)
|
||||
|
@ -461,12 +466,12 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
|
|||
log.debug("received node announcement from {}", sender)
|
||||
stay using handle(n, sender, d)
|
||||
|
||||
case Event(PeerRoutingMessage(_, n: NodeAnnouncement), d: Data) =>
|
||||
case Event(PeerRoutingMessage(_, _, n: NodeAnnouncement), d: Data) =>
|
||||
sender ! TransportHandler.ReadAck(n)
|
||||
log.debug("received node announcement for nodeId={}", n.nodeId)
|
||||
stay using handle(n, sender, d)
|
||||
|
||||
case Event(PeerRoutingMessage(_, routingMessage@QueryChannelRange(chainHash, firstBlockNum, numberOfBlocks)), d) =>
|
||||
case Event(PeerRoutingMessage(transport, _, routingMessage@QueryChannelRange(chainHash, firstBlockNum, numberOfBlocks)), d) =>
|
||||
sender ! TransportHandler.ReadAck(routingMessage)
|
||||
log.info("received query_channel_range={}", routingMessage)
|
||||
// sort channel ids and keep the ones which are in [firstBlockNum, firstBlockNum + numberOfBlocks]
|
||||
|
@ -478,10 +483,10 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
|
|||
log.info("sending back reply_channel_range with {} items for range=({}, {})", shortChannelIds.size, firstBlockNum, numberOfBlocks)
|
||||
// there could be several reply_channel_range messages for a single query
|
||||
val replies = blocks.map(block => ReplyChannelRange(chainHash, block.firstBlock, block.numBlocks, 1, block.shortChannelIds))
|
||||
replies.foreach(reply => sender ! reply)
|
||||
replies.foreach(reply => transport ! reply)
|
||||
stay
|
||||
|
||||
case Event(PeerRoutingMessage(remoteNodeId, routingMessage@ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, _, data)), d) =>
|
||||
case Event(PeerRoutingMessage(transport, remoteNodeId, routingMessage@ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, _, data)), d) =>
|
||||
sender ! TransportHandler.ReadAck(routingMessage)
|
||||
val (format, theirShortChannelIds, useGzip) = ChannelRangeQueries.decodeShortChannelIds(data)
|
||||
val ourShortChannelIds: SortedSet[ShortChannelId] = d.channels.keySet.filter(keep(firstBlockNum, numberOfBlocks, _, d.channels, d.updates))
|
||||
|
@ -494,7 +499,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
|
|||
case None =>
|
||||
// we don't have a pending query with this peer
|
||||
val (slice, rest) = missing.splitAt(SHORTID_WINDOW)
|
||||
sender ! QueryShortChannelIds(chainHash, ChannelRangeQueries.encodeShortChannelIdsSingle(slice, format, useGzip))
|
||||
transport ! QueryShortChannelIds(chainHash, ChannelRangeQueries.encodeShortChannelIdsSingle(slice, format, useGzip))
|
||||
d.copy(sync = d.sync + (remoteNodeId -> Sync(rest, missing.size)))
|
||||
case Some(sync) =>
|
||||
// we already have a pending query with this peer, add missing ids to our "sync" state
|
||||
|
@ -504,7 +509,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
|
|||
context.system.eventStream.publish(syncProgress(d1))
|
||||
stay using d1
|
||||
|
||||
case Event(PeerRoutingMessage(_, routingMessage@QueryShortChannelIds(chainHash, data)), d) =>
|
||||
case Event(PeerRoutingMessage(transport, _, routingMessage@QueryShortChannelIds(chainHash, data)), d) =>
|
||||
sender ! TransportHandler.ReadAck(routingMessage)
|
||||
val (_, shortChannelIds, useGzip) = ChannelRangeQueries.decodeShortChannelIds(data)
|
||||
log.info("received query_short_channel_ids for {} channel announcements, useGzip={}", shortChannelIds.size, useGzip)
|
||||
|
@ -512,15 +517,15 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
|
|||
d.channels.get(shortChannelId) match {
|
||||
case None => log.warning("received query for shortChannelId={} that we don't have", shortChannelId)
|
||||
case Some(ca) =>
|
||||
sender ! ca
|
||||
d.updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId1, ca.nodeId2)).map(u => sender ! u)
|
||||
d.updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId2, ca.nodeId1)).map(u => sender ! u)
|
||||
transport ! ca
|
||||
d.updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId1, ca.nodeId2)).map(u => transport ! u)
|
||||
d.updates.get(ChannelDesc(ca.shortChannelId, ca.nodeId2, ca.nodeId1)).map(u => transport ! u)
|
||||
}
|
||||
})
|
||||
sender ! ReplyShortChannelIdsEnd(chainHash, 1)
|
||||
transport ! ReplyShortChannelIdsEnd(chainHash, 1)
|
||||
stay
|
||||
|
||||
case Event(PeerRoutingMessage(remoteNodeId, routingMessage@ReplyShortChannelIdsEnd(chainHash, complete)), d) =>
|
||||
case Event(PeerRoutingMessage(transport, remoteNodeId, routingMessage@ReplyShortChannelIdsEnd(chainHash, complete)), d) =>
|
||||
sender ! TransportHandler.ReadAck(routingMessage)
|
||||
log.info("received reply_short_channel_ids_end={}", routingMessage)
|
||||
// have we more channels to ask this peer?
|
||||
|
@ -528,7 +533,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
|
|||
case Some(sync) if sync.missing.nonEmpty =>
|
||||
log.info(s"asking {} for the next slice of short_channel_ids", remoteNodeId)
|
||||
val (slice, rest) = sync.missing.splitAt(SHORTID_WINDOW)
|
||||
sender ! QueryShortChannelIds(chainHash, ChannelRangeQueries.encodeShortChannelIdsSingle(slice, ChannelRangeQueries.UNCOMPRESSED_FORMAT, useGzip = false))
|
||||
transport ! QueryShortChannelIds(chainHash, ChannelRangeQueries.encodeShortChannelIdsSingle(slice, ChannelRangeQueries.UNCOMPRESSED_FORMAT, useGzip = false))
|
||||
d.copy(sync = d.sync + (remoteNodeId -> sync.copy(missing = rest)))
|
||||
case Some(sync) if sync.missing.isEmpty =>
|
||||
// we received reply_short_channel_ids_end for our last query aand have not sent another one, we can now remove
|
||||
|
@ -557,7 +562,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
|
|||
d
|
||||
} else if (!Announcements.checkSig(n)) {
|
||||
log.warning("bad signature for {}", n)
|
||||
origin ! Error(Peer.CHANNELID_ZERO, "bad announcement sig!!!".getBytes())
|
||||
origin ! InvalidSignature(n)
|
||||
d
|
||||
} else if (d.nodes.contains(n.nodeId)) {
|
||||
log.debug("updated node nodeId={}", n.nodeId)
|
||||
|
@ -597,7 +602,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
|
|||
d
|
||||
} else if (!Announcements.checkSig(u, desc.a)) {
|
||||
log.warning("bad signature for announcement shortChannelId={} {}", u.shortChannelId, u)
|
||||
origin ! Error(Peer.CHANNELID_ZERO, "bad announcement sig!!!".getBytes())
|
||||
origin ! InvalidSignature(u)
|
||||
d
|
||||
} else if (d.updates.contains(desc)) {
|
||||
log.debug("updated channel_update for shortChannelId={} public={} flags={} {}", u.shortChannelId, publicChannel, u.flags, u)
|
||||
|
@ -638,7 +643,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
|
|||
d
|
||||
} else if (!Announcements.checkSig(u, desc.a)) {
|
||||
log.warning("bad signature for announcement shortChannelId={} {}", u.shortChannelId, u)
|
||||
origin ! Error(Peer.CHANNELID_ZERO, "bad announcement sig!!!".getBytes())
|
||||
origin ! InvalidSignature(u)
|
||||
d
|
||||
} else if (d.privateUpdates.contains(desc)) {
|
||||
log.debug("updated channel_update for shortChannelId={} public={} flags={} {}", u.shortChannelId, publicChannel, u.flags, u)
|
||||
|
@ -661,7 +666,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef) extends FSMDiagnosticAct
|
|||
|
||||
override def mdc(currentMessage: Any): MDC = currentMessage match {
|
||||
case SendChannelQuery(remoteNodeId, _) => Logs.mdc(remoteNodeId_opt = Some(remoteNodeId))
|
||||
case PeerRoutingMessage(remoteNodeId, _) => Logs.mdc(remoteNodeId_opt = Some(remoteNodeId))
|
||||
case PeerRoutingMessage(_, remoteNodeId, _) => Logs.mdc(remoteNodeId_opt = Some(remoteNodeId))
|
||||
case _ => akka.event.Logging.emptyMDC
|
||||
}
|
||||
}
|
||||
|
|
|
@ -790,10 +790,10 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService
|
|||
|
||||
// then we make the announcements
|
||||
val announcements = channels.map(c => AnnouncementsBatchValidationSpec.makeChannelAnnouncement(c))
|
||||
announcements.foreach(ann => nodes("A").router ! PeerRoutingMessage(remoteNodeId, ann))
|
||||
announcements.foreach(ann => nodes("A").router ! PeerRoutingMessage(sender.ref, remoteNodeId, ann))
|
||||
// we need to send channel_update otherwise router won't validate the channels
|
||||
val updates = channels.zip(announcements).map(x => AnnouncementsBatchValidationSpec.makeChannelUpdate(x._1, x._2.shortChannelId))
|
||||
updates.foreach(update => nodes("A").router ! PeerRoutingMessage(remoteNodeId, update))
|
||||
updates.foreach(update => nodes("A").router ! PeerRoutingMessage(sender.ref, remoteNodeId, update))
|
||||
awaitCond({
|
||||
sender.send(nodes("D").router, 'channels)
|
||||
sender.expectMsgType[Iterable[ChannelAnnouncement]](5 seconds).size == channels.size + 5 // 5 remaining channels because D->F{1-F4} have disappeared
|
||||
|
|
|
@ -1,16 +1,24 @@
|
|||
package fr.acinq.eclair.io
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.testkit.TestProbe
|
||||
import fr.acinq.bitcoin.Block
|
||||
import fr.acinq.eclair.TestkitBaseClass
|
||||
import fr.acinq.eclair.TestConstants._
|
||||
import fr.acinq.eclair.blockchain.EclairWallet
|
||||
import fr.acinq.eclair.crypto.TransportHandler
|
||||
import fr.acinq.eclair.io.Peer.ResumeAnnouncements
|
||||
import fr.acinq.eclair.router.RoutingSyncSpec.makeFakeRoutingInfo
|
||||
import fr.acinq.eclair.router.{ChannelRangeQueriesSpec, Rebroadcast}
|
||||
import fr.acinq.eclair.wire.GossipTimestampFilter
|
||||
import fr.acinq.eclair.router.{ChannelRangeQueries, ChannelRangeQueriesSpec, Rebroadcast, RouteCalculationSpec}
|
||||
import fr.acinq.eclair.{ShortChannelId, TestkitBaseClass, randomKey, wire}
|
||||
import org.junit.runner.RunWith
|
||||
import org.scalatest.Outcome
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Random
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class PeerSpec extends TestkitBaseClass {
|
||||
val shortChannelIds = ChannelRangeQueriesSpec.shortChannelIds.take(100)
|
||||
|
@ -48,9 +56,91 @@ class PeerSpec extends TestkitBaseClass {
|
|||
test("filter gossip message (filtered by timestamp)") { probe =>
|
||||
val rebroadcast = Rebroadcast(channels.map(_ -> Set.empty[ActorRef]).toMap, updates.map(_ -> Set.empty[ActorRef]).toMap, nodes.map(_ -> Set.empty[ActorRef]).toMap)
|
||||
val timestamps = updates.map(_.timestamp).sorted.drop(10).take(20)
|
||||
val (channels1, updates1, nodes1) = Peer.filterGossipMessages(rebroadcast, probe.ref, Some(GossipTimestampFilter(Block.RegtestGenesisBlock.blockId, timestamps.head, timestamps.last - timestamps.head)))
|
||||
val (channels1, updates1, nodes1) = Peer.filterGossipMessages(rebroadcast, probe.ref, Some(wire.GossipTimestampFilter(Block.RegtestGenesisBlock.blockId, timestamps.head, timestamps.last - timestamps.head)))
|
||||
assert(updates1.toSet == updates.filter(u => timestamps.contains(u.timestamp)).toSet)
|
||||
assert(nodes1.toSet == nodes.filter(u => timestamps.contains(u.timestamp)).toSet)
|
||||
assert(channels1.toSet == channels.filter(ca => updates1.map(_.shortChannelId).toSet.contains(ca.shortChannelId)).toSet)
|
||||
}
|
||||
|
||||
test("react to peer's bad behavior") { probe =>
|
||||
val authenticator = TestProbe()
|
||||
val watcher = TestProbe()
|
||||
val router = TestProbe()
|
||||
val relayer = TestProbe()
|
||||
val connection = TestProbe()
|
||||
val transport = TestProbe()
|
||||
val wallet: EclairWallet = null // unused
|
||||
val remoteNodeId = Bob.nodeParams.nodeId
|
||||
val peer = system.actorOf(Peer.props(Alice.nodeParams, remoteNodeId, authenticator.ref, watcher.ref, router.ref, relayer.ref, wallet))
|
||||
|
||||
// let's simulate a connection
|
||||
probe.send(peer, Peer.Init(None, Set.empty))
|
||||
authenticator.send(peer, Authenticator.Authenticated(connection.ref, transport.ref, remoteNodeId, InetSocketAddress.createUnresolved("foo.bar", 42000), false, None))
|
||||
transport.expectMsgType[TransportHandler.Listener]
|
||||
transport.expectMsgType[wire.Init]
|
||||
transport.send(peer, wire.Init(Bob.nodeParams.globalFeatures, Bob.nodeParams.localFeatures))
|
||||
transport.expectMsgType[TransportHandler.ReadAck]
|
||||
router.expectNoMsg(1 second) // bob's features require no sync
|
||||
probe.send(peer, Peer.GetPeerInfo)
|
||||
assert(probe.expectMsgType[Peer.PeerInfo].state == "CONNECTED")
|
||||
|
||||
val channels = for (_ <- 0 until 12) yield RouteCalculationSpec.makeChannel(Random.nextInt(10000000), randomKey.publicKey, randomKey.publicKey)
|
||||
val updates = for (_ <- 0 until 20) yield RouteCalculationSpec.makeUpdate(Random.nextInt(10000000), randomKey.publicKey, randomKey.publicKey, Random.nextInt(1000), Random.nextInt(1000))._2
|
||||
val query = wire.QueryShortChannelIds(Block.RegtestGenesisBlock.hash, ChannelRangeQueries.encodeShortChannelIdsSingle(Seq(ShortChannelId(42000)), ChannelRangeQueries.UNCOMPRESSED_FORMAT, useGzip = false))
|
||||
|
||||
// make sure that routing messages go through
|
||||
for (ann <- channels ++ updates) {
|
||||
transport.send(peer, ann)
|
||||
router.expectMsg(Peer.PeerRoutingMessage(transport.ref, remoteNodeId, ann))
|
||||
}
|
||||
transport.expectNoMsg(1 second) // peer hasn't acknowledged the messages
|
||||
|
||||
// let's assume that the router isn't happy with those channels because the funding tx is already spent
|
||||
for (c <- channels) {
|
||||
router.send(peer, Peer.ChannelClosed(c))
|
||||
}
|
||||
// peer will temporary ignore announcements coming from bob
|
||||
for (ann <- channels ++ updates) {
|
||||
transport.send(peer, ann)
|
||||
transport.expectMsg(TransportHandler.ReadAck(ann))
|
||||
}
|
||||
router.expectNoMsg(1 second)
|
||||
// other routing messages go through
|
||||
transport.send(peer, query)
|
||||
router.expectMsg(Peer.PeerRoutingMessage(transport.ref, remoteNodeId, query))
|
||||
|
||||
// after a while the ban is lifted
|
||||
probe.send(peer, ResumeAnnouncements)
|
||||
|
||||
// and announcements are processed again
|
||||
for (ann <- channels ++ updates) {
|
||||
transport.send(peer, ann)
|
||||
router.expectMsg(Peer.PeerRoutingMessage(transport.ref, remoteNodeId, ann))
|
||||
}
|
||||
transport.expectNoMsg(1 second) // peer hasn't acknowledged the messages
|
||||
|
||||
// now let's assume that the router isn't happy with those channels because the funding tx is not found
|
||||
for (c <- channels) {
|
||||
router.send(peer, Peer.NonexistingChannel(c))
|
||||
}
|
||||
// peer will temporary ignore announcements coming from bob
|
||||
for (ann <- channels ++ updates) {
|
||||
transport.send(peer, ann)
|
||||
transport.expectMsg(TransportHandler.ReadAck(ann))
|
||||
}
|
||||
router.expectNoMsg(1 second)
|
||||
// other routing messages go through
|
||||
transport.send(peer, query)
|
||||
router.expectMsg(Peer.PeerRoutingMessage(transport.ref, remoteNodeId, query))
|
||||
|
||||
// after a while the ban is lifted
|
||||
probe.send(peer, ResumeAnnouncements)
|
||||
|
||||
// and announcements are processed again
|
||||
for (c <- channels) {
|
||||
transport.send(peer, c)
|
||||
router.expectMsg(Peer.PeerRoutingMessage(transport.ref, remoteNodeId, c))
|
||||
}
|
||||
transport.expectNoMsg(1 second) // peer hasn't acknowledged the messages
|
||||
}
|
||||
}
|
||||
|
|
|
@ -103,26 +103,26 @@ abstract class BaseRouterSpec extends TestkitBaseClass {
|
|||
val watcher = TestProbe()
|
||||
val router = system.actorOf(Router.props(Alice.nodeParams, watcher.ref))
|
||||
// we announce channels
|
||||
router ! PeerRoutingMessage(remoteNodeId, chan_ab)
|
||||
router ! PeerRoutingMessage(remoteNodeId, chan_bc)
|
||||
router ! PeerRoutingMessage(remoteNodeId, chan_cd)
|
||||
router ! PeerRoutingMessage(remoteNodeId, chan_ef)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, chan_ab)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, chan_bc)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, chan_cd)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, chan_ef)
|
||||
// then nodes
|
||||
router ! PeerRoutingMessage(remoteNodeId, ann_a)
|
||||
router ! PeerRoutingMessage(remoteNodeId, ann_b)
|
||||
router ! PeerRoutingMessage(remoteNodeId, ann_c)
|
||||
router ! PeerRoutingMessage(remoteNodeId, ann_d)
|
||||
router ! PeerRoutingMessage(remoteNodeId, ann_e)
|
||||
router ! PeerRoutingMessage(remoteNodeId, ann_f)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, ann_a)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, ann_b)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, ann_c)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, ann_d)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, ann_e)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, ann_f)
|
||||
// then channel updates
|
||||
router ! PeerRoutingMessage(remoteNodeId, channelUpdate_ab)
|
||||
router ! PeerRoutingMessage(remoteNodeId, channelUpdate_ba)
|
||||
router ! PeerRoutingMessage(remoteNodeId, channelUpdate_bc)
|
||||
router ! PeerRoutingMessage(remoteNodeId, channelUpdate_cb)
|
||||
router ! PeerRoutingMessage(remoteNodeId, channelUpdate_cd)
|
||||
router ! PeerRoutingMessage(remoteNodeId, channelUpdate_dc)
|
||||
router ! PeerRoutingMessage(remoteNodeId, channelUpdate_ef)
|
||||
router ! PeerRoutingMessage(remoteNodeId, channelUpdate_fe)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, channelUpdate_ab)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, channelUpdate_ba)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, channelUpdate_bc)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, channelUpdate_cb)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, channelUpdate_cd)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, channelUpdate_dc)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, channelUpdate_ef)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, channelUpdate_fe)
|
||||
// watcher receives the get tx requests
|
||||
watcher.expectMsg(ValidateRequest(chan_ab))
|
||||
watcher.expectMsg(ValidateRequest(chan_bc))
|
||||
|
|
|
@ -35,24 +35,7 @@ import scala.util.{Failure, Success}
|
|||
@RunWith(classOf[JUnitRunner])
|
||||
class RouteCalculationSpec extends FunSuite {
|
||||
|
||||
val DUMMY_SIG = BinaryData("3045022100e0a180fdd0fe38037cc878c03832861b40a29d32bd7b40b10c9e1efc8c1468a002205ae06d1624896d0d29f4b31e32772ea3cb1b4d7ed4e077e5da28dcc33c0e781201")
|
||||
|
||||
def makeChannel(shortChannelId: Long, nodeIdA: PublicKey, nodeIdB: PublicKey) = {
|
||||
val (nodeId1, nodeId2) = if (Announcements.isNode1(nodeIdA, nodeIdB)) (nodeIdA, nodeIdB) else (nodeIdB, nodeIdA)
|
||||
ChannelAnnouncement(DUMMY_SIG, DUMMY_SIG, DUMMY_SIG, DUMMY_SIG, "", Block.RegtestGenesisBlock.hash, ShortChannelId(shortChannelId), nodeId1, nodeId2, randomKey.publicKey, randomKey.publicKey)
|
||||
}
|
||||
|
||||
def makeUpdate(shortChannelId: Long, nodeId1: PublicKey, nodeId2: PublicKey, feeBaseMsat: Int, feeProportionalMillionth: Int): (ChannelDesc, ChannelUpdate) =
|
||||
(ChannelDesc(ShortChannelId(shortChannelId), nodeId1, nodeId2) -> ChannelUpdate(DUMMY_SIG, Block.RegtestGenesisBlock.hash, ShortChannelId(shortChannelId), 0L, "0000", 1, 42, feeBaseMsat, feeProportionalMillionth))
|
||||
|
||||
|
||||
def makeGraph(updates: Map[ChannelDesc, ChannelUpdate]) = {
|
||||
val g = new DirectedWeightedPseudograph[PublicKey, DescEdge](classOf[DescEdge])
|
||||
updates.foreach { case (d, u) => Router.addEdge(g, d, u) }
|
||||
g
|
||||
}
|
||||
|
||||
def hops2Ids(route: Seq[Hop]) = route.map(hop => hop.lastUpdate.shortChannelId.toLong)
|
||||
import RouteCalculationSpec._
|
||||
|
||||
val (a, b, c, d, e) = (randomKey.publicKey, randomKey.publicKey, randomKey.publicKey, randomKey.publicKey, randomKey.publicKey)
|
||||
|
||||
|
@ -411,3 +394,26 @@ class RouteCalculationSpec extends FunSuite {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
object RouteCalculationSpec {
|
||||
|
||||
val DUMMY_SIG = BinaryData("3045022100e0a180fdd0fe38037cc878c03832861b40a29d32bd7b40b10c9e1efc8c1468a002205ae06d1624896d0d29f4b31e32772ea3cb1b4d7ed4e077e5da28dcc33c0e781201")
|
||||
|
||||
def makeChannel(shortChannelId: Long, nodeIdA: PublicKey, nodeIdB: PublicKey) = {
|
||||
val (nodeId1, nodeId2) = if (Announcements.isNode1(nodeIdA, nodeIdB)) (nodeIdA, nodeIdB) else (nodeIdB, nodeIdA)
|
||||
ChannelAnnouncement(DUMMY_SIG, DUMMY_SIG, DUMMY_SIG, DUMMY_SIG, "", Block.RegtestGenesisBlock.hash, ShortChannelId(shortChannelId), nodeId1, nodeId2, randomKey.publicKey, randomKey.publicKey)
|
||||
}
|
||||
|
||||
def makeUpdate(shortChannelId: Long, nodeId1: PublicKey, nodeId2: PublicKey, feeBaseMsat: Int, feeProportionalMillionth: Int): (ChannelDesc, ChannelUpdate) =
|
||||
(ChannelDesc(ShortChannelId(shortChannelId), nodeId1, nodeId2) -> ChannelUpdate(DUMMY_SIG, Block.RegtestGenesisBlock.hash, ShortChannelId(shortChannelId), 0L, "0000", 1, 42, feeBaseMsat, feeProportionalMillionth))
|
||||
|
||||
|
||||
def makeGraph(updates: Map[ChannelDesc, ChannelUpdate]) = {
|
||||
val g = new DirectedWeightedPseudograph[PublicKey, DescEdge](classOf[DescEdge])
|
||||
updates.foreach { case (d, u) => Router.addEdge(g, d, u) }
|
||||
g
|
||||
}
|
||||
|
||||
def hops2Ids(route: Seq[Hop]) = route.map(hop => hop.lastUpdate.shortChannelId.toLong)
|
||||
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ import fr.acinq.bitcoin.{Block, Satoshi, Transaction, TxOut}
|
|||
import fr.acinq.eclair.blockchain._
|
||||
import fr.acinq.eclair.channel.BITCOIN_FUNDING_EXTERNAL_CHANNEL_SPENT
|
||||
import fr.acinq.eclair.crypto.TransportHandler
|
||||
import fr.acinq.eclair.io.Peer.PeerRoutingMessage
|
||||
import fr.acinq.eclair.io.Peer.{InvalidSignature, PeerRoutingMessage}
|
||||
import fr.acinq.eclair.payment.PaymentRequest.ExtraHop
|
||||
import fr.acinq.eclair.router.Announcements.makeChannelUpdate
|
||||
import fr.acinq.eclair.transactions.Scripts
|
||||
|
@ -62,15 +62,15 @@ class RouterSpec extends BaseRouterSpec {
|
|||
val chan_az = channelAnnouncement(ShortChannelId(42003), priv_a, priv_z, priv_funding_a, priv_funding_z)
|
||||
val update_az = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, priv_z.publicKey, chan_az.shortChannelId, cltvExpiryDelta = 7, 0, feeBaseMsat = 766000, feeProportionalMillionths = 10)
|
||||
|
||||
router ! PeerRoutingMessage(remoteNodeId, chan_ac)
|
||||
router ! PeerRoutingMessage(remoteNodeId, chan_ax)
|
||||
router ! PeerRoutingMessage(remoteNodeId, chan_ay)
|
||||
router ! PeerRoutingMessage(remoteNodeId, chan_az)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, chan_ac)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, chan_ax)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, chan_ay)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, chan_az)
|
||||
// router won't validate channels before it has a recent enough channel update
|
||||
router ! PeerRoutingMessage(remoteNodeId, update_ac)
|
||||
router ! PeerRoutingMessage(remoteNodeId, update_ax)
|
||||
router ! PeerRoutingMessage(remoteNodeId, update_ay)
|
||||
router ! PeerRoutingMessage(remoteNodeId, update_az)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, update_ac)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, update_ax)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, update_ay)
|
||||
router ! PeerRoutingMessage(null, remoteNodeId, update_az)
|
||||
watcher.expectMsg(ValidateRequest(chan_ac))
|
||||
watcher.expectMsg(ValidateRequest(chan_ax))
|
||||
watcher.expectMsg(ValidateRequest(chan_ay))
|
||||
|
@ -114,25 +114,25 @@ class RouterSpec extends BaseRouterSpec {
|
|||
val channelId_ac = ShortChannelId(420000, 5, 0)
|
||||
val chan_ac = channelAnnouncement(channelId_ac, priv_a, priv_c, priv_funding_a, priv_funding_c)
|
||||
val buggy_chan_ac = chan_ac.copy(nodeSignature1 = chan_ac.nodeSignature2)
|
||||
sender.send(router, PeerRoutingMessage(remoteNodeId, buggy_chan_ac))
|
||||
sender.send(router, PeerRoutingMessage(null, remoteNodeId, buggy_chan_ac))
|
||||
sender.expectMsg(TransportHandler.ReadAck(buggy_chan_ac))
|
||||
sender.expectMsgType[Error]
|
||||
sender.expectMsg(InvalidSignature(buggy_chan_ac))
|
||||
}
|
||||
|
||||
test("handle bad signature for NodeAnnouncement") { case (router, _) =>
|
||||
val sender = TestProbe()
|
||||
val buggy_ann_a = ann_a.copy(signature = ann_b.signature, timestamp = ann_a.timestamp + 1)
|
||||
sender.send(router, PeerRoutingMessage(remoteNodeId, buggy_ann_a))
|
||||
sender.send(router, PeerRoutingMessage(null, remoteNodeId, buggy_ann_a))
|
||||
sender.expectMsg(TransportHandler.ReadAck(buggy_ann_a))
|
||||
sender.expectMsgType[Error]
|
||||
sender.expectMsg(InvalidSignature(buggy_ann_a))
|
||||
}
|
||||
|
||||
test("handle bad signature for ChannelUpdate") { case (router, _) =>
|
||||
val sender = TestProbe()
|
||||
val buggy_channelUpdate_ab = channelUpdate_ab.copy(signature = ann_b.signature, timestamp = channelUpdate_ab.timestamp + 1)
|
||||
sender.send(router, PeerRoutingMessage(remoteNodeId, buggy_channelUpdate_ab))
|
||||
sender.send(router, PeerRoutingMessage(null, remoteNodeId, buggy_channelUpdate_ab))
|
||||
sender.expectMsg(TransportHandler.ReadAck(buggy_channelUpdate_ab))
|
||||
sender.expectMsgType[Error]
|
||||
sender.expectMsg(InvalidSignature(buggy_channelUpdate_ab))
|
||||
}
|
||||
|
||||
test("route not found (unreachable target)") { case (router, _) =>
|
||||
|
@ -186,7 +186,7 @@ class RouterSpec extends BaseRouterSpec {
|
|||
assert(res.hops.last.nextNodeId === d)
|
||||
|
||||
val channelUpdate_cd1 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_c, d, channelId_cd, cltvExpiryDelta = 3, 0, feeBaseMsat = 153000, feeProportionalMillionths = 4, enable = false)
|
||||
sender.send(router, PeerRoutingMessage(remoteNodeId, channelUpdate_cd1))
|
||||
sender.send(router, PeerRoutingMessage(null, remoteNodeId, channelUpdate_cd1))
|
||||
sender.expectMsg(TransportHandler.ReadAck(channelUpdate_cd1))
|
||||
sender.send(router, RouteRequest(a, d))
|
||||
sender.expectMsg(Failure(RouteNotFound))
|
||||
|
|
|
@ -18,11 +18,13 @@ import scala.concurrent.duration._
|
|||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike {
|
||||
|
||||
import RoutingSyncSpec.makeFakeRoutingInfo
|
||||
|
||||
test("handle chanel range queries") {
|
||||
test("handle channel range queries") {
|
||||
val params = TestConstants.Alice.nodeParams
|
||||
val router = TestFSMRef(new Router(params, TestProbe().ref))
|
||||
val transport = TestProbe()
|
||||
val sender = TestProbe()
|
||||
sender.ignoreMsg { case _: TransportHandler.ReadAck => true }
|
||||
val remoteNodeId = TestConstants.Bob.nodeParams.nodeId
|
||||
|
@ -42,41 +44,39 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike {
|
|||
val List(block3) = ChannelRangeQueries.encodeShortChannelIds(firstBlockNum, numberOfBlocks, shortChannelIds.drop(200).take(150), ChannelRangeQueries.UNCOMPRESSED_FORMAT)
|
||||
|
||||
// send first block
|
||||
sender.send(router, PeerRoutingMessage(remoteNodeId, ReplyChannelRange(chainHash, block1.firstBlock, block1.numBlocks, 1, block1.shortChannelIds)))
|
||||
sender.send(router, PeerRoutingMessage(transport.ref, remoteNodeId, ReplyChannelRange(chainHash, block1.firstBlock, block1.numBlocks, 1, block1.shortChannelIds)))
|
||||
// router should ask for our first block of ids
|
||||
val QueryShortChannelIds(_, data1) = sender.expectMsgType[QueryShortChannelIds]
|
||||
val QueryShortChannelIds(_, data1) = transport.expectMsgType[QueryShortChannelIds]
|
||||
val (_, shortChannelIds1, false) = ChannelRangeQueries.decodeShortChannelIds(data1)
|
||||
assert(shortChannelIds1 == shortChannelIds.take(100))
|
||||
|
||||
// send second block
|
||||
sender.send(router, PeerRoutingMessage(remoteNodeId, ReplyChannelRange(chainHash, block2.firstBlock, block2.numBlocks, 1, block2.shortChannelIds)))
|
||||
|
||||
// router should not ask for more ids, it already has a pending query !
|
||||
sender.expectNoMsg(1 second)
|
||||
sender.send(router, PeerRoutingMessage(transport.ref, remoteNodeId, ReplyChannelRange(chainHash, block2.firstBlock, block2.numBlocks, 1, block2.shortChannelIds)))
|
||||
|
||||
// send the first 50 items
|
||||
shortChannelIds1.take(50).foreach(id => {
|
||||
val (ca, cu1, cu2, _, _) = fakeRoutingInfo(id)
|
||||
sender.send(router, PeerRoutingMessage(remoteNodeId, ca))
|
||||
sender.send(router, PeerRoutingMessage(remoteNodeId, cu1))
|
||||
sender.send(router, PeerRoutingMessage(remoteNodeId, cu2))
|
||||
sender.send(router, PeerRoutingMessage(transport.ref, remoteNodeId, ca))
|
||||
sender.send(router, PeerRoutingMessage(transport.ref, remoteNodeId, cu1))
|
||||
sender.send(router, PeerRoutingMessage(transport.ref, remoteNodeId, cu2))
|
||||
})
|
||||
sender.expectNoMsg(1 second)
|
||||
|
||||
// send the last 50 items
|
||||
shortChannelIds1.drop(50).foreach(id => {
|
||||
val (ca, cu1, cu2, _, _) = fakeRoutingInfo(id)
|
||||
sender.send(router, PeerRoutingMessage(remoteNodeId, ca))
|
||||
sender.send(router, PeerRoutingMessage(remoteNodeId, cu1))
|
||||
sender.send(router, PeerRoutingMessage(remoteNodeId, cu2))
|
||||
sender.send(router, PeerRoutingMessage(transport.ref, remoteNodeId, ca))
|
||||
sender.send(router, PeerRoutingMessage(transport.ref, remoteNodeId, cu1))
|
||||
sender.send(router, PeerRoutingMessage(transport.ref, remoteNodeId, cu2))
|
||||
})
|
||||
sender.expectNoMsg(1 second)
|
||||
|
||||
// during that time, router should not have asked for more ids, it already has a pending query !
|
||||
transport.expectNoMsg(200 millis)
|
||||
|
||||
// now send our ReplyShortChannelIdsEnd message
|
||||
sender.send(router, PeerRoutingMessage(remoteNodeId, ReplyShortChannelIdsEnd(chainHash, 1.toByte)))
|
||||
sender.send(router, PeerRoutingMessage(transport.ref, remoteNodeId, ReplyShortChannelIdsEnd(chainHash, 1.toByte)))
|
||||
|
||||
// router should ask for our second block of ids
|
||||
val QueryShortChannelIds(_, data2) = sender.expectMsgType[QueryShortChannelIds]
|
||||
val QueryShortChannelIds(_, data2) = transport.expectMsgType[QueryShortChannelIds]
|
||||
val (_, shortChannelIds2, false) = ChannelRangeQueries.decodeShortChannelIds(data2)
|
||||
assert(shortChannelIds2 == shortChannelIds.drop(100).take(100))
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue