1
0
mirror of https://github.com/ACINQ/eclair.git synced 2024-11-20 02:27:32 +01:00

Clean pending htlcs db (#1027)

We store `CMD_FULFILL_HTLC`/`CMD_FAIL_HTLC`/`CMD_FAIL_MALFORMED_HTLC` in
a database (see `CommandBuffer`) because we don't want to lose
preimages, or to forget to fail incoming htlcs, which would lead to
unwanted channel closings.

But we currently only clean up this database on success, and because of
the way our watcher works, in a scenario where a downstream channel has
gone to the blockchain, it may send several times the same command. Only
the first one will be acked and cleaned up by the upstream channel,
causing the remaining commands to stay forever in the "pending relay
db".

With this change we clean up the commands when they fail too.

We also clean up the pending relay db on startup.
This commit is contained in:
Pierre-Marie Padiou 2019-06-13 18:08:34 +02:00 committed by GitHub
parent 849b6bd22b
commit db334380b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 161 additions and 20 deletions

View File

@ -625,7 +625,10 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case Success((commitments1, fulfill)) =>
if (c.commit) self ! CMD_SIGN
handleCommandSuccess(sender, d.copy(commitments = commitments1)) sending fulfill
case Failure(cause) => handleCommandError(cause, c)
case Failure(cause) =>
// we can clean up the command right away in case of failure
relayer ! CommandBuffer.CommandAck(d.channelId, c.id)
handleCommandError(cause, c)
}
case Event(fulfill: UpdateFulfillHtlc, d: DATA_NORMAL) =>
@ -643,7 +646,10 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case Success((commitments1, fail)) =>
if (c.commit) self ! CMD_SIGN
handleCommandSuccess(sender, d.copy(commitments = commitments1)) sending fail
case Failure(cause) => handleCommandError(cause, c)
case Failure(cause) =>
// we can clean up the command right away in case of failure
relayer ! CommandBuffer.CommandAck(d.channelId, c.id)
handleCommandError(cause, c)
}
case Event(c: CMD_FAIL_MALFORMED_HTLC, d: DATA_NORMAL) =>
@ -651,7 +657,10 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case Success((commitments1, fail)) =>
if (c.commit) self ! CMD_SIGN
handleCommandSuccess(sender, d.copy(commitments = commitments1)) sending fail
case Failure(cause) => handleCommandError(cause, c)
case Failure(cause) =>
// we can clean up the command right away in case of failure
relayer ! CommandBuffer.CommandAck(d.channelId, c.id)
handleCommandError(cause, c)
}
case Event(fail: UpdateFailHtlc, d: DATA_NORMAL) =>
@ -975,7 +984,10 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case Success((commitments1, fulfill)) =>
if (c.commit) self ! CMD_SIGN
handleCommandSuccess(sender, d.copy(commitments = commitments1)) sending fulfill
case Failure(cause) => handleCommandError(cause, c)
case Failure(cause) =>
// we can clean up the command right away in case of failure
relayer ! CommandBuffer.CommandAck(d.channelId, c.id)
handleCommandError(cause, c)
}
case Event(fulfill: UpdateFulfillHtlc, d: DATA_SHUTDOWN) =>
@ -993,7 +1005,10 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case Success((commitments1, fail)) =>
if (c.commit) self ! CMD_SIGN
handleCommandSuccess(sender, d.copy(commitments = commitments1)) sending fail
case Failure(cause) => handleCommandError(cause, c)
case Failure(cause) =>
// we can clean up the command right away in case of failure
relayer ! CommandBuffer.CommandAck(d.channelId, c.id)
handleCommandError(cause, c)
}
case Event(c: CMD_FAIL_MALFORMED_HTLC, d: DATA_SHUTDOWN) =>
@ -1001,7 +1016,10 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case Success((commitments1, fail)) =>
if (c.commit) self ! CMD_SIGN
handleCommandSuccess(sender, d.copy(commitments = commitments1)) sending fail
case Failure(cause) => handleCommandError(cause, c)
case Failure(cause) =>
// we can clean up the command right away in case of failure
relayer ! CommandBuffer.CommandAck(d.channelId, c.id)
handleCommandError(cause, c)
}
case Event(fail: UpdateFailHtlc, d: DATA_SHUTDOWN) =>

View File

@ -39,6 +39,8 @@ trait PendingRelayDb {
def listPendingRelay(channelId: ByteVector32): Seq[Command]
def listPendingRelay(): Set[(ByteVector32, Long)]
def close(): Unit
}

View File

@ -21,11 +21,15 @@ import java.sql.Connection
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.channel.Command
import fr.acinq.eclair.db.PendingRelayDb
import fr.acinq.eclair.db.sqlite.SqliteUtils.{codecSequence, getVersion, using}
import fr.acinq.eclair.wire.CommandCodecs.cmdCodec
import scala.collection.immutable.Queue
class SqlitePendingRelayDb(sqlite: Connection) extends PendingRelayDb {
import SqliteUtils.ExtendedResultSet._
import SqliteUtils._
val DB_NAME = "pending_relay"
val CURRENT_VERSION = 1
@ -60,5 +64,16 @@ class SqlitePendingRelayDb(sqlite: Connection) extends PendingRelayDb {
}
}
override def listPendingRelay(): Set[(ByteVector32, Long)] = {
using(sqlite.prepareStatement("SELECT channel_id, htlc_id FROM pending_relay")) { statement =>
val rs = statement.executeQuery()
var q: Queue[(ByteVector32, Long)] = Queue()
while (rs.next()) {
q = q :+ (rs.getByteVector32("channel_id"), rs.getLong("htlc_id"))
}
q.toSet
}
}
override def close(): Unit = sqlite.close()
}

View File

@ -19,11 +19,13 @@ package fr.acinq.eclair.io
import java.net.InetSocketAddress
import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Status, SupervisorStrategy}
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.blockchain.EclairWallet
import fr.acinq.eclair.channel.Helpers.Closing
import fr.acinq.eclair.channel.{HasCommitments, _}
import fr.acinq.eclair.db.PendingRelayDb
import fr.acinq.eclair.payment.Relayer.RelayPayload
import fr.acinq.eclair.payment.{Relayed, Relayer}
import fr.acinq.eclair.router.Rebroadcast
@ -62,6 +64,8 @@ class Switchboard(nodeParams: NodeParams, authenticator: ActorRef, watcher: Acto
brokenHtlcKiller ! brokenHtlcs
}
cleanupRelayDb(channels, nodeParams.db.pendingRelay)
channels
.groupBy(_.commitments.remoteParams.nodeId)
.map {
@ -163,15 +167,12 @@ object Switchboard extends Logging {
* get closed, which is a major inconvenience.
*
* This check will detect this and will allow us to fast-fail HTLCs and thus preserve channels.
*
* @param channels
* @return
*/
def checkBrokenHtlcsLink(channels: Seq[HasCommitments], privateKey: PrivateKey): Seq[UpdateAddHtlc] = {
// We are interested in incoming HTLCs, that have been *cross-signed*. They signed it first, so the HTLC will first
// appear in our commitment tx, and later on in their commitment when we subsequently sign it.
// That's why we need to look in *their* commitment with direction=OUT.
// We are interested in incoming HTLCs, that have been *cross-signed* (otherwise they wouldn't have been relayed).
// They signed it first, so the HTLC will first appear in our commitment tx, and later on in their commitment when
// we subsequently sign it. That's why we need to look in *their* commitment with direction=OUT.
val htlcs_in = channels
.flatMap(_.commitments.remoteCommit.spec.htlcs)
.filter(_.direction == OUT)
@ -192,6 +193,42 @@ object Switchboard extends Logging {
htlcs_broken
}
/**
* We store [[CMD_FULFILL_HTLC]]/[[CMD_FAIL_HTLC]]/[[CMD_FAIL_MALFORMED_HTLC]]
* in a database (see [[fr.acinq.eclair.payment.CommandBuffer]]) because we
* don't want to lose preimages, or to forget to fail incoming htlcs, which
* would lead to unwanted channel closings.
*
* Because of the way our watcher works, in a scenario where a downstream
* channel has gone to the blockchain, it may send several times the same
* command, and the upstream channel may have disappeared in the meantime.
*
* That's why we need to periodically clean up the pending relay db.
*/
def cleanupRelayDb(channels: Seq[HasCommitments], relayDb: PendingRelayDb): Int = {
// We are interested in incoming HTLCs, that have been *cross-signed* (otherwise they wouldn't have been relayed).
// If the HTLC is not in their commitment, it means that we have already fulfilled/failed it and that we can remove
// the command from the pending relay db.
val channel2Htlc: Set[(ByteVector32, Long)] =
channels
.flatMap(_.commitments.remoteCommit.spec.htlcs)
.filter(_.direction == OUT)
.map(htlc => (htlc.add.channelId, htlc.add.id))
.toSet
val pendingRelay: Set[(ByteVector32, Long)] = relayDb.listPendingRelay()
val toClean = pendingRelay -- channel2Htlc
toClean.foreach {
case (channelId, htlcId) =>
logger.info(s"cleaning up channelId=$channelId htlcId=$htlcId from relay db")
relayDb.removePendingRelay(channelId, htlcId)
}
toClean.size
}
}
class HtlcReaper extends Actor with ActorLogging {

View File

@ -21,6 +21,11 @@ import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.channel._
/**
* We store [[CMD_FULFILL_HTLC]]/[[CMD_FAIL_HTLC]]/[[CMD_FAIL_MALFORMED_HTLC]]
* in a database because we don't want to lose preimages, or to forget to fail
* incoming htlcs, which would lead to unwanted channel closings.
*/
class CommandBuffer(nodeParams: NodeParams, register: ActorRef) extends Actor with ActorLogging {
import CommandBuffer._
@ -43,12 +48,12 @@ class CommandBuffer(nodeParams: NodeParams, register: ActorRef) extends Actor wi
case ChannelStateChanged(channel, _, _, WAIT_FOR_INIT_INTERNAL | OFFLINE | SYNCING, NORMAL | SHUTDOWN | CLOSING, d: HasCommitments) =>
import d.channelId
// if channel is in a state where it can have pending htlcs, we send them the fulfills we know of
// if channel is in a state where it can have pending htlcs, we send them the fulfills/fails we know of
pendingRelay.listPendingRelay(channelId) match {
case Nil => ()
case msgs =>
log.info(s"re-sending ${msgs.size} unacked fulfills/fails to channel $channelId")
msgs.foreach(channel ! _) // they all have commit = false
case cmds =>
log.info(s"re-sending ${cmds.size} unacked fulfills/fails to channel $channelId")
cmds.foreach(channel ! _) // they all have commit = false
// better to sign once instead of after each fulfill
channel ! CMD_SIGN
}

View File

@ -1046,6 +1046,16 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
assert(initialState == bob.stateData)
}
test("recv CMD_FULFILL_HTLC (acknowledge in case of failure)") { f =>
import f._
val sender = TestProbe()
val initialState = bob.stateData.asInstanceOf[DATA_NORMAL]
sender.send(bob, CMD_FULFILL_HTLC(42, randomBytes32)) // this will fail
sender.expectMsg(Failure(UnknownHtlcId(channelId(bob), 42)))
relayerB.expectMsg(CommandBuffer.CommandAck(initialState.channelId, 42))
}
test("recv UpdateFulfillHtlc") { f =>
import f._
val sender = TestProbe()
@ -1148,6 +1158,17 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
assert(initialState == bob.stateData)
}
test("recv CMD_FAIL_HTLC (acknowledge in case of failure)") { f =>
import f._
val sender = TestProbe()
val r = randomBytes32
val initialState = bob.stateData.asInstanceOf[DATA_NORMAL]
sender.send(bob, CMD_FAIL_HTLC(42, Right(PermanentChannelFailure))) // this will fail
sender.expectMsg(Failure(UnknownHtlcId(channelId(bob), 42)))
relayerB.expectMsg(CommandBuffer.CommandAck(initialState.channelId, 42))
}
test("recv CMD_FAIL_MALFORMED_HTLC") { f =>
import f._
val sender = TestProbe()
@ -1168,12 +1189,13 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
import f._
val sender = TestProbe()
val initialState = bob.stateData.asInstanceOf[DATA_NORMAL]
sender.send(bob, CMD_FAIL_MALFORMED_HTLC(42, ByteVector32.Zeroes, FailureMessageCodecs.BADONION))
sender.expectMsg(Failure(UnknownHtlcId(channelId(bob), 42)))
assert(initialState == bob.stateData)
}
test("recv CMD_FAIL_HTLC (invalid failure_code)") { f =>
test("recv CMD_FAIL_MALFORMED_HTLC (invalid failure_code)") { f =>
import f._
val sender = TestProbe()
val initialState = bob.stateData.asInstanceOf[DATA_NORMAL]
@ -1182,6 +1204,16 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
assert(initialState == bob.stateData)
}
test("recv CMD_FAIL_MALFORMED_HTLC (acknowledge in case of failure)") { f =>
import f._
val sender = TestProbe()
val initialState = bob.stateData.asInstanceOf[DATA_NORMAL]
sender.send(bob, CMD_FAIL_MALFORMED_HTLC(42, ByteVector32.Zeroes, FailureMessageCodecs.BADONION)) // this will fail
sender.expectMsg(Failure(UnknownHtlcId(channelId(bob), 42)))
relayerB.expectMsg(CommandBuffer.CommandAck(initialState.channelId, 42))
}
test("recv UpdateFailHtlc") { f =>
import f._
val sender = TestProbe()

View File

@ -140,6 +140,16 @@ class ShutdownStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
assert(initialState == bob.stateData)
}
test("recv CMD_FULFILL_HTLC (acknowledge in case of failure)") { f =>
import f._
val sender = TestProbe()
val initialState = bob.stateData.asInstanceOf[DATA_SHUTDOWN]
sender.send(bob, CMD_FULFILL_HTLC(42, randomBytes32)) // this will fail
sender.expectMsg(Failure(UnknownHtlcId(channelId(bob), 42)))
relayerB.expectMsg(CommandBuffer.CommandAck(initialState.channelId, 42))
}
test("recv UpdateFulfillHtlc") { f =>
import f._
val sender = TestProbe()
@ -203,6 +213,16 @@ class ShutdownStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
assert(initialState == bob.stateData)
}
test("recv CMD_FAIL_HTLC (acknowledge in case of failure)") { f =>
import f._
val sender = TestProbe()
val r = randomBytes32
val initialState = bob.stateData.asInstanceOf[DATA_SHUTDOWN]
sender.send(bob, CMD_FAIL_HTLC(42, Right(PermanentChannelFailure))) // this will fail
sender.expectMsg(Failure(UnknownHtlcId(channelId(bob), 42)))
relayerB.expectMsg(CommandBuffer.CommandAck(initialState.channelId, 42))
}
test("recv CMD_FAIL_MALFORMED_HTLC") { f =>
import f._
val sender = TestProbe()
@ -224,7 +244,7 @@ class ShutdownStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
assert(initialState == bob.stateData)
}
test("recv CMD_FAIL_HTLC (invalid failure_code)") { f =>
test("recv CMD_FAIL_MALFORMED_HTLC (invalid failure_code)") { f =>
import f._
val sender = TestProbe()
val initialState = bob.stateData.asInstanceOf[DATA_SHUTDOWN]
@ -233,6 +253,17 @@ class ShutdownStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
assert(initialState == bob.stateData)
}
test("recv CMD_FAIL_MALFORMED_HTLC (acknowledge in case of failure)") { f =>
import f._
val sender = TestProbe()
val r = randomBytes32
val initialState = bob.stateData.asInstanceOf[DATA_SHUTDOWN]
sender.send(bob, CMD_FAIL_MALFORMED_HTLC(42, ByteVector32.Zeroes, FailureMessageCodecs.BADONION)) // this will fail
sender.expectMsg(Failure(UnknownHtlcId(channelId(bob), 42)))
relayerB.expectMsg(CommandBuffer.CommandAck(initialState.channelId, 42))
}
test("recv UpdateFailHtlc") { f =>
import f._
val sender = TestProbe()

View File

@ -54,8 +54,9 @@ class SqlitePendingRelayDbSpec extends FunSuite {
db.addPendingRelay(channelId2, msg1.id, msg1)
assert(db.listPendingRelay(channelId1).toSet === Set(msg0, msg1, msg2, msg3, msg4))
assert(db.listPendingRelay(channelId2).toSet === Set(msg0, msg1))
assert(db.listPendingRelay === Set((channelId1, msg0.id), (channelId1, msg1.id), (channelId1, msg2.id), (channelId1, msg3.id), (channelId1, msg4.id), (channelId2, msg0.id), (channelId2, msg1.id)))
db.removePendingRelay(channelId1, msg1.id)
assert(db.listPendingRelay(channelId1).toSet === Set(msg0, msg2, msg3, msg4))
assert(db.listPendingRelay === Set((channelId1, msg0.id), (channelId1, msg2.id), (channelId1, msg3.id), (channelId1, msg4.id), (channelId2, msg0.id), (channelId2, msg1.id)))
}
}