2023 08 01 issue 5174 (#5176)

* Move methods out of PeerManager.onInitialization()

* Add PersistentPeerData, QueriedPeerData

* Segregate PeerData -> {AttemptToConnectPeerData, PersistentPeerData}, handle the cases differently in managePeerAfterInitialization()

* Remove call to sync() in BitcoinSServerMain

* Fix bug where we were attempting to stop peers that had already had their connections fail

* reduce log level for peer discovery failures
This commit is contained in:
Chris Stewart 2023-08-04 11:18:27 -05:00 committed by GitHub
parent 147f7782e5
commit 08e780a884
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 134 additions and 71 deletions

View File

@ -280,7 +280,6 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
_ <- callbacksF
node <- startedNodeF
_ <- startedTorConfigF
_ <- node.sync()
} yield {
nodeOpt = Some(node)
logger.info(

View File

@ -14,15 +14,18 @@ import scala.concurrent.duration.DurationInt
/** PeerData contains objects specific to a peer associated together
*/
case class PeerData(
peer: Peer,
controlMessageHandler: ControlMessageHandler,
queue: SourceQueueWithComplete[NodeStreamMessage],
peerMessageSenderApi: PeerMessageSenderApi
)(implicit
system: ActorSystem,
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig) {
sealed trait PeerData {
implicit protected def nodeAppConfig: NodeAppConfig
implicit protected def chainAppConfig: ChainAppConfig
implicit protected def system: ActorSystem
def peer: Peer
def controlMessageHandler: ControlMessageHandler
def queue: SourceQueueWithComplete[NodeStreamMessage]
def peerMessageSenderApi: PeerMessageSenderApi
private val initPeerMessageRecv = PeerMessageReceiver(
controlMessageHandler = controlMessageHandler,
@ -30,14 +33,15 @@ case class PeerData(
peer = peer,
state = PeerMessageReceiverState.fresh())
def stop(): Future[Unit] = {
peerMessageSender.disconnect()
}
val peerMessageSender: PeerMessageSender = {
PeerMessageSender(peer, initPeerMessageRecv, peerMessageSenderApi)
}
def stop(): Future[Unit] = {
peerMessageSender.disconnect()
}
private var _serviceIdentifier: Option[ServiceIdentifier] = None
private[this] var _serviceIdentifier: Option[ServiceIdentifier] = None
def serviceIdentifier: ServiceIdentifier = {
_serviceIdentifier.getOrElse(
@ -48,6 +52,19 @@ case class PeerData(
def setServiceIdentifier(serviceIdentifier: ServiceIdentifier): Unit = {
_serviceIdentifier = Some(serviceIdentifier)
}
}
/** A peer we plan on being connected to persistently */
case class PersistentPeerData(
peer: Peer,
controlMessageHandler: ControlMessageHandler,
queue: SourceQueueWithComplete[NodeStreamMessage],
peerMessageSenderApi: PeerMessageSenderApi
)(implicit
override val system: ActorSystem,
override val nodeAppConfig: NodeAppConfig,
override val chainAppConfig: ChainAppConfig)
extends PeerData {
private var _invalidMessagesCount: Int = 0
@ -76,3 +93,18 @@ case class PeerData(
_invalidMessagesCount > nodeAppConfig.maxInvalidResponsesAllowed
}
}
/** A peer we are just discovering on the p2p network for future connections
* we do not want to be persistently connected to this peer, just see if
* we can connect to it and exchange version/verack messages
*/
case class AttemptToConnectPeerData(
peer: Peer,
controlMessageHandler: ControlMessageHandler,
queue: SourceQueueWithComplete[NodeStreamMessage],
peerMessageSenderApi: PeerMessageSenderApi
)(implicit
override val system: ActorSystem,
override val nodeAppConfig: NodeAppConfig,
override val chainAppConfig: ChainAppConfig)
extends PeerData

View File

@ -146,13 +146,13 @@ case class PeerFinder(
.filterNot(p => skipPeers().contains(p) || _peerData.contains(p))
logger.debug(s"Trying next set of peers $peers")
val peersF = Future.traverse(peers)(tryPeer)
val peersF = Future.traverse(peers)(tryToAttemptToConnectPeer)
peersF.onComplete {
case Success(_) =>
isConnectionSchedulerRunning.set(false)
case Failure(err) =>
isConnectionSchedulerRunning.set(false)
logger.error(
logger.debug(
s"Failed to connect to peers=$peers errMsg=${err.getMessage}")
}
} else {
@ -232,19 +232,33 @@ case class PeerFinder(
stopF
}
private def tryToAttemptToConnectPeer(peer: Peer): Future[Unit] = {
logger.debug(s"tryToAttemptToConnectPeer=$peer")
_peerData.put(peer,
AttemptToConnectPeerData(peer,
controlMessageHandler,
queue,
peerMessageSenderApi))
_peerData(peer).peerMessageSender.connect()
}
/** creates and initialises a new test peer */
private def tryPeer(peer: Peer): Future[Unit] = {
logger.debug(s"tryPeer=$peer")
_peerData.put(
peer,
PeerData(peer, controlMessageHandler, queue, peerMessageSenderApi))
_peerData.put(peer,
PersistentPeerData(peer,
controlMessageHandler,
queue,
peerMessageSenderApi))
_peerData(peer).peerMessageSender.connect()
}
private def tryToReconnectPeer(peer: Peer): Future[Unit] = {
_peerData.put(
peer,
PeerData(peer, controlMessageHandler, queue, peerMessageSenderApi))
_peerData.put(peer,
PersistentPeerData(peer,
controlMessageHandler,
queue,
peerMessageSenderApi))
_peerData(peer).peerMessageSender.reconnect()
}
@ -264,12 +278,15 @@ case class PeerFinder(
_peerData(peer).setServiceIdentifier(serviceIdentifier)
}
def popFromCache(peer: Peer): Option[PeerData] = {
if (_peerData.contains(peer))
_peerData.remove(peer)
else {
logger.debug(s"removeFromCache: $peer not found in peerData")
None
def popFromCache(peer: Peer): Option[PersistentPeerData] = {
_peerData.get(peer) match {
case Some(persistentPeerData: PersistentPeerData) =>
_peerData.remove(peer)
Some(persistentPeerData)
case Some(_: AttemptToConnectPeerData) => None
case None =>
logger.debug(s"removeFromCache: $peer not found in peerData")
None
}
}

View File

@ -62,7 +62,9 @@ case class PeerManager(
with SourceQueue[NodeStreamMessage]
with P2PLogger {
private val isStarted: AtomicBoolean = new AtomicBoolean(false)
private val _peerDataMap: mutable.Map[Peer, PeerData] = mutable.Map.empty
private val _peerDataMap: mutable.Map[Peer, PersistentPeerData] =
mutable.Map.empty
/** holds peers removed from peerData whose client actors are not stopped yet. Used for runtime sanity checks. */
private val _waitingForDisconnection: mutable.Set[Peer] = mutable.Set.empty
@ -382,9 +384,10 @@ case class PeerManager(
}
}
private def peerDataMap: Map[Peer, PeerData] = _peerDataMap.toMap
private def peerDataMap: Map[Peer, PersistentPeerData] = _peerDataMap.toMap
def getPeerData(peer: Peer): Option[PeerData] = peerDataMap.get(peer)
def getPeerData(peer: Peer): Option[PersistentPeerData] =
peerDataMap.get(peer)
override def stop(): Future[PeerManager] = {
logger.info(s"Stopping PeerManager")
@ -475,6 +478,44 @@ case class PeerManager(
}
/** Helper method to determine what action to take after a peer is initialized, such as beginning sync with that peer */
private def managePeerAfterInitialization(
finder: PeerFinder,
peerData: PeerData,
hasCf: Boolean): Future[Unit] = {
val peer = peerData.peer
peerData match {
case _: PersistentPeerData =>
//if we have slots remaining, connect
if (connectedPeerCount < nodeAppConfig.maxConnectedPeers) {
connectPeer(peer)
.flatMap(_ => syncHelper(Some(peer)))
} else {
val notCf = peerDataMap
.filter(p => !p._2.serviceIdentifier.nodeCompactFilters)
.keys
//try to drop another non compact filter connection for this
if (hasCf && notCf.nonEmpty)
replacePeer(replacePeer = notCf.head, withPeer = peer)
.flatMap(_ => syncHelper(Some(peer)))
else {
peerData.stop()
}
}
case q: AttemptToConnectPeerData =>
if (finder.hasPeer(q.peer)) {
//if we still have an active connection with this peer, stop it
q.stop()
} else {
//else it already has been deleted because of connection issues
Future.unit
}
}
}
private def onInitialization(
peer: Peer,
state: NodeState): Future[NodeState] = {
@ -494,43 +535,12 @@ case class PeerManager(
val hasCf = serviceIdentifer.nodeCompactFilters
logger.debug(s"Initialized peer $peer with $hasCf")
def sendAddrReq: Future[Unit] = {
sendGetAddrMessage(Some(peer))
}
def managePeerF(): Future[Unit] = {
//if we have slots remaining, connect
if (connectedPeerCount < nodeAppConfig.maxConnectedPeers) {
connectPeer(peer)
} else {
lazy val notCf = peerDataMap
.filter(p => !p._2.serviceIdentifier.nodeCompactFilters)
.keys
//try to drop another non compact filter connection for this
if (hasCf && notCf.nonEmpty)
replacePeer(replacePeer = notCf.head, withPeer = peer)
else {
//no use for this apart from writing in db
//we do want to give it enough time to send addr messages
AsyncUtil
.nonBlockingSleep(duration = 10.seconds)
.flatMap { _ =>
//could have already been deleted in case of connection issues
finder.getData(peer) match {
case Some(p) => p.stop()
case None => Future.unit
}
}
}
}
}
for {
_ <- sendAddrReq
_ <- sendGetAddrMessage(Some(peer))
_ <- createInDb(peer, peerData.serviceIdentifier)
_ <- managePeerF()
_ <- syncHelper(Some(peer))
_ <- managePeerAfterInitialization(finder = finder,
peerData = peerData,
hasCf = hasCf)
} yield state
} else if (peerDataMap.contains(peer)) {
@ -1087,7 +1097,7 @@ case class PeerManager(
@volatile private[this] var inactivityCancellableOpt: Option[Cancellable] =
None
private def inactivityChecks(peerData: PeerData): Future[Unit] = {
private def inactivityChecks(peerData: PersistentPeerData): Future[Unit] = {
if (peerData.isConnectionTimedOut) {
val stopF = peerData.stop()
stopF

View File

@ -16,7 +16,12 @@ import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models._
import org.bitcoins.node.networking.peer.NodeState._
import org.bitcoins.node.util.PeerMessageSenderApi
import org.bitcoins.node.{NodeStreamMessage, P2PLogger, PeerData, PeerManager}
import org.bitcoins.node.{
NodeStreamMessage,
P2PLogger,
PeerManager,
PersistentPeerData
}
import java.time.Instant
import scala.concurrent.{ExecutionContext, Future}
@ -54,7 +59,7 @@ case class DataMessageHandler(
def handleDataPayload(
payload: DataPayload,
peerData: PeerData): Future[DataMessageHandler] = {
peerData: PersistentPeerData): Future[DataMessageHandler] = {
state match {
case syncState: SyncNodeState =>
syncState match {
@ -118,7 +123,7 @@ case class DataMessageHandler(
*/
private def handleDataPayloadValidState(
payload: DataPayload,
peerData: PeerData): Future[DataMessageHandler] = {
peerData: PersistentPeerData): Future[DataMessageHandler] = {
val peer = peerData.peer
val wrappedFuture: Future[Future[DataMessageHandler]] = Future {
payload match {
@ -471,7 +476,7 @@ case class DataMessageHandler(
/** Recover the data message handler if we received an invalid block header from a peer */
private def recoverInvalidHeader(
peerData: PeerData): Future[DataMessageHandler] = {
peerData: PersistentPeerData): Future[DataMessageHandler] = {
val result = state match {
case state @ (HeaderSync(_, _) | DoneSyncing(_)) =>
val peer = peerData.peer