1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-02-20 13:34:35 +01:00

Add channel errors in audit db (#955)

We now keep track of all local/remote channel errors in the audit db.
This commit is contained in:
Pierre-Marie Padiou 2019-04-19 16:28:02 +02:00 committed by GitHub
parent ed507334e0
commit f563ca0897
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 133 additions and 53 deletions

View file

@ -1675,7 +1675,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
def replyToUser(message: Either[Channel.ChannelError, String]) = {
val m = message match {
case Left(LocalError(t)) => Status.Failure(t)
case Left(RemoteError(e)) => Status.Failure(new RuntimeException(s"peer sent error: '${if (isAsciiPrintable(e.data)) new String(e.data.toArray, StandardCharsets.US_ASCII) else e.data.toString()}'"))
case Left(RemoteError(e)) => Status.Failure(new RuntimeException(s"peer sent error: ascii='${e.toAscii}' bin=${e.data.toHex}"))
case Right(s) => s
}
origin_opt.map(_ ! m)
@ -1691,6 +1691,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case _: ChannelException => ()
case _ => log.error(cause, s"msg=$cmd stateData=$stateData ")
}
context.system.eventStream.publish(ChannelErrorOccured(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, LocalError(cause), isFatal = false))
stay replying Status.Failure(cause)
}
@ -1700,7 +1701,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val error = Error(d.channelId, exc.getMessage)
// NB: we don't use the handleLocalError handler because it would result in the commit tx being published, which we don't want:
// implementation *guarantees* that in case of BITCOIN_FUNDING_PUBLISH_FAILED, the funding tx hasn't and will never be published, so we can close the channel right away
context.system.eventStream.publish(ChannelFailed(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, LocalError(exc)))
context.system.eventStream.publish(ChannelErrorOccured(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, LocalError(exc), isFatal = true))
goto(CLOSED) sending error
}
@ -1708,7 +1709,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
log.warning(s"funding tx hasn't been confirmed in time, cancelling channel delay=$FUNDING_TIMEOUT_FUNDEE")
val exc = FundingTxTimedout(d.channelId)
val error = Error(d.channelId, exc.getMessage)
context.system.eventStream.publish(ChannelFailed(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, LocalError(exc)))
context.system.eventStream.publish(ChannelErrorOccured(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, LocalError(exc), isFatal = true))
goto(ERR_FUNDING_TIMEOUT) sending error
}
@ -1732,7 +1733,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case _ => log.error(cause, s"msg=${msg.getOrElse("n/a")} stateData=$stateData ")
}
val error = Error(Helpers.getChannelId(d), cause.getMessage)
context.system.eventStream.publish(ChannelFailed(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, LocalError(cause)))
context.system.eventStream.publish(ChannelErrorOccured(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, LocalError(cause), isFatal = true))
d match {
case dd: HasCommitments if Closing.nothingAtStake(dd) => goto(CLOSED)
@ -1747,8 +1748,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
def handleRemoteError(e: Error, d: Data) = {
// see BOLT 1: only print out data verbatim if is composed of printable ASCII characters
log.error(s"peer sent error: ascii='${if (isAsciiPrintable(e.data)) new String(e.data.toArray, StandardCharsets.US_ASCII) else "n/a"}' bin=${e.data}")
context.system.eventStream.publish(ChannelFailed(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, RemoteError(e)))
log.error(s"peer sent error: ascii='${e.toAscii}' bin=${e.data.toHex}")
context.system.eventStream.publish(ChannelErrorOccured(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, RemoteError(e), isFatal = true))
d match {
case _: DATA_CLOSING => stay // nothing to do, there is already a spending tx published

View file

@ -47,7 +47,7 @@ case class ChannelSignatureSent(channel: ActorRef, commitments: Commitments) ext
case class ChannelSignatureReceived(channel: ActorRef, commitments: Commitments) extends ChannelEvent
case class ChannelFailed(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, data: Data, error: ChannelError) extends ChannelEvent
case class ChannelErrorOccured(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, data: Data, error: ChannelError, isFatal: Boolean) extends ChannelEvent
case class NetworkFeePaid(channel: ActorRef, remoteNodeId: PublicKey, channelId: ByteVector32, tx: Transaction, fee: Satoshi, txType: String) extends ChannelEvent

View file

@ -35,6 +35,8 @@ trait AuditDb {
def add(networkFeePaid: NetworkFeePaid)
def add(channelErrorOccured: ChannelErrorOccured)
def listSent(from: Long, to: Long): Seq[PaymentSent]
def listReceived(from: Long, to: Long): Seq[PaymentReceived]

View file

@ -16,12 +16,12 @@
package fr.acinq.eclair.db.sqlite
import java.sql.Connection
import java.sql.{Connection, Statement}
import java.util.UUID
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.MilliSatoshi
import fr.acinq.eclair.channel.{AvailableBalanceChanged, NetworkFeePaid}
import fr.acinq.eclair.channel.{AvailableBalanceChanged, Channel, ChannelErrorOccured, NetworkFeePaid}
import fr.acinq.eclair.db.{AuditDb, ChannelLifecycleEvent, NetworkFee, Stats}
import fr.acinq.eclair.payment.{PaymentReceived, PaymentRelayed, PaymentSent}
import fr.acinq.eclair.wire.ChannelCodecs
@ -36,48 +36,48 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
import ExtendedResultSet._
val DB_NAME = "audit"
val CURRENT_VERSION = 2
val CURRENT_VERSION = 3
using(sqlite.createStatement()) { statement =>
getVersion(statement, DB_NAME, CURRENT_VERSION) match {
case 1 => // previous version let's migrate
logger.warn(s"Performing db migration for DB $DB_NAME, found version=1 current=$CURRENT_VERSION")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS balance_updated (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, amount_msat INTEGER NOT NULL, capacity_sat INTEGER NOT NULL, reserve_sat INTEGER NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, payment_preimage BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS received (amount_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, from_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (amount_in_msat INTEGER NOT NULL, amount_out_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, from_channel_id BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS network_fees (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, tx_id BLOB NOT NULL, fee_sat INTEGER NOT NULL, tx_type TEXT NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_events (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, capacity_sat INTEGER NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event STRING NOT NULL, timestamp INTEGER NOT NULL)")
// add id
statement.executeUpdate(s"ALTER TABLE sent ADD id BLOB DEFAULT '${ChannelCodecs.UNKNOWN_UUID.toString}' NOT NULL")
def migration12(statement: Statement) = {
statement.executeUpdate(s"ALTER TABLE sent ADD id BLOB DEFAULT '${ChannelCodecs.UNKNOWN_UUID.toString}' NOT NULL")
}
statement.executeUpdate("CREATE INDEX IF NOT EXISTS balance_updated_idx ON balance_updated(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS sent_timestamp_idx ON sent(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS received_timestamp_idx ON received(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_timestamp_idx ON relayed(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS network_fees_timestamp_idx ON network_fees(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_events_timestamp_idx ON channel_events(timestamp)")
def migration23(statement: Statement) = {
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_errors (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, error_name STRING NOT NULL, error_message STRING NOT NULL, is_fatal INTEGER NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_errors_timestamp_idx ON channel_errors(timestamp)")
}
// update version
setVersion(statement, DB_NAME, CURRENT_VERSION)
case CURRENT_VERSION =>
statement.executeUpdate("CREATE TABLE IF NOT EXISTS balance_updated (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, amount_msat INTEGER NOT NULL, capacity_sat INTEGER NOT NULL, reserve_sat INTEGER NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, payment_preimage BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL, id BLOB NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS received (amount_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, from_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (amount_in_msat INTEGER NOT NULL, amount_out_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, from_channel_id BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS network_fees (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, tx_id BLOB NOT NULL, fee_sat INTEGER NOT NULL, tx_type TEXT NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_events (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, capacity_sat INTEGER NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event STRING NOT NULL, timestamp INTEGER NOT NULL)")
getVersion(statement, DB_NAME, CURRENT_VERSION) match {
case 1 => // previous version let's migrate
logger.warn(s"migrating db $DB_NAME, found version=1 current=$CURRENT_VERSION")
migration12(statement)
migration23(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case 2 =>
logger.warn(s"migrating db $DB_NAME, found version=2 current=$CURRENT_VERSION")
migration23(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case CURRENT_VERSION =>
statement.executeUpdate("CREATE TABLE IF NOT EXISTS balance_updated (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, amount_msat INTEGER NOT NULL, capacity_sat INTEGER NOT NULL, reserve_sat INTEGER NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, payment_preimage BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL, id BLOB NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS received (amount_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, from_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (amount_in_msat INTEGER NOT NULL, amount_out_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, from_channel_id BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS network_fees (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, tx_id BLOB NOT NULL, fee_sat INTEGER NOT NULL, tx_type TEXT NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_events (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, capacity_sat INTEGER NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event STRING NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_errors (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, error_name STRING NOT NULL, error_message STRING NOT NULL, is_fatal INTEGER NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS balance_updated_idx ON balance_updated(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS sent_timestamp_idx ON sent(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS received_timestamp_idx ON received(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_timestamp_idx ON relayed(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS network_fees_timestamp_idx ON network_fees(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_events_timestamp_idx ON channel_events(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS balance_updated_idx ON balance_updated(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS sent_timestamp_idx ON sent(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS received_timestamp_idx ON received(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_timestamp_idx ON relayed(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS network_fees_timestamp_idx ON network_fees(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_events_timestamp_idx ON channel_events(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_errors_timestamp_idx ON channel_errors(timestamp)")
case unknownVersion => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
case unknownVersion => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
}
override def add(e: AvailableBalanceChanged): Unit =
@ -147,6 +147,21 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
statement.executeUpdate()
}
override def add(e: ChannelErrorOccured): Unit =
using(sqlite.prepareStatement("INSERT INTO channel_errors VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
val (errorName, errorMessage) = e.error match {
case Channel.LocalError(t) => (t.getClass.getSimpleName, t.getMessage)
case Channel.RemoteError(error) => ("remote", error.toAscii)
}
statement.setBytes(1, e.channelId.toArray)
statement.setBytes(2, e.remoteNodeId.toBin.toArray)
statement.setString(3, errorName)
statement.setString(4, errorMessage)
statement.setBoolean(5, e.isFatal)
statement.setLong(6, Platform.currentTime)
statement.executeUpdate()
}
override def listSent(from: Long, to: Long): Seq[PaymentSent] =
using(sqlite.prepareStatement("SELECT * FROM sent WHERE timestamp >= ? AND timestamp < ?")) { statement =>
statement.setLong(1, from)

View file

@ -19,6 +19,7 @@ package fr.acinq.eclair.payment
import akka.actor.{Actor, ActorLogging, Props}
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.channel.Channel.{LocalError, RemoteError}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.{AuditDb, ChannelLifecycleEvent}
@ -32,6 +33,7 @@ class Auditor(nodeParams: NodeParams) extends Actor with ActorLogging {
context.system.eventStream.subscribe(self, classOf[PaymentEvent])
context.system.eventStream.subscribe(self, classOf[NetworkFeePaid])
context.system.eventStream.subscribe(self, classOf[AvailableBalanceChanged])
context.system.eventStream.subscribe(self, classOf[ChannelErrorOccured])
context.system.eventStream.subscribe(self, classOf[ChannelStateChanged])
context.system.eventStream.subscribe(self, classOf[ChannelClosed])
@ -49,6 +51,8 @@ class Auditor(nodeParams: NodeParams) extends Actor with ActorLogging {
case e: AvailableBalanceChanged => balanceEventThrottler ! e
case e: ChannelErrorOccured => db.add(e)
case e: ChannelStateChanged =>
e match {
case ChannelStateChanged(_, _, remoteNodeId, WAIT_FOR_FUNDING_LOCKED, NORMAL, d: DATA_NORMAL) =>

View file

@ -17,6 +17,7 @@
package fr.acinq.eclair.wire
import java.net.{Inet4Address, Inet6Address, InetAddress, InetSocketAddress}
import java.nio.charset.StandardCharsets
import com.google.common.base.Charsets
import fr.acinq.bitcoin.ByteVector32
@ -47,7 +48,9 @@ case class Init(globalFeatures: ByteVector,
localFeatures: ByteVector) extends SetupMessage
case class Error(channelId: ByteVector32,
data: ByteVector) extends SetupMessage with HasChannelId
data: ByteVector) extends SetupMessage with HasChannelId {
def toAscii: String = if (fr.acinq.eclair.isAsciiPrintable(data)) new String(data.toArray, StandardCharsets.US_ASCII) else "n/a"
}
object Error {
def apply(channelId: ByteVector32, msg: String): Error = Error(channelId, ByteVector.view(msg.getBytes(Charsets.US_ASCII)))

View file

@ -19,13 +19,15 @@ package fr.acinq.eclair.db
import java.util.UUID
import fr.acinq.bitcoin.{MilliSatoshi, Satoshi, Transaction}
import fr.acinq.eclair.channel.{AvailableBalanceChanged, NetworkFeePaid}
import fr.acinq.eclair.channel.Channel.{LocalError, RemoteError}
import fr.acinq.eclair.channel.{AvailableBalanceChanged, ChannelErrorOccured, NetworkFeePaid}
import fr.acinq.eclair.db.sqlite.SqliteAuditDb
import fr.acinq.eclair.db.sqlite.SqliteUtils.{ExtendedResultSet, getVersion, using}
import fr.acinq.eclair.db.sqlite.SqliteUtils.{getVersion, using}
import fr.acinq.eclair.payment.{PaymentReceived, PaymentRelayed, PaymentSent}
import fr.acinq.eclair.wire.ChannelCodecs
import fr.acinq.eclair.{ShortChannelId, TestConstants, randomBytes32, randomKey}
import fr.acinq.eclair._
import org.scalatest.FunSuite
import scala.compat.Platform
@ -49,6 +51,8 @@ class SqliteAuditDbSpec extends FunSuite {
val e6 = PaymentSent(ChannelCodecs.UNKNOWN_UUID, MilliSatoshi(42000), MilliSatoshi(1000), randomBytes32, randomBytes32, randomBytes32, timestamp = Platform.currentTime * 2)
val e7 = AvailableBalanceChanged(null, randomBytes32, ShortChannelId(500000, 42, 1), 456123000, ChannelStateSpec.commitments)
val e8 = ChannelLifecycleEvent(randomBytes32, randomKey.publicKey, 456123000, true, false, "mutual")
val e9 = ChannelErrorOccured(null, randomBytes32, randomKey.publicKey, null, LocalError(new RuntimeException("oops")), true)
val e10 = ChannelErrorOccured(null, randomBytes32, randomKey.publicKey, null, RemoteError(wire.Error(randomBytes32, "remote oops")), true)
db.add(e1)
db.add(e2)
@ -58,6 +62,8 @@ class SqliteAuditDbSpec extends FunSuite {
db.add(e6)
db.add(e7)
db.add(e8)
db.add(e9)
db.add(e10)
assert(db.listSent(from = 0L, to = Long.MaxValue).toSet === Set(e1, e5, e6))
assert(db.listSent(from = 100000L, to = Platform.currentTime + 1).toList === List(e1))
@ -96,9 +102,8 @@ class SqliteAuditDbSpec extends FunSuite {
))
}
test("handle migration version 1 -> 2") {
test("handle migration version 1 -> 3") {
import ExtendedResultSet._
val connection = TestConstants.sqliteInMemory()
// simulate existing previous version db
@ -120,12 +125,14 @@ class SqliteAuditDbSpec extends FunSuite {
}
using(connection.createStatement()) { statement =>
assert(getVersion(statement, "audit", 1) == 1) // version 1 is deployed now
assert(getVersion(statement, "audit", 3) == 1) // we expect version 1
}
val ps = PaymentSent(UUID.randomUUID(), MilliSatoshi(42000), MilliSatoshi(1000), randomBytes32, randomBytes32, randomBytes32)
val ps1 = PaymentSent(UUID.randomUUID(), MilliSatoshi(42001), MilliSatoshi(1001), randomBytes32, randomBytes32, randomBytes32)
val ps2 = PaymentSent(UUID.randomUUID(), MilliSatoshi(42002), MilliSatoshi(1002), randomBytes32, randomBytes32, randomBytes32)
val e1 = ChannelErrorOccured(null, randomBytes32, randomKey.publicKey, null, LocalError(new RuntimeException("oops")), true)
val e2 = ChannelErrorOccured(null, randomBytes32, randomKey.publicKey, null, RemoteError(wire.Error(randomBytes32, "remote oops")), true)
// add a row (no ID on sent)
using(connection.prepareStatement("INSERT INTO sent VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
@ -141,7 +148,7 @@ class SqliteAuditDbSpec extends FunSuite {
val migratedDb = new SqliteAuditDb(connection)
using(connection.createStatement()) { statement =>
assert(getVersion(statement, "audit", 1) == 2) // version changed from 1 -> 2
assert(getVersion(statement, "audit", 3) == 3) // version changed from 1 -> 3
}
// existing rows will use 00000000-0000-0000-0000-000000000000 as default
@ -150,14 +157,62 @@ class SqliteAuditDbSpec extends FunSuite {
val postMigrationDb = new SqliteAuditDb(connection)
using(connection.createStatement()) { statement =>
assert(getVersion(statement, "audit", 2) == 2) // version 2
assert(getVersion(statement, "audit", 3) == 3) // version 3
}
postMigrationDb.add(ps1)
postMigrationDb.add(ps2)
postMigrationDb.add(e1)
postMigrationDb.add(e2)
// the old record will have the UNKNOWN_UUID but the new ones will have their actual id
assert(postMigrationDb.listSent(0, Long.MaxValue) == Seq(ps.copy(id = ChannelCodecs.UNKNOWN_UUID), ps1, ps2))
}
test("handle migration version 2 -> 3") {
val connection = TestConstants.sqliteInMemory()
// simulate existing previous version db
using(connection.createStatement()) { statement =>
getVersion(statement, "audit", 2)
statement.executeUpdate("CREATE TABLE IF NOT EXISTS balance_updated (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, amount_msat INTEGER NOT NULL, capacity_sat INTEGER NOT NULL, reserve_sat INTEGER NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, payment_preimage BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL, id BLOB NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS received (amount_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, from_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (amount_in_msat INTEGER NOT NULL, amount_out_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, from_channel_id BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS network_fees (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, tx_id BLOB NOT NULL, fee_sat INTEGER NOT NULL, tx_type TEXT NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_events (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, capacity_sat INTEGER NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event STRING NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS balance_updated_idx ON balance_updated(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS sent_timestamp_idx ON sent(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS received_timestamp_idx ON received(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_timestamp_idx ON relayed(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS network_fees_timestamp_idx ON network_fees(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_events_timestamp_idx ON channel_events(timestamp)")
}
using(connection.createStatement()) { statement =>
assert(getVersion(statement, "audit", 3) == 2) // version 2 is deployed now
}
val e1 = ChannelErrorOccured(null, randomBytes32, randomKey.publicKey, null, LocalError(new RuntimeException("oops")), true)
val e2 = ChannelErrorOccured(null, randomBytes32, randomKey.publicKey, null, RemoteError(wire.Error(randomBytes32, "remote oops")), true)
val migratedDb = new SqliteAuditDb(connection)
using(connection.createStatement()) { statement =>
assert(getVersion(statement, "audit", 3) == 3) // version changed from 2 -> 3
}
migratedDb.add(e1)
val postMigrationDb = new SqliteAuditDb(connection)
using(connection.createStatement()) { statement =>
assert(getVersion(statement, "audit", 3) == 3) // version 3
}
postMigrationDb.add(e2)
}
}