mirror of
https://github.com/ACINQ/eclair.git
synced 2025-02-23 06:35:11 +01:00
Better handling of closed channels (#944)
* Remove closed channels when application starts If the app is stopped just after a channel has transition from CLOSING to CLOSED, when the application starts again if will be restored as CLOSING. This commit checks channel data and remove closed channels instead of restoring them. * Channels Database: tag closed channels but don't delete them Instead we add a new `closed` column that we check when we restore channels. * Document how we check and remove closed channels on startup
This commit is contained in:
parent
595c23c38c
commit
650c5049d5
5 changed files with 103 additions and 36 deletions
|
@ -1280,34 +1280,17 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
|
|||
.onchainOutgoingHtlcs(d.commitments.localCommit, d.commitments.remoteCommit, d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit), tx)
|
||||
.map(add => (add, d.commitments.originChannels.get(add.id).collect { case Local(id, _) => id })) // we resolve the payment id if this was a local payment
|
||||
.collect { case (add, Some(id)) => context.system.eventStream.publish(PaymentSettlingOnChain(id, amount = MilliSatoshi(add.amountMsat), add.paymentHash)) }
|
||||
// then let's see if any of the possible close scenarii can be considered done
|
||||
val mutualCloseDone = d.mutualClosePublished.exists(_.txid == tx.txid) // this case is trivial, in a mutual close scenario we only need to make sure that one of the closing txes is confirmed
|
||||
val localCommitDone = localCommitPublished1.map(Closing.isLocalCommitDone(_)).getOrElse(false)
|
||||
val remoteCommitDone = remoteCommitPublished1.map(Closing.isRemoteCommitDone(_)).getOrElse(false)
|
||||
val nextRemoteCommitDone = nextRemoteCommitPublished1.map(Closing.isRemoteCommitDone(_)).getOrElse(false)
|
||||
val futureRemoteCommitDone = futureRemoteCommitPublished1.map(Closing.isRemoteCommitDone(_)).getOrElse(false)
|
||||
val revokedCommitDone = revokedCommitPublished1.map(Closing.isRevokedCommitDone(_)).exists(_ == true) // we only need one revoked commit done
|
||||
// finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay (note that we don't store the state)
|
||||
// we update the channel data
|
||||
val d1 = d.copy(localCommitPublished = localCommitPublished1, remoteCommitPublished = remoteCommitPublished1, nextRemoteCommitPublished = nextRemoteCommitPublished1, futureRemoteCommitPublished = futureRemoteCommitPublished1, revokedCommitPublished = revokedCommitPublished1)
|
||||
// we also send events related to fee
|
||||
// and we also send events related to fee
|
||||
Closing.networkFeePaid(tx, d1) map { case (fee, desc) => feePaid(fee, tx, desc, d.channelId) }
|
||||
val closeType_opt = if (mutualCloseDone) {
|
||||
Some("mutual")
|
||||
} else if (localCommitDone) {
|
||||
Some("local")
|
||||
} else if (remoteCommitDone || nextRemoteCommitDone) {
|
||||
Some("remote")
|
||||
} else if (futureRemoteCommitDone) {
|
||||
Some("recovery")
|
||||
} else if (revokedCommitDone) {
|
||||
Some("revoked")
|
||||
} else {
|
||||
None
|
||||
}
|
||||
// then let's see if any of the possible close scenarii can be considered done
|
||||
val closeType_opt = Closing.isClosed(d1, Some(tx))
|
||||
// finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay (note that we don't store the state)
|
||||
closeType_opt match {
|
||||
case Some(closeType) =>
|
||||
log.info(s"channel closed (type=$closeType)")
|
||||
context.system.eventStream.publish(ChannelClosed(self, d.channelId, closeType, d.commitments))
|
||||
context.system.eventStream.publish(ChannelClosed(self, d.channelId, closeType.toString, d.commitments))
|
||||
goto(CLOSED) using store(d1)
|
||||
case None =>
|
||||
stay using store(d1)
|
||||
|
|
|
@ -337,6 +337,15 @@ object Helpers {
|
|||
|
||||
object Closing {
|
||||
|
||||
// @formatter:off
|
||||
sealed trait ClosingType
|
||||
case object MutualClose extends ClosingType
|
||||
case object LocalClose extends ClosingType
|
||||
case object RemoteClose extends ClosingType
|
||||
case object RecoveryClose extends ClosingType
|
||||
case object RevokedClose extends ClosingType
|
||||
// @formatter:on
|
||||
|
||||
/**
|
||||
* Indicates whether local has anything at stake in this channel
|
||||
*
|
||||
|
@ -350,6 +359,30 @@ object Helpers {
|
|||
data.commitments.remoteCommit.spec.toRemoteMsat == 0 &&
|
||||
data.commitments.remoteNextCommitInfo.isRight
|
||||
|
||||
/**
|
||||
* Checks if a channel is closed (i.e. its closing tx has been confirmed)
|
||||
*
|
||||
* @param data channel state data
|
||||
* @param additionalConfirmedTx_opt additional confirmed transaction; we need this for the mutual close scenario
|
||||
* because we don't store the closing tx in the channel state
|
||||
* @return the channel closing type, if applicable
|
||||
*/
|
||||
def isClosed(data: HasCommitments, additionalConfirmedTx_opt: Option[Transaction]): Option[ClosingType] = data match {
|
||||
case closing: DATA_CLOSING if additionalConfirmedTx_opt.exists(closing.mutualClosePublished.contains) =>
|
||||
Some(MutualClose)
|
||||
case closing: DATA_CLOSING if closing.localCommitPublished.exists(Closing.isLocalCommitDone) =>
|
||||
Some(LocalClose)
|
||||
case closing: DATA_CLOSING if closing.remoteCommitPublished.exists(Closing.isRemoteCommitDone) =>
|
||||
Some(RemoteClose)
|
||||
case closing: DATA_CLOSING if closing.nextRemoteCommitPublished.exists(Closing.isRemoteCommitDone) =>
|
||||
Some(RemoteClose)
|
||||
case closing: DATA_CLOSING if closing.futureRemoteCommitPublished.exists(Closing.isRemoteCommitDone) =>
|
||||
Some(RecoveryClose)
|
||||
case closing: DATA_CLOSING if closing.revokedCommitPublished.exists(Closing.isRevokedCommitDone) =>
|
||||
Some(RevokedClose)
|
||||
case _ => None
|
||||
}
|
||||
|
||||
// used only to compute tx weights and estimate fees
|
||||
lazy val dummyPublicKey = PrivateKey(ByteVector32(ByteVector.fill(32)(1)), true).publicKey
|
||||
|
||||
|
|
|
@ -16,29 +16,42 @@
|
|||
|
||||
package fr.acinq.eclair.db.sqlite
|
||||
|
||||
import java.sql.Connection
|
||||
import java.sql.{Connection, Statement}
|
||||
|
||||
import fr.acinq.bitcoin.ByteVector32
|
||||
import fr.acinq.eclair.channel.HasCommitments
|
||||
import fr.acinq.eclair.db.ChannelsDb
|
||||
import fr.acinq.eclair.wire.ChannelCodecs.stateDataCodec
|
||||
import grizzled.slf4j.Logging
|
||||
|
||||
import scala.collection.immutable.Queue
|
||||
|
||||
class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb {
|
||||
class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging {
|
||||
|
||||
import SqliteUtils.ExtendedResultSet._
|
||||
import SqliteUtils._
|
||||
|
||||
val DB_NAME = "channels"
|
||||
val CURRENT_VERSION = 1
|
||||
val CURRENT_VERSION = 2
|
||||
|
||||
private def migration12(statement: Statement) = {
|
||||
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN is_closed BOOLEAN NOT NULL DEFAULT 0")
|
||||
}
|
||||
|
||||
using(sqlite.createStatement()) { statement =>
|
||||
require(getVersion(statement, DB_NAME, CURRENT_VERSION) == CURRENT_VERSION, s"incompatible version of $DB_NAME DB found") // there is only one version currently deployed
|
||||
statement.execute("PRAGMA foreign_keys = ON")
|
||||
statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL)")
|
||||
statement.executeUpdate("CREATE TABLE IF NOT EXISTS htlc_infos (channel_id BLOB NOT NULL, commitment_number BLOB NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))")
|
||||
statement.executeUpdate("CREATE INDEX IF NOT EXISTS htlc_infos_idx ON htlc_infos(channel_id, commitment_number)")
|
||||
getVersion(statement, DB_NAME, CURRENT_VERSION) match {
|
||||
case 1 =>
|
||||
logger.warn(s"migrating db $DB_NAME, found version=1 current=$CURRENT_VERSION")
|
||||
migration12(statement)
|
||||
setVersion(statement, DB_NAME, CURRENT_VERSION)
|
||||
case CURRENT_VERSION =>
|
||||
statement.execute("PRAGMA foreign_keys = ON")
|
||||
statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0)")
|
||||
statement.executeUpdate("CREATE TABLE IF NOT EXISTS htlc_infos (channel_id BLOB NOT NULL, commitment_number BLOB NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))")
|
||||
statement.executeUpdate("CREATE INDEX IF NOT EXISTS htlc_infos_idx ON htlc_infos(channel_id, commitment_number)")
|
||||
|
||||
case unknownVersion => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
|
||||
}
|
||||
}
|
||||
|
||||
override def addOrUpdateChannel(state: HasCommitments): Unit = {
|
||||
|
@ -47,7 +60,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb {
|
|||
update.setBytes(1, data)
|
||||
update.setBytes(2, state.channelId.toArray)
|
||||
if (update.executeUpdate() == 0) {
|
||||
using(sqlite.prepareStatement("INSERT INTO local_channels VALUES (?, ?)")) { statement =>
|
||||
using(sqlite.prepareStatement("INSERT INTO local_channels VALUES (?, ?, 0)")) { statement =>
|
||||
statement.setBytes(1, state.channelId.toArray)
|
||||
statement.setBytes(2, data)
|
||||
statement.executeUpdate()
|
||||
|
@ -67,7 +80,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb {
|
|||
statement.executeUpdate()
|
||||
}
|
||||
|
||||
using(sqlite.prepareStatement("DELETE FROM local_channels WHERE channel_id=?")) { statement =>
|
||||
using(sqlite.prepareStatement("UPDATE local_channels SET is_closed=1 WHERE channel_id=?")) { statement =>
|
||||
statement.setBytes(1, channelId.toArray)
|
||||
statement.executeUpdate()
|
||||
}
|
||||
|
@ -75,7 +88,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb {
|
|||
|
||||
override def listLocalChannels(): Seq[HasCommitments] = {
|
||||
using(sqlite.createStatement) { statement =>
|
||||
val rs = statement.executeQuery("SELECT data FROM local_channels")
|
||||
val rs = statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=0")
|
||||
codecSequence(rs, stateDataCodec)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Stat
|
|||
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
|
||||
import fr.acinq.eclair.NodeParams
|
||||
import fr.acinq.eclair.blockchain.EclairWallet
|
||||
import fr.acinq.eclair.channel.Helpers.Closing
|
||||
import fr.acinq.eclair.channel.{HasCommitments, _}
|
||||
import fr.acinq.eclair.payment.Relayer.RelayPayload
|
||||
import fr.acinq.eclair.payment.{Relayed, Relayer}
|
||||
|
@ -44,7 +45,14 @@ class Switchboard(nodeParams: NodeParams, authenticator: ActorRef, watcher: Acto
|
|||
|
||||
// we load peers and channels from database
|
||||
{
|
||||
val channels = nodeParams.db.channels.listLocalChannels()
|
||||
// Check if channels that are still in CLOSING state have actually been closed. This can happen when the app is stopped
|
||||
// just after a channel state has transitioned to CLOSED and before it has effectively been removed.
|
||||
// Closed channels will be removed, other channels will be restored.
|
||||
val (channels, closedChannels) = nodeParams.db.channels.listLocalChannels().partition(c => Closing.isClosed(c, None).isEmpty)
|
||||
closedChannels.foreach(c => {
|
||||
log.info(s"closing channel ${c.channelId}")
|
||||
nodeParams.db.channels.removeChannel(c.channelId)
|
||||
})
|
||||
val peers = nodeParams.db.peers.listPeers()
|
||||
|
||||
checkBrokenHtlcsLink(channels, nodeParams.privateKey) match {
|
||||
|
|
|
@ -18,7 +18,9 @@ package fr.acinq.eclair.db
|
|||
|
||||
import fr.acinq.bitcoin.ByteVector32
|
||||
import fr.acinq.eclair.TestConstants
|
||||
import fr.acinq.eclair.db.sqlite.SqliteUtils.{getVersion, using}
|
||||
import fr.acinq.eclair.db.sqlite.{SqliteChannelsDb, SqlitePendingRelayDb}
|
||||
import fr.acinq.eclair.wire.ChannelCodecs.stateDataCodec
|
||||
import org.scalatest.FunSuite
|
||||
import org.sqlite.SQLiteException
|
||||
import scodec.bits.ByteVector
|
||||
|
@ -63,4 +65,32 @@ class SqliteChannelsDbSpec extends FunSuite {
|
|||
assert(db.listHtlcInfos(channel.channelId, commitNumber).toList == Nil)
|
||||
}
|
||||
|
||||
}
|
||||
test("migrate channel database v1 -> v2") {
|
||||
val sqlite = TestConstants.sqliteInMemory()
|
||||
|
||||
// create a v1 channels database
|
||||
using(sqlite.createStatement()) { statement =>
|
||||
getVersion(statement, "channels", 1)
|
||||
statement.execute("PRAGMA foreign_keys = ON")
|
||||
statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL)")
|
||||
statement.executeUpdate("CREATE TABLE IF NOT EXISTS htlc_infos (channel_id BLOB NOT NULL, commitment_number BLOB NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))")
|
||||
statement.executeUpdate("CREATE INDEX IF NOT EXISTS htlc_infos_idx ON htlc_infos(channel_id, commitment_number)")
|
||||
}
|
||||
|
||||
// insert 1 row
|
||||
val channel = ChannelStateSpec.normal
|
||||
val data = stateDataCodec.encode(channel).require.toByteArray
|
||||
using(sqlite.prepareStatement("INSERT INTO local_channels VALUES (?, ?)")) { statement =>
|
||||
statement.setBytes(1, channel.channelId.toArray)
|
||||
statement.setBytes(2, data)
|
||||
statement.executeUpdate()
|
||||
}
|
||||
|
||||
// check that db migration works
|
||||
val db = new SqliteChannelsDb(sqlite)
|
||||
using(sqlite.createStatement()) { statement =>
|
||||
assert(getVersion(statement, "channels", 1) == 2) // version changed from 1 -> 2
|
||||
}
|
||||
assert(db.listLocalChannels() === List(channel))
|
||||
}
|
||||
}
|
Loading…
Add table
Reference in a new issue