1
0
mirror of https://github.com/ACINQ/eclair.git synced 2024-11-19 01:43:22 +01:00

Add simple integration test between Channel and Router (#2270)

In this PR we add a basic integration test between `Channel` and `Router` that checks the proper handling of local announcements.

* disable router rebroadcast in tests

* use separate ActorSystem for alice and bob in tests

* add a simple channel-router integration test

* fix bug found in new channel-router test

The new test was failing, due to a bug. When a local
channel graduates from private to public, we do not copy existing
known `channel_update`s. Current implementation guarantees that we
will process our local `channel_update` immediately, but what about our
peer?

* fix rebroadcast for local announcements

We fix a second bug where gossip origin wasn't properly set for local
announcements

* increase ram for tests to 2G

* improve debuggability of integration tests
This commit is contained in:
Pierre-Marie Padiou 2022-05-19 09:39:58 +02:00 committed by GitHub
parent 81571b95e6
commit bb7703aa5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 257 additions and 85 deletions

View File

@ -604,7 +604,7 @@ object Router {
channels: SortedMap[ShortChannelId, PublicChannel],
stash: Stash,
rebroadcast: Rebroadcast,
awaiting: Map[ChannelAnnouncement, Seq[RemoteGossip]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done
awaiting: Map[ChannelAnnouncement, Seq[GossipOrigin]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done
privateChannels: Map[ShortChannelId, PrivateChannel],
excludedChannels: Set[ChannelDesc], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
graph: DirectedGraph,

View File

@ -19,6 +19,7 @@ package fr.acinq.eclair.router
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.actor.{ActorContext, ActorRef, typed}
import akka.event.{DiagnosticLoggingAdapter, LoggingAdapter}
import com.softwaremill.quicklens.ModifyPimp
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.Script.{pay2wsh, write}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
@ -81,17 +82,20 @@ object Validation {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
import nodeParams.db.{network => db}
import r.c
d0.awaiting.get(c) match {
case Some(origin +: _) => origin.peerConnection ! TransportHandler.ReadAck(c) // now we can acknowledge the message, we only need to do it for the first peer that sent us the announcement
// now we can acknowledge the message, we only need to do it for the first peer that sent us the announcement
// (the other ones have already been acknowledged as duplicates)
d0.awaiting.getOrElse(c, Seq.empty).headOption match {
case Some(origin: RemoteGossip) => origin.peerConnection ! TransportHandler.ReadAck(c)
case Some(LocalGossip) => () // there is nothing to ack if it was a local gossip
case _ => ()
}
val remoteOrigins_opt = d0.awaiting.get(c)
Logs.withMdc(log)(Logs.mdc(remoteNodeId_opt = remoteOrigins_opt.flatMap(_.headOption).map(_.nodeId))) { // in the MDC we use the node id that sent us the announcement first
val remoteOrigins = d0.awaiting.getOrElse(c, Set.empty).collect { case rg: RemoteGossip => rg }
Logs.withMdc(log)(Logs.mdc(remoteNodeId_opt = remoteOrigins.headOption.map(_.nodeId))) { // in the MDC we use the node id that sent us the announcement first
log.debug("got validation result for shortChannelId={} (awaiting={} stash.nodes={} stash.updates={})", c.shortChannelId, d0.awaiting.size, d0.stash.nodes.size, d0.stash.updates.size)
val publicChannel_opt = r match {
case ValidateResult(c, Left(t)) =>
log.warning("validation failure for shortChannelId={} reason={}", c.shortChannelId, t.getMessage)
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ValidationFailure(c))))
remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.ValidationFailure(c)))
None
case ValidateResult(c, Right((tx, UtxoStatus.Unspent))) =>
val TxCoordinates(_, _, outputIndex) = ShortChannelId.coordinates(c.shortChannelId)
@ -103,12 +107,12 @@ object Validation {
}
if (fundingOutputIsInvalid) {
log.error(s"invalid script for shortChannelId={}: txid={} does not have script=$fundingOutputScript at outputIndex=$outputIndex ann={}", c.shortChannelId, tx.txid, c)
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.InvalidAnnouncement(c))))
remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.InvalidAnnouncement(c)))
None
} else {
watcher ! WatchExternalChannelSpent(ctx.self, tx.txid, outputIndex, c.shortChannelId)
log.debug("added channel channelId={}", c.shortChannelId)
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c))))
remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c)))
val capacity = tx.txOut(outputIndex).amount
ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(c, capacity, None, None) :: Nil))
db.addChannel(c, tx.txid, capacity)
@ -118,24 +122,29 @@ object Validation {
val nodeAnn = Announcements.makeNodeAnnouncement(nodeParams.privateKey, nodeParams.alias, nodeParams.color, nodeParams.publicAddresses, nodeParams.features.nodeAnnouncementFeatures())
ctx.self ! nodeAnn
}
// public channels that haven't yet been announced are considered as private channels
val channelMeta_opt = d0.privateChannels.get(c.shortChannelId).map(_.meta)
Some(PublicChannel(c, tx.txid, capacity, None, None, channelMeta_opt))
// maybe this previously was a local unannounced channel
val privateChannel_opt = d0.privateChannels.get(c.shortChannelId)
Some(PublicChannel(c,
tx.txid,
capacity,
update_1_opt = privateChannel_opt.flatMap(_.update_1_opt),
update_2_opt = privateChannel_opt.flatMap(_.update_2_opt),
meta_opt = privateChannel_opt.map(_.meta)))
}
case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) =>
if (fundingTxStatus.spendingTxConfirmed) {
log.debug("ignoring shortChannelId={} tx={} (funding tx already spent and spending tx is confirmed)", c.shortChannelId, tx.txid)
// the funding tx has been spent by a transaction that is now confirmed: peer shouldn't send us those
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosed(c))))
remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosed(c)))
} else {
log.debug("ignoring shortChannelId={} tx={} (funding tx already spent but spending tx isn't confirmed)", c.shortChannelId, tx.txid)
remoteOrigins_opt.foreach(_.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosing(c))))
remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.ChannelClosing(c)))
}
// there may be a record if we have just restarted
db.removeChannel(c.shortChannelId)
None
}
// we also reprocess node and channel_update announcements related to channels that were just analyzed
// we also reprocess node and channel_update announcements related to the channel that was just analyzed
val reprocessUpdates = d0.stash.updates.view.filterKeys(u => u.shortChannelId == c.shortChannelId)
val reprocessNodes = d0.stash.nodes.view.filterKeys(n => isRelatedTo(c, n.nodeId))
// and we remove the reprocessed messages from the stash
@ -145,12 +154,17 @@ object Validation {
publicChannel_opt match {
case Some(pc) =>
// note: if the channel is graduating from private to public, the implementation (in the LocalChannelUpdate handler) guarantees that we will process a new channel_update
// right after the channel_announcement, channel_updates will be moved from private to public at that time
// those updates are only defined if this was a previously an unannounced local channel, we broadcast them if they use the real scid
val updates1 = (pc.update_1_opt.toSet ++ pc.update_2_opt.toSet)
.map(u => u -> (if (pc.getNodeIdSameSideAs(u) == nodeParams.nodeId) Set[GossipOrigin](LocalGossip) else Set.empty[GossipOrigin]))
.toMap
val d1 = d0.copy(
channels = d0.channels + (c.shortChannelId -> pc),
privateChannels = d0.privateChannels - c.shortChannelId, // we remove fake announcements that we may have made before
rebroadcast = d0.rebroadcast.copy(channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet)), // we also add the newly validated channels to the rebroadcast queue
privateChannels = d0.privateChannels - c.shortChannelId, // we remove the corresponding unannounced channel that we may have until now
rebroadcast = d0.rebroadcast.copy(
channels = d0.rebroadcast.channels + (c -> d0.awaiting.getOrElse(c, Nil).toSet), // we rebroadcast the channel to our peers
updates = d0.rebroadcast.updates ++ updates1
), // we also add the newly validated channels to the rebroadcast queue
stash = stash1,
awaiting = awaiting1)
// we only reprocess updates and nodes if validation succeeded
@ -419,7 +433,7 @@ object Validation {
case Some(c) =>
// channel wasn't announced but here is the announcement, we will process it *before* the channel_update
watcher ! ValidateRequest(ctx.self, c)
val d1 = d.copy(awaiting = d.awaiting + (c -> Nil)) // no origin
val d1 = d.copy(awaiting = d.awaiting + (c -> Seq(LocalGossip))) // no origin
// maybe the local channel was pruned (can happen if we were disconnected for more than 2 weeks)
db.removeFromPruned(c.shortChannelId)
handleChannelUpdate(d1, db, routerConf, Left(lcu))

View File

@ -161,7 +161,7 @@ object TestConstants {
routerConf = RouterConf(
watchSpentWindow = 1 second,
channelExcludeDuration = 60 seconds,
routerBroadcastInterval = 5 seconds,
routerBroadcastInterval = 1 day, // "disables" rebroadcast
requestNodeAnnouncements = true,
encodingType = EncodingType.COMPRESSED_ZLIB,
channelRangeChunkSize = 20,
@ -299,7 +299,7 @@ object TestConstants {
routerConf = RouterConf(
watchSpentWindow = 1 second,
channelExcludeDuration = 60 seconds,
routerBroadcastInterval = 5 seconds,
routerBroadcastInterval = 1 day, // "disables" rebroadcast
requestNodeAnnouncements = true,
encodingType = EncodingType.UNCOMPRESSED,
channelRangeChunkSize = 20,

View File

@ -17,8 +17,8 @@
package fr.acinq.eclair.channel.states
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.actor.{ActorContext, ActorRef}
import akka.testkit.{TestFSMRef, TestKitBase, TestProbe}
import akka.actor.{ActorContext, ActorRef, ActorSystem}
import akka.testkit.{TestFSMRef, TestKit, TestKitBase, TestProbe}
import com.softwaremill.quicklens.ModifyPimp
import fr.acinq.bitcoin.ScriptFlags
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
@ -106,6 +106,12 @@ trait ChannelStateTestsHelperMethods extends TestKitBase {
def currentBlockHeight: BlockHeight = alice.underlyingActor.nodeParams.currentBlockHeight
}
val systemA: ActorSystem = ActorSystem("system-alice")
val systemB: ActorSystem = ActorSystem("system-bob")
system.registerOnTermination(TestKit.shutdownActorSystem(systemA))
system.registerOnTermination(TestKit.shutdownActorSystem(systemB))
def init(nodeParamsA: NodeParams = TestConstants.Alice.nodeParams, nodeParamsB: NodeParams = TestConstants.Bob.nodeParams, wallet: OnChainWallet = new DummyOnChainWallet(), tags: Set[String] = Set.empty): SetupFixture = {
val aliceOrigin = TestProbe()
val alice2bob = TestProbe()
@ -119,8 +125,10 @@ trait ChannelStateTestsHelperMethods extends TestKitBase {
val alice2relayer = TestProbe()
val bob2relayer = TestProbe()
val channelUpdateListener = TestProbe()
system.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelUpdate])
system.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelDown])
systemA.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelUpdate])
systemA.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelDown])
systemB.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelUpdate])
systemB.eventStream.subscribe(channelUpdateListener.ref, classOf[LocalChannelDown])
val router = TestProbe()
val finalNodeParamsA = nodeParamsA
.modify(_.channelConf.dustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceAliceBob))(5000 sat)
@ -132,8 +140,14 @@ trait ChannelStateTestsHelperMethods extends TestKitBase {
.modify(_.channelConf.dustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceBobAlice))(5000 sat)
.modify(_.channelConf.maxRemoteDustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceAliceBob))(10000 sat)
.modify(_.channelConf.maxRemoteDustLimit).setToIf(tags.contains(ChannelStateTestsTags.HighDustLimitDifferenceBobAlice))(10000 sat)
val alice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(finalNodeParamsA, wallet, finalNodeParamsB.nodeId, alice2blockchain.ref, alice2relayer.ref, FakeTxPublisherFactory(alice2blockchain), origin_opt = Some(aliceOrigin.ref)), alicePeer.ref)
val bob: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(finalNodeParamsB, wallet, finalNodeParamsA.nodeId, bob2blockchain.ref, bob2relayer.ref, FakeTxPublisherFactory(bob2blockchain)), bobPeer.ref)
val alice: TestFSMRef[ChannelState, ChannelData, Channel] = {
implicit val system: ActorSystem = systemA
TestFSMRef(new Channel(finalNodeParamsA, wallet, finalNodeParamsB.nodeId, alice2blockchain.ref, alice2relayer.ref, FakeTxPublisherFactory(alice2blockchain), origin_opt = Some(aliceOrigin.ref)), alicePeer.ref)
}
val bob: TestFSMRef[ChannelState, ChannelData, Channel] = {
implicit val system: ActorSystem = systemB
TestFSMRef(new Channel(finalNodeParamsB, wallet, finalNodeParamsA.nodeId, bob2blockchain.ref, bob2relayer.ref, FakeTxPublisherFactory(bob2blockchain)), bobPeer.ref)
}
SetupFixture(alice, bob, aliceOrigin, alice2bob, bob2alice, alice2blockchain, bob2blockchain, router, alice2relayer, bob2relayer, channelUpdateListener, wallet, alicePeer, bobPeer)
}
@ -179,7 +193,7 @@ trait ChannelStateTestsHelperMethods extends TestKitBase {
(aliceParams, bobParams, channelType)
}
def reachNormal(setup: SetupFixture, tags: Set[String] = Set.empty): Unit = {
def reachNormal(setup: SetupFixture, tags: Set[String] = Set.empty): Transaction = {
import setup._
@ -231,6 +245,7 @@ trait ChannelStateTestsHelperMethods extends TestKitBase {
// x2 because alice and bob share the same relayer
channelUpdateListener.expectMsgType[LocalChannelUpdate]
channelUpdateListener.expectMsgType[LocalChannelUpdate]
fundingTx
}
def localOrigin(replyTo: ActorRef): Origin.LocalHot = Origin.LocalHot(replyTo, UUID.randomUUID())

View File

@ -82,7 +82,7 @@ class WaitForFundingSignedStateSpec extends TestKitBaseClass with FixtureAnyFunS
test("recv FundingSigned with valid signature") { f =>
import f._
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
alice.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
bob2alice.expectMsgType[FundingSigned]
bob2alice.forward(alice)
awaitCond(alice.stateName == WAIT_FOR_FUNDING_CONFIRMED)

View File

@ -54,7 +54,7 @@ class WaitForFundingConfirmedStateSpec extends TestKitBaseClass with FixtureAnyF
within(30 seconds) {
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
alice.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
alice ! INPUT_INIT_FUNDER(ByteVector32.Zeroes, TestConstants.fundingSatoshis, pushMsat, TestConstants.feeratePerKw, TestConstants.feeratePerKw, aliceParams, alice2bob.ref, bobInit, ChannelFlags.Private, channelConfig, channelType)
alice2blockchain.expectMsgType[TxPublisher.SetChannelId]
bob ! INPUT_INIT_FUNDEE(ByteVector32.Zeroes, bobParams, bob2alice.ref, aliceInit, channelConfig, channelType)
@ -83,8 +83,8 @@ class WaitForFundingConfirmedStateSpec extends TestKitBaseClass with FixtureAnyF
import f._
// we create a new listener that registers after alice has published the funding tx
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
system.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed])
bob.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
bob.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed])
// make bob send a FundingLocked msg
val fundingTx = alice.stateData.asInstanceOf[DATA_WAIT_FOR_FUNDING_CONFIRMED].fundingTx.get
bob ! WatchFundingConfirmedTriggered(BlockHeight(42000), 42, fundingTx)
@ -102,8 +102,8 @@ class WaitForFundingConfirmedStateSpec extends TestKitBaseClass with FixtureAnyF
import f._
// we create a new listener that registers after alice has published the funding tx
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
system.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed])
alice.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionPublished])
alice.underlying.system.eventStream.subscribe(listener.ref, classOf[TransactionConfirmed])
val fundingTx = alice.stateData.asInstanceOf[DATA_WAIT_FOR_FUNDING_CONFIRMED].fundingTx.get
alice ! WatchFundingConfirmedTriggered(BlockHeight(42000), 42, fundingTx)
assert(listener.expectMsgType[TransactionConfirmed].tx === fundingTx)

