1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-02-24 06:47:46 +01:00

Add HasHtlcIdCommand trait (#1245)

This commit is contained in:
Anton Kumaigorodski 2019-12-11 15:40:22 +02:00 committed by Bastien Teinturier
parent e1c48ebda1
commit 7460f3b0d8
20 changed files with 87 additions and 100 deletions

View file

@ -177,4 +177,4 @@ eclair {
executor = "thread-pool-executor"
type = PinnedDispatcher
}
}
}

View file

@ -113,18 +113,21 @@ object Upstream {
}
sealed trait Command
sealed trait HasHtlcIdCommand extends Command {
def id: Long
}
final case class CMD_FULFILL_HTLC(id: Long, r: ByteVector32, commit: Boolean = false) extends HasHtlcIdCommand
final case class CMD_FAIL_HTLC(id: Long, reason: Either[ByteVector, FailureMessage], commit: Boolean = false) extends HasHtlcIdCommand
final case class CMD_FAIL_MALFORMED_HTLC(id: Long, onionHash: ByteVector32, failureCode: Int, commit: Boolean = false) extends HasHtlcIdCommand
final case class CMD_ADD_HTLC(amount: MilliSatoshi, paymentHash: ByteVector32, cltvExpiry: CltvExpiry, onion: OnionRoutingPacket, upstream: Upstream, commit: Boolean = false, previousFailures: Seq[AddHtlcFailed] = Seq.empty) extends Command
final case class CMD_FULFILL_HTLC(id: Long, r: ByteVector32, commit: Boolean = false) extends Command
final case class CMD_FAIL_HTLC(id: Long, reason: Either[ByteVector, FailureMessage], commit: Boolean = false) extends Command
final case class CMD_FAIL_MALFORMED_HTLC(id: Long, onionHash: ByteVector32, failureCode: Int, commit: Boolean = false) extends Command
final case class CMD_UPDATE_FEE(feeratePerKw: Long, commit: Boolean = false) extends Command
final case object CMD_SIGN extends Command
case object CMD_SIGN extends Command
final case class CMD_CLOSE(scriptPubKey: Option[ByteVector]) extends Command
final case class CMD_UPDATE_RELAY_FEE(feeBase: MilliSatoshi, feeProportionalMillionths: Long) extends Command
final case object CMD_FORCECLOSE extends Command
final case object CMD_GETSTATE extends Command
final case object CMD_GETSTATEDATA extends Command
final case object CMD_GETINFO extends Command
case object CMD_FORCECLOSE extends Command
case object CMD_GETSTATE extends Command
case object CMD_GETSTATEDATA extends Command
case object CMD_GETINFO extends Command
final case class RES_GETINFO(nodeId: PublicKey, channelId: ByteVector32, state: State, data: Data)
/*

View file

@ -32,6 +32,4 @@ trait ChannelsDb {
def listHtlcInfos(channelId: ByteVector32, commitmentNumber: Long): Seq[(ByteVector32, CltvExpiry)]
def close(): Unit
}

View file

@ -27,6 +27,4 @@ trait PeersDb {
def listPeers(): Map[PublicKey, NodeAddress]
def close(): Unit
}

View file

@ -17,30 +17,28 @@
package fr.acinq.eclair.db
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.channel.Command
import fr.acinq.eclair.channel.HasHtlcIdCommand
/**
* This database stores the preimages that we have received from downstream
* (either directly via UpdateFulfillHtlc or by extracting the value from the
* blockchain).
*
* This means that this database is only used in the context of *relaying* payments.
*
* We need to be sure that if downstream is able to pulls funds from us, we can always
* do the same from upstream, otherwise we lose money. Hence the need for persistence
* to handle all corner cases.
*
*/
* This database stores CMD_FULFILL_HTLC and CMD_FAIL_HTLC that we have received from downstream
* (either directly via UpdateFulfillHtlc or by extracting the value from the
* blockchain).
*
* This means that this database is only used in the context of *relaying* payments.
*
* We need to be sure that if downstream is able to pull funds from us, we can always
* do the same from upstream, otherwise we lose money. Hence the need for persistence
* to handle all corner cases.
*
*/
trait PendingRelayDb {
def addPendingRelay(channelId: ByteVector32, htlcId: Long, cmd: Command)
def addPendingRelay(channelId: ByteVector32, cmd: HasHtlcIdCommand)
def removePendingRelay(channelId: ByteVector32, htlcId: Long)
def listPendingRelay(channelId: ByteVector32): Seq[Command]
def listPendingRelay(channelId: ByteVector32): Seq[HasHtlcIdCommand]
def listPendingRelay(): Set[(ByteVector32, Long)]
def close(): Unit
}
}

