1
0
mirror of https://github.com/ACINQ/eclair.git synced 2024-11-20 10:39:19 +01:00

added balance_updated and channel_events tables

This commit is contained in:
pm47 2019-01-10 19:42:05 +01:00
parent 52f671384e
commit 7af05b3eee
No known key found for this signature in database
GPG Key ID: E434ED292E85643A
7 changed files with 61 additions and 10 deletions

View File

@ -633,7 +633,7 @@ class Channel(val nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: Pu
self ! TickRefreshChannelUpdate
}
context.system.eventStream.publish(ChannelSignatureSent(self, commitments1))
context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, nextRemoteCommit.spec.toRemoteMsat)) // note that remoteCommit.toRemote == toLocal
context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, nextRemoteCommit.spec.toRemoteMsat, commitments1)) // note that remoteCommit.toRemote == toLocal
// we expect a quick response from our peer
setTimer(RevocationTimeout.toString, RevocationTimeout(commitments1.remoteCommit.index, peer = context.parent), timeout = nodeParams.revocationTimeout, repeat = false)
handleCommandSuccess(sender, store(d.copy(commitments = commitments1))) sending commit
@ -1249,6 +1249,7 @@ class Channel(val nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: Pu
closeType_opt match {
case Some(closeType) =>
log.info(s"channel closed (type=$closeType)")
context.system.eventStream.publish(ChannelClosed(self, d.channelId, closeType, d.commitments))
goto(CLOSED) using store(d1)
case None =>
stay using store(d1)

View File

@ -52,8 +52,10 @@ case class ChannelFailed(channel: ActorRef, channelId: BinaryData, remoteNodeId:
case class NetworkFeePaid(channel: ActorRef, remoteNodeId: PublicKey, channelId: BinaryData, tx: Transaction, fee: Satoshi, txType: String) extends ChannelEvent
// NB: this event is only sent when the channel is available
case class AvailableBalanceChanged(channel: ActorRef, channelId: BinaryData, shortChannelId: ShortChannelId, localBalanceMsat: Long) extends ChannelEvent
case class AvailableBalanceChanged(channel: ActorRef, channelId: BinaryData, shortChannelId: ShortChannelId, localBalanceMsat: Long, commitments: Commitments) extends ChannelEvent
case class ChannelPersisted(channel: ActorRef, remoteNodeId: PublicKey, channelId: BinaryData, data: Data) extends ChannelEvent
case class LocalCommitConfirmed(channel: ActorRef, remoteNodeId: PublicKey, channelId: BinaryData, refundAtBlock: Long) extends ChannelEvent
case class ChannelClosed(channel: ActorRef, channelId: BinaryData, closeType: String, commitments: Commitments)

View File

@ -18,11 +18,15 @@ package fr.acinq.eclair.db
import fr.acinq.bitcoin.BinaryData
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.channel.NetworkFeePaid
import fr.acinq.eclair.channel._
import fr.acinq.eclair.payment.{PaymentReceived, PaymentRelayed, PaymentSent}
trait AuditDb {
def add(availableBalanceChanged: AvailableBalanceChanged)
def add(channelLifecycle: ChannelLifecycleEvent)
def add(paymentSent: PaymentSent)
def add(paymentReceived: PaymentReceived)
@ -45,6 +49,8 @@ trait AuditDb {
}
case class ChannelLifecycleEvent(channelId: BinaryData, remoteNodeId: PublicKey, capacitySat: Long, isFunder: Boolean, isPrivate: Boolean, event: String)
case class NetworkFee(remoteNodeId: PublicKey, channelId: BinaryData, txId: BinaryData, feeSat: Long, txType: String, timestamp: Long)
case class Stats(channelId: BinaryData, avgPaymentAmountSatoshi: Long, paymentCount: Int, relayFeeSatoshi: Long, networkFeeSatoshi: Long)

View File

@ -19,9 +19,9 @@ package fr.acinq.eclair.db.sqlite
import java.sql.Connection
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{BinaryData, MilliSatoshi, Satoshi}
import fr.acinq.eclair.channel.NetworkFeePaid
import fr.acinq.eclair.db.{AuditDb, NetworkFee, Stats}
import fr.acinq.bitcoin.{BinaryData, MilliSatoshi}
import fr.acinq.eclair.channel.{AvailableBalanceChanged, ChannelClosed, ChannelCreated, NetworkFeePaid}
import fr.acinq.eclair.db.{AuditDb, ChannelLifecycleEvent, NetworkFee, Stats}
import fr.acinq.eclair.payment.{PaymentReceived, PaymentRelayed, PaymentSent}
import scala.collection.immutable.Queue
@ -36,17 +36,44 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb {
using(sqlite.createStatement()) { statement =>
require(getVersion(statement, DB_NAME, CURRENT_VERSION) == CURRENT_VERSION) // there is only one version currently deployed
statement.executeUpdate("CREATE TABLE IF NOT EXISTS balance_updated (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, amount_msat INTEGER NOT NULL, capacity_sat INTEGER NOT NULL, reserve_sat INTEGER NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, payment_preimage BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS received (amount_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, from_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (amount_in_msat INTEGER NOT NULL, amount_out_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, from_channel_id BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS network_fees (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, tx_id BLOB NOT NULL, fee_sat INTEGER NOT NULL, tx_type TEXT NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_events (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, capacity_sat INTEGER NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event STRING NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS balance_updated_idx ON balance_updated(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS sent_timestamp_idx ON sent(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS received_timestamp_idx ON received(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_timestamp_idx ON relayed(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS network_fees_timestamp_idx ON network_fees(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_events_timestamp_idx ON channel_events(timestamp)")
}
override def add(e: AvailableBalanceChanged): Unit =
using(sqlite.prepareStatement("INSERT INTO balance_updated VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
statement.setBytes(1, e.channelId)
statement.setBytes(2, e.commitments.remoteParams.nodeId.toBin)
statement.setLong(3, e.localBalanceMsat)
statement.setLong(4, e.commitments.commitInput.txOut.amount.toLong)
statement.setLong(5, e.commitments.remoteParams.channelReserveSatoshis) // remote decides what our reserve should be
statement.setLong(6, Platform.currentTime)
statement.executeUpdate()
}
override def add(e: ChannelLifecycleEvent): Unit =
using(sqlite.prepareStatement("INSERT INTO channel_events VALUES (?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setBytes(1, e.channelId)
statement.setBytes(2, e.remoteNodeId.toBin)
statement.setLong(3, e.capacitySat)
statement.setBoolean(4, e.isFunder)
statement.setBoolean(5, e.isPrivate)
statement.setString(6, e.event)
statement.setLong(7, Platform.currentTime)
statement.executeUpdate()
}
override def add(e: PaymentSent): Unit =
using(sqlite.prepareStatement("INSERT INTO sent VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
statement.setLong(1, e.amount.toLong)
@ -203,4 +230,5 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb {
}
override def close(): Unit = sqlite.close()
}

View File

@ -18,7 +18,8 @@ package fr.acinq.eclair.payment
import akka.actor.{Actor, ActorLogging, Props}
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.channel.NetworkFeePaid
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.ChannelLifecycleEvent
class Auditor(nodeParams: NodeParams) extends Actor with ActorLogging {
@ -26,6 +27,9 @@ class Auditor(nodeParams: NodeParams) extends Actor with ActorLogging {
context.system.eventStream.subscribe(self, classOf[PaymentEvent])
context.system.eventStream.subscribe(self, classOf[NetworkFeePaid])
context.system.eventStream.subscribe(self, classOf[AvailableBalanceChanged])
context.system.eventStream.subscribe(self, classOf[ChannelStateChanged])
context.system.eventStream.subscribe(self, classOf[ChannelClosed])
override def receive: Receive = {
@ -37,6 +41,12 @@ class Auditor(nodeParams: NodeParams) extends Actor with ActorLogging {
case e: NetworkFeePaid => db.add(e)
case ChannelStateChanged(_, _, remoteNodeId, WAIT_FOR_FUNDING_LOCKED, NORMAL, d: DATA_NORMAL) =>
db.add(ChannelLifecycleEvent(d.channelId, remoteNodeId, d.commitments.commitInput.txOut.amount.toLong, d.commitments.localParams.isFunder, !d.commitments.announceChannel, "created"))
case e: ChannelClosed =>
db.add(ChannelLifecycleEvent(e.channelId, e.commitments.remoteParams.nodeId, e.commitments.commitInput.txOut.amount.toLong, e.commitments.localParams.isFunder, !e.commitments.announceChannel, e.closeType))
}
override def unhandled(message: Any): Unit = log.warning(s"unhandled msg=$message")

View File

@ -77,7 +77,7 @@ class Relayer(nodeParams: NodeParams, register: ActorRef, paymentHandler: ActorR
log.debug(s"removed local channel info for channelId=$channelId shortChannelId=$shortChannelId")
context become main(channelUpdates - shortChannelId, node2channels.removeBinding(remoteNodeId, shortChannelId))
case AvailableBalanceChanged(_, _, shortChannelId, localBalanceMsat) =>
case AvailableBalanceChanged(_, _, shortChannelId, localBalanceMsat, _) =>
val channelUpdates1 = channelUpdates.get(shortChannelId) match {
case Some(c: OutgoingChannel) => channelUpdates + (shortChannelId -> c.copy(availableBalanceMsat = localBalanceMsat))
case None => channelUpdates // we only consider the balance if we have the channel_update

View File

@ -19,10 +19,10 @@ package fr.acinq.eclair.db
import java.sql.DriverManager
import fr.acinq.bitcoin.{MilliSatoshi, Satoshi, Transaction}
import fr.acinq.eclair.channel.NetworkFeePaid
import fr.acinq.eclair.channel.{AvailableBalanceChanged, NetworkFeePaid}
import fr.acinq.eclair.db.sqlite.SqliteAuditDb
import fr.acinq.eclair.payment.{PaymentReceived, PaymentRelayed, PaymentSent}
import fr.acinq.eclair.{randomBytes, randomKey}
import fr.acinq.eclair.{ShortChannelId, randomBytes, randomKey}
import org.scalatest.FunSuite
import scala.compat.Platform
@ -48,6 +48,8 @@ class SqliteAuditDbSpec extends FunSuite {
val e4 = NetworkFeePaid(null, randomKey.publicKey, randomBytes(32), Transaction(0, Seq.empty, Seq.empty, 0), Satoshi(42), "mutual")
val e5 = PaymentSent(MilliSatoshi(42000), MilliSatoshi(1000), randomBytes(32), randomBytes(32), randomBytes(32), timestamp = 0)
val e6 = PaymentSent(MilliSatoshi(42000), MilliSatoshi(1000), randomBytes(32), randomBytes(32), randomBytes(32), timestamp = Platform.currentTime * 2)
val e7 = AvailableBalanceChanged(null, randomBytes(32), ShortChannelId(500000, 42, 1), 456123000, ChannelStateSpec.commitments)
val e8 = ChannelLifecycleEvent(randomBytes(32), randomKey.publicKey, 456123000, true, false, "mutual")
db.add(e1)
db.add(e2)
@ -55,6 +57,8 @@ class SqliteAuditDbSpec extends FunSuite {
db.add(e4)
db.add(e5)
db.add(e6)
db.add(e7)
db.add(e8)
assert(db.listSent(from = 0L, to = Long.MaxValue).toSet === Set(e1, e5, e6))
assert(db.listSent(from = 100000L, to = Platform.currentTime + 1).toList === List(e1))