View File

@ -74,7 +74,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
val initialState = alice.stateData.asInstanceOf[DATA_NORMAL]
val sender = TestProbe()
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[AvailableBalanceChanged])
alice.underlying.system.eventStream.subscribe(listener.ref, classOf[AvailableBalanceChanged])
val h = randomBytes32()
val add = CMD_ADD_HTLC(sender.ref, 50000000 msat, h, CltvExpiryDelta(144).toCltvExpiry(currentBlockHeight), TestConstants.emptyOnionPacket, localOrigin(sender.ref))
alice ! add
@ -881,7 +881,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
bob2alice.expectMsgType[UpdateFulfillHtlc]
// we listen to channel_update events
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[LocalChannelUpdate])
bob.underlying.system.eventStream.subscribe(listener.ref, classOf[LocalChannelUpdate])
// actual test starts here
// when signing the fulfill, bob will have its main output go above reserve in alice's commitment tx
@ -896,7 +896,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
test("recv CMD_SIGN (after CMD_UPDATE_FEE)") { f =>
import f._
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[AvailableBalanceChanged])
alice.underlying.system.eventStream.subscribe(listener.ref, classOf[AvailableBalanceChanged])
alice ! CMD_UPDATE_FEE(FeeratePerKw(654564 sat))
alice2bob.expectMsgType[UpdateFee]
alice ! CMD_SIGN()
@ -2734,7 +2734,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
crossSign(alice, bob, alice2bob, bob2alice)
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred])
bob.underlying.system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred])
// actual test begins:
// * Bob receives the HTLC pre-image and wants to fulfill
@ -2767,7 +2767,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
crossSign(alice, bob, alice2bob, bob2alice)
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred])
bob.underlying.system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred])
// actual test begins:
// * Bob receives the HTLC pre-image and wants to fulfill but doesn't sign
@ -2800,7 +2800,7 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
crossSign(alice, bob, alice2bob, bob2alice)
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred])
bob.underlying.system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred])
// actual test begins:
// * Bob receives the HTLC pre-image and wants to fulfill

View File

@ -527,7 +527,7 @@ class OfflineStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
crossSign(alice, bob, alice2bob, bob2alice)
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred])
bob.underlying.system.eventStream.subscribe(listener.ref, classOf[ChannelErrorOccurred])
val initialState = bob.stateData.asInstanceOf[DATA_NORMAL]
val initialCommitTx = initialState.commitments.localCommit.commitTxAndRemoteSig.commitTx.tx

View File

@ -89,15 +89,19 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
bob2blockchain.expectMsgType[WatchFundingConfirmed]
awaitCond(alice.stateName == WAIT_FOR_FUNDING_CONFIRMED)
awaitCond(bob.stateName == WAIT_FOR_FUNDING_CONFIRMED)
system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished])
system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed])
alice.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished])
alice.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed])
bob.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished])
bob.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed])
withFixture(test.toNoArgTest(FixtureParam(alice, bob, alice2bob, bob2alice, alice2blockchain, bob2blockchain, alice2relayer, bob2relayer, channelUpdateListener, eventListener, Nil)))
}
} else {
within(30 seconds) {
reachNormal(setup, test.tags)
system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished])
system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed])
alice.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished])
alice.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed])
bob.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionPublished])
bob.underlying.system.eventStream.subscribe(eventListener.ref, classOf[TransactionConfirmed])
val bobCommitTxs: List[CommitTxAndRemoteSig] = (for (amt <- List(100000000 msat, 200000000 msat, 300000000 msat)) yield {
val (r, htlc) = addHtlc(amt, alice, bob, alice2bob, bob2alice)
crossSign(alice, bob, alice2bob, bob2alice)
@ -368,8 +372,8 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
assert(alice.stateData.asInstanceOf[DATA_NORMAL].commitments.channelFeatures === channelFeatures)
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[LocalCommitConfirmed])
system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain])
alice.underlying.system.eventStream.subscribe(listener.ref, classOf[LocalCommitConfirmed])
alice.underlying.system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain])
// alice sends an htlc to bob
val (_, htlca1) = addHtlc(50000000 msat, alice, bob, alice2bob, bob2alice)
@ -473,7 +477,7 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
test("recv WatchTxConfirmedTriggered (local commit with htlcs only signed by local)") { f =>
import f._
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain])
alice.underlying.system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain])
val aliceCommitTx = alice.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.commitTxAndRemoteSig.commitTx.tx
// alice sends an htlc
val (_, htlc) = addHtlc(4200000 msat, alice, bob, alice2bob, bob2alice)
@ -522,7 +526,7 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
test("recv WatchTxConfirmedTriggered (local commit with fail not acked by remote)") { f =>
import f._
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain])
alice.underlying.system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain])
val (_, htlc) = addHtlc(25000000 msat, alice, bob, alice2bob, bob2alice)
crossSign(alice, bob, alice2bob, bob2alice)
failHtlc(htlc.id, bob, alice, bob2alice, alice2bob)
@ -603,7 +607,7 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
test("recv WatchTxConfirmedTriggered (remote commit with htlcs only signed by local in next remote commit)") { f =>
import f._
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain])
alice.underlying.system.eventStream.subscribe(listener.ref, classOf[PaymentSettlingOnChain])
val bobCommitTx = bob.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.commitTxAndRemoteSig.commitTx.tx
// alice sends an htlc
val (_, htlc) = addHtlc(4200000 msat, alice, bob, alice2bob, bob2alice)