View file

@ -123,6 +123,4 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging {
q
}
}
override def close(): Unit = sqlite.close
}

View file

@ -71,6 +71,4 @@ import SqliteUtils.ExtendedResultSet._
m
}
}
override def close(): Unit = sqlite.close()
}

View file

@ -19,9 +19,10 @@ package fr.acinq.eclair.db.sqlite
import java.sql.Connection
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.channel.Command
import fr.acinq.eclair.channel.HasHtlcIdCommand
import fr.acinq.eclair.db.PendingRelayDb
import fr.acinq.eclair.wire.CommandCodecs.cmdCodec
import scodec.bits.BitVector
import scala.collection.immutable.Queue
@ -39,10 +40,10 @@ class SqlitePendingRelayDb(sqlite: Connection) extends PendingRelayDb {
statement.executeUpdate("CREATE TABLE IF NOT EXISTS pending_relay (channel_id BLOB NOT NULL, htlc_id INTEGER NOT NULL, data BLOB NOT NULL, PRIMARY KEY(channel_id, htlc_id))")
}
override def addPendingRelay(channelId: ByteVector32, htlcId: Long, cmd: Command): Unit = {
override def addPendingRelay(channelId: ByteVector32, cmd: HasHtlcIdCommand): Unit = {
using(sqlite.prepareStatement("INSERT OR IGNORE INTO pending_relay VALUES (?, ?, ?)")) { statement =>
statement.setBytes(1, channelId.toArray)
statement.setLong(2, htlcId)
statement.setLong(2, cmd.id)
statement.setBytes(3, cmdCodec.encode(cmd).require.toByteArray)
statement.executeUpdate()
}
@ -56,8 +57,8 @@ class SqlitePendingRelayDb(sqlite: Connection) extends PendingRelayDb {
}
}
override def listPendingRelay(channelId: ByteVector32): Seq[Command] = {
using(sqlite.prepareStatement("SELECT htlc_id, data FROM pending_relay WHERE channel_id=?")) { statement =>
override def listPendingRelay(channelId: ByteVector32): Seq[HasHtlcIdCommand] = {
using(sqlite.prepareStatement("SELECT data FROM pending_relay WHERE channel_id=?")) { statement =>
statement.setBytes(1, channelId.toArray)
val rs = statement.executeQuery()
codecSequence(rs, cmdCodec)
@ -74,6 +75,4 @@ class SqlitePendingRelayDb(sqlite: Connection) extends PendingRelayDb {
q.toSet
}
}
override def close(): Unit = sqlite.close()
}
}

View file

@ -30,7 +30,7 @@ import fr.acinq.eclair.db.{IncomingPayment, IncomingPaymentStatus, IncomingPayme
import fr.acinq.eclair.payment.IncomingPacket
import fr.acinq.eclair.payment.relay.Origin
import fr.acinq.eclair.router.Rebroadcast
import fr.acinq.eclair.transactions.{IN, OUT}
import fr.acinq.eclair.transactions.{DirectedHtlc, IN, OUT}
import fr.acinq.eclair.wire.{TemporaryNodeFailure, UpdateAddHtlc}
import scodec.bits.ByteVector
@ -208,7 +208,6 @@ object Switchboard {
* That's why we need to periodically clean up the pending relay db.
*/
def cleanupRelayDb(channels: Seq[HasCommitments], relayDb: PendingRelayDb)(implicit log: LoggingAdapter): Int = {
// We are interested in incoming HTLCs, that have been *cross-signed* (otherwise they wouldn't have been relayed).
// If the HTLC is not in their commitment, it means that we have already fulfilled/failed it and that we can remove
// the command from the pending relay db.
@ -230,7 +229,6 @@ object Switchboard {
}
toClean.size
}
}
class HtlcReaper extends Actor with ActorLogging {

View file

@ -81,7 +81,7 @@ class MultiPartHandler(nodeParams: NodeParams, db: IncomingPaymentsDb, commandBu
db.getIncomingPayment(p.add.paymentHash) match {
case Some(record) => validatePayment(p, record, nodeParams.currentBlockHeight) match {
case Some(cmdFail) =>
commandBuffer ! CommandBuffer.CommandSend(p.add.channelId, p.add.id, cmdFail)
commandBuffer ! CommandBuffer.CommandSend(p.add.channelId, cmdFail)
case None =>
log.info(s"received payment for amount=${p.add.amountMsat} totalAmount=${p.payload.totalAmount}")
pendingPayments.get(p.add.paymentHash) match {
@ -95,7 +95,7 @@ class MultiPartHandler(nodeParams: NodeParams, db: IncomingPaymentsDb, commandBu
}
case None =>
val cmdFail = CMD_FAIL_HTLC(p.add.id, Right(IncorrectOrUnknownPaymentDetails(p.payload.totalAmount, nodeParams.currentBlockHeight)), commit = true)
commandBuffer ! CommandBuffer.CommandSend(p.add.channelId, p.add.id, cmdFail)
commandBuffer ! CommandBuffer.CommandSend(p.add.channelId, cmdFail)
}
}
@ -103,7 +103,7 @@ class MultiPartHandler(nodeParams: NodeParams, db: IncomingPaymentsDb, commandBu
Logs.withMdc(log)(Logs.mdc(paymentHash_opt = Some(paymentHash))) {
log.warning(s"payment with paidAmount=${parts.map(_.payment.amount).sum} failed ($failure)")
pendingPayments.get(paymentHash).foreach { case (_, handler: ActorRef) => handler ! PoisonPill }
parts.foreach(p => commandBuffer ! CommandBuffer.CommandSend(p.payment.fromChannelId, p.htlcId, CMD_FAIL_HTLC(p.htlcId, Right(failure), commit = true)))
parts.foreach(p => commandBuffer ! CommandBuffer.CommandSend(p.payment.fromChannelId, CMD_FAIL_HTLC(p.htlcId, Right(failure), commit = true)))
pendingPayments = pendingPayments - paymentHash
}
@ -116,7 +116,7 @@ class MultiPartHandler(nodeParams: NodeParams, db: IncomingPaymentsDb, commandBu
pendingPayments.get(paymentHash).foreach {
case (preimage: ByteVector32, handler: ActorRef) =>
handler ! PoisonPill
parts.foreach(p => commandBuffer ! CommandBuffer.CommandSend(p.payment.fromChannelId, p.htlcId, CMD_FULFILL_HTLC(p.htlcId, preimage, commit = true)))
parts.foreach(p => commandBuffer ! CommandBuffer.CommandSend(p.payment.fromChannelId, CMD_FULFILL_HTLC(p.htlcId, preimage, commit = true)))
}
ctx.system.eventStream.publish(received)
pendingPayments = pendingPayments - paymentHash
@ -126,11 +126,11 @@ class MultiPartHandler(nodeParams: NodeParams, db: IncomingPaymentsDb, commandBu
case MultiPartPaymentFSM.ExtraHtlcReceived(paymentHash, p, failure) if doHandle(paymentHash) =>
Logs.withMdc(log)(Logs.mdc(paymentHash_opt = Some(paymentHash))) {
failure match {
case Some(failure) => commandBuffer ! CommandBuffer.CommandSend(p.payment.fromChannelId, p.htlcId, CMD_FAIL_HTLC(p.htlcId, Right(failure), commit = true))
case Some(failure) => commandBuffer ! CommandBuffer.CommandSend(p.payment.fromChannelId, CMD_FAIL_HTLC(p.htlcId, Right(failure), commit = true))
// NB: this case shouldn't happen unless the sender violated the spec, so it's ok that we take a slightly more
// expensive code path by fetching the preimage from DB.
case None => db.getIncomingPayment(paymentHash).foreach(record => {
commandBuffer ! CommandBuffer.CommandSend(p.payment.fromChannelId, p.htlcId, CMD_FULFILL_HTLC(p.htlcId, record.paymentPreimage, commit = true))
commandBuffer ! CommandBuffer.CommandSend(p.payment.fromChannelId, CMD_FULFILL_HTLC(p.htlcId, record.paymentPreimage, commit = true))
db.receiveIncomingPayment(paymentHash, p.payment.amount, p.payment.timestamp)
ctx.system.eventStream.publish(PaymentReceived(paymentHash, p.payment :: Nil))
})

View file

@ -47,7 +47,7 @@ class ChannelRelayer(nodeParams: NodeParams, relayer: ActorRef, register: ActorR
handleRelay(r, channelUpdates, node2channels, previousFailures, nodeParams.chainHash) match {
case RelayFailure(cmdFail) =>
log.info(s"rejecting htlc #${r.add.id} from channelId=${r.add.channelId} to shortChannelId=${r.payload.outgoingChannelId} reason=${cmdFail.reason}")
commandBuffer ! CommandBuffer.CommandSend(r.add.channelId, r.add.id, cmdFail)
commandBuffer ! CommandBuffer.CommandSend(r.add.channelId, cmdFail)
case RelaySuccess(selectedShortChannelId, cmdAdd) =>
log.info(s"forwarding htlc #${r.add.id} from channelId=${r.add.channelId} to shortChannelId=$selectedShortChannelId")
register ! Register.ForwardShortId(selectedShortChannelId, cmdAdd)
@ -56,7 +56,7 @@ class ChannelRelayer(nodeParams: NodeParams, relayer: ActorRef, register: ActorR
case Status.Failure(Register.ForwardShortIdFailure(Register.ForwardShortId(shortChannelId, CMD_ADD_HTLC(_, _, _, _, Upstream.Relayed(add), _, _)))) =>
log.warning(s"couldn't resolve downstream channel $shortChannelId, failing htlc #${add.id}")
val cmdFail = CMD_FAIL_HTLC(add.id, Right(UnknownNextPeer), commit = true)
commandBuffer ! CommandBuffer.CommandSend(add.channelId, add.id, cmdFail)
commandBuffer ! CommandBuffer.CommandSend(add.channelId, cmdFail)
case Status.Failure(addFailed: AddHtlcFailed) => addFailed.origin match {
case Origin.Relayed(originChannelId, originHtlcId, _, _) => addFailed.originalCommand match {
@ -67,7 +67,7 @@ class ChannelRelayer(nodeParams: NodeParams, relayer: ActorRef, register: ActorR
val failure = translateError(addFailed)
val cmdFail = CMD_FAIL_HTLC(originHtlcId, Right(failure), commit = true)
log.info(s"rejecting htlc #$originHtlcId from channelId=$originChannelId reason=${cmdFail.reason}")
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmdFail)
commandBuffer ! CommandBuffer.CommandSend(originChannelId, cmdFail)
}
case _ => throw new IllegalArgumentException(s"channel relayer received unexpected failure: $addFailed")
}

View file

@ -36,10 +36,10 @@ class CommandBuffer(nodeParams: NodeParams, register: ActorRef) extends Actor wi
override def receive: Receive = {
case CommandSend(channelId, htlcId, cmd) =>
case CommandSend(channelId, cmd) =>
register forward Register.Forward(channelId, cmd)
// we store the command in a db (note that this happens *after* forwarding the command to the channel, so we don't add latency)
db.addPendingRelay(channelId, htlcId, cmd)
db.addPendingRelay(channelId, cmd)
case CommandAck(channelId, htlcId) =>
log.debug(s"fulfill/fail acked for channelId=$channelId htlcId=$htlcId")
@ -62,7 +62,7 @@ class CommandBuffer(nodeParams: NodeParams, register: ActorRef) extends Actor wi
object CommandBuffer {
case class CommandSend(channelId: ByteVector32, htlcId: Long, cmd: Command)
case class CommandSend(channelId: ByteVector32, cmd: HasHtlcIdCommand)
case class CommandAck(channelId: ByteVector32, htlcId: Long)

View file

@ -122,21 +122,21 @@ class Relayer(nodeParams: NodeParams, register: ActorRef, commandBuffer: ActorRe
case Right(r: IncomingPacket.NodeRelayPacket) =>
if (!nodeParams.enableTrampolinePayment) {
log.warning(s"rejecting htlc #${add.id} from channelId=${add.channelId} to nodeId=${r.innerPayload.outgoingNodeId} reason=trampoline disabled")
commandBuffer ! CommandBuffer.CommandSend(add.channelId, add.id, CMD_FAIL_HTLC(add.id, Right(RequiredNodeFeatureMissing), commit = true))
commandBuffer ! CommandBuffer.CommandSend(add.channelId, CMD_FAIL_HTLC(add.id, Right(RequiredNodeFeatureMissing), commit = true))
} else {
// TODO: @t-bast: relay trampoline payload instead of rejecting.
log.warning(s"rejecting htlc #${add.id} from channelId=${add.channelId} to nodeId=${r.innerPayload.outgoingNodeId} reason=trampoline not implemented yet")
commandBuffer ! CommandBuffer.CommandSend(add.channelId, add.id, CMD_FAIL_HTLC(add.id, Right(RequiredNodeFeatureMissing), commit = true))
commandBuffer ! CommandBuffer.CommandSend(add.channelId, CMD_FAIL_HTLC(add.id, Right(RequiredNodeFeatureMissing), commit = true))
}
case Left(badOnion: BadOnion) =>
log.warning(s"couldn't parse onion: reason=${badOnion.message}")
val cmdFail = CMD_FAIL_MALFORMED_HTLC(add.id, badOnion.onionHash, badOnion.code, commit = true)
log.warning(s"rejecting htlc #${add.id} from channelId=${add.channelId} reason=malformed onionHash=${cmdFail.onionHash} failureCode=${cmdFail.failureCode}")
commandBuffer ! CommandBuffer.CommandSend(add.channelId, add.id, cmdFail)
commandBuffer ! CommandBuffer.CommandSend(add.channelId, cmdFail)
case Left(failure) =>
log.warning(s"rejecting htlc #${add.id} from channelId=${add.channelId} reason=$failure")
val cmdFail = CMD_FAIL_HTLC(add.id, Right(failure), commit = true)
commandBuffer ! CommandBuffer.CommandSend(add.channelId, add.id, cmdFail)
commandBuffer ! CommandBuffer.CommandSend(add.channelId, cmdFail)
}
case Status.Failure(addFailed: AddHtlcFailed) =>
@ -159,7 +159,7 @@ class Relayer(nodeParams: NodeParams, register: ActorRef, commandBuffer: ActorRe
sender ! fulfill
case Origin.Relayed(originChannelId, originHtlcId, amountIn, amountOut) =>
val cmd = CMD_FULFILL_HTLC(originHtlcId, fulfill.paymentPreimage, commit = true)
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmd)
commandBuffer ! CommandBuffer.CommandSend(originChannelId, cmd)
context.system.eventStream.publish(PaymentRelayed(amountIn, amountOut, add.paymentHash, fromChannelId = originChannelId, toChannelId = fulfill.channelId))
}
@ -171,7 +171,7 @@ class Relayer(nodeParams: NodeParams, register: ActorRef, commandBuffer: ActorRe
sender ! fail
case Origin.Relayed(originChannelId, originHtlcId, _, _) =>
val cmd = CMD_FAIL_HTLC(originHtlcId, Left(fail.reason), commit = true)
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmd)
commandBuffer ! CommandBuffer.CommandSend(originChannelId, cmd)
}
case ForwardFailMalformed(fail, to, add) =>
@ -182,7 +182,7 @@ class Relayer(nodeParams: NodeParams, register: ActorRef, commandBuffer: ActorRe
sender ! fail
case Origin.Relayed(originChannelId, originHtlcId, _, _) =>
val cmd = CMD_FAIL_MALFORMED_HTLC(originHtlcId, fail.onionHash, fail.failureCode, commit = true)
commandBuffer ! CommandBuffer.CommandSend(originChannelId, originHtlcId, cmd)
commandBuffer ! CommandBuffer.CommandSend(originChannelId, cmd)
}
case ack: CommandBuffer.CommandAck => commandBuffer forward ack

View file

@ -16,7 +16,7 @@
package fr.acinq.eclair.wire
import fr.acinq.eclair.channel.{CMD_FAIL_HTLC, CMD_FAIL_MALFORMED_HTLC, CMD_FULFILL_HTLC, Command}
import fr.acinq.eclair.channel.{CMD_FAIL_HTLC, CMD_FAIL_MALFORMED_HTLC, CMD_FULFILL_HTLC, HasHtlcIdCommand}
import fr.acinq.eclair.wire.CommonCodecs._
import fr.acinq.eclair.wire.FailureMessageCodecs.failureMessageCodec
import scodec.Codec
@ -40,9 +40,8 @@ object CommandCodecs {
("failureCode" | uint16) ::
("commit" | provide(false))).as[CMD_FAIL_MALFORMED_HTLC]
val cmdCodec: Codec[Command] = discriminated[Command].by(uint16)
val cmdCodec: Codec[HasHtlcIdCommand] = discriminated[HasHtlcIdCommand].by(uint16)
.typecase(0, cmdFulfillCodec)
.typecase(1, cmdFailCodec)
.typecase(2, cmdFailMalformedCodec)
}
}

View file

@ -220,5 +220,4 @@ object TestConstants {
channelReserve = 20000 sat // Alice will need to keep that much satoshis as direct payment
)
}
}
}

View file

@ -430,7 +430,7 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
// We simulate a pending fulfill on that HTLC but not relayed.
// When it is close to expiring upstream, we should close the channel.
sender.send(commandBuffer, CommandBuffer.CommandSend(htlc.channelId, htlc.id, CMD_FULFILL_HTLC(htlc.id, r, commit = true)))
sender.send(commandBuffer, CommandBuffer.CommandSend(htlc.channelId, CMD_FULFILL_HTLC(htlc.id, r, commit = true)))
sender.send(bob, CurrentBlockCount((htlc.cltvExpiry - bob.underlyingActor.nodeParams.fulfillSafetyBeforeTimeoutBlocks).toLong))
val ChannelErrorOccurred(_, _, _, _, LocalError(err), isFatal) = listener.expectMsgType[ChannelErrorOccurred]
@ -464,7 +464,7 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
// We simulate a pending failure on that HTLC.
// Even if we get close to expiring upstream we shouldn't close the channel, because we have nothing to lose.
sender.send(commandBuffer, CommandBuffer.CommandSend(htlc.channelId, htlc.id, CMD_FAIL_HTLC(htlc.id, Right(IncorrectOrUnknownPaymentDetails(0 msat, 0)))))
sender.send(commandBuffer, CommandBuffer.CommandSend(htlc.channelId, CMD_FAIL_HTLC(htlc.id, Right(IncorrectOrUnknownPaymentDetails(0 msat, 0)))))
sender.send(bob, CurrentBlockCount((htlc.cltvExpiry - bob.underlyingActor.nodeParams.fulfillSafetyBeforeTimeoutBlocks).toLong))
bob2blockchain.expectNoMsg(250 millis)

View file

@ -44,14 +44,14 @@ class SqlitePendingRelayDbSpec extends FunSuite {
val msg4 = CMD_FAIL_MALFORMED_HTLC(4, randomBytes32, FailureMessageCodecs.BADONION)
assert(db.listPendingRelay(channelId1).toSet === Set.empty)
db.addPendingRelay(channelId1, msg0.id, msg0)
db.addPendingRelay(channelId1, msg0.id, msg0) // duplicate
db.addPendingRelay(channelId1, msg1.id, msg1)
db.addPendingRelay(channelId1, msg2.id, msg2)
db.addPendingRelay(channelId1, msg3.id, msg3)
db.addPendingRelay(channelId1, msg4.id, msg4)
db.addPendingRelay(channelId2, msg0.id, msg0) // same messages but for different channel
db.addPendingRelay(channelId2, msg1.id, msg1)
db.addPendingRelay(channelId1, msg0)
db.addPendingRelay(channelId1, msg0) // duplicate
db.addPendingRelay(channelId1, msg1)
db.addPendingRelay(channelId1, msg2)
db.addPendingRelay(channelId1, msg3)
db.addPendingRelay(channelId1, msg4)
db.addPendingRelay(channelId2, msg0) // same messages but for different channel
db.addPendingRelay(channelId2, msg1)
assert(db.listPendingRelay(channelId1).toSet === Set(msg0, msg1, msg2, msg3, msg4))
assert(db.listPendingRelay(channelId2).toSet === Set(msg0, msg1))
assert(db.listPendingRelay === Set((channelId1, msg0.id), (channelId1, msg1.id), (channelId1, msg2.id), (channelId1, msg3.id), (channelId1, msg4.id), (channelId2, msg0.id), (channelId2, msg1.id)))

View file

@ -336,8 +336,8 @@ class MultiPartHandlerSpec extends TestKit(ActorSystem("test")) with fixture.Fun
val commands = f.commandBuffer.expectMsgType[CommandBuffer.CommandSend] :: f.commandBuffer.expectMsgType[CommandBuffer.CommandSend] :: Nil
assert(commands.toSet === Set(
CommandBuffer.CommandSend(ByteVector32.One, 0, CMD_FAIL_HTLC(0, Right(PaymentTimeout), commit = true)),
CommandBuffer.CommandSend(ByteVector32.One, 1, CMD_FAIL_HTLC(1, Right(PaymentTimeout), commit = true))
CommandBuffer.CommandSend(ByteVector32.One, CMD_FAIL_HTLC(0, Right(PaymentTimeout), commit = true)),
CommandBuffer.CommandSend(ByteVector32.One, CMD_FAIL_HTLC(1, Right(PaymentTimeout), commit = true))
))
awaitCond({
f.sender.send(handler, GetPendingPayments)
@ -346,7 +346,7 @@ class MultiPartHandlerSpec extends TestKit(ActorSystem("test")) with fixture.Fun
// Extraneous HTLCs should be failed.
f.sender.send(handler, MultiPartPaymentFSM.ExtraHtlcReceived(pr1.paymentHash, PendingPayment(42, PartialPayment(200 msat, ByteVector32.One)), Some(PaymentTimeout)))
f.commandBuffer.expectMsg(CommandBuffer.CommandSend(ByteVector32.One, 42, CMD_FAIL_HTLC(42, Right(PaymentTimeout), commit = true)))
f.commandBuffer.expectMsg(CommandBuffer.CommandSend(ByteVector32.One, CMD_FAIL_HTLC(42, Right(PaymentTimeout), commit = true)))
// The payment should still be pending in DB.
val Some(incomingPayment) = nodeParams.db.payments.getIncomingPayment(pr1.paymentHash)
@ -368,13 +368,13 @@ class MultiPartHandlerSpec extends TestKit(ActorSystem("test")) with fixture.Fun
val add3 = add2.copy(id = 43)
f.sender.send(handler, IncomingPacket.FinalPacket(add3, Onion.createMultiPartPayload(add3.amountMsat, 1000 msat, add3.cltvExpiry, pr.paymentSecret.get)))
f.commandBuffer.expectMsg(CommandBuffer.CommandSend(add2.channelId, add2.id, CMD_FAIL_HTLC(add2.id, Right(IncorrectOrUnknownPaymentDetails(1000 msat, nodeParams.currentBlockHeight)), commit = true)))
f.commandBuffer.expectMsg(CommandBuffer.CommandSend(add2.channelId, CMD_FAIL_HTLC(add2.id, Right(IncorrectOrUnknownPaymentDetails(1000 msat, nodeParams.currentBlockHeight)), commit = true)))
val cmd1 = f.commandBuffer.expectMsgType[CommandBuffer.CommandSend]
assert(cmd1.htlcId === add1.id)
assert(cmd1.cmd.id === add1.id)
assert(cmd1.channelId === add1.channelId)
val fulfill1 = cmd1.cmd.asInstanceOf[CMD_FULFILL_HTLC]
assert(Crypto.sha256(fulfill1.r) === pr.paymentHash)
f.commandBuffer.expectMsg(CommandBuffer.CommandSend(add3.channelId, add3.id, CMD_FULFILL_HTLC(add3.id, fulfill1.r, commit = true)))
f.commandBuffer.expectMsg(CommandBuffer.CommandSend(add3.channelId, CMD_FULFILL_HTLC(add3.id, fulfill1.r, commit = true)))
f.sender.send(handler, CommandBuffer.CommandAck(add1.channelId, add1.id))
f.commandBuffer.expectMsg(CommandBuffer.CommandAck(add1.channelId, add1.id))
@ -391,7 +391,7 @@ class MultiPartHandlerSpec extends TestKit(ActorSystem("test")) with fixture.Fun
// Extraneous HTLCs should be fulfilled.
f.sender.send(handler, MultiPartPaymentFSM.ExtraHtlcReceived(pr.paymentHash, PendingPayment(44, PartialPayment(200 msat, ByteVector32.One, 0)), None))
f.commandBuffer.expectMsg(CommandBuffer.CommandSend(ByteVector32.One, 44, CMD_FULFILL_HTLC(44, fulfill1.r, commit = true)))
f.commandBuffer.expectMsg(CommandBuffer.CommandSend(ByteVector32.One, CMD_FULFILL_HTLC(44, fulfill1.r, commit = true)))
assert(f.eventListener.expectMsgType[PaymentReceived].amount === 200.msat)
val received2 = nodeParams.db.payments.getIncomingPayment(pr.paymentHash)
assert(received2.get.status.asInstanceOf[IncomingPaymentStatus.Received].amount === 1200.msat)
@ -410,7 +410,7 @@ class MultiPartHandlerSpec extends TestKit(ActorSystem("test")) with fixture.Fun
val add1 = UpdateAddHtlc(ByteVector32.One, 0, 800 msat, pr.paymentHash, f.defaultExpiry, TestConstants.emptyOnionPacket)
f.sender.send(handler, IncomingPacket.FinalPacket(add1, Onion.createMultiPartPayload(add1.amountMsat, 1000 msat, add1.cltvExpiry, pr.paymentSecret.get)))
f.commandBuffer.expectMsg(CommandBuffer.CommandSend(ByteVector32.One, 0, CMD_FAIL_HTLC(0, Right(PaymentTimeout), commit = true)))
f.commandBuffer.expectMsg(CommandBuffer.CommandSend(ByteVector32.One, CMD_FAIL_HTLC(0, Right(PaymentTimeout), commit = true)))
awaitCond({
f.sender.send(handler, GetPendingPayments)
f.sender.expectMsgType[PendingPayments].paymentHashes.isEmpty
@ -426,7 +426,7 @@ class MultiPartHandlerSpec extends TestKit(ActorSystem("test")) with fixture.Fun
val fulfill1 = cmd1.cmd.asInstanceOf[CMD_FULFILL_HTLC]
assert(fulfill1.id === 2)
assert(Crypto.sha256(fulfill1.r) === pr.paymentHash)
f.commandBuffer.expectMsg(CommandBuffer.CommandSend(add3.channelId, 5, CMD_FULFILL_HTLC(5, fulfill1.r, commit = true)))
f.commandBuffer.expectMsg(CommandBuffer.CommandSend(add3.channelId, CMD_FULFILL_HTLC(5, fulfill1.r, commit = true)))
val paymentReceived = f.eventListener.expectMsgType[PaymentReceived]
assert(paymentReceived.copy(parts = paymentReceived.parts.map(_.copy(timestamp = 0))) === PaymentReceived(pr.paymentHash, PartialPayment(300 msat, ByteVector32.One, 0) :: PartialPayment(700 msat, ByteVector32.Zeroes, 0) :: Nil))

View file

@ -621,10 +621,10 @@ class RelayerSpec extends TestkitBaseClass {
val (preimage1, preimage2) = (randomBytes32, randomBytes32)
val onionHash = randomBytes32
nodeParams.db.pendingRelay.addPendingRelay(channelId, 1, CMD_FULFILL_HTLC(1, preimage1, commit = true))
nodeParams.db.pendingRelay.addPendingRelay(channelId, 100, CMD_FULFILL_HTLC(100, preimage2, commit = true))
nodeParams.db.pendingRelay.addPendingRelay(channelId, 101, CMD_FAIL_HTLC(101, Right(TemporaryNodeFailure), commit = true))
nodeParams.db.pendingRelay.addPendingRelay(channelId, 102, CMD_FAIL_MALFORMED_HTLC(102, onionHash, 0x4001, commit = true))
nodeParams.db.pendingRelay.addPendingRelay(channelId, CMD_FULFILL_HTLC(1, preimage1, commit = true))
nodeParams.db.pendingRelay.addPendingRelay(channelId, CMD_FULFILL_HTLC(100, preimage2, commit = true))
nodeParams.db.pendingRelay.addPendingRelay(channelId, CMD_FAIL_HTLC(101, Right(TemporaryNodeFailure), commit = true))
nodeParams.db.pendingRelay.addPendingRelay(channelId, CMD_FAIL_MALFORMED_HTLC(102, onionHash, 0x4001, commit = true))
// Channel comes online: we should replay pending commands.
system.eventStream.publish(ChannelStateChanged(channel.ref, system.deadLetters, remoteNodeId, OFFLINE, NORMAL, channelData))

View file

@ -16,7 +16,7 @@
package fr.acinq.eclair.wire
import fr.acinq.eclair.channel.{CMD_FAIL_HTLC, CMD_FAIL_MALFORMED_HTLC, CMD_FULFILL_HTLC, Command}
import fr.acinq.eclair.channel.{CMD_FAIL_HTLC, CMD_FAIL_MALFORMED_HTLC, CMD_FULFILL_HTLC, Command, HasHtlcIdCommand}
import fr.acinq.eclair.{randomBytes, randomBytes32}
import org.scalatest.FunSuite
@ -27,18 +27,17 @@ import org.scalatest.FunSuite
class CommandCodecsSpec extends FunSuite {
test("encode/decode all channel messages") {
val msgs: List[Command] =
val msgs: List[HasHtlcIdCommand] =
CMD_FULFILL_HTLC(1573L, randomBytes32) ::
CMD_FAIL_HTLC(42456L, Left(randomBytes(145))) ::
CMD_FAIL_HTLC(253, Right(TemporaryNodeFailure)) ::
CMD_FAIL_MALFORMED_HTLC(7984, randomBytes32, FailureMessageCodecs.BADONION) :: Nil
CMD_FAIL_HTLC(42456L, Left(randomBytes(145))) ::
CMD_FAIL_HTLC(253, Right(TemporaryNodeFailure)) ::
CMD_FAIL_MALFORMED_HTLC(7984, randomBytes32, FailureMessageCodecs.BADONION) :: Nil
msgs.foreach {
case msg => {
msg =>
val encoded = CommandCodecs.cmdCodec.encode(msg).require
val decoded = CommandCodecs.cmdCodec.decode(encoded).require
assert(msg === decoded.value)
}
}
}
}