mirror of
https://github.com/ACINQ/eclair.git
synced 2025-02-22 22:25:26 +01:00
Peer storage (#2888)
Implements https://github.com/lightning/bolts/pull/1110 to allow storing a small amount of data for our peers. --------- Co-authored-by: t-bast <bastien@acinq.fr>
This commit is contained in:
parent
189e282993
commit
e28f23fbc8
19 changed files with 456 additions and 39 deletions
|
@ -6,6 +6,13 @@
|
|||
|
||||
<insert changes>
|
||||
|
||||
### Peer storage
|
||||
|
||||
With this release, eclair supports the `option_provide_storage` feature introduced in <https://github.com/lightning/bolts/pull/1110>.
|
||||
When `option_provide_storage` is enabled, eclair will store a small encrypted backup for peers that request it.
|
||||
This backup is limited to 65kB and node operators should customize the `eclair.peer-storage` configuration section to match their desired SLAs.
|
||||
This is mostly intended for LSPs that serve mobile wallets to allow users to restore their channels when they switch phones.
|
||||
|
||||
### API changes
|
||||
|
||||
<insert changes>
|
||||
|
|
|
@ -73,6 +73,9 @@ eclair {
|
|||
option_dual_fund = optional
|
||||
option_quiesce = optional
|
||||
option_onion_messages = optional
|
||||
// This feature should only be enabled when acting as an LSP for mobile wallets.
|
||||
// When activating this feature, the peer-storage section should be customized to match desired SLAs.
|
||||
option_provide_storage = disabled
|
||||
option_channel_type = optional
|
||||
option_scid_alias = optional
|
||||
option_payment_metadata = optional
|
||||
|
@ -596,6 +599,19 @@ eclair {
|
|||
enabled = true // enable automatic purges of expired invoices from the database
|
||||
interval = 24 hours // interval between expired invoice purges
|
||||
}
|
||||
|
||||
peer-storage {
|
||||
// Peer storage is persisted only after this delay to reduce the number of writes when updating it multiple times in a row.
|
||||
// A small delay may result in a lot of IO write operations, which can have a negative performance impact on the node.
|
||||
// But using a large delay increases the risk of not storing the latest peer data if you restart your node while writes are pending.
|
||||
write-delay = 1 minute
|
||||
// Peer storage is kept this long after the last channel with that peer has been closed.
|
||||
// A long delay here guarantees that peers who are offline while their channels are closed will be able to get their funds
|
||||
// back if they restore from seed on a different device after the channels have been closed.
|
||||
removal-delay = 30 days
|
||||
// Frequency at which we clean our DB to remove peer storage from nodes with whom we don't have channels anymore.
|
||||
cleanup-frequency = 1 day
|
||||
}
|
||||
}
|
||||
|
||||
akka {
|
||||
|
|
|
@ -275,6 +275,11 @@ object Features {
|
|||
val mandatory = 38
|
||||
}
|
||||
|
||||
case object ProvideStorage extends Feature with InitFeature with NodeFeature {
|
||||
val rfcName = "option_provide_storage"
|
||||
val mandatory = 42
|
||||
}
|
||||
|
||||
case object ChannelType extends Feature with InitFeature with NodeFeature {
|
||||
val rfcName = "option_channel_type"
|
||||
val mandatory = 44
|
||||
|
@ -358,6 +363,7 @@ object Features {
|
|||
DualFunding,
|
||||
Quiescence,
|
||||
OnionMessages,
|
||||
ProvideStorage,
|
||||
ChannelType,
|
||||
ScidAlias,
|
||||
PaymentMetadata,
|
||||
|
|
|
@ -92,7 +92,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
|
|||
revokedHtlcInfoCleanerConfig: RevokedHtlcInfoCleaner.Config,
|
||||
willFundRates_opt: Option[LiquidityAds.WillFundRates],
|
||||
peerWakeUpConfig: PeerReadyNotifier.WakeUpConfig,
|
||||
onTheFlyFundingConfig: OnTheFlyFunding.Config) {
|
||||
onTheFlyFundingConfig: OnTheFlyFunding.Config,
|
||||
peerStorageConfig: PeerStorageConfig) {
|
||||
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey
|
||||
|
||||
val nodeId: PublicKey = nodeKeyManager.nodeId
|
||||
|
@ -156,6 +157,13 @@ case class PaymentFinalExpiryConf(min: CltvExpiryDelta, max: CltvExpiryDelta) {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param writeDelay delay before writing the peer's data to disk, which avoids doing multiple writes during bursts of storage updates.
|
||||
* @param removalDelay we keep our peer's data in our DB even after closing all of our channels with them, up to this duration.
|
||||
* @param cleanUpFrequency frequency at which we go through the DB to remove unused storage.
|
||||
*/
|
||||
case class PeerStorageConfig(writeDelay: FiniteDuration, removalDelay: FiniteDuration, cleanUpFrequency: FiniteDuration)
|
||||
|
||||
object NodeParams extends Logging {
|
||||
|
||||
/**
|
||||
|
@ -680,6 +688,11 @@ object NodeParams extends Logging {
|
|||
onTheFlyFundingConfig = OnTheFlyFunding.Config(
|
||||
proposalTimeout = FiniteDuration(config.getDuration("on-the-fly-funding.proposal-timeout").getSeconds, TimeUnit.SECONDS),
|
||||
),
|
||||
peerStorageConfig = PeerStorageConfig(
|
||||
writeDelay = FiniteDuration(config.getDuration("peer-storage.write-delay").getSeconds, TimeUnit.SECONDS),
|
||||
removalDelay = FiniteDuration(config.getDuration("peer-storage.removal-delay").getSeconds, TimeUnit.SECONDS),
|
||||
cleanUpFrequency = FiniteDuration(config.getDuration("peer-storage.cleanup-frequency").getSeconds, TimeUnit.SECONDS),
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ import fr.acinq.eclair.crypto.WeakEntropyPool
|
|||
import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager, LocalOnChainKeyManager}
|
||||
import fr.acinq.eclair.db.Databases.FileBackup
|
||||
import fr.acinq.eclair.db.FileBackupHandler.FileBackupParams
|
||||
import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler}
|
||||
import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler, PeerStorageCleaner}
|
||||
import fr.acinq.eclair.io._
|
||||
import fr.acinq.eclair.message.Postman
|
||||
import fr.acinq.eclair.payment.offer.OfferManager
|
||||
|
@ -356,6 +356,9 @@ class Setup(val datadir: File,
|
|||
logger.warn("database backup is disabled")
|
||||
system.deadLetters
|
||||
}
|
||||
_ = if (nodeParams.features.hasFeature(Features.ProvideStorage)) {
|
||||
system.spawn(Behaviors.supervise(PeerStorageCleaner(nodeParams.db.peers, nodeParams.peerStorageConfig)).onFailure(typed.SupervisorStrategy.restart), name = "peer-storage-cleaner")
|
||||
}
|
||||
dbEventHandler = system.actorOf(SimpleSupervisor.props(DbEventHandler.props(nodeParams), "db-event-handler", SupervisorStrategy.Resume))
|
||||
register = system.actorOf(SimpleSupervisor.props(Register.props(), "register", SupervisorStrategy.Resume))
|
||||
offerManager = system.spawn(Behaviors.supervise(OfferManager(nodeParams, router, paymentTimeout = 1 minute)).onFailure(typed.SupervisorStrategy.resume), name = "offer-manager")
|
||||
|
|
|
@ -13,8 +13,9 @@ import fr.acinq.eclair.payment.relay.OnTheFlyFunding
|
|||
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
|
||||
import fr.acinq.eclair.router.Router
|
||||
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement}
|
||||
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli}
|
||||
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli, TimestampSecond}
|
||||
import grizzled.slf4j.Logging
|
||||
import scodec.bits.ByteVector
|
||||
|
||||
import java.io.File
|
||||
import java.util.UUID
|
||||
|
@ -292,6 +293,21 @@ case class DualPeersDb(primary: PeersDb, secondary: PeersDb) extends PeersDb {
|
|||
runAsync(secondary.getRelayFees(nodeId))
|
||||
primary.getRelayFees(nodeId)
|
||||
}
|
||||
|
||||
override def updateStorage(nodeId: PublicKey, data: ByteVector): Unit = {
|
||||
runAsync(secondary.updateStorage(nodeId, data))
|
||||
primary.updateStorage(nodeId, data)
|
||||
}
|
||||
|
||||
override def getStorage(nodeId: PublicKey): Option[ByteVector] = {
|
||||
runAsync(secondary.getStorage(nodeId))
|
||||
primary.getStorage(nodeId)
|
||||
}
|
||||
|
||||
override def removePeerStorage(peerRemovedBefore: TimestampSecond): Unit = {
|
||||
runAsync(secondary.removePeerStorage(peerRemovedBefore))
|
||||
primary.removePeerStorage(peerRemovedBefore)
|
||||
}
|
||||
}
|
||||
|
||||
case class DualPaymentsDb(primary: PaymentsDb, secondary: PaymentsDb) extends PaymentsDb {
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright 2024 ACINQ SAS
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package fr.acinq.eclair.db
|
||||
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import fr.acinq.eclair.{PeerStorageConfig, TimestampSecond}
|
||||
|
||||
/**
|
||||
* This actor frequently deletes from our DB peer storage from nodes with whom we don't have channels anymore, after a
|
||||
* grace period.
|
||||
*/
|
||||
object PeerStorageCleaner {
|
||||
// @formatter:off
|
||||
sealed trait Command
|
||||
private case object CleanPeerStorage extends Command
|
||||
// @formatter:on
|
||||
|
||||
def apply(db: PeersDb, config: PeerStorageConfig): Behavior[Command] = {
|
||||
Behaviors.withTimers { timers =>
|
||||
timers.startTimerWithFixedDelay(CleanPeerStorage, config.cleanUpFrequency)
|
||||
Behaviors.receiveMessage {
|
||||
case CleanPeerStorage =>
|
||||
db.removePeerStorage(TimestampSecond.now() - config.removalDelay)
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -17,8 +17,10 @@
|
|||
package fr.acinq.eclair.db
|
||||
|
||||
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
|
||||
import fr.acinq.eclair.TimestampSecond
|
||||
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
|
||||
import fr.acinq.eclair.wire.protocol.NodeAddress
|
||||
import scodec.bits.ByteVector
|
||||
|
||||
trait PeersDb {
|
||||
|
||||
|
@ -34,4 +36,13 @@ trait PeersDb {
|
|||
|
||||
def getRelayFees(nodeId: PublicKey): Option[RelayFees]
|
||||
|
||||
/** Update our peer's blob data when [[fr.acinq.eclair.Features.ProvideStorage]] is enabled. */
|
||||
def updateStorage(nodeId: PublicKey, data: ByteVector): Unit
|
||||
|
||||
/** Get the last blob of data we stored for that peer, if [[fr.acinq.eclair.Features.ProvideStorage]] is enabled. */
|
||||
def getStorage(nodeId: PublicKey): Option[ByteVector]
|
||||
|
||||
/** Remove storage from peers that have had no active channel with us for a while. */
|
||||
def removePeerStorage(peerRemovedBefore: TimestampSecond): Unit
|
||||
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ package fr.acinq.eclair.db.pg
|
|||
|
||||
import fr.acinq.bitcoin.scalacompat.Crypto
|
||||
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
|
||||
import fr.acinq.eclair.MilliSatoshi
|
||||
import fr.acinq.eclair.{MilliSatoshi, TimestampSecond}
|
||||
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
|
||||
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
|
||||
import fr.acinq.eclair.db.PeersDb
|
||||
|
@ -26,14 +26,14 @@ import fr.acinq.eclair.db.pg.PgUtils.PgLock
|
|||
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
|
||||
import fr.acinq.eclair.wire.protocol._
|
||||
import grizzled.slf4j.Logging
|
||||
import scodec.bits.BitVector
|
||||
import scodec.bits.{BitVector, ByteVector}
|
||||
|
||||
import java.sql.Statement
|
||||
import javax.sql.DataSource
|
||||
|
||||
object PgPeersDb {
|
||||
val DB_NAME = "peers"
|
||||
val CURRENT_VERSION = 3
|
||||
val CURRENT_VERSION = 4
|
||||
}
|
||||
|
||||
class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logging {
|
||||
|
@ -54,13 +54,21 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
|
|||
statement.executeUpdate("CREATE TABLE local.relay_fees (node_id TEXT NOT NULL PRIMARY KEY, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL)")
|
||||
}
|
||||
|
||||
def migration34(statement: Statement): Unit = {
|
||||
statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, last_updated_at TIMESTAMP WITH TIME ZONE NOT NULL, removed_peer_at TIMESTAMP WITH TIME ZONE)")
|
||||
statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON local.peer_storage(removed_peer_at)")
|
||||
}
|
||||
|
||||
using(pg.createStatement()) { statement =>
|
||||
getVersion(statement, DB_NAME) match {
|
||||
case None =>
|
||||
statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local")
|
||||
statement.executeUpdate("CREATE TABLE local.peers (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)")
|
||||
statement.executeUpdate("CREATE TABLE local.relay_fees (node_id TEXT NOT NULL PRIMARY KEY, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL)")
|
||||
case Some(v@(1 | 2)) =>
|
||||
statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, last_updated_at TIMESTAMP WITH TIME ZONE NOT NULL, removed_peer_at TIMESTAMP WITH TIME ZONE)")
|
||||
|
||||
statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON local.peer_storage(removed_peer_at)")
|
||||
case Some(v@(1 | 2 | 3)) =>
|
||||
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
|
||||
if (v < 2) {
|
||||
migration12(statement)
|
||||
|
@ -68,6 +76,9 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
|
|||
if (v < 3) {
|
||||
migration23(statement)
|
||||
}
|
||||
if (v < 4) {
|
||||
migration34(statement)
|
||||
}
|
||||
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
|
||||
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
|
||||
}
|
||||
|
@ -98,6 +109,20 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
|
|||
statement.setString(1, nodeId.value.toHex)
|
||||
statement.executeUpdate()
|
||||
}
|
||||
using(pg.prepareStatement("UPDATE local.peer_storage SET removed_peer_at = ? WHERE node_id = ?")) { statement =>
|
||||
statement.setTimestamp(1, TimestampSecond.now().toSqlTimestamp)
|
||||
statement.setString(2, nodeId.value.toHex)
|
||||
statement.executeUpdate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def removePeerStorage(peerRemovedBefore: TimestampSecond): Unit = withMetrics("peers/remove-storage", DbBackends.Postgres) {
|
||||
withLock { pg =>
|
||||
using(pg.prepareStatement("DELETE FROM local.peer_storage WHERE removed_peer_at < ?")) { statement =>
|
||||
statement.setTimestamp(1, peerRemovedBefore.toSqlTimestamp)
|
||||
statement.executeUpdate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -155,4 +180,32 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def updateStorage(nodeId: PublicKey, data: ByteVector): Unit = withMetrics("peers/update-storage", DbBackends.Postgres) {
|
||||
withLock { pg =>
|
||||
using(pg.prepareStatement(
|
||||
"""
|
||||
INSERT INTO local.peer_storage (node_id, data, last_updated_at, removed_peer_at)
|
||||
VALUES (?, ?, ?, NULL)
|
||||
ON CONFLICT (node_id)
|
||||
DO UPDATE SET data = EXCLUDED.data, last_updated_at = EXCLUDED.last_updated_at, removed_peer_at = NULL
|
||||
""")) { statement =>
|
||||
statement.setString(1, nodeId.value.toHex)
|
||||
statement.setBytes(2, data.toArray)
|
||||
statement.setTimestamp(3, TimestampSecond.now().toSqlTimestamp)
|
||||
statement.executeUpdate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def getStorage(nodeId: PublicKey): Option[ByteVector] = withMetrics("peers/get-storage", DbBackends.Postgres) {
|
||||
withLock { pg =>
|
||||
using(pg.prepareStatement("SELECT data FROM local.peer_storage WHERE node_id = ?")) { statement =>
|
||||
statement.setString(1, nodeId.value.toHex)
|
||||
statement.executeQuery()
|
||||
.headOption
|
||||
.map(rs => ByteVector(rs.getBytes("data")))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ package fr.acinq.eclair.db.sqlite
|
|||
|
||||
import fr.acinq.bitcoin.scalacompat.Crypto
|
||||
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
|
||||
import fr.acinq.eclair.MilliSatoshi
|
||||
import fr.acinq.eclair.{MilliSatoshi, TimestampSecond}
|
||||
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
|
||||
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
|
||||
import fr.acinq.eclair.db.PeersDb
|
||||
|
@ -26,13 +26,13 @@ import fr.acinq.eclair.db.sqlite.SqliteUtils.{getVersion, setVersion, using}
|
|||
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
|
||||
import fr.acinq.eclair.wire.protocol._
|
||||
import grizzled.slf4j.Logging
|
||||
import scodec.bits.BitVector
|
||||
import scodec.bits.{BitVector, ByteVector}
|
||||
|
||||
import java.sql.{Connection, Statement}
|
||||
|
||||
object SqlitePeersDb {
|
||||
val DB_NAME = "peers"
|
||||
val CURRENT_VERSION = 2
|
||||
val CURRENT_VERSION = 3
|
||||
}
|
||||
|
||||
class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging {
|
||||
|
@ -46,13 +46,26 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging {
|
|||
statement.executeUpdate("CREATE TABLE relay_fees (node_id BLOB NOT NULL PRIMARY KEY, fee_base_msat INTEGER NOT NULL, fee_proportional_millionths INTEGER NOT NULL)")
|
||||
}
|
||||
|
||||
def migration23(statement: Statement): Unit = {
|
||||
statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, last_updated_at INTEGER NOT NULL, removed_peer_at INTEGER)")
|
||||
statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON peer_storage(removed_peer_at)")
|
||||
}
|
||||
|
||||
getVersion(statement, DB_NAME) match {
|
||||
case None =>
|
||||
statement.executeUpdate("CREATE TABLE peers (node_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL)")
|
||||
statement.executeUpdate("CREATE TABLE relay_fees (node_id BLOB NOT NULL PRIMARY KEY, fee_base_msat INTEGER NOT NULL, fee_proportional_millionths INTEGER NOT NULL)")
|
||||
case Some(v@1) =>
|
||||
statement.executeUpdate("CREATE TABLE peer_storage (node_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, last_updated_at INTEGER NOT NULL, removed_peer_at INTEGER)")
|
||||
|
||||
statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON peer_storage(removed_peer_at)")
|
||||
case Some(v@(1 | 2)) =>
|
||||
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
|
||||
migration12(statement)
|
||||
if (v < 2) {
|
||||
migration12(statement)
|
||||
}
|
||||
if (v < 3) {
|
||||
migration23(statement)
|
||||
}
|
||||
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
|
||||
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
|
||||
}
|
||||
|
@ -79,6 +92,18 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging {
|
|||
statement.setBytes(1, nodeId.value.toArray)
|
||||
statement.executeUpdate()
|
||||
}
|
||||
using(sqlite.prepareStatement("UPDATE peer_storage SET removed_peer_at = ? WHERE node_id = ?")) { statement =>
|
||||
statement.setLong(1, TimestampSecond.now().toLong)
|
||||
statement.setBytes(2, nodeId.value.toArray)
|
||||
statement.executeUpdate()
|
||||
}
|
||||
}
|
||||
|
||||
override def removePeerStorage(peerRemovedBefore: TimestampSecond): Unit = withMetrics("peers/remove-storage", DbBackends.Sqlite) {
|
||||
using(sqlite.prepareStatement("DELETE FROM peer_storage WHERE removed_peer_at < ?")) { statement =>
|
||||
statement.setLong(1, peerRemovedBefore.toLong)
|
||||
statement.executeUpdate()
|
||||
}
|
||||
}
|
||||
|
||||
override def getPeer(nodeId: PublicKey): Option[NodeAddress] = withMetrics("peers/get", DbBackends.Sqlite) {
|
||||
|
@ -128,4 +153,29 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging {
|
|||
)
|
||||
}
|
||||
}
|
||||
|
||||
override def updateStorage(nodeId: PublicKey, data: ByteVector): Unit = withMetrics("peers/update-storage", DbBackends.Sqlite) {
|
||||
using(sqlite.prepareStatement("UPDATE peer_storage SET data = ?, last_updated_at = ?, removed_peer_at = NULL WHERE node_id = ?")) { update =>
|
||||
update.setBytes(1, data.toArray)
|
||||
update.setLong(2, TimestampSecond.now().toLong)
|
||||
update.setBytes(3, nodeId.value.toArray)
|
||||
if (update.executeUpdate() == 0) {
|
||||
using(sqlite.prepareStatement("INSERT INTO peer_storage VALUES (?, ?, ?, NULL)")) { statement =>
|
||||
statement.setBytes(1, nodeId.value.toArray)
|
||||
statement.setBytes(2, data.toArray)
|
||||
statement.setLong(3, TimestampSecond.now().toLong)
|
||||
statement.executeUpdate()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def getStorage(nodeId: PublicKey): Option[ByteVector] = withMetrics("peers/get-storage", DbBackends.Sqlite) {
|
||||
using(sqlite.prepareStatement("SELECT data FROM peer_storage WHERE node_id = ?")) { statement =>
|
||||
statement.setBytes(1, nodeId.value.toArray)
|
||||
statement.executeQuery()
|
||||
.headOption
|
||||
.map(rs => ByteVector(rs.getBytes("data")))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,7 +45,8 @@ import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
|
|||
import fr.acinq.eclair.router.Router
|
||||
import fr.acinq.eclair.wire.protocol
|
||||
import fr.acinq.eclair.wire.protocol.FailureMessageCodecs.createBadOnionFailure
|
||||
import fr.acinq.eclair.wire.protocol.{AddFeeCredit, ChannelTlv, CurrentFeeCredit, Error, FailureReason, HasChannelId, HasTemporaryChannelId, LightningMessage, LiquidityAds, NodeAddress, OnTheFlyFundingFailureMessage, OnionMessage, OnionRoutingPacket, RecommendedFeerates, RoutingMessage, SpliceInit, TlvStream, TxAbort, UnknownMessage, Warning, WillAddHtlc, WillFailHtlc, WillFailMalformedHtlc}
|
||||
import fr.acinq.eclair.wire.protocol.{AddFeeCredit, ChannelTlv, CurrentFeeCredit, Error, FailureReason, HasChannelId, HasTemporaryChannelId, LightningMessage, LiquidityAds, NodeAddress, OnTheFlyFundingFailureMessage, OnionMessage, OnionRoutingPacket, PeerStorageRetrieval, PeerStorageStore, RecommendedFeerates, RoutingMessage, SpliceInit, TlvStream, TxAbort, UnknownMessage, Warning, WillAddHtlc, WillFailHtlc, WillFailMalformedHtlc}
|
||||
import scodec.bits.ByteVector
|
||||
|
||||
/**
|
||||
* This actor represents a logical peer. There is one [[Peer]] per unique remote node id at all time.
|
||||
|
@ -73,6 +74,7 @@ class Peer(val nodeParams: NodeParams,
|
|||
|
||||
context.system.eventStream.subscribe(self, classOf[CurrentFeerates])
|
||||
context.system.eventStream.subscribe(self, classOf[CurrentBlockHeight])
|
||||
context.system.eventStream.subscribe(self, classOf[LocalChannelDown])
|
||||
|
||||
startWith(INSTANTIATING, Nothing)
|
||||
|
||||
|
@ -85,7 +87,12 @@ class Peer(val nodeParams: NodeParams,
|
|||
FinalChannelId(state.channelId) -> channel
|
||||
}.toMap
|
||||
context.system.eventStream.publish(PeerCreated(self, remoteNodeId))
|
||||
goto(DISCONNECTED) using DisconnectedData(channels) // when we restart, we will attempt to reconnect right away, but then we'll wait
|
||||
val peerStorageData = if (nodeParams.features.hasFeature(Features.ProvideStorage)) {
|
||||
nodeParams.db.peers.getStorage(remoteNodeId)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
goto(DISCONNECTED) using DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(peerStorageData, written = true)) // when we restart, we will attempt to reconnect right away, but then we'll wait
|
||||
}
|
||||
|
||||
when(DISCONNECTED) {
|
||||
|
@ -94,7 +101,7 @@ class Peer(val nodeParams: NodeParams,
|
|||
stay()
|
||||
|
||||
case Event(connectionReady: PeerConnection.ConnectionReady, d: DisconnectedData) =>
|
||||
gotoConnected(connectionReady, d.channels.map { case (k: ChannelId, v) => (k, v) })
|
||||
gotoConnected(connectionReady, d.channels.map { case (k: ChannelId, v) => (k, v) }, d.activeChannels, d.peerStorage)
|
||||
|
||||
case Event(Terminated(actor), d: DisconnectedData) if d.channels.values.toSet.contains(actor) =>
|
||||
// we have at most 2 ids: a TemporaryChannelId and a FinalChannelId
|
||||
|
@ -105,7 +112,7 @@ class Peer(val nodeParams: NodeParams,
|
|||
log.info("that was the last open channel")
|
||||
context.system.eventStream.publish(LastChannelClosed(self, remoteNodeId))
|
||||
// We have no existing channels or pending signed transaction, we can forget about this peer.
|
||||
stopPeer()
|
||||
stopPeer(d.peerStorage)
|
||||
} else {
|
||||
stay() using d.copy(channels = channels1)
|
||||
}
|
||||
|
@ -116,7 +123,7 @@ class Peer(val nodeParams: NodeParams,
|
|||
}
|
||||
if (d.channels.isEmpty && canForgetPendingOnTheFlyFunding()) {
|
||||
// We have no existing channels or pending signed transaction, we can forget about this peer.
|
||||
stopPeer()
|
||||
stopPeer(d.peerStorage)
|
||||
} else {
|
||||
stay()
|
||||
}
|
||||
|
@ -134,6 +141,19 @@ class Peer(val nodeParams: NodeParams,
|
|||
case Event(_: SpawnChannelNonInitiator, _) => stay() // we got disconnected before creating the channel actor
|
||||
|
||||
case Event(_: LightningMessage, _) => stay() // we probably just got disconnected and that's the last messages we received
|
||||
|
||||
case Event(WritePeerStorage, d: DisconnectedData) =>
|
||||
d.peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _))
|
||||
stay() using d.copy(peerStorage = d.peerStorage.copy(written = true))
|
||||
|
||||
case Event(e: ChannelReadyForPayments, d: DisconnectedData) =>
|
||||
if (!d.peerStorage.written && !isTimerActive(WritePeerStorageTimerKey)) {
|
||||
startSingleTimer(WritePeerStorageTimerKey, WritePeerStorage, nodeParams.peerStorageConfig.writeDelay)
|
||||
}
|
||||
stay() using d.copy(activeChannels = d.activeChannels + e.channelId)
|
||||
|
||||
case Event(e: LocalChannelDown, d: DisconnectedData) =>
|
||||
stay() using d.copy(activeChannels = d.activeChannels - e.channelId)
|
||||
}
|
||||
|
||||
when(CONNECTED) {
|
||||
|
@ -408,7 +428,7 @@ class Peer(val nodeParams: NodeParams,
|
|||
}
|
||||
stay()
|
||||
|
||||
case Event(e: ChannelReadyForPayments, _: ConnectedData) =>
|
||||
case Event(e: ChannelReadyForPayments, d: ConnectedData) =>
|
||||
pendingOnTheFlyFunding.foreach {
|
||||
case (paymentHash, pending) =>
|
||||
pending.status match {
|
||||
|
@ -424,7 +444,13 @@ class Peer(val nodeParams: NodeParams,
|
|||
}
|
||||
}
|
||||
}
|
||||
stay()
|
||||
if (!d.peerStorage.written && !isTimerActive(WritePeerStorageTimerKey)) {
|
||||
startSingleTimer(WritePeerStorageTimerKey, WritePeerStorage, nodeParams.peerStorageConfig.writeDelay)
|
||||
}
|
||||
stay() using d.copy(activeChannels = d.activeChannels + e.channelId)
|
||||
|
||||
case Event(e: LocalChannelDown, d: ConnectedData) =>
|
||||
stay() using d.copy(activeChannels = d.activeChannels - e.channelId)
|
||||
|
||||
case Event(msg: HasChannelId, d: ConnectedData) =>
|
||||
d.channels.get(FinalChannelId(msg.channelId)) match {
|
||||
|
@ -463,10 +489,10 @@ class Peer(val nodeParams: NodeParams,
|
|||
}
|
||||
if (d.channels.isEmpty && canForgetPendingOnTheFlyFunding()) {
|
||||
// We have no existing channels or pending signed transaction, we can forget about this peer.
|
||||
stopPeer()
|
||||
stopPeer(d.peerStorage)
|
||||
} else {
|
||||
d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
|
||||
goto(DISCONNECTED) using DisconnectedData(d.channels.collect { case (k: FinalChannelId, v) => (k, v) })
|
||||
goto(DISCONNECTED) using DisconnectedData(d.channels.collect { case (k: FinalChannelId, v) => (k, v) }, d.activeChannels, d.peerStorage)
|
||||
}
|
||||
|
||||
case Event(Terminated(actor), d: ConnectedData) if d.channels.values.toSet.contains(actor) =>
|
||||
|
@ -485,7 +511,7 @@ class Peer(val nodeParams: NodeParams,
|
|||
log.debug(s"got new connection, killing current one and switching")
|
||||
d.peerConnection ! PeerConnection.Kill(KillReason.ConnectionReplaced)
|
||||
d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
|
||||
gotoConnected(connectionReady, d.channels)
|
||||
gotoConnected(connectionReady, d.channels, d.activeChannels, d.peerStorage)
|
||||
|
||||
case Event(msg: OnionMessage, _: ConnectedData) =>
|
||||
OnionMessages.process(nodeParams.privateKey, msg) match {
|
||||
|
@ -518,6 +544,27 @@ class Peer(val nodeParams: NodeParams,
|
|||
d.peerConnection forward unknownMsg
|
||||
stay()
|
||||
|
||||
case Event(store: PeerStorageStore, d: ConnectedData) =>
|
||||
if (nodeParams.features.hasFeature(Features.ProvideStorage)) {
|
||||
// If we don't have any pending write operations, we write the updated peer storage to disk after a delay.
|
||||
// This ensures that when we receive a burst of peer storage updates, we will rate-limit our IO disk operations.
|
||||
// If we already have a pending write operation, we must not reset the timer, otherwise we may indefinitely delay
|
||||
// writing to the DB and may never store our peer's backup.
|
||||
if (d.activeChannels.isEmpty) {
|
||||
log.debug("received peer storage from peer with no active channel")
|
||||
} else if (!isTimerActive(WritePeerStorageTimerKey)) {
|
||||
startSingleTimer(WritePeerStorageTimerKey, WritePeerStorage, nodeParams.peerStorageConfig.writeDelay)
|
||||
}
|
||||
stay() using d.copy(peerStorage = PeerStorage(Some(store.blob), written = false))
|
||||
} else {
|
||||
log.debug("ignoring peer storage, feature disabled")
|
||||
stay()
|
||||
}
|
||||
|
||||
case Event(WritePeerStorage, d: ConnectedData) =>
|
||||
d.peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _))
|
||||
stay() using d.copy(peerStorage = d.peerStorage.copy(written = true))
|
||||
|
||||
case Event(unhandledMsg: LightningMessage, _) =>
|
||||
log.warning("ignoring message {}", unhandledMsg)
|
||||
stay()
|
||||
|
@ -598,7 +645,7 @@ class Peer(val nodeParams: NodeParams,
|
|||
}
|
||||
pendingOnTheFlyFunding = pendingOnTheFlyFunding.removedAll(expired.keys)
|
||||
d match {
|
||||
case d: DisconnectedData if d.channels.isEmpty && pendingOnTheFlyFunding.isEmpty => stopPeer()
|
||||
case d: DisconnectedData if d.channels.isEmpty && pendingOnTheFlyFunding.isEmpty => stopPeer(d.peerStorage)
|
||||
case _ => stay()
|
||||
}
|
||||
|
||||
|
@ -749,7 +796,7 @@ class Peer(val nodeParams: NodeParams,
|
|||
context.system.eventStream.publish(PeerDisconnected(self, remoteNodeId))
|
||||
}
|
||||
|
||||
private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef]): State = {
|
||||
private def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef], activeChannels: Set[ByteVector32], peerStorage: PeerStorage): State = {
|
||||
require(remoteNodeId == connectionReady.remoteNodeId, s"invalid nodeId: $remoteNodeId != ${connectionReady.remoteNodeId}")
|
||||
log.debug("got authenticated connection to address {}", connectionReady.address)
|
||||
|
||||
|
@ -759,6 +806,9 @@ class Peer(val nodeParams: NodeParams,
|
|||
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, connectionReady.address)
|
||||
}
|
||||
|
||||
// If we have some data stored from our peer, we send it to them before doing anything else.
|
||||
peerStorage.data.foreach(connectionReady.peerConnection ! PeerStorageRetrieval(_))
|
||||
|
||||
// let's bring existing/requested channels online
|
||||
channels.values.toSet[ActorRef].foreach(_ ! INPUT_RECONNECTED(connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit)) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
|
||||
|
||||
|
@ -769,14 +819,14 @@ class Peer(val nodeParams: NodeParams,
|
|||
if (Features.canUseFeature(connectionReady.localInit.features, connectionReady.remoteInit.features, Features.FundingFeeCredit)) {
|
||||
if (feeCredit.isEmpty) {
|
||||
// We read the fee credit from the database on the first connection attempt.
|
||||
// We keep track of the latest credit afterwards and don't need to read it from the DB at every reconnection.
|
||||
// We keep track of the latest credit afterwards and don't need to read it from the DB at every reconnection.
|
||||
feeCredit = Some(nodeParams.db.liquidity.getFeeCredit(remoteNodeId))
|
||||
}
|
||||
log.info("reconnecting with fee credit = {}", feeCredit)
|
||||
connectionReady.peerConnection ! CurrentFeeCredit(nodeParams.chainHash, feeCredit.getOrElse(0 msat))
|
||||
}
|
||||
|
||||
goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, feerates, None)
|
||||
goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, activeChannels, feerates, None, peerStorage)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -849,7 +899,10 @@ class Peer(val nodeParams: NodeParams,
|
|||
// resume the openChannelInterceptor in case of failure, we always want the open channel request to succeed or fail
|
||||
private val openChannelInterceptor = context.spawnAnonymous(Behaviors.supervise(OpenChannelInterceptor(context.self.toTyped, nodeParams, remoteNodeId, wallet, pendingChannelsRateLimiter)).onFailure(typed.SupervisorStrategy.resume))
|
||||
|
||||
private def stopPeer(): State = {
|
||||
private def stopPeer(peerStorage: PeerStorage): State = {
|
||||
if (!peerStorage.written) {
|
||||
peerStorage.data.foreach(nodeParams.db.peers.updateStorage(remoteNodeId, _))
|
||||
}
|
||||
log.info("removing peer from db")
|
||||
cancelUnsignedOnTheFlyFunding()
|
||||
nodeParams.db.peers.removePeer(remoteNodeId)
|
||||
|
@ -884,6 +937,7 @@ class Peer(val nodeParams: NodeParams,
|
|||
Logs.mdc(LogCategory(currentMessage), Some(remoteNodeId), Logs.channelId(currentMessage), nodeAlias_opt = Some(nodeParams.alias))
|
||||
}
|
||||
|
||||
private val WritePeerStorageTimerKey = "peer-storage-write"
|
||||
}
|
||||
|
||||
object Peer {
|
||||
|
@ -911,12 +965,20 @@ object Peer {
|
|||
case class TemporaryChannelId(id: ByteVector32) extends ChannelId
|
||||
case class FinalChannelId(id: ByteVector32) extends ChannelId
|
||||
|
||||
case class PeerStorage(data: Option[ByteVector], written: Boolean)
|
||||
|
||||
sealed trait Data {
|
||||
def channels: Map[_ <: ChannelId, ActorRef] // will be overridden by Map[FinalChannelId, ActorRef] or Map[ChannelId, ActorRef]
|
||||
def activeChannels: Set[ByteVector32] // channels that are available to process payments
|
||||
def peerStorage: PeerStorage
|
||||
}
|
||||
case object Nothing extends Data { override def channels = Map.empty }
|
||||
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef]) extends Data
|
||||
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates]) extends Data {
|
||||
case object Nothing extends Data {
|
||||
override def channels = Map.empty
|
||||
override def activeChannels: Set[ByteVector32] = Set.empty
|
||||
override def peerStorage: PeerStorage = PeerStorage(None, written = true)
|
||||
}
|
||||
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], activeChannels: Set[ByteVector32], peerStorage: PeerStorage) extends Data
|
||||
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], activeChannels: Set[ByteVector32], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage) extends Data {
|
||||
val connectionInfo: ConnectionInfo = ConnectionInfo(address, peerConnection, localInit, remoteInit)
|
||||
def localFeatures: Features[InitFeature] = localInit.features
|
||||
def remoteFeatures: Features[InitFeature] = remoteInit.features
|
||||
|
@ -1029,5 +1091,7 @@ object Peer {
|
|||
case class RelayOnionMessage(messageId: ByteVector32, msg: OnionMessage, replyTo_opt: Option[typed.ActorRef[Status]])
|
||||
|
||||
case class RelayUnknownMessage(unknownMessage: UnknownMessage)
|
||||
|
||||
case object WritePeerStorage
|
||||
// @formatter:on
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ import fr.acinq.eclair.wire.protocol.CommonCodecs._
|
|||
import fr.acinq.eclair.{Features, InitFeature, KamonExt}
|
||||
import scodec.bits.{BinStringSyntax, BitVector, ByteVector}
|
||||
import scodec.codecs._
|
||||
import scodec.{Attempt, Codec}
|
||||
import scodec.{Attempt, Codec, Err}
|
||||
|
||||
/**
|
||||
* Created by PM on 15/11/2016.
|
||||
|
@ -389,6 +389,14 @@ object LightningMessageCodecs {
|
|||
("onionPacket" | MessageOnionCodecs.messageOnionPacketCodec) ::
|
||||
("tlvStream" | OnionMessageTlv.onionMessageTlvCodec)).as[OnionMessage]
|
||||
|
||||
val peerStorageStore: Codec[PeerStorageStore] = (
|
||||
("blob" | variableSizeBytes(uint16, bytes)) ::
|
||||
("tlvStream" | PeerStorageTlv.peerStorageTlvCodec)).as[PeerStorageStore]
|
||||
|
||||
val peerStorageRetrieval: Codec[PeerStorageRetrieval] = (
|
||||
("blob" | variableSizeBytes(uint16, bytes)) ::
|
||||
("tlvStream" | PeerStorageTlv.peerStorageTlvCodec)).as[PeerStorageRetrieval]
|
||||
|
||||
// NB: blank lines to minimize merge conflicts
|
||||
|
||||
//
|
||||
|
@ -476,6 +484,8 @@ object LightningMessageCodecs {
|
|||
val lightningMessageCodec = discriminated[LightningMessage].by(uint16)
|
||||
.typecase(1, warningCodec)
|
||||
.typecase(2, stfuCodec)
|
||||
.typecase(7, peerStorageStore)
|
||||
.typecase(9, peerStorageRetrieval)
|
||||
.typecase(16, initCodec)
|
||||
.typecase(17, errorCodec)
|
||||
.typecase(18, pingCodec)
|
||||
|
|
|
@ -602,6 +602,10 @@ case class GossipTimestampFilter(chainHash: BlockHash, firstTimestamp: Timestamp
|
|||
|
||||
case class OnionMessage(pathKey: PublicKey, onionRoutingPacket: OnionRoutingPacket, tlvStream: TlvStream[OnionMessageTlv] = TlvStream.empty) extends LightningMessage
|
||||
|
||||
case class PeerStorageStore(blob: ByteVector, tlvStream: TlvStream[PeerStorageTlv] = TlvStream.empty) extends SetupMessage
|
||||
|
||||
case class PeerStorageRetrieval(blob: ByteVector, tlvStream: TlvStream[PeerStorageTlv] = TlvStream.empty) extends SetupMessage
|
||||
|
||||
// NB: blank lines to minimize merge conflicts
|
||||
|
||||
//
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Copyright 2024 ACINQ SAS
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package fr.acinq.eclair.wire.protocol
|
||||
|
||||
import fr.acinq.eclair.wire.protocol.CommonCodecs.varint
|
||||
import fr.acinq.eclair.wire.protocol.TlvCodecs.tlvStream
|
||||
import scodec.Codec
|
||||
import scodec.codecs.discriminated
|
||||
|
||||
/**
|
||||
* Created by thomash on July 2024.
|
||||
*/
|
||||
|
||||
sealed trait PeerStorageTlv extends Tlv
|
||||
|
||||
object PeerStorageTlv {
|
||||
val peerStorageTlvCodec: Codec[TlvStream[PeerStorageTlv]] = tlvStream(discriminated[PeerStorageTlv].by(varint))
|
||||
}
|
|
@ -108,6 +108,7 @@ object TestConstants {
|
|||
Features.StaticRemoteKey -> FeatureSupport.Mandatory,
|
||||
Features.Quiescence -> FeatureSupport.Optional,
|
||||
Features.SplicePrototype -> FeatureSupport.Optional,
|
||||
Features.ProvideStorage -> FeatureSupport.Optional,
|
||||
),
|
||||
unknown = Set(UnknownFeature(TestFeature.optional))
|
||||
),
|
||||
|
@ -240,6 +241,7 @@ object TestConstants {
|
|||
willFundRates_opt = Some(defaultLiquidityRates),
|
||||
peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds),
|
||||
onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds),
|
||||
peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds, cleanUpFrequency = 1 hour)
|
||||
)
|
||||
|
||||
def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams(
|
||||
|
@ -416,6 +418,7 @@ object TestConstants {
|
|||
willFundRates_opt = Some(defaultLiquidityRates),
|
||||
peerWakeUpConfig = PeerReadyNotifier.WakeUpConfig(enabled = false, timeout = 30 seconds),
|
||||
onTheFlyFundingConfig = OnTheFlyFunding.Config(proposalTimeout = 90 seconds),
|
||||
peerStorageConfig = PeerStorageConfig(writeDelay = 5 seconds, removalDelay = 10 seconds, cleanUpFrequency = 1 hour)
|
||||
)
|
||||
|
||||
def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams(
|
||||
|
|
|
@ -18,12 +18,13 @@ package fr.acinq.eclair.db
|
|||
|
||||
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
|
||||
import fr.acinq.eclair.TestDatabases.{TestPgDatabases, TestSqliteDatabases}
|
||||
import fr.acinq.eclair._
|
||||
import fr.acinq.eclair.db.pg.PgPeersDb
|
||||
import fr.acinq.eclair.db.sqlite.SqlitePeersDb
|
||||
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
|
||||
import fr.acinq.eclair._
|
||||
import fr.acinq.eclair.wire.protocol.{NodeAddress, Tor2, Tor3}
|
||||
import org.scalatest.funsuite.AnyFunSuite
|
||||
import scodec.bits.HexStringSyntax
|
||||
|
||||
import java.util.concurrent.Executors
|
||||
import scala.concurrent.duration._
|
||||
|
@ -107,4 +108,39 @@ class PeersDbSpec extends AnyFunSuite {
|
|||
}
|
||||
}
|
||||
|
||||
test("add/update/remove peer storage") {
|
||||
forAllDbs { dbs =>
|
||||
val db = dbs.peers
|
||||
|
||||
val a = randomKey().publicKey
|
||||
val b = randomKey().publicKey
|
||||
|
||||
assert(db.getStorage(a) == None)
|
||||
assert(db.getStorage(b) == None)
|
||||
db.updateStorage(a, hex"012345")
|
||||
assert(db.getStorage(a) == Some(hex"012345"))
|
||||
assert(db.getStorage(b) == None)
|
||||
db.updateStorage(a, hex"6789")
|
||||
assert(db.getStorage(a) == Some(hex"6789"))
|
||||
assert(db.getStorage(b) == None)
|
||||
db.updateStorage(b, hex"abcd")
|
||||
assert(db.getStorage(a) == Some(hex"6789"))
|
||||
assert(db.getStorage(b) == Some(hex"abcd"))
|
||||
|
||||
// Actively used storage shouldn't be removed.
|
||||
db.removePeerStorage(TimestampSecond.now() + 1.hour)
|
||||
assert(db.getStorage(a) == Some(hex"6789"))
|
||||
assert(db.getStorage(b) == Some(hex"abcd"))
|
||||
|
||||
// After removing the peer, peer storage can be removed.
|
||||
db.removePeer(a)
|
||||
assert(db.getStorage(a) == Some(hex"6789"))
|
||||
db.removePeerStorage(TimestampSecond.now() - 1.hour)
|
||||
assert(db.getStorage(a) == Some(hex"6789"))
|
||||
db.removePeerStorage(TimestampSecond.now() + 1.hour)
|
||||
assert(db.getStorage(a) == None)
|
||||
assert(db.getStorage(b) == Some(hex"abcd"))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ import fr.acinq.eclair.wire.protocol
|
|||
import fr.acinq.eclair.wire.protocol._
|
||||
import org.scalatest.Inside.inside
|
||||
import org.scalatest.{Tag, TestData}
|
||||
import scodec.bits.ByteVector
|
||||
import scodec.bits.{ByteVector, HexStringSyntax}
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
import java.nio.channels.ServerSocketChannel
|
||||
|
@ -108,11 +108,14 @@ class PeerSpec extends FixtureSpec {
|
|||
|
||||
def cleanupFixture(fixture: FixtureParam): Unit = fixture.cleanup()
|
||||
|
||||
def connect(remoteNodeId: PublicKey, peer: TestFSMRef[Peer.State, Peer.Data, Peer], peerConnection: TestProbe, switchboard: TestProbe, channels: Set[PersistentChannelData] = Set.empty, remoteInit: protocol.Init = protocol.Init(Bob.nodeParams.features.initFeatures()))(implicit system: ActorSystem): Unit = {
|
||||
def connect(remoteNodeId: PublicKey, peer: TestFSMRef[Peer.State, Peer.Data, Peer], peerConnection: TestProbe, switchboard: TestProbe, channels: Set[PersistentChannelData] = Set.empty, remoteInit: protocol.Init = protocol.Init(Bob.nodeParams.features.initFeatures()), initializePeer: Boolean = true, peerStorage: Option[ByteVector] = None)(implicit system: ActorSystem): Unit = {
|
||||
// let's simulate a connection
|
||||
switchboard.send(peer, Peer.Init(channels, Map.empty))
|
||||
if (initializePeer) {
|
||||
switchboard.send(peer, Peer.Init(channels, Map.empty))
|
||||
}
|
||||
val localInit = protocol.Init(peer.underlyingActor.nodeParams.features.initFeatures())
|
||||
switchboard.send(peer, PeerConnection.ConnectionReady(peerConnection.ref, remoteNodeId, fakeIPAddress, outgoing = true, localInit, remoteInit))
|
||||
peerStorage.foreach(data => peerConnection.expectMsg(PeerStorageRetrieval(data)))
|
||||
peerConnection.expectMsgType[RecommendedFeerates]
|
||||
val probe = TestProbe()
|
||||
probe.send(peer, Peer.GetPeerInfo(Some(probe.ref.toTyped)))
|
||||
|
@ -752,6 +755,39 @@ class PeerSpec extends FixtureSpec {
|
|||
channel.expectMsg(open)
|
||||
}
|
||||
|
||||
test("peer storage") { f =>
|
||||
import f._
|
||||
|
||||
// We connect with a previous backup.
|
||||
val channel = ChannelCodecsSpec.normal
|
||||
val peerConnection1 = peerConnection
|
||||
nodeParams.db.peers.updateStorage(remoteNodeId, hex"abcdef")
|
||||
connect(remoteNodeId, peer, peerConnection1, switchboard, channels = Set(channel), peerStorage = Some(hex"abcdef"))
|
||||
peer ! ChannelReadyForPayments(ActorRef.noSender, channel.remoteNodeId, channel.channelId, channel.commitments.latest.fundingTxIndex)
|
||||
peerConnection1.send(peer, PeerStorageStore(hex"deadbeef"))
|
||||
peerConnection1.send(peer, PeerStorageStore(hex"0123456789"))
|
||||
|
||||
// We disconnect and reconnect, sending the last backup we received.
|
||||
peer ! Peer.Disconnect(f.remoteNodeId)
|
||||
val peerConnection2 = TestProbe()
|
||||
connect(remoteNodeId, peer, peerConnection2, switchboard, channels = Set(ChannelCodecsSpec.normal), initializePeer = false, peerStorage = Some(hex"0123456789"))
|
||||
peerConnection2.send(peer, PeerStorageStore(hex"1111"))
|
||||
|
||||
// We reconnect again.
|
||||
val peerConnection3 = TestProbe()
|
||||
connect(remoteNodeId, peer, peerConnection3, switchboard, channels = Set(ChannelCodecsSpec.normal), initializePeer = false, peerStorage = Some(hex"1111"))
|
||||
// Because of the delayed writes, we may not have stored the latest value immediately, but we will eventually store it.
|
||||
eventually {
|
||||
assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111"))
|
||||
}
|
||||
|
||||
// Our channel closes, so we stop storing backups for that peer.
|
||||
peer ! LocalChannelDown(ActorRef.noSender, channel.channelId, channel.shortIds, channel.remoteNodeId)
|
||||
peerConnection3.send(peer, PeerStorageStore(hex"2222"))
|
||||
assert(!peer.isTimerActive("peer-storage-write"))
|
||||
assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111"))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object PeerSpec {
|
||||
|
|
|
@ -20,7 +20,7 @@ import akka.testkit.{TestFSMRef, TestProbe}
|
|||
import fr.acinq.bitcoin.scalacompat.Block
|
||||
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
|
||||
import fr.acinq.eclair._
|
||||
import fr.acinq.eclair.io.Peer.ChannelId
|
||||
import fr.acinq.eclair.io.Peer.{ChannelId, PeerStorage}
|
||||
import fr.acinq.eclair.io.ReconnectionTask.WaitingData
|
||||
import fr.acinq.eclair.tor.Socks5ProxyParams
|
||||
import fr.acinq.eclair.wire.protocol.{Color, NodeAddress, NodeAnnouncement, RecommendedFeerates}
|
||||
|
@ -38,8 +38,8 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
|
|||
private val recommendedFeerates = RecommendedFeerates(Block.RegtestGenesisBlock.hash, TestConstants.feeratePerKw, TestConstants.anchorOutputsFeeratePerKw)
|
||||
|
||||
private val PeerNothingData = Peer.Nothing
|
||||
private val PeerDisconnectedData = Peer.DisconnectedData(channels)
|
||||
private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, recommendedFeerates, None)
|
||||
private val PeerDisconnectedData = Peer.DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(None, written = true))
|
||||
private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, activeChannels = Set.empty, recommendedFeerates, None, PeerStorage(None, written = true))
|
||||
|
||||
case class FixtureParam(nodeParams: NodeParams, remoteNodeId: PublicKey, reconnectionTask: TestFSMRef[ReconnectionTask.State, ReconnectionTask.Data, ReconnectionTask], monitor: TestProbe)
|
||||
|
||||
|
@ -82,7 +82,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
|
|||
import f._
|
||||
|
||||
val peer = TestProbe()
|
||||
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty)))
|
||||
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, activeChannels = Set.empty, PeerStorage(None, written = true))))
|
||||
monitor.expectNoMessage()
|
||||
}
|
||||
|
||||
|
|
|
@ -777,4 +777,17 @@ class LightningMessageCodecsSpec extends AnyFunSuite {
|
|||
assert(updateAddHtlcCodec.encode(decoded.value).require.bytes == bin)
|
||||
}
|
||||
}
|
||||
|
||||
test("encode/decode peer storage messages") {
|
||||
val testCases = Seq(
|
||||
hex"0007 0003 012345" -> PeerStorageStore(hex"012345"),
|
||||
hex"0009 0002 abcd" -> PeerStorageRetrieval(hex"abcd"),
|
||||
)
|
||||
for ((bin, ref) <- testCases) {
|
||||
val decoded = lightningMessageCodec.decode(bin.bits).require
|
||||
assert(decoded.value == ref)
|
||||
assert(decoded.remainder.isEmpty)
|
||||
assert(lightningMessageCodec.encode(ref).require.bytes == bin)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue