1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-03-13 11:35:47 +01:00

Address @thomash-acinq comments

- Use separate function when updating features only
- Remove DB write before channel ready for incoming connection
This commit is contained in:
t-bast 2025-01-13 10:41:12 +01:00
parent 0c9b78cb9a
commit ad482dc766
No known key found for this signature in database
GPG key ID: 34F377B0100ED6BB
8 changed files with 95 additions and 86 deletions

View file

@ -12,8 +12,8 @@ import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.relay.OnTheFlyFunding
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, NodeInfo}
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli, TimestampSecond}
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{CltvExpiry, Features, InitFeature, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli, TimestampSecond}
import grizzled.slf4j.Logging
import scodec.bits.ByteVector
@ -264,9 +264,14 @@ case class DualPeersDb(primary: PeersDb, secondary: PeersDb) extends PeersDb {
private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-peers").build()))
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, nodeInfo: NodeInfo): Unit = {
runAsync(secondary.addOrUpdatePeer(nodeId, nodeInfo))
primary.addOrUpdatePeer(nodeId, nodeInfo)
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, address: NodeAddress, features: Features[InitFeature]): Unit = {
runAsync(secondary.addOrUpdatePeer(nodeId, address, features))
primary.addOrUpdatePeer(nodeId, address, features)
}
override def addOrUpdatePeerFeatures(nodeId: Crypto.PublicKey, features: Features[InitFeature]): Unit = {
runAsync(secondary.addOrUpdatePeerFeatures(nodeId, features))
primary.addOrUpdatePeerFeatures(nodeId, features)
}
override def removePeer(nodeId: Crypto.PublicKey): Unit = {

View file

@ -17,15 +17,19 @@
package fr.acinq.eclair.db
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.{Features, InitFeature, TimestampSecond}
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair.wire.protocol.{NodeAddress, NodeInfo}
import fr.acinq.eclair.{Features, InitFeature, TimestampSecond}
import scodec.bits.ByteVector
/** The PeersDb contains information about our direct peers, with whom we have or had channels. */
trait PeersDb {
def addOrUpdatePeer(nodeId: PublicKey, nodeInfo: NodeInfo): Unit
/** Update our DB with a verified address and features for the given peer. */
def addOrUpdatePeer(nodeId: PublicKey, address: NodeAddress, features: Features[InitFeature]): Unit
/** Update our DB with the features for the given peer, without updating its address. */
def addOrUpdatePeerFeatures(nodeId: PublicKey, features: Features[InitFeature]): Unit
def removePeer(nodeId: PublicKey): Unit

View file

@ -24,7 +24,7 @@ import fr.acinq.eclair.db.PeersDb
import fr.acinq.eclair.db.pg.PgUtils.PgLock
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{Features, MilliSatoshi, TimestampSecond}
import fr.acinq.eclair.{Features, InitFeature, MilliSatoshi, TimestampSecond}
import grizzled.slf4j.Logging
import scodec.bits.ByteVector
@ -95,37 +95,38 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
}
}
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, nodeInfo: NodeInfo): Unit = withMetrics("peers/add-or-update", DbBackends.Postgres) {
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, address: NodeAddress, features: Features[InitFeature]): Unit = withMetrics("peers/add-or-update", DbBackends.Postgres) {
withLock { pg =>
nodeInfo.address_opt match {
case Some(address) =>
val encodedAddress = CommonCodecs.nodeaddress.encode(address).require.toByteArray
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(nodeInfo.features).require.toByteArray
using(pg.prepareStatement(
"""
| INSERT INTO local.peers (node_id, node_address, node_features)
| VALUES (?, ?, ?)
| ON CONFLICT (node_id)
| DO UPDATE SET node_address = EXCLUDED.node_address, node_features = EXCLUDED.node_features
|""".stripMargin)) { statement =>
statement.setString(1, nodeId.value.toHex)
statement.setBytes(2, encodedAddress)
statement.setBytes(3, encodedFeatures)
statement.executeUpdate()
}
case None =>
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(nodeInfo.features).require.toByteArray
using(pg.prepareStatement(
"""
| INSERT INTO local.peers (node_id, node_address, node_features)
| VALUES (?, NULL, ?)
| ON CONFLICT (node_id)
| DO UPDATE SET node_features = EXCLUDED.node_features
|""".stripMargin)) { statement =>
statement.setString(1, nodeId.value.toHex)
statement.setBytes(2, encodedFeatures)
statement.executeUpdate()
}
val encodedAddress = CommonCodecs.nodeaddress.encode(address).require.toByteArray
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(features).require.toByteArray
using(pg.prepareStatement(
"""
| INSERT INTO local.peers (node_id, node_address, node_features)
| VALUES (?, ?, ?)
| ON CONFLICT (node_id)
| DO UPDATE SET node_address = EXCLUDED.node_address, node_features = EXCLUDED.node_features
|""".stripMargin)) { statement =>
statement.setString(1, nodeId.value.toHex)
statement.setBytes(2, encodedAddress)
statement.setBytes(3, encodedFeatures)
statement.executeUpdate()
}
}
}
override def addOrUpdatePeerFeatures(nodeId: Crypto.PublicKey, features: Features[InitFeature]): Unit = withMetrics("peers/add-or-update", DbBackends.Postgres) {
withLock { pg =>
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(features).require.toByteArray
using(pg.prepareStatement(
"""
| INSERT INTO local.peers (node_id, node_address, node_features)
| VALUES (?, NULL, ?)
| ON CONFLICT (node_id)
| DO UPDATE SET node_features = EXCLUDED.node_features
|""".stripMargin)) { statement =>
statement.setString(1, nodeId.value.toHex)
statement.setBytes(2, encodedFeatures)
statement.executeUpdate()
}
}
}

View file

@ -18,15 +18,15 @@ package fr.acinq.eclair.db.sqlite
import fr.acinq.bitcoin.scalacompat.Crypto
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.{Features, InitFeature, MilliSatoshi, TimestampSecond}
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db.PeersDb
import fr.acinq.eclair.db.sqlite.SqliteUtils.{getVersion, setVersion, using}
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{Features, InitFeature, MilliSatoshi, TimestampSecond}
import grizzled.slf4j.Logging
import scodec.bits.{BitVector, ByteVector}
import scodec.bits.ByteVector
import java.sql.{Connection, Statement}
@ -82,30 +82,35 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging {
setVersion(statement, DB_NAME, CURRENT_VERSION)
}
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, nodeInfo: NodeInfo): Unit = withMetrics("peers/add-or-update", DbBackends.Sqlite) {
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(nodeInfo.features).require.toByteArray
val encodedAddress_opt = nodeInfo.address_opt.map(address => CommonCodecs.nodeaddress.encode(address).require.toByteArray)
val unknownPeer = encodedAddress_opt match {
case Some(encodedAddress) =>
using(sqlite.prepareStatement("UPDATE peers SET node_address=?, node_features=? WHERE node_id=?")) { update =>
update.setBytes(1, encodedAddress)
update.setBytes(2, encodedFeatures)
update.setBytes(3, nodeId.value.toArray)
update.executeUpdate() == 0
}
case None =>
using(sqlite.prepareStatement("UPDATE peers SET node_features=? WHERE node_id=?")) { update =>
update.setBytes(1, encodedFeatures)
update.setBytes(2, nodeId.value.toArray)
update.executeUpdate() == 0
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, address: NodeAddress, features: Features[InitFeature]): Unit = withMetrics("peers/add-or-update", DbBackends.Sqlite) {
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(features).require.toByteArray
val encodedAddress = CommonCodecs.nodeaddress.encode(address).require.toByteArray
using(sqlite.prepareStatement("UPDATE peers SET node_address=?, node_features=? WHERE node_id=?")) { update =>
update.setBytes(1, encodedAddress)
update.setBytes(2, encodedFeatures)
update.setBytes(3, nodeId.value.toArray)
if (update.executeUpdate() == 0) {
using(sqlite.prepareStatement("INSERT INTO peers VALUES (?, ?, ?)")) { statement =>
statement.setBytes(1, nodeId.value.toArray)
statement.setBytes(2, encodedAddress)
statement.setBytes(3, encodedFeatures)
statement.executeUpdate()
}
}
}
if (unknownPeer) {
using(sqlite.prepareStatement("INSERT INTO peers VALUES (?, ?, ?)")) { statement =>
statement.setBytes(1, nodeId.value.toArray)
statement.setBytes(2, encodedAddress_opt.orNull)
statement.setBytes(3, encodedFeatures)
statement.executeUpdate()
}
override def addOrUpdatePeerFeatures(nodeId: Crypto.PublicKey, features: Features[InitFeature]): Unit = withMetrics("peers/add-or-update", DbBackends.Sqlite) {
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(features).require.toByteArray
using(sqlite.prepareStatement("UPDATE peers SET node_features=? WHERE node_id=?")) { update =>
update.setBytes(1, encodedFeatures)
update.setBytes(2, nodeId.value.toArray)
if (update.executeUpdate() == 0) {
using(sqlite.prepareStatement("INSERT INTO peers VALUES (?, NULL, ?)")) { statement =>
statement.setBytes(1, nodeId.value.toArray)
statement.setBytes(2, encodedFeatures)
statement.executeUpdate()
}
}
}
}

View file

@ -155,7 +155,7 @@ class Peer(val nodeParams: NodeParams,
val remoteFeatures_opt = d.remoteFeatures_opt match {
case Some(remoteFeatures) if !remoteFeatures.written =>
// We have a channel, so we can write to the DB without any DoS risk.
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(remoteFeatures.features, None))
nodeParams.db.peers.addOrUpdatePeerFeatures(remoteNodeId, remoteFeatures.features)
Some(remoteFeatures.copy(written = true))
case _ => d.remoteFeatures_opt
}
@ -458,7 +458,7 @@ class Peer(val nodeParams: NodeParams,
}
if (!d.remoteFeaturesWritten) {
// We have a channel, so we can write to the DB without any DoS risk.
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(d.remoteFeatures, None))
nodeParams.db.peers.addOrUpdatePeerFeatures(remoteNodeId, d.remoteFeatures)
}
stay() using d.copy(activeChannels = d.activeChannels + e.channelId, remoteFeaturesWritten = true)
@ -827,14 +827,8 @@ class Peer(val nodeParams: NodeParams,
if (connectionReady.outgoing) {
// We store the node address and features upon successful outgoing connection, so we can reconnect later.
// The previous address is overwritten: we don't need it since the current one works.
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(connectionReady.remoteInit.features, Some(connectionReady.address)))
} else if (channels.nonEmpty) {
// If this is an incoming connection, we only store the peer details in our DB if we have channels with them.
// Otherwise nodes could DoS by simply connecting to us to force us to store data in our DB.
// We don't update the remote address, we don't know if we would successfully connect using the current one.
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(connectionReady.remoteInit.features, None))
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, connectionReady.address, connectionReady.remoteInit.features)
}
val remoteFeaturesWritten = connectionReady.outgoing || channels.nonEmpty
// If we have some data stored from our peer, we send it to them before doing anything else.
peerStorage.data.foreach(connectionReady.peerConnection ! PeerStorageRetrieval(_))
@ -856,7 +850,7 @@ class Peer(val nodeParams: NodeParams,
connectionReady.peerConnection ! CurrentFeeCredit(nodeParams.chainHash, feeCredit.getOrElse(0 msat))
}
goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, activeChannels, feerates, None, peerStorage, remoteFeaturesWritten)
goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, activeChannels, feerates, None, peerStorage, remoteFeaturesWritten = connectionReady.outgoing)
}
/**

View file

@ -60,24 +60,24 @@ class PeersDbSpec extends AnyFunSuite {
val peer3 = TestCase(randomKey().publicKey, NodeInfo(Features.empty, Some(Tor3("mrl2d3ilhctt2vw4qzvmz3etzjvpnc6dczliq5chrxetthgbuczuggyd", 4231))))
assert(db.listPeers().isEmpty)
db.addOrUpdatePeer(peer1a.nodeId, peer1a.nodeInfo)
db.addOrUpdatePeer(peer1a.nodeId, peer1a.nodeInfo.address_opt.get, peer1a.nodeInfo.features)
assert(db.getPeer(peer1a.nodeId).contains(peer1a.nodeInfo))
assert(db.getPeer(peer2a.nodeId).isEmpty)
db.addOrUpdatePeer(peer1a.nodeId, peer1a.nodeInfo) // duplicate is ignored
db.addOrUpdatePeer(peer1a.nodeId, peer1a.nodeInfo.address_opt.get, peer1a.nodeInfo.features) // duplicate is ignored
assert(db.listPeers().size == 1)
db.addOrUpdatePeer(peer2a.nodeId, peer2a.nodeInfo)
db.addOrUpdatePeer(peer3.nodeId, peer3.nodeInfo)
db.addOrUpdatePeerFeatures(peer2a.nodeId, peer2a.nodeInfo.features)
db.addOrUpdatePeer(peer3.nodeId, peer3.nodeInfo.address_opt.get, peer3.nodeInfo.features)
assert(db.listPeers().map(p => TestCase(p._1, p._2)).toSet == Set(peer1a, peer2a, peer3))
db.addOrUpdatePeer(peer2b.nodeId, peer2b.nodeInfo)
db.addOrUpdatePeer(peer2b.nodeId, peer2b.nodeInfo.address_opt.get, peer2b.nodeInfo.features)
assert(db.listPeers().map(p => TestCase(p._1, p._2)).toSet == Set(peer1a, peer2b, peer3))
db.removePeer(peer2b.nodeId)
assert(db.listPeers().map(p => TestCase(p._1, p._2)).toSet == Set(peer1a, peer3))
// The first peer updates its address without changing its features.
db.addOrUpdatePeer(peer1b.nodeId, peer1b.nodeInfo)
db.addOrUpdatePeer(peer1b.nodeId, peer1b.nodeInfo.address_opt.get, peer1b.nodeInfo.features)
assert(db.getPeer(peer1b.nodeId).contains(peer1b.nodeInfo))
assert(db.listPeers().map(p => TestCase(p._1, p._2)).toSet == Set(peer1b, peer3))
// The first peer updates its features without an address: the previous address should be kept.
db.addOrUpdatePeer(peer1c.nodeId, peer1c.nodeInfo.copy(address_opt = None))
db.addOrUpdatePeerFeatures(peer1c.nodeId, peer1c.nodeInfo.features)
assert(db.getPeer(peer1c.nodeId).contains(peer1c.nodeInfo))
assert(db.listPeers().map(p => TestCase(p._1, p._2)).toSet == Set(peer1c, peer3))
}
@ -89,7 +89,7 @@ class PeersDbSpec extends AnyFunSuite {
implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))
val Success(peerAddress) = NodeAddress.fromParts("127.0.0.1", 42000)
val futures = for (_ <- 0 until 10000) yield {
Future(db.addOrUpdatePeer(randomKey().publicKey, NodeInfo(Features.empty, Some(peerAddress))))
Future(db.addOrUpdatePeer(randomKey().publicKey, peerAddress, Features.empty))
}
val res = Future.sequence(futures)
Await.result(res, 60 seconds)
@ -187,7 +187,7 @@ class PeersDbSpec extends AnyFunSuite {
val postMigrationDb = new SqlitePeersDb(connection)
assert(postMigrationDb.listPeers() == peerInfos)
val updatedPeerInfo1 = peerInfos(peerId1).copy(features = Features(Features.DataLossProtect -> FeatureSupport.Mandatory))
postMigrationDb.addOrUpdatePeer(peerId1, updatedPeerInfo1.copy(address_opt = None))
postMigrationDb.addOrUpdatePeerFeatures(peerId1, updatedPeerInfo1.features)
assert(postMigrationDb.getPeer(peerId1).contains(updatedPeerInfo1))
}
)
@ -215,7 +215,7 @@ class PeersDbSpec extends AnyFunSuite {
val postMigrationDb = dbs.peers
assert(postMigrationDb.listPeers() == peerInfos)
val updatedPeerInfo1 = peerInfos(peerId1).copy(features = Features(Features.ChannelType -> FeatureSupport.Optional))
postMigrationDb.addOrUpdatePeer(peerId1, updatedPeerInfo1.copy(address_opt = None))
postMigrationDb.addOrUpdatePeerFeatures(peerId1, updatedPeerInfo1.features)
assert(postMigrationDb.getPeer(peerId1).contains(updatedPeerInfo1))
}
)

View file

@ -849,7 +849,7 @@ class PeerSpec extends FixtureSpec {
// We have information about one of our peers in our DB.
val nodeInfo = NodeInfo(TestConstants.Bob.nodeParams.features.initFeatures(), None)
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, nodeInfo)
nodeParams.db.peers.addOrUpdatePeerFeatures(remoteNodeId, nodeInfo.features)
// We initialize ourselves after a restart, but our peer doesn't reconnect immediately to us.
switchboard.send(peer, Peer.Init(Set(ChannelCodecsSpec.normal), Map.empty))

View file

@ -112,7 +112,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
val probe = TestProbe()
val peer = TestProbe()
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(Features.empty, Some(fakeIPAddress)))
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, fakeIPAddress, Features.empty)
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, PeerDisconnectedData))
val TransitionWithData(ReconnectionTask.IDLE, ReconnectionTask.WAITING, _, _) = monitor.expectMsgType[TransitionWithData]
probe.send(reconnectionTask, ReconnectionTask.TickReconnect)
@ -161,7 +161,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
import f._
val peer = TestProbe()
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(Features.empty, Some(fakeIPAddress)))
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, fakeIPAddress, Features.empty)
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, PeerDisconnectedData))
val TransitionWithData(ReconnectionTask.IDLE, ReconnectionTask.WAITING, _, _) = monitor.expectMsgType[TransitionWithData]
val TransitionWithData(ReconnectionTask.WAITING, ReconnectionTask.CONNECTING, _, connectingData: ReconnectionTask.ConnectingData) = monitor.expectMsgType[TransitionWithData]
@ -189,7 +189,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
import f._
val peer = TestProbe()
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(Features.empty, Some(fakeIPAddress)))
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, fakeIPAddress, Features.empty)
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, PeerDisconnectedData))
val TransitionWithData(ReconnectionTask.IDLE, ReconnectionTask.WAITING, _, _) = monitor.expectMsgType[TransitionWithData]
val TransitionWithData(ReconnectionTask.WAITING, ReconnectionTask.CONNECTING, _, _: ReconnectionTask.ConnectingData) = monitor.expectMsgType[TransitionWithData]