mirror of
https://github.com/ACINQ/eclair.git
synced 2025-02-22 14:22:39 +01:00
Rework database initialization (#911)
* Initialize the database outside the node param constructor * Do not create folders during StartupSpec * Simplify syntax for instantiating test Databases * Rework parameter passing to database initialization * Force UTF-8 file encoding on all platform.
This commit is contained in:
parent
3b7afd92d4
commit
06b2337ed9
19 changed files with 114 additions and 109 deletions
|
@ -28,7 +28,7 @@ object DBCompatChecker extends Logging {
|
||||||
* @param nodeParams
|
* @param nodeParams
|
||||||
*/
|
*/
|
||||||
def checkDBCompatibility(nodeParams: NodeParams): Unit =
|
def checkDBCompatibility(nodeParams: NodeParams): Unit =
|
||||||
Try(nodeParams.channelsDb.listChannels()) match {
|
Try(nodeParams.db.channels.listLocalChannels()) match {
|
||||||
case Success(_) => {}
|
case Success(_) => {}
|
||||||
case Failure(_) => throw IncompatibleDBException
|
case Failure(_) => throw IncompatibleDBException
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,7 @@ object DBCompatChecker extends Logging {
|
||||||
* @param nodeParams
|
* @param nodeParams
|
||||||
*/
|
*/
|
||||||
def checkNetworkDBCompatibility(nodeParams: NodeParams): Unit =
|
def checkNetworkDBCompatibility(nodeParams: NodeParams): Unit =
|
||||||
Try(nodeParams.networkDb.listChannels(), nodeParams.networkDb.listNodes(), nodeParams.networkDb.listChannelUpdates()) match {
|
Try(nodeParams.db.network.listChannels(), nodeParams.db.network.listNodes(), nodeParams.db.network.listChannelUpdates()) match {
|
||||||
case Success(_) => {}
|
case Success(_) => {}
|
||||||
case Failure(_) => throw IncompatibleNetworkDBException
|
case Failure(_) => throw IncompatibleNetworkDBException
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,12 +61,7 @@ case class NodeParams(keyManager: KeyManager,
|
||||||
feeProportionalMillionth: Int,
|
feeProportionalMillionth: Int,
|
||||||
reserveToFundingRatio: Double,
|
reserveToFundingRatio: Double,
|
||||||
maxReserveToFundingRatio: Double,
|
maxReserveToFundingRatio: Double,
|
||||||
channelsDb: ChannelsDb,
|
db: Databases,
|
||||||
peersDb: PeersDb,
|
|
||||||
networkDb: NetworkDb,
|
|
||||||
pendingRelayDb: PendingRelayDb,
|
|
||||||
paymentsDb: PaymentsDb,
|
|
||||||
auditDb: AuditDb,
|
|
||||||
revocationTimeout: FiniteDuration,
|
revocationTimeout: FiniteDuration,
|
||||||
pingInterval: FiniteDuration,
|
pingInterval: FiniteDuration,
|
||||||
pingTimeout: FiniteDuration,
|
pingTimeout: FiniteDuration,
|
||||||
|
@ -129,29 +124,11 @@ object NodeParams {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def makeNodeParams(datadir: File, config: Config, keyManager: KeyManager, torAddress_opt: Option[NodeAddress]): NodeParams = {
|
def makeNodeParams(config: Config, keyManager: KeyManager, torAddress_opt: Option[NodeAddress], database: Databases): NodeParams = {
|
||||||
|
|
||||||
datadir.mkdirs()
|
|
||||||
|
|
||||||
val chain = config.getString("chain")
|
val chain = config.getString("chain")
|
||||||
val chainHash = makeChainHash(chain)
|
val chainHash = makeChainHash(chain)
|
||||||
|
|
||||||
val chaindir = new File(datadir, chain)
|
|
||||||
chaindir.mkdir()
|
|
||||||
|
|
||||||
val sqlite = DriverManager.getConnection(s"jdbc:sqlite:${new File(chaindir, "eclair.sqlite")}")
|
|
||||||
SqliteUtils.obtainExclusiveLock(sqlite) // there should only be one process writing to this file
|
|
||||||
val channelsDb = new SqliteChannelsDb(sqlite)
|
|
||||||
val peersDb = new SqlitePeersDb(sqlite)
|
|
||||||
val pendingRelayDb = new SqlitePendingRelayDb(sqlite)
|
|
||||||
val paymentsDb = new SqlitePaymentsDb(sqlite)
|
|
||||||
|
|
||||||
val sqliteNetwork = DriverManager.getConnection(s"jdbc:sqlite:${new File(chaindir, "network.sqlite")}")
|
|
||||||
val networkDb = new SqliteNetworkDb(sqliteNetwork)
|
|
||||||
|
|
||||||
val sqliteAudit = DriverManager.getConnection(s"jdbc:sqlite:${new File(chaindir, "audit.sqlite")}")
|
|
||||||
val auditDb = new SqliteAuditDb(sqliteAudit)
|
|
||||||
|
|
||||||
val color = ByteVector.fromValidHex(config.getString("node-color"))
|
val color = ByteVector.fromValidHex(config.getString("node-color"))
|
||||||
require(color.size == 3, "color should be a 3-bytes hex buffer")
|
require(color.size == 3, "color should be a 3-bytes hex buffer")
|
||||||
|
|
||||||
|
@ -220,12 +197,7 @@ object NodeParams {
|
||||||
feeProportionalMillionth = config.getInt("fee-proportional-millionths"),
|
feeProportionalMillionth = config.getInt("fee-proportional-millionths"),
|
||||||
reserveToFundingRatio = config.getDouble("reserve-to-funding-ratio"),
|
reserveToFundingRatio = config.getDouble("reserve-to-funding-ratio"),
|
||||||
maxReserveToFundingRatio = config.getDouble("max-reserve-to-funding-ratio"),
|
maxReserveToFundingRatio = config.getDouble("max-reserve-to-funding-ratio"),
|
||||||
channelsDb = channelsDb,
|
db = database,
|
||||||
peersDb = peersDb,
|
|
||||||
networkDb = networkDb,
|
|
||||||
pendingRelayDb = pendingRelayDb,
|
|
||||||
paymentsDb = paymentsDb,
|
|
||||||
auditDb = auditDb,
|
|
||||||
revocationTimeout = FiniteDuration(config.getDuration("revocation-timeout").getSeconds, TimeUnit.SECONDS),
|
revocationTimeout = FiniteDuration(config.getDuration("revocation-timeout").getSeconds, TimeUnit.SECONDS),
|
||||||
pingInterval = FiniteDuration(config.getDuration("ping-interval").getSeconds, TimeUnit.SECONDS),
|
pingInterval = FiniteDuration(config.getDuration("ping-interval").getSeconds, TimeUnit.SECONDS),
|
||||||
pingTimeout = FiniteDuration(config.getDuration("ping-timeout").getSeconds, TimeUnit.SECONDS),
|
pingTimeout = FiniteDuration(config.getDuration("ping-timeout").getSeconds, TimeUnit.SECONDS),
|
||||||
|
|
|
@ -43,6 +43,7 @@ import fr.acinq.eclair.blockchain.fee.{ConstantFeeProvider, _}
|
||||||
import fr.acinq.eclair.blockchain.{EclairWallet, _}
|
import fr.acinq.eclair.blockchain.{EclairWallet, _}
|
||||||
import fr.acinq.eclair.channel.Register
|
import fr.acinq.eclair.channel.Register
|
||||||
import fr.acinq.eclair.crypto.LocalKeyManager
|
import fr.acinq.eclair.crypto.LocalKeyManager
|
||||||
|
import fr.acinq.eclair.db.Databases
|
||||||
import fr.acinq.eclair.io.{Authenticator, Server, Switchboard}
|
import fr.acinq.eclair.io.{Authenticator, Server, Switchboard}
|
||||||
import fr.acinq.eclair.payment._
|
import fr.acinq.eclair.payment._
|
||||||
import fr.acinq.eclair.router._
|
import fr.acinq.eclair.router._
|
||||||
|
@ -67,7 +68,14 @@ import scala.concurrent.duration._
|
||||||
*/
|
*/
|
||||||
class Setup(datadir: File,
|
class Setup(datadir: File,
|
||||||
overrideDefaults: Config = ConfigFactory.empty(),
|
overrideDefaults: Config = ConfigFactory.empty(),
|
||||||
seed_opt: Option[ByteVector] = None)(implicit system: ActorSystem) extends Logging {
|
seed_opt: Option[ByteVector] = None,
|
||||||
|
db: Option[Databases] = None)(implicit system: ActorSystem) extends Logging {
|
||||||
|
|
||||||
|
implicit val materializer = ActorMaterializer()
|
||||||
|
implicit val timeout = Timeout(30 seconds)
|
||||||
|
implicit val formats = org.json4s.DefaultFormats
|
||||||
|
implicit val ec = ExecutionContext.Implicits.global
|
||||||
|
implicit val sttpBackend = OkHttpFutureBackend()
|
||||||
|
|
||||||
logger.info(s"hello!")
|
logger.info(s"hello!")
|
||||||
logger.info(s"version=${getClass.getPackage.getImplementationVersion} commit=${getClass.getPackage.getSpecificationVersion}")
|
logger.info(s"version=${getClass.getPackage.getImplementationVersion} commit=${getClass.getPackage.getSpecificationVersion}")
|
||||||
|
@ -76,17 +84,18 @@ class Setup(datadir: File,
|
||||||
// this will force the secure random instance to initialize itself right now, making sure it doesn't hang later (see comment in package.scala)
|
// this will force the secure random instance to initialize itself right now, making sure it doesn't hang later (see comment in package.scala)
|
||||||
secureRandom.nextInt()
|
secureRandom.nextInt()
|
||||||
|
|
||||||
|
datadir.mkdirs()
|
||||||
val config = NodeParams.loadConfiguration(datadir, overrideDefaults)
|
val config = NodeParams.loadConfiguration(datadir, overrideDefaults)
|
||||||
val seed = seed_opt.getOrElse(NodeParams.getSeed(datadir))
|
val seed = seed_opt.getOrElse(NodeParams.getSeed(datadir))
|
||||||
val chain = config.getString("chain")
|
val chain = config.getString("chain")
|
||||||
val keyManager = new LocalKeyManager(seed, NodeParams.makeChainHash(chain))
|
val keyManager = new LocalKeyManager(seed, NodeParams.makeChainHash(chain))
|
||||||
implicit val materializer = ActorMaterializer()
|
|
||||||
implicit val timeout = Timeout(30 seconds)
|
|
||||||
implicit val formats = org.json4s.DefaultFormats
|
|
||||||
implicit val ec = ExecutionContext.Implicits.global
|
|
||||||
implicit val sttpBackend = OkHttpFutureBackend()
|
|
||||||
|
|
||||||
val nodeParams = NodeParams.makeNodeParams(datadir, config, keyManager, initTor())
|
val database = db match {
|
||||||
|
case Some(d) => d
|
||||||
|
case None => Databases.sqliteJDBC(new File(datadir, chain))
|
||||||
|
}
|
||||||
|
|
||||||
|
val nodeParams = NodeParams.makeNodeParams(config, keyManager, initTor(), database)
|
||||||
|
|
||||||
val serverBindingAddress = new InetSocketAddress(
|
val serverBindingAddress = new InetSocketAddress(
|
||||||
config.getString("server.binding-ip"),
|
config.getString("server.binding-ip"),
|
||||||
|
|
|
@ -321,9 +321,9 @@ trait Service extends Logging {
|
||||||
case _ => (0L, Long.MaxValue)
|
case _ => (0L, Long.MaxValue)
|
||||||
}
|
}
|
||||||
completeRpcFuture(req.id, Future(AuditResponse(
|
completeRpcFuture(req.id, Future(AuditResponse(
|
||||||
sent = nodeParams.auditDb.listSent(from, to),
|
sent = nodeParams.db.audit.listSent(from, to),
|
||||||
received = nodeParams.auditDb.listReceived(from, to),
|
received = nodeParams.db.audit.listReceived(from, to),
|
||||||
relayed = nodeParams.auditDb.listRelayed(from, to))
|
relayed = nodeParams.db.audit.listRelayed(from, to))
|
||||||
))
|
))
|
||||||
|
|
||||||
case "networkfees" =>
|
case "networkfees" =>
|
||||||
|
@ -331,10 +331,10 @@ trait Service extends Logging {
|
||||||
case JInt(from) :: JInt(to) :: Nil => (from.toLong, to.toLong)
|
case JInt(from) :: JInt(to) :: Nil => (from.toLong, to.toLong)
|
||||||
case _ => (0L, Long.MaxValue)
|
case _ => (0L, Long.MaxValue)
|
||||||
}
|
}
|
||||||
completeRpcFuture(req.id, Future(nodeParams.auditDb.listNetworkFees(from, to)))
|
completeRpcFuture(req.id, Future(nodeParams.db.audit.listNetworkFees(from, to)))
|
||||||
|
|
||||||
// retrieve fee stats
|
// retrieve fee stats
|
||||||
case "channelstats" => completeRpcFuture(req.id, Future(nodeParams.auditDb.stats))
|
case "channelstats" => completeRpcFuture(req.id, Future(nodeParams.db.audit.stats))
|
||||||
|
|
||||||
|
|
||||||
// method name was not found
|
// method name was not found
|
||||||
|
|
|
@ -645,7 +645,7 @@ class Channel(val nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: Pu
|
||||||
trimmedHtlcs collect {
|
trimmedHtlcs collect {
|
||||||
case DirectedHtlc(_, u) =>
|
case DirectedHtlc(_, u) =>
|
||||||
log.info(s"adding paymentHash=${u.paymentHash} cltvExpiry=${u.cltvExpiry} to htlcs db for commitNumber=$nextCommitNumber")
|
log.info(s"adding paymentHash=${u.paymentHash} cltvExpiry=${u.cltvExpiry} to htlcs db for commitNumber=$nextCommitNumber")
|
||||||
nodeParams.channelsDb.addOrUpdateHtlcInfo(d.channelId, nextCommitNumber, u.paymentHash, u.cltvExpiry)
|
nodeParams.db.channels.addOrUpdateHtlcInfo(d.channelId, nextCommitNumber, u.paymentHash, u.cltvExpiry)
|
||||||
}
|
}
|
||||||
if (!Helpers.aboveReserve(d.commitments) && Helpers.aboveReserve(commitments1)) {
|
if (!Helpers.aboveReserve(d.commitments) && Helpers.aboveReserve(commitments1)) {
|
||||||
// we just went above reserve (can't go below), let's refresh our channel_update to enable/disable it accordingly
|
// we just went above reserve (can't go below), let's refresh our channel_update to enable/disable it accordingly
|
||||||
|
@ -1300,7 +1300,7 @@ class Channel(val nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: Pu
|
||||||
stateData match {
|
stateData match {
|
||||||
case d: HasCommitments =>
|
case d: HasCommitments =>
|
||||||
log.info(s"deleting database record for channelId=${d.channelId}")
|
log.info(s"deleting database record for channelId=${d.channelId}")
|
||||||
nodeParams.channelsDb.removeChannel(d.channelId)
|
nodeParams.db.channels.removeChannel(d.channelId)
|
||||||
case _ => {}
|
case _ => {}
|
||||||
}
|
}
|
||||||
log.info("shutting down")
|
log.info("shutting down")
|
||||||
|
@ -1895,7 +1895,7 @@ class Channel(val nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: Pu
|
||||||
def handleRemoteSpentOther(tx: Transaction, d: HasCommitments) = {
|
def handleRemoteSpentOther(tx: Transaction, d: HasCommitments) = {
|
||||||
log.warning(s"funding tx spent in txid=${tx.txid}")
|
log.warning(s"funding tx spent in txid=${tx.txid}")
|
||||||
|
|
||||||
Helpers.Closing.claimRevokedRemoteCommitTxOutputs(keyManager, d.commitments, tx, nodeParams.channelsDb) match {
|
Helpers.Closing.claimRevokedRemoteCommitTxOutputs(keyManager, d.commitments, tx, nodeParams.db.channels) match {
|
||||||
case Some(revokedCommitPublished) =>
|
case Some(revokedCommitPublished) =>
|
||||||
log.warning(s"txid=${tx.txid} was a revoked commitment, publishing the penalty tx")
|
log.warning(s"txid=${tx.txid} was a revoked commitment, publishing the penalty tx")
|
||||||
val exc = FundingTxSpent(d.channelId, tx)
|
val exc = FundingTxSpent(d.channelId, tx)
|
||||||
|
@ -2053,7 +2053,7 @@ class Channel(val nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: Pu
|
||||||
|
|
||||||
def store[T](d: T)(implicit tp: T <:< HasCommitments): T = {
|
def store[T](d: T)(implicit tp: T <:< HasCommitments): T = {
|
||||||
log.debug(s"updating database record for channelId={}", d.channelId)
|
log.debug(s"updating database record for channelId={}", d.channelId)
|
||||||
nodeParams.channelsDb.addOrUpdateChannel(d)
|
nodeParams.db.channels.addOrUpdateChannel(d)
|
||||||
context.system.eventStream.publish(ChannelPersisted(self, remoteNodeId, d.channelId, d))
|
context.system.eventStream.publish(ChannelPersisted(self, remoteNodeId, d.channelId, d))
|
||||||
d
|
d
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ trait ChannelsDb {
|
||||||
|
|
||||||
def removeChannel(channelId: ByteVector32)
|
def removeChannel(channelId: ByteVector32)
|
||||||
|
|
||||||
def listChannels(): Seq[HasCommitments]
|
def listLocalChannels(): Seq[HasCommitments]
|
||||||
|
|
||||||
def addOrUpdateHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: Long)
|
def addOrUpdateHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: Long)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,50 @@
|
||||||
|
package fr.acinq.eclair.db
|
||||||
|
|
||||||
|
import java.io.File
|
||||||
|
import java.sql.{Connection, DriverManager}
|
||||||
|
|
||||||
|
import fr.acinq.eclair.db.sqlite._
|
||||||
|
|
||||||
|
trait Databases {
|
||||||
|
|
||||||
|
val network: NetworkDb
|
||||||
|
|
||||||
|
val audit: AuditDb
|
||||||
|
|
||||||
|
val channels: ChannelsDb
|
||||||
|
|
||||||
|
val peers: PeersDb
|
||||||
|
|
||||||
|
val payments: PaymentsDb
|
||||||
|
|
||||||
|
val pendingRelay: PendingRelayDb
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
object Databases {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a parent folder it creates or loads all the databases from a JDBC connection
|
||||||
|
* @param dbdir
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
def sqliteJDBC(dbdir: File): Databases = {
|
||||||
|
dbdir.mkdir()
|
||||||
|
val sqliteEclair = DriverManager.getConnection(s"jdbc:sqlite:${new File(dbdir, "eclair.sqlite")}")
|
||||||
|
val sqliteNetwork = DriverManager.getConnection(s"jdbc:sqlite:${new File(dbdir, "network.sqlite")}")
|
||||||
|
val sqliteAudit = DriverManager.getConnection(s"jdbc:sqlite:${new File(dbdir, "audit.sqlite")}")
|
||||||
|
SqliteUtils.obtainExclusiveLock(sqliteEclair) // there should only be one process writing to this file
|
||||||
|
|
||||||
|
databaseByConnections(sqliteAudit, sqliteNetwork, sqliteEclair)
|
||||||
|
}
|
||||||
|
|
||||||
|
def databaseByConnections(auditJdbc: Connection, networkJdbc: Connection, eclairJdbc: Connection) = new Databases {
|
||||||
|
override val network = new SqliteNetworkDb(networkJdbc)
|
||||||
|
override val audit = new SqliteAuditDb(auditJdbc)
|
||||||
|
override val channels = new SqliteChannelsDb(eclairJdbc)
|
||||||
|
override val peers = new SqlitePeersDb(eclairJdbc)
|
||||||
|
override val payments = new SqlitePaymentsDb(eclairJdbc)
|
||||||
|
override val pendingRelay = new SqlitePendingRelayDb(eclairJdbc)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -73,7 +73,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def listChannels(): Seq[HasCommitments] = {
|
override def listLocalChannels(): Seq[HasCommitments] = {
|
||||||
using(sqlite.createStatement) { statement =>
|
using(sqlite.createStatement) { statement =>
|
||||||
val rs = statement.executeQuery("SELECT data FROM local_channels")
|
val rs = statement.executeQuery("SELECT data FROM local_channels")
|
||||||
codecSequence(rs, stateDataCodec)
|
codecSequence(rs, stateDataCodec)
|
||||||
|
|
|
@ -95,7 +95,7 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
||||||
val address_opt = if (outgoing) {
|
val address_opt = if (outgoing) {
|
||||||
// we store the node address upon successful outgoing connection, so we can reconnect later
|
// we store the node address upon successful outgoing connection, so we can reconnect later
|
||||||
// any previous address is overwritten
|
// any previous address is overwritten
|
||||||
NodeAddress.fromParts(address.getHostString, address.getPort).map(nodeAddress => nodeParams.peersDb.addOrUpdatePeer(remoteNodeId, nodeAddress))
|
NodeAddress.fromParts(address.getHostString, address.getPort).map(nodeAddress => nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, nodeAddress))
|
||||||
Some(address)
|
Some(address)
|
||||||
} else None
|
} else None
|
||||||
|
|
||||||
|
@ -489,7 +489,7 @@ class Peer(nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: Actor
|
||||||
|
|
||||||
def stopPeer() = {
|
def stopPeer() = {
|
||||||
log.info("removing peer from db")
|
log.info("removing peer from db")
|
||||||
nodeParams.peersDb.removePeer(remoteNodeId)
|
nodeParams.db.peers.removePeer(remoteNodeId)
|
||||||
stop(FSM.Normal)
|
stop(FSM.Normal)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,8 +44,8 @@ class Switchboard(nodeParams: NodeParams, authenticator: ActorRef, watcher: Acto
|
||||||
|
|
||||||
// we load peers and channels from database
|
// we load peers and channels from database
|
||||||
{
|
{
|
||||||
val channels = nodeParams.channelsDb.listChannels()
|
val channels = nodeParams.db.channels.listLocalChannels()
|
||||||
val peers = nodeParams.peersDb.listPeers()
|
val peers = nodeParams.db.peers.listPeers()
|
||||||
|
|
||||||
checkBrokenHtlcsLink(channels, nodeParams.privateKey) match {
|
checkBrokenHtlcsLink(channels, nodeParams.privateKey) match {
|
||||||
case Nil => ()
|
case Nil => ()
|
||||||
|
|
|
@ -27,7 +27,7 @@ import scala.concurrent.duration._
|
||||||
|
|
||||||
class Auditor(nodeParams: NodeParams) extends Actor with ActorLogging {
|
class Auditor(nodeParams: NodeParams) extends Actor with ActorLogging {
|
||||||
|
|
||||||
val db = nodeParams.auditDb
|
val db = nodeParams.db.audit
|
||||||
|
|
||||||
context.system.eventStream.subscribe(self, classOf[PaymentEvent])
|
context.system.eventStream.subscribe(self, classOf[PaymentEvent])
|
||||||
context.system.eventStream.subscribe(self, classOf[NetworkFeePaid])
|
context.system.eventStream.subscribe(self, classOf[NetworkFeePaid])
|
||||||
|
|
|
@ -24,7 +24,7 @@ import fr.acinq.eclair.channel._
|
||||||
class CommandBuffer(nodeParams: NodeParams, register: ActorRef) extends Actor with ActorLogging {
|
class CommandBuffer(nodeParams: NodeParams, register: ActorRef) extends Actor with ActorLogging {
|
||||||
|
|
||||||
import CommandBuffer._
|
import CommandBuffer._
|
||||||
import nodeParams.pendingRelayDb
|
import nodeParams.db._
|
||||||
|
|
||||||
context.system.eventStream.subscribe(self, classOf[ChannelStateChanged])
|
context.system.eventStream.subscribe(self, classOf[ChannelStateChanged])
|
||||||
|
|
||||||
|
@ -34,17 +34,17 @@ class CommandBuffer(nodeParams: NodeParams, register: ActorRef) extends Actor wi
|
||||||
// save command in db
|
// save command in db
|
||||||
register forward Register.Forward(channelId, cmd)
|
register forward Register.Forward(channelId, cmd)
|
||||||
// we also store the preimage in a db (note that this happens *after* forwarding the fulfill to the channel, so we don't add latency)
|
// we also store the preimage in a db (note that this happens *after* forwarding the fulfill to the channel, so we don't add latency)
|
||||||
pendingRelayDb.addPendingRelay(channelId, htlcId, cmd)
|
pendingRelay.addPendingRelay(channelId, htlcId, cmd)
|
||||||
|
|
||||||
case CommandAck(channelId, htlcId) =>
|
case CommandAck(channelId, htlcId) =>
|
||||||
//delete from db
|
//delete from db
|
||||||
log.debug(s"fulfill/fail acked for channelId=$channelId htlcId=$htlcId")
|
log.debug(s"fulfill/fail acked for channelId=$channelId htlcId=$htlcId")
|
||||||
pendingRelayDb.removePendingRelay(channelId, htlcId)
|
pendingRelay.removePendingRelay(channelId, htlcId)
|
||||||
|
|
||||||
case ChannelStateChanged(channel, _, _, WAIT_FOR_INIT_INTERNAL | OFFLINE | SYNCING, NORMAL | SHUTDOWN | CLOSING, d: HasCommitments) =>
|
case ChannelStateChanged(channel, _, _, WAIT_FOR_INIT_INTERNAL | OFFLINE | SYNCING, NORMAL | SHUTDOWN | CLOSING, d: HasCommitments) =>
|
||||||
import d.channelId
|
import d.channelId
|
||||||
// if channel is in a state where it can have pending htlcs, we send them the fulfills we know of
|
// if channel is in a state where it can have pending htlcs, we send them the fulfills we know of
|
||||||
pendingRelayDb.listPendingRelay(channelId) match {
|
pendingRelay.listPendingRelay(channelId) match {
|
||||||
case Nil => ()
|
case Nil => ()
|
||||||
case msgs =>
|
case msgs =>
|
||||||
log.info(s"re-sending ${msgs.size} unacked fulfills/fails to channel $channelId")
|
log.info(s"re-sending ${msgs.size} unacked fulfills/fails to channel $channelId")
|
||||||
|
|
|
@ -65,7 +65,7 @@ class LocalPaymentHandler(nodeParams: NodeParams) extends Actor with ActorLoggin
|
||||||
} recover { case t => sender ! Status.Failure(t) }
|
} recover { case t => sender ! Status.Failure(t) }
|
||||||
|
|
||||||
case CheckPayment(paymentHash) =>
|
case CheckPayment(paymentHash) =>
|
||||||
nodeParams.paymentsDb.findByPaymentHash(paymentHash) match {
|
nodeParams.db.payments.findByPaymentHash(paymentHash) match {
|
||||||
case Some(_) => sender ! true
|
case Some(_) => sender ! true
|
||||||
case _ => sender ! false
|
case _ => sender ! false
|
||||||
}
|
}
|
||||||
|
@ -92,7 +92,7 @@ class LocalPaymentHandler(nodeParams: NodeParams) extends Actor with ActorLoggin
|
||||||
case _ =>
|
case _ =>
|
||||||
log.info(s"received payment for paymentHash=${htlc.paymentHash} amountMsat=${htlc.amountMsat}")
|
log.info(s"received payment for paymentHash=${htlc.paymentHash} amountMsat=${htlc.amountMsat}")
|
||||||
// amount is correct or was not specified in the payment request
|
// amount is correct or was not specified in the payment request
|
||||||
nodeParams.paymentsDb.addPayment(Payment(htlc.paymentHash, htlc.amountMsat, Platform.currentTime / 1000))
|
nodeParams.db.payments.addPayment(Payment(htlc.paymentHash, htlc.amountMsat, Platform.currentTime / 1000))
|
||||||
sender ! CMD_FULFILL_HTLC(htlc.id, paymentPreimage, commit = true)
|
sender ! CMD_FULFILL_HTLC(htlc.id, paymentPreimage, commit = true)
|
||||||
context.system.eventStream.publish(PaymentReceived(MilliSatoshi(htlc.amountMsat), htlc.paymentHash, htlc.channelId))
|
context.system.eventStream.publish(PaymentReceived(MilliSatoshi(htlc.amountMsat), htlc.paymentHash, htlc.channelId))
|
||||||
context.become(run(hash2preimage - htlc.paymentHash))
|
context.become(run(hash2preimage - htlc.paymentHash))
|
||||||
|
|
|
@ -136,7 +136,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Prom
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
val db = nodeParams.networkDb
|
val db = nodeParams.db.network
|
||||||
|
|
||||||
{
|
{
|
||||||
log.info("loading network announcements from db...")
|
log.info("loading network announcements from db...")
|
||||||
|
|
|
@ -16,15 +16,10 @@
|
||||||
|
|
||||||
package fr.acinq.eclair
|
package fr.acinq.eclair
|
||||||
|
|
||||||
import java.io.File
|
|
||||||
import java.nio.file._
|
|
||||||
import java.nio.file.attribute.BasicFileAttributes
|
|
||||||
|
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import fr.acinq.bitcoin.Block
|
import fr.acinq.bitcoin.Block
|
||||||
import fr.acinq.eclair.crypto.LocalKeyManager
|
import fr.acinq.eclair.crypto.LocalKeyManager
|
||||||
import org.scalatest.FunSuite
|
import org.scalatest.FunSuite
|
||||||
|
|
||||||
import scala.util.Try
|
import scala.util.Try
|
||||||
|
|
||||||
class StartupSpec extends FunSuite {
|
class StartupSpec extends FunSuite {
|
||||||
|
@ -38,31 +33,18 @@ class StartupSpec extends FunSuite {
|
||||||
assert(baseUkraineAlias.getBytes.length === 27)
|
assert(baseUkraineAlias.getBytes.length === 27)
|
||||||
|
|
||||||
// we add 2 UTF-8 chars, each is 3-bytes long -> total new length 33 bytes!
|
// we add 2 UTF-8 chars, each is 3-bytes long -> total new length 33 bytes!
|
||||||
val goUkraineGo = threeBytesUTFChar+"BitcoinLightningNodeUkraine"+threeBytesUTFChar
|
val goUkraineGo = threeBytesUTFChar + "BitcoinLightningNodeUkraine" + threeBytesUTFChar
|
||||||
|
|
||||||
assert(goUkraineGo.length === 29)
|
assert(goUkraineGo.length === 29)
|
||||||
assert(goUkraineGo.getBytes.length === 33) // too long for the alias, should be truncated
|
assert(goUkraineGo.getBytes.length === 33) // too long for the alias, should be truncated
|
||||||
|
|
||||||
val illegalAliasConf = ConfigFactory.parseString(s"node-alias = $goUkraineGo")
|
val illegalAliasConf = ConfigFactory.parseString(s"node-alias = $goUkraineGo")
|
||||||
val conf = illegalAliasConf.withFallback(ConfigFactory.parseResources("reference.conf").getConfig("eclair"))
|
val conf = illegalAliasConf.withFallback(ConfigFactory.parseResources("reference.conf").getConfig("eclair"))
|
||||||
val tempConfParentDir = new File("temp-test.conf")
|
|
||||||
|
|
||||||
val keyManager = new LocalKeyManager(seed = randomKey.toBin, chainHash = Block.TestnetGenesisBlock.hash)
|
val keyManager = new LocalKeyManager(seed = randomKey.toBin, chainHash = Block.TestnetGenesisBlock.hash)
|
||||||
|
|
||||||
// try to create a NodeParams instance with a conf that contains an illegal alias
|
// try to create a NodeParams instance with a conf that contains an illegal alias
|
||||||
val nodeParamsAttempt = Try(NodeParams.makeNodeParams(tempConfParentDir, conf, keyManager, None))
|
val nodeParamsAttempt = Try(NodeParams.makeNodeParams(conf, keyManager, None, TestConstants.inMemoryDb()))
|
||||||
assert(nodeParamsAttempt.isFailure && nodeParamsAttempt.failed.get.getMessage.contains("alias, too long"))
|
assert(nodeParamsAttempt.isFailure && nodeParamsAttempt.failed.get.getMessage.contains("alias, too long"))
|
||||||
|
|
||||||
// destroy conf files after the test
|
|
||||||
Files.walkFileTree(tempConfParentDir.toPath, new SimpleFileVisitor[Path]() {
|
|
||||||
override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
|
|
||||||
Files.deleteIfExists(file)
|
|
||||||
FileVisitResult.CONTINUE
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
tempConfParentDir.listFiles.foreach(_.delete())
|
|
||||||
tempConfParentDir.deleteOnExit()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,18 +16,18 @@
|
||||||
|
|
||||||
package fr.acinq.eclair
|
package fr.acinq.eclair
|
||||||
|
|
||||||
import java.sql.DriverManager
|
import java.sql.{Connection, DriverManager}
|
||||||
|
|
||||||
import fr.acinq.bitcoin.Crypto.PrivateKey
|
import fr.acinq.bitcoin.Crypto.PrivateKey
|
||||||
import fr.acinq.bitcoin.{Block, ByteVector32, Script}
|
import fr.acinq.bitcoin.{Block, ByteVector32, Script}
|
||||||
import fr.acinq.eclair.NodeParams.BITCOIND
|
import fr.acinq.eclair.NodeParams.BITCOIND
|
||||||
import fr.acinq.eclair.crypto.LocalKeyManager
|
import fr.acinq.eclair.crypto.LocalKeyManager
|
||||||
|
import fr.acinq.eclair.db._
|
||||||
import fr.acinq.eclair.db.sqlite._
|
import fr.acinq.eclair.db.sqlite._
|
||||||
import fr.acinq.eclair.io.Peer
|
import fr.acinq.eclair.io.Peer
|
||||||
import fr.acinq.eclair.router.RouterConf
|
import fr.acinq.eclair.router.RouterConf
|
||||||
import fr.acinq.eclair.wire.{Color, NodeAddress}
|
import fr.acinq.eclair.wire.{Color, NodeAddress}
|
||||||
import scodec.bits.ByteVector
|
import scodec.bits.ByteVector
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -38,12 +38,15 @@ object TestConstants {
|
||||||
val pushMsat = 200000000L
|
val pushMsat = 200000000L
|
||||||
val feeratePerKw = 10000L
|
val feeratePerKw = 10000L
|
||||||
|
|
||||||
|
def sqliteInMemory() = DriverManager.getConnection("jdbc:sqlite::memory:")
|
||||||
|
|
||||||
|
def inMemoryDb(connection: Connection = sqliteInMemory()): Databases = Databases.databaseByConnections(connection, connection, connection)
|
||||||
|
|
||||||
|
|
||||||
object Alice {
|
object Alice {
|
||||||
val seed = ByteVector32(ByteVector.fill(32)(1))
|
val seed = ByteVector32(ByteVector.fill(32)(1))
|
||||||
val keyManager = new LocalKeyManager(seed, Block.RegtestGenesisBlock.hash)
|
val keyManager = new LocalKeyManager(seed, Block.RegtestGenesisBlock.hash)
|
||||||
|
|
||||||
def sqlite = DriverManager.getConnection("jdbc:sqlite::memory:")
|
|
||||||
|
|
||||||
// This is a function, and not a val! When called will return a new NodeParams
|
// This is a function, and not a val! When called will return a new NodeParams
|
||||||
def nodeParams = NodeParams(
|
def nodeParams = NodeParams(
|
||||||
keyManager = keyManager,
|
keyManager = keyManager,
|
||||||
|
@ -66,12 +69,7 @@ object TestConstants {
|
||||||
feeProportionalMillionth = 10,
|
feeProportionalMillionth = 10,
|
||||||
reserveToFundingRatio = 0.01, // note: not used (overridden below)
|
reserveToFundingRatio = 0.01, // note: not used (overridden below)
|
||||||
maxReserveToFundingRatio = 0.05,
|
maxReserveToFundingRatio = 0.05,
|
||||||
channelsDb = new SqliteChannelsDb(sqlite),
|
db = inMemoryDb(sqliteInMemory),
|
||||||
peersDb = new SqlitePeersDb(sqlite),
|
|
||||||
networkDb = new SqliteNetworkDb(sqlite),
|
|
||||||
pendingRelayDb = new SqlitePendingRelayDb(sqlite),
|
|
||||||
paymentsDb = new SqlitePaymentsDb(sqlite),
|
|
||||||
auditDb = new SqliteAuditDb(sqlite),
|
|
||||||
revocationTimeout = 20 seconds,
|
revocationTimeout = 20 seconds,
|
||||||
pingInterval = 30 seconds,
|
pingInterval = 30 seconds,
|
||||||
pingTimeout = 10 seconds,
|
pingTimeout = 10 seconds,
|
||||||
|
@ -114,8 +112,6 @@ object TestConstants {
|
||||||
val seed = ByteVector32(ByteVector.fill(32)(2))
|
val seed = ByteVector32(ByteVector.fill(32)(2))
|
||||||
val keyManager = new LocalKeyManager(seed, Block.RegtestGenesisBlock.hash)
|
val keyManager = new LocalKeyManager(seed, Block.RegtestGenesisBlock.hash)
|
||||||
|
|
||||||
def sqlite = DriverManager.getConnection("jdbc:sqlite::memory:")
|
|
||||||
|
|
||||||
def nodeParams = NodeParams(
|
def nodeParams = NodeParams(
|
||||||
keyManager = keyManager,
|
keyManager = keyManager,
|
||||||
alias = "bob",
|
alias = "bob",
|
||||||
|
@ -137,12 +133,7 @@ object TestConstants {
|
||||||
feeProportionalMillionth = 10,
|
feeProportionalMillionth = 10,
|
||||||
reserveToFundingRatio = 0.01, // note: not used (overridden below)
|
reserveToFundingRatio = 0.01, // note: not used (overridden below)
|
||||||
maxReserveToFundingRatio = 0.05,
|
maxReserveToFundingRatio = 0.05,
|
||||||
channelsDb = new SqliteChannelsDb(sqlite),
|
db = inMemoryDb(sqliteInMemory),
|
||||||
peersDb = new SqlitePeersDb(sqlite),
|
|
||||||
networkDb = new SqliteNetworkDb(sqlite),
|
|
||||||
pendingRelayDb = new SqlitePendingRelayDb(sqlite),
|
|
||||||
paymentsDb = new SqlitePaymentsDb(sqlite),
|
|
||||||
auditDb = new SqliteAuditDb(sqlite),
|
|
||||||
revocationTimeout = 20 seconds,
|
revocationTimeout = 20 seconds,
|
||||||
pingInterval = 30 seconds,
|
pingInterval = 30 seconds,
|
||||||
pingTimeout = 10 seconds,
|
pingTimeout = 10 seconds,
|
||||||
|
|
|
@ -484,11 +484,11 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
|
||||||
// depending on who starts signing first, there will be one or two commitments because both sides have changes
|
// depending on who starts signing first, there will be one or two commitments because both sides have changes
|
||||||
assert(alice.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.index === 1)
|
assert(alice.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.index === 1)
|
||||||
assert(bob.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.index === 2)
|
assert(bob.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.index === 2)
|
||||||
assert(alice.underlyingActor.nodeParams.channelsDb.listHtlcInfos(alice.stateData.asInstanceOf[DATA_NORMAL].channelId, 0).size == 0)
|
assert(alice.underlyingActor.nodeParams.db.channels.listHtlcInfos(alice.stateData.asInstanceOf[DATA_NORMAL].channelId, 0).size == 0)
|
||||||
assert(alice.underlyingActor.nodeParams.channelsDb.listHtlcInfos(alice.stateData.asInstanceOf[DATA_NORMAL].channelId, 1).size == 2)
|
assert(alice.underlyingActor.nodeParams.db.channels.listHtlcInfos(alice.stateData.asInstanceOf[DATA_NORMAL].channelId, 1).size == 2)
|
||||||
assert(alice.underlyingActor.nodeParams.channelsDb.listHtlcInfos(alice.stateData.asInstanceOf[DATA_NORMAL].channelId, 2).size == 4)
|
assert(alice.underlyingActor.nodeParams.db.channels.listHtlcInfos(alice.stateData.asInstanceOf[DATA_NORMAL].channelId, 2).size == 4)
|
||||||
assert(bob.underlyingActor.nodeParams.channelsDb.listHtlcInfos(bob.stateData.asInstanceOf[DATA_NORMAL].channelId, 0).size == 0)
|
assert(bob.underlyingActor.nodeParams.db.channels.listHtlcInfos(bob.stateData.asInstanceOf[DATA_NORMAL].channelId, 0).size == 0)
|
||||||
assert(bob.underlyingActor.nodeParams.channelsDb.listHtlcInfos(bob.stateData.asInstanceOf[DATA_NORMAL].channelId, 1).size == 3)
|
assert(bob.underlyingActor.nodeParams.db.channels.listHtlcInfos(bob.stateData.asInstanceOf[DATA_NORMAL].channelId, 1).size == 3)
|
||||||
}
|
}
|
||||||
|
|
||||||
test("recv CMD_SIGN (htlcs with same pubkeyScript but different amounts)") { f =>
|
test("recv CMD_SIGN (htlcs with same pubkeyScript but different amounts)") { f =>
|
||||||
|
|
|
@ -50,10 +50,10 @@ class SqliteChannelsDbSpec extends FunSuite {
|
||||||
|
|
||||||
intercept[SQLiteException](db.addOrUpdateHtlcInfo(channel.channelId, commitNumber, paymentHash1, cltvExpiry1)) // no related channel
|
intercept[SQLiteException](db.addOrUpdateHtlcInfo(channel.channelId, commitNumber, paymentHash1, cltvExpiry1)) // no related channel
|
||||||
|
|
||||||
assert(db.listChannels().toSet === Set.empty)
|
assert(db.listLocalChannels().toSet === Set.empty)
|
||||||
db.addOrUpdateChannel(channel)
|
db.addOrUpdateChannel(channel)
|
||||||
db.addOrUpdateChannel(channel)
|
db.addOrUpdateChannel(channel)
|
||||||
assert(db.listChannels() === List(channel))
|
assert(db.listLocalChannels() === List(channel))
|
||||||
|
|
||||||
assert(db.listHtlcInfos(channel.channelId, commitNumber).toList == Nil)
|
assert(db.listHtlcInfos(channel.channelId, commitNumber).toList == Nil)
|
||||||
db.addOrUpdateHtlcInfo(channel.channelId, commitNumber, paymentHash1, cltvExpiry1)
|
db.addOrUpdateHtlcInfo(channel.channelId, commitNumber, paymentHash1, cltvExpiry1)
|
||||||
|
@ -62,7 +62,7 @@ class SqliteChannelsDbSpec extends FunSuite {
|
||||||
assert(db.listHtlcInfos(channel.channelId, 43).toList == Nil)
|
assert(db.listHtlcInfos(channel.channelId, 43).toList == Nil)
|
||||||
|
|
||||||
db.removeChannel(channel.channelId)
|
db.removeChannel(channel.channelId)
|
||||||
assert(db.listChannels() === Nil)
|
assert(db.listLocalChannels() === Nil)
|
||||||
assert(db.listHtlcInfos(channel.channelId, commitNumber).toList == Nil)
|
assert(db.listHtlcInfos(channel.channelId, commitNumber).toList == Nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
1
pom.xml
1
pom.xml
|
@ -217,6 +217,7 @@
|
||||||
<buildDirectory>${project.build.directory}</buildDirectory>
|
<buildDirectory>${project.build.directory}</buildDirectory>
|
||||||
</systemProperties>
|
</systemProperties>
|
||||||
<argLine>-Xmx1024m</argLine>
|
<argLine>-Xmx1024m</argLine>
|
||||||
|
<argLine>-Dfile.encoding=UTF-8</argLine>
|
||||||
</configuration>
|
</configuration>
|
||||||
<executions>
|
<executions>
|
||||||
<execution>
|
<execution>
|
||||||
|
|
Loading…
Add table
Reference in a new issue