View File

@ -117,19 +117,21 @@ class PaymentIntegrationSpec extends IntegrationSpec {
def awaitAnnouncements(subset: Map[String, Kit], nodes: Int, channels: Int, updates: Int): Unit = {
val sender = TestProbe()
subset.foreach {
case (_, setup) =>
awaitCond({
sender.send(setup.router, Router.GetNodes)
sender.expectMsgType[Iterable[NodeAnnouncement]].size == nodes
}, max = 60 seconds, interval = 1 second)
awaitCond({
sender.send(setup.router, Router.GetChannels)
sender.expectMsgType[Iterable[ChannelAnnouncement]].size == channels
}, max = 60 seconds, interval = 1 second)
awaitCond({
sender.send(setup.router, Router.GetChannelUpdates)
sender.expectMsgType[Iterable[ChannelUpdate]].size == updates
}, max = 60 seconds, interval = 1 second)
case (node, setup) =>
withClue(node) {
awaitAssert({
sender.send(setup.router, Router.GetNodes)
assert(sender.expectMsgType[Iterable[NodeAnnouncement]].size == nodes)
}, max = 10 seconds, interval = 1 second)
awaitAssert({
sender.send(setup.router, Router.GetChannels)
sender.expectMsgType[Iterable[ChannelAnnouncement]].size == channels
}, max = 10 seconds, interval = 1 second)
awaitAssert({
sender.send(setup.router, Router.GetChannelUpdates)
sender.expectMsgType[Iterable[ChannelUpdate]].size == updates
}, max = 10 seconds, interval = 1 second)
}
}
}

View File

@ -122,7 +122,6 @@ abstract class BaseRouterSpec extends TestKitBaseClass with FixtureAnyFunSuiteLi
import com.softwaremill.quicklens._
val nodeParams = Alice.nodeParams
.modify(_.nodeKeyManager).setTo(testNodeKeyManager)
.modify(_.routerConf.routerBroadcastInterval).setTo(1 day) // "disable" auto rebroadcast
val router = system.actorOf(Router.props(nodeParams, watcher.ref))
// we announce channels
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_ab))

View File

@ -0,0 +1,125 @@
package fr.acinq.eclair.router
import akka.actor.ActorSystem
import akka.testkit.{TestFSMRef, TestProbe}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.WatchFundingDeeplyBuriedTriggered
import fr.acinq.eclair.channel.DATA_NORMAL
import fr.acinq.eclair.channel.states.{ChannelStateTestsBase, ChannelStateTestsTags}
import fr.acinq.eclair.io.Peer.PeerRoutingMessage
import fr.acinq.eclair.router.Router.{GossipOrigin, LocalGossip}
import fr.acinq.eclair.wire.protocol.{AnnouncementSignatures, ChannelUpdate}
import fr.acinq.eclair.{BlockHeight, TestKitBaseClass}
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
import org.scalatest.{Outcome, Tag}
/**
* This test checks the integration between Channel and Router (events, etc.)
*/
class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with ChannelStateTestsBase {
case class FixtureParam(router: TestFSMRef[Router.State, Router.Data, Router], rebroadcastListener: TestProbe, channels: SetupFixture, testTags: Set[String])
implicit val log: akka.event.LoggingAdapter = akka.event.NoLogging
override def withFixture(test: OneArgTest): Outcome = {
val channels = init(tags = test.tags)
val rebroadcastListener = TestProbe()
val router: TestFSMRef[Router.State, Router.Data, Router] = {
// we use alice's actor system so we share the same event stream
implicit val system: ActorSystem = channels.alice.underlying.system
system.eventStream.subscribe(rebroadcastListener.ref, classOf[Router.Rebroadcast])
TestFSMRef(new Router(channels.alice.underlyingActor.nodeParams, channels.alice.underlyingActor.blockchain, initialized = None))
}
withFixture(test.toNoArgTest(FixtureParam(router, rebroadcastListener, channels, test.tags)))
}
test("private local channel") { f =>
import f._
reachNormal(channels, testTags)
awaitAssert(router.stateData.privateChannels.size === 1)
{
// only the local channel_update is known (bob won't send his before the channel is deeply buried)
val pc = router.stateData.privateChannels.values.head
assert(pc.update_1_opt.isDefined ^ pc.update_2_opt.isDefined)
}
val peerConnection = TestProbe()
// bob hasn't yet sent his channel_update but we can get it by looking at its internal data
val bobChannelUpdate = channels.bob.stateData.asInstanceOf[DATA_NORMAL].channelUpdate
router ! PeerRoutingMessage(peerConnection.ref, channels.bob.underlyingActor.nodeParams.nodeId, bobChannelUpdate)
awaitAssert {
val pc = router.stateData.privateChannels.values.head
// both channel_updates are known
pc.update_1_opt.isDefined && pc.update_2_opt.isDefined
}
// manual rebroadcast
router ! Router.TickBroadcast
rebroadcastListener.expectNoMessage()
}
test("public local channel", Tag(ChannelStateTestsTags.ChannelsPublic)) { f =>
import f._
val fundingTx = reachNormal(channels, testTags)
awaitAssert(router.stateData.privateChannels.size === 1)
{
val pc = router.stateData.privateChannels.values.head
// only the local channel_update is known
assert(pc.update_1_opt.isDefined ^ pc.update_2_opt.isDefined)
}
val peerConnection = TestProbe()
// alice and bob haven't yet sent their channel_updates but we can get them by looking at their internal data
val aliceChannelUpdate = channels.alice.stateData.asInstanceOf[DATA_NORMAL].channelUpdate
val bobChannelUpdate = channels.bob.stateData.asInstanceOf[DATA_NORMAL].channelUpdate
router ! PeerRoutingMessage(peerConnection.ref, channels.bob.underlyingActor.nodeParams.nodeId, bobChannelUpdate)
awaitAssert {
val pc = router.stateData.privateChannels.values.head
// both channel_updates are known
pc.update_1_opt.isDefined && pc.update_2_opt.isDefined
}
// funding tx reaches 6 blocks, announcements are exchanged
channels.alice ! WatchFundingDeeplyBuriedTriggered(BlockHeight(400000), 42, null)
channels.alice2bob.expectMsgType[AnnouncementSignatures]
channels.alice2bob.forward(channels.bob)
channels.bob ! WatchFundingDeeplyBuriedTriggered(BlockHeight(400000), 42, null)
channels.bob2alice.expectMsgType[AnnouncementSignatures]
channels.bob2alice.forward(channels.alice)
// router gets notified and attempts to validate the local channel
val vr = channels.alice2blockchain.expectMsgType[ZmqWatcher.ValidateRequest]
vr.replyTo ! ZmqWatcher.ValidateResult(vr.ann, Right((fundingTx, ZmqWatcher.UtxoStatus.Unspent)))
awaitAssert {
router.stateData.privateChannels.isEmpty && router.stateData.channels.size == 1
}
awaitAssert {
val pc = router.stateData.channels.values.head
// both channel updates are preserved
pc.update_1_opt.isDefined && pc.update_2_opt.isDefined
}
// manual rebroadcast
router ! Router.TickBroadcast
rebroadcastListener.expectMsg(Router.Rebroadcast(
channels = Map(vr.ann -> Set[GossipOrigin](LocalGossip)),
updates = Map(aliceChannelUpdate -> Set[GossipOrigin](LocalGossip), bobChannelUpdate -> Set.empty[GossipOrigin]), // broadcast the channel_updates (they were previously unannounced)
nodes = Map(router.underlyingActor.stateData.nodes.values.head -> Set[GossipOrigin](LocalGossip)), // new node_announcement
))
}
}

View File

@ -18,7 +18,7 @@ package fr.acinq.eclair.router
import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.testkit.{TestKit, TestProbe}
import akka.testkit.{TestFSMRef, TestKit, TestProbe}
import fr.acinq.bitcoin.scalacompat.Crypto.PrivateKey
import fr.acinq.bitcoin.scalacompat.Script.{pay2wsh, write}
import fr.acinq.bitcoin.scalacompat.{Block, SatoshiLong, Transaction, TxOut}
@ -32,9 +32,6 @@ import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.wire.protocol.Color
import org.scalatest.funsuite.AnyFunSuiteLike
import scodec.bits._
import scala.concurrent.duration._
class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike {
@ -131,12 +128,14 @@ class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike
peerConnection1b.expectMsg(GossipDecision.Accepted(chan_ab))
peerConnection2a.expectMsg(GossipDecision.Accepted(chan_ab))
// we have to wait 2 times the broadcast interval because there is an additional per-peer delay
val maxBroadcastDelay = 2 * nodeParams.routerConf.routerBroadcastInterval + 1.second
peerConnection1a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty))
peerConnection1b.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty))
peerConnection2a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map.empty, nodes = Map.empty))
peerConnection3a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map.empty, nodes = Map.empty))
// manual rebroadcast
front1 ! Router.TickBroadcast
peerConnection1a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty))
peerConnection1b.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map.empty, nodes = Map.empty))
front2 ! Router.TickBroadcast
peerConnection2a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map.empty, nodes = Map.empty))
front3 ! Router.TickBroadcast
peerConnection3a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map.empty, nodes = Map.empty))
}
test("aggregate gossip") {
@ -149,9 +148,18 @@ class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike
val system2 = ActorSystem("front-system-2")
val system3 = ActorSystem("front-system-3")
val front1 = system1.actorOf(FrontRouter.props(nodeParams.routerConf, router))
val front2 = system2.actorOf(FrontRouter.props(nodeParams.routerConf, router))
val front3 = system3.actorOf(FrontRouter.props(nodeParams.routerConf, router))
val front1 = {
implicit val system: ActorSystem = system1
TestFSMRef[FrontRouter.State, FrontRouter.Data, FrontRouter](new FrontRouter(nodeParams.routerConf, router))
}
val front2 = {
implicit val system: ActorSystem = system2
TestFSMRef[FrontRouter.State, FrontRouter.Data, FrontRouter](new FrontRouter(nodeParams.routerConf, router))
}
val front3 = {
implicit val system: ActorSystem = system3
TestFSMRef[FrontRouter.State, FrontRouter.Data, FrontRouter](new FrontRouter(nodeParams.routerConf, router))
}
val peerConnection1a = TestProbe("peerconn-1a")
val peerConnection1b = TestProbe("peerconn-1b")
@ -182,7 +190,6 @@ class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike
peerConnection3a.expectMsg(TransportHandler.ReadAck(channelUpdate_bc))
peerConnection3a.expectMsg(GossipDecision.NoRelatedChannel(channelUpdate_bc))
watcher.send(router, ValidateResult(chan_ab, Right((Transaction(version = 0, txIn = Nil, txOut = TxOut(1000000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, funding_b)))) :: Nil, lockTime = 0), UtxoStatus.Unspent))))
peerConnection1a.expectMsg(TransportHandler.ReadAck(chan_ab))
@ -207,12 +214,18 @@ class FrontRouterSpec extends TestKit(ActorSystem("test")) with AnyFunSuiteLike
peerConnection3a.expectMsg(TransportHandler.ReadAck(ann_b))
peerConnection3a.expectMsg(GossipDecision.Accepted(ann_b))
// we have to wait 2 times the broadcast interval because there is an additional per-peer delay
val maxBroadcastDelay = 2 * nodeParams.routerConf.routerBroadcastInterval + 1.second
peerConnection1a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty)))
peerConnection1b.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty)))
peerConnection2a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty)))
peerConnection3a.expectMsg(maxBroadcastDelay, Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set(origin3a)), nodes = Map(ann_a -> Set(origin3a), ann_b -> Set(origin3a))))
awaitCond(front1.stateData.nodes.size == 2)
awaitCond(front2.stateData.nodes.size == 2)
awaitCond(front3.stateData.nodes.size == 2)
// manual rebroadcast
front1 ! Router.TickBroadcast
peerConnection1a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty)))
peerConnection1b.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin1a, origin1b)), updates = Map(channelUpdate_ab -> Set(origin1b), channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty)))
front2 ! Router.TickBroadcast
peerConnection2a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set(origin2a)), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set.empty), nodes = Map(ann_a -> Set.empty, ann_b -> Set.empty)))
front3 ! Router.TickBroadcast
peerConnection3a.expectMsg(Rebroadcast(channels = Map(chan_ab -> Set.empty), updates = Map(channelUpdate_ab -> Set.empty, channelUpdate_ba -> Set(origin3a)), nodes = Map(ann_a -> Set(origin3a), ann_b -> Set(origin3a))))
}
test("do not forward duplicate gossip") {

View File

@ -235,7 +235,7 @@
<systemProperties>
<buildDirectory>${project.build.directory}</buildDirectory>
</systemProperties>
<argLine>-Xmx1024m -Dfile.encoding=UTF-8</argLine>
<argLine>-Xmx2048m -Dfile.encoding=UTF-8</argLine>
</configuration>
<executions>
<execution>