diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala index 341137679..01bf73a84 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala @@ -1280,34 +1280,17 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId .onchainOutgoingHtlcs(d.commitments.localCommit, d.commitments.remoteCommit, d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit), tx) .map(add => (add, d.commitments.originChannels.get(add.id).collect { case Local(id, _) => id })) // we resolve the payment id if this was a local payment .collect { case (add, Some(id)) => context.system.eventStream.publish(PaymentSettlingOnChain(id, amount = MilliSatoshi(add.amountMsat), add.paymentHash)) } - // then let's see if any of the possible close scenarii can be considered done - val mutualCloseDone = d.mutualClosePublished.exists(_.txid == tx.txid) // this case is trivial, in a mutual close scenario we only need to make sure that one of the closing txes is confirmed - val localCommitDone = localCommitPublished1.map(Closing.isLocalCommitDone(_)).getOrElse(false) - val remoteCommitDone = remoteCommitPublished1.map(Closing.isRemoteCommitDone(_)).getOrElse(false) - val nextRemoteCommitDone = nextRemoteCommitPublished1.map(Closing.isRemoteCommitDone(_)).getOrElse(false) - val futureRemoteCommitDone = futureRemoteCommitPublished1.map(Closing.isRemoteCommitDone(_)).getOrElse(false) - val revokedCommitDone = revokedCommitPublished1.map(Closing.isRevokedCommitDone(_)).exists(_ == true) // we only need one revoked commit done - // finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay (note that we don't store the state) + // we update the channel data val d1 = d.copy(localCommitPublished = localCommitPublished1, remoteCommitPublished = remoteCommitPublished1, nextRemoteCommitPublished = nextRemoteCommitPublished1, futureRemoteCommitPublished = futureRemoteCommitPublished1, revokedCommitPublished = revokedCommitPublished1) - // we also send events related to fee + // and we also send events related to fee Closing.networkFeePaid(tx, d1) map { case (fee, desc) => feePaid(fee, tx, desc, d.channelId) } - val closeType_opt = if (mutualCloseDone) { - Some("mutual") - } else if (localCommitDone) { - Some("local") - } else if (remoteCommitDone || nextRemoteCommitDone) { - Some("remote") - } else if (futureRemoteCommitDone) { - Some("recovery") - } else if (revokedCommitDone) { - Some("revoked") - } else { - None - } + // then let's see if any of the possible close scenarii can be considered done + val closeType_opt = Closing.isClosed(d1, Some(tx)) + // finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay (note that we don't store the state) closeType_opt match { case Some(closeType) => log.info(s"channel closed (type=$closeType)") - context.system.eventStream.publish(ChannelClosed(self, d.channelId, closeType, d.commitments)) + context.system.eventStream.publish(ChannelClosed(self, d.channelId, closeType.toString, d.commitments)) goto(CLOSED) using store(d1) case None => stay using store(d1) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala index 37aea633f..af4642b89 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala @@ -337,6 +337,15 @@ object Helpers { object Closing { + // @formatter:off + sealed trait ClosingType + case object MutualClose extends ClosingType + case object LocalClose extends ClosingType + case object RemoteClose extends ClosingType + case object RecoveryClose extends ClosingType + case object RevokedClose extends ClosingType + // @formatter:on + /** * Indicates whether local has anything at stake in this channel * @@ -350,6 +359,30 @@ object Helpers { data.commitments.remoteCommit.spec.toRemoteMsat == 0 && data.commitments.remoteNextCommitInfo.isRight + /** + * Checks if a channel is closed (i.e. its closing tx has been confirmed) + * + * @param data channel state data + * @param additionalConfirmedTx_opt additional confirmed transaction; we need this for the mutual close scenario + * because we don't store the closing tx in the channel state + * @return the channel closing type, if applicable + */ + def isClosed(data: HasCommitments, additionalConfirmedTx_opt: Option[Transaction]): Option[ClosingType] = data match { + case closing: DATA_CLOSING if additionalConfirmedTx_opt.exists(closing.mutualClosePublished.contains) => + Some(MutualClose) + case closing: DATA_CLOSING if closing.localCommitPublished.exists(Closing.isLocalCommitDone) => + Some(LocalClose) + case closing: DATA_CLOSING if closing.remoteCommitPublished.exists(Closing.isRemoteCommitDone) => + Some(RemoteClose) + case closing: DATA_CLOSING if closing.nextRemoteCommitPublished.exists(Closing.isRemoteCommitDone) => + Some(RemoteClose) + case closing: DATA_CLOSING if closing.futureRemoteCommitPublished.exists(Closing.isRemoteCommitDone) => + Some(RecoveryClose) + case closing: DATA_CLOSING if closing.revokedCommitPublished.exists(Closing.isRevokedCommitDone) => + Some(RevokedClose) + case _ => None + } + // used only to compute tx weights and estimate fees lazy val dummyPublicKey = PrivateKey(ByteVector32(ByteVector.fill(32)(1)), true).publicKey diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala index f261f1bbf..5ffd26c51 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala @@ -16,29 +16,42 @@ package fr.acinq.eclair.db.sqlite -import java.sql.Connection +import java.sql.{Connection, Statement} import fr.acinq.bitcoin.ByteVector32 import fr.acinq.eclair.channel.HasCommitments import fr.acinq.eclair.db.ChannelsDb import fr.acinq.eclair.wire.ChannelCodecs.stateDataCodec +import grizzled.slf4j.Logging import scala.collection.immutable.Queue -class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb { +class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging { import SqliteUtils.ExtendedResultSet._ import SqliteUtils._ val DB_NAME = "channels" - val CURRENT_VERSION = 1 + val CURRENT_VERSION = 2 + + private def migration12(statement: Statement) = { + statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN is_closed BOOLEAN NOT NULL DEFAULT 0") + } using(sqlite.createStatement()) { statement => - require(getVersion(statement, DB_NAME, CURRENT_VERSION) == CURRENT_VERSION, s"incompatible version of $DB_NAME DB found") // there is only one version currently deployed - statement.execute("PRAGMA foreign_keys = ON") - statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL)") - statement.executeUpdate("CREATE TABLE IF NOT EXISTS htlc_infos (channel_id BLOB NOT NULL, commitment_number BLOB NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") - statement.executeUpdate("CREATE INDEX IF NOT EXISTS htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") + getVersion(statement, DB_NAME, CURRENT_VERSION) match { + case 1 => + logger.warn(s"migrating db $DB_NAME, found version=1 current=$CURRENT_VERSION") + migration12(statement) + setVersion(statement, DB_NAME, CURRENT_VERSION) + case CURRENT_VERSION => + statement.execute("PRAGMA foreign_keys = ON") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS htlc_infos (channel_id BLOB NOT NULL, commitment_number BLOB NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") + + case unknownVersion => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") + } } override def addOrUpdateChannel(state: HasCommitments): Unit = { @@ -47,7 +60,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb { update.setBytes(1, data) update.setBytes(2, state.channelId.toArray) if (update.executeUpdate() == 0) { - using(sqlite.prepareStatement("INSERT INTO local_channels VALUES (?, ?)")) { statement => + using(sqlite.prepareStatement("INSERT INTO local_channels VALUES (?, ?, 0)")) { statement => statement.setBytes(1, state.channelId.toArray) statement.setBytes(2, data) statement.executeUpdate() @@ -67,7 +80,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb { statement.executeUpdate() } - using(sqlite.prepareStatement("DELETE FROM local_channels WHERE channel_id=?")) { statement => + using(sqlite.prepareStatement("UPDATE local_channels SET is_closed=1 WHERE channel_id=?")) { statement => statement.setBytes(1, channelId.toArray) statement.executeUpdate() } @@ -75,7 +88,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb { override def listLocalChannels(): Seq[HasCommitments] = { using(sqlite.createStatement) { statement => - val rs = statement.executeQuery("SELECT data FROM local_channels") + val rs = statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=0") codecSequence(rs, stateDataCodec) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala index c5f91c19c..c1ec8c037 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala @@ -22,6 +22,7 @@ import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Stat import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey} import fr.acinq.eclair.NodeParams import fr.acinq.eclair.blockchain.EclairWallet +import fr.acinq.eclair.channel.Helpers.Closing import fr.acinq.eclair.channel.{HasCommitments, _} import fr.acinq.eclair.payment.Relayer.RelayPayload import fr.acinq.eclair.payment.{Relayed, Relayer} @@ -44,7 +45,14 @@ class Switchboard(nodeParams: NodeParams, authenticator: ActorRef, watcher: Acto // we load peers and channels from database { - val channels = nodeParams.db.channels.listLocalChannels() + // Check if channels that are still in CLOSING state have actually been closed. This can happen when the app is stopped + // just after a channel state has transitioned to CLOSED and before it has effectively been removed. + // Closed channels will be removed, other channels will be restored. + val (channels, closedChannels) = nodeParams.db.channels.listLocalChannels().partition(c => Closing.isClosed(c, None).isEmpty) + closedChannels.foreach(c => { + log.info(s"closing channel ${c.channelId}") + nodeParams.db.channels.removeChannel(c.channelId) + }) val peers = nodeParams.db.peers.listPeers() checkBrokenHtlcsLink(channels, nodeParams.privateKey) match { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/SqliteChannelsDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/SqliteChannelsDbSpec.scala index 76731dfcd..64b62472e 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/SqliteChannelsDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/SqliteChannelsDbSpec.scala @@ -18,7 +18,9 @@ package fr.acinq.eclair.db import fr.acinq.bitcoin.ByteVector32 import fr.acinq.eclair.TestConstants +import fr.acinq.eclair.db.sqlite.SqliteUtils.{getVersion, using} import fr.acinq.eclair.db.sqlite.{SqliteChannelsDb, SqlitePendingRelayDb} +import fr.acinq.eclair.wire.ChannelCodecs.stateDataCodec import org.scalatest.FunSuite import org.sqlite.SQLiteException import scodec.bits.ByteVector @@ -63,4 +65,32 @@ class SqliteChannelsDbSpec extends FunSuite { assert(db.listHtlcInfos(channel.channelId, commitNumber).toList == Nil) } -} + test("migrate channel database v1 -> v2") { + val sqlite = TestConstants.sqliteInMemory() + + // create a v1 channels database + using(sqlite.createStatement()) { statement => + getVersion(statement, "channels", 1) + statement.execute("PRAGMA foreign_keys = ON") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS htlc_infos (channel_id BLOB NOT NULL, commitment_number BLOB NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") + } + + // insert 1 row + val channel = ChannelStateSpec.normal + val data = stateDataCodec.encode(channel).require.toByteArray + using(sqlite.prepareStatement("INSERT INTO local_channels VALUES (?, ?)")) { statement => + statement.setBytes(1, channel.channelId.toArray) + statement.setBytes(2, data) + statement.executeUpdate() + } + + // check that db migration works + val db = new SqliteChannelsDb(sqlite) + using(sqlite.createStatement()) { statement => + assert(getVersion(statement, "channels", 1) == 2) // version changed from 1 -> 2 + } + assert(db.listLocalChannels() === List(channel)) + } +} \ No newline at end of file