From 650c5049d5a97739ccac3ff4171914d4618b8542 Mon Sep 17 00:00:00 2001 From: Fabrice Drouin Date: Wed, 24 Apr 2019 17:32:53 +0200 Subject: [PATCH] Better handling of closed channels (#944) * Remove closed channels when application starts If the app is stopped just after a channel has transition from CLOSING to CLOSED, when the application starts again if will be restored as CLOSING. This commit checks channel data and remove closed channels instead of restoring them. * Channels Database: tag closed channels but don't delete them Instead we add a new `closed` column that we check when we restore channels. * Document how we check and remove closed channels on startup --- .../fr/acinq/eclair/channel/Channel.scala | 29 ++++----------- .../fr/acinq/eclair/channel/Helpers.scala | 33 +++++++++++++++++ .../eclair/db/sqlite/SqliteChannelsDb.scala | 35 +++++++++++++------ .../fr/acinq/eclair/io/Switchboard.scala | 10 +++++- .../eclair/db/SqliteChannelsDbSpec.scala | 32 ++++++++++++++++- 5 files changed, 103 insertions(+), 36 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala index 341137679..01bf73a84 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala @@ -1280,34 +1280,17 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId .onchainOutgoingHtlcs(d.commitments.localCommit, d.commitments.remoteCommit, d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit), tx) .map(add => (add, d.commitments.originChannels.get(add.id).collect { case Local(id, _) => id })) // we resolve the payment id if this was a local payment .collect { case (add, Some(id)) => context.system.eventStream.publish(PaymentSettlingOnChain(id, amount = MilliSatoshi(add.amountMsat), add.paymentHash)) } - // then let's see if any of the possible close scenarii can be considered done - val mutualCloseDone = d.mutualClosePublished.exists(_.txid == tx.txid) // this case is trivial, in a mutual close scenario we only need to make sure that one of the closing txes is confirmed - val localCommitDone = localCommitPublished1.map(Closing.isLocalCommitDone(_)).getOrElse(false) - val remoteCommitDone = remoteCommitPublished1.map(Closing.isRemoteCommitDone(_)).getOrElse(false) - val nextRemoteCommitDone = nextRemoteCommitPublished1.map(Closing.isRemoteCommitDone(_)).getOrElse(false) - val futureRemoteCommitDone = futureRemoteCommitPublished1.map(Closing.isRemoteCommitDone(_)).getOrElse(false) - val revokedCommitDone = revokedCommitPublished1.map(Closing.isRevokedCommitDone(_)).exists(_ == true) // we only need one revoked commit done - // finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay (note that we don't store the state) + // we update the channel data val d1 = d.copy(localCommitPublished = localCommitPublished1, remoteCommitPublished = remoteCommitPublished1, nextRemoteCommitPublished = nextRemoteCommitPublished1, futureRemoteCommitPublished = futureRemoteCommitPublished1, revokedCommitPublished = revokedCommitPublished1) - // we also send events related to fee + // and we also send events related to fee Closing.networkFeePaid(tx, d1) map { case (fee, desc) => feePaid(fee, tx, desc, d.channelId) } - val closeType_opt = if (mutualCloseDone) { - Some("mutual") - } else if (localCommitDone) { - Some("local") - } else if (remoteCommitDone || nextRemoteCommitDone) { - Some("remote") - } else if (futureRemoteCommitDone) { - Some("recovery") - } else if (revokedCommitDone) { - Some("revoked") - } else { - None - } + // then let's see if any of the possible close scenarii can be considered done + val closeType_opt = Closing.isClosed(d1, Some(tx)) + // finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay (note that we don't store the state) closeType_opt match { case Some(closeType) => log.info(s"channel closed (type=$closeType)") - context.system.eventStream.publish(ChannelClosed(self, d.channelId, closeType, d.commitments)) + context.system.eventStream.publish(ChannelClosed(self, d.channelId, closeType.toString, d.commitments)) goto(CLOSED) using store(d1) case None => stay using store(d1) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala index 37aea633f..af4642b89 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala @@ -337,6 +337,15 @@ object Helpers { object Closing { + // @formatter:off + sealed trait ClosingType + case object MutualClose extends ClosingType + case object LocalClose extends ClosingType + case object RemoteClose extends ClosingType + case object RecoveryClose extends ClosingType + case object RevokedClose extends ClosingType + // @formatter:on + /** * Indicates whether local has anything at stake in this channel * @@ -350,6 +359,30 @@ object Helpers { data.commitments.remoteCommit.spec.toRemoteMsat == 0 && data.commitments.remoteNextCommitInfo.isRight + /** + * Checks if a channel is closed (i.e. its closing tx has been confirmed) + * + * @param data channel state data + * @param additionalConfirmedTx_opt additional confirmed transaction; we need this for the mutual close scenario + * because we don't store the closing tx in the channel state + * @return the channel closing type, if applicable + */ + def isClosed(data: HasCommitments, additionalConfirmedTx_opt: Option[Transaction]): Option[ClosingType] = data match { + case closing: DATA_CLOSING if additionalConfirmedTx_opt.exists(closing.mutualClosePublished.contains) => + Some(MutualClose) + case closing: DATA_CLOSING if closing.localCommitPublished.exists(Closing.isLocalCommitDone) => + Some(LocalClose) + case closing: DATA_CLOSING if closing.remoteCommitPublished.exists(Closing.isRemoteCommitDone) => + Some(RemoteClose) + case closing: DATA_CLOSING if closing.nextRemoteCommitPublished.exists(Closing.isRemoteCommitDone) => + Some(RemoteClose) + case closing: DATA_CLOSING if closing.futureRemoteCommitPublished.exists(Closing.isRemoteCommitDone) => + Some(RecoveryClose) + case closing: DATA_CLOSING if closing.revokedCommitPublished.exists(Closing.isRevokedCommitDone) => + Some(RevokedClose) + case _ => None + } + // used only to compute tx weights and estimate fees lazy val dummyPublicKey = PrivateKey(ByteVector32(ByteVector.fill(32)(1)), true).publicKey diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala index f261f1bbf..5ffd26c51 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala @@ -16,29 +16,42 @@ package fr.acinq.eclair.db.sqlite -import java.sql.Connection +import java.sql.{Connection, Statement} import fr.acinq.bitcoin.ByteVector32 import fr.acinq.eclair.channel.HasCommitments import fr.acinq.eclair.db.ChannelsDb import fr.acinq.eclair.wire.ChannelCodecs.stateDataCodec +import grizzled.slf4j.Logging import scala.collection.immutable.Queue -class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb { +class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging { import SqliteUtils.ExtendedResultSet._ import SqliteUtils._ val DB_NAME = "channels" - val CURRENT_VERSION = 1 + val CURRENT_VERSION = 2 + + private def migration12(statement: Statement) = { + statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN is_closed BOOLEAN NOT NULL DEFAULT 0") + } using(sqlite.createStatement()) { statement => - require(getVersion(statement, DB_NAME, CURRENT_VERSION) == CURRENT_VERSION, s"incompatible version of $DB_NAME DB found") // there is only one version currently deployed - statement.execute("PRAGMA foreign_keys = ON") - statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL)") - statement.executeUpdate("CREATE TABLE IF NOT EXISTS htlc_infos (channel_id BLOB NOT NULL, commitment_number BLOB NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") - statement.executeUpdate("CREATE INDEX IF NOT EXISTS htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") + getVersion(statement, DB_NAME, CURRENT_VERSION) match { + case 1 => + logger.warn(s"migrating db $DB_NAME, found version=1 current=$CURRENT_VERSION") + migration12(statement) + setVersion(statement, DB_NAME, CURRENT_VERSION) + case CURRENT_VERSION => + statement.execute("PRAGMA foreign_keys = ON") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS htlc_infos (channel_id BLOB NOT NULL, commitment_number BLOB NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") + + case unknownVersion => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") + } } override def addOrUpdateChannel(state: HasCommitments): Unit = { @@ -47,7 +60,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb { update.setBytes(1, data) update.setBytes(2, state.channelId.toArray) if (update.executeUpdate() == 0) { - using(sqlite.prepareStatement("INSERT INTO local_channels VALUES (?, ?)")) { statement => + using(sqlite.prepareStatement("INSERT INTO local_channels VALUES (?, ?, 0)")) { statement => statement.setBytes(1, state.channelId.toArray) statement.setBytes(2, data) statement.executeUpdate() @@ -67,7 +80,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb { statement.executeUpdate() } - using(sqlite.prepareStatement("DELETE FROM local_channels WHERE channel_id=?")) { statement => + using(sqlite.prepareStatement("UPDATE local_channels SET is_closed=1 WHERE channel_id=?")) { statement => statement.setBytes(1, channelId.toArray) statement.executeUpdate() } @@ -75,7 +88,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb { override def listLocalChannels(): Seq[HasCommitments] = { using(sqlite.createStatement) { statement => - val rs = statement.executeQuery("SELECT data FROM local_channels") + val rs = statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=0") codecSequence(rs, stateDataCodec) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala index c5f91c19c..c1ec8c037 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala @@ -22,6 +22,7 @@ import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Stat import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey} import fr.acinq.eclair.NodeParams import fr.acinq.eclair.blockchain.EclairWallet +import fr.acinq.eclair.channel.Helpers.Closing import fr.acinq.eclair.channel.{HasCommitments, _} import fr.acinq.eclair.payment.Relayer.RelayPayload import fr.acinq.eclair.payment.{Relayed, Relayer} @@ -44,7 +45,14 @@ class Switchboard(nodeParams: NodeParams, authenticator: ActorRef, watcher: Acto // we load peers and channels from database { - val channels = nodeParams.db.channels.listLocalChannels() + // Check if channels that are still in CLOSING state have actually been closed. This can happen when the app is stopped + // just after a channel state has transitioned to CLOSED and before it has effectively been removed. + // Closed channels will be removed, other channels will be restored. + val (channels, closedChannels) = nodeParams.db.channels.listLocalChannels().partition(c => Closing.isClosed(c, None).isEmpty) + closedChannels.foreach(c => { + log.info(s"closing channel ${c.channelId}") + nodeParams.db.channels.removeChannel(c.channelId) + }) val peers = nodeParams.db.peers.listPeers() checkBrokenHtlcsLink(channels, nodeParams.privateKey) match { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/SqliteChannelsDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/SqliteChannelsDbSpec.scala index 76731dfcd..64b62472e 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/SqliteChannelsDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/SqliteChannelsDbSpec.scala @@ -18,7 +18,9 @@ package fr.acinq.eclair.db import fr.acinq.bitcoin.ByteVector32 import fr.acinq.eclair.TestConstants +import fr.acinq.eclair.db.sqlite.SqliteUtils.{getVersion, using} import fr.acinq.eclair.db.sqlite.{SqliteChannelsDb, SqlitePendingRelayDb} +import fr.acinq.eclair.wire.ChannelCodecs.stateDataCodec import org.scalatest.FunSuite import org.sqlite.SQLiteException import scodec.bits.ByteVector @@ -63,4 +65,32 @@ class SqliteChannelsDbSpec extends FunSuite { assert(db.listHtlcInfos(channel.channelId, commitNumber).toList == Nil) } -} + test("migrate channel database v1 -> v2") { + val sqlite = TestConstants.sqliteInMemory() + + // create a v1 channels database + using(sqlite.createStatement()) { statement => + getVersion(statement, "channels", 1) + statement.execute("PRAGMA foreign_keys = ON") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS htlc_infos (channel_id BLOB NOT NULL, commitment_number BLOB NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") + } + + // insert 1 row + val channel = ChannelStateSpec.normal + val data = stateDataCodec.encode(channel).require.toByteArray + using(sqlite.prepareStatement("INSERT INTO local_channels VALUES (?, ?)")) { statement => + statement.setBytes(1, channel.channelId.toArray) + statement.setBytes(2, data) + statement.executeUpdate() + } + + // check that db migration works + val db = new SqliteChannelsDb(sqlite) + using(sqlite.createStatement()) { statement => + assert(getVersion(statement, "channels", 1) == 2) // version changed from 1 -> 2 + } + assert(db.listLocalChannels() === List(channel)) + } +} \ No newline at end of file