Move initialization of sync into PeerManager's queue (#5250)

This commit is contained in:
Chris Stewart 2023-10-03 13:16:20 -05:00 committed by GitHub
parent 4972d0a368
commit 83cf657a0b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 10 additions and 2 deletions

View file

@ -63,7 +63,7 @@ case class NeutrinoNode(
peerManager.randomPeerWithService(serviceIdentifier).isDefined) peerManager.randomPeerWithService(serviceIdentifier).isDefined)
for { for {
_ <- peerAvailableF _ <- peerAvailableF
_ <- peerManager.syncHelper(None) _ <- peerManager.sync(None)
} yield () } yield ()
} }

View file

@ -32,6 +32,7 @@ object NodeStreamMessage {
case class SendResponseTimeout(peer: Peer, payload: NetworkPayload) case class SendResponseTimeout(peer: Peer, payload: NetworkPayload)
extends NodeStreamMessage extends NodeStreamMessage
case class StartSync(peerOpt: Option[Peer]) extends NodeStreamMessage
case class SendToPeer(msg: NetworkMessage, peerOpt: Option[Peer]) case class SendToPeer(msg: NetworkMessage, peerOpt: Option[Peer])
case class Initialized(peer: Peer) case class Initialized(peer: Peer)

View file

@ -776,6 +776,8 @@ case class PeerManager(
NodeStreamMessage, NodeStreamMessage,
Future[DataMessageHandler]] = { Future[DataMessageHandler]] = {
Sink.foldAsync(initDmh) { Sink.foldAsync(initDmh) {
case (dmh, s: StartSync) =>
syncHelper(s.peerOpt).map(_ => dmh)
case (dmh, DataMessageWrapper(payload, peer)) => case (dmh, DataMessageWrapper(payload, peer)) =>
logger.debug(s"Got ${payload.commandName} from peer=${peer} in stream") logger.debug(s"Got ${payload.commandName} from peer=${peer} in stream")
val peerDataOpt = peerDataMap.get(peer) val peerDataOpt = peerDataMap.get(peer)
@ -952,11 +954,16 @@ case class PeerManager(
@volatile private[this] var syncFilterCancellableOpt: Option[Cancellable] = @volatile private[this] var syncFilterCancellableOpt: Option[Cancellable] =
None None
def sync(syncPeerOpt: Option[Peer]): Future[Unit] = {
val s = StartSync(syncPeerOpt)
offer(s).map(_ => ())
}
/** Helper method to sync the blockchain over the network /** Helper method to sync the blockchain over the network
* *
* @param syncPeerOpt if syncPeer is given, we send [[org.bitcoins.core.p2p.GetHeadersMessage]] to that peer. If None we gossip GetHeadersMessage to all peers * @param syncPeerOpt if syncPeer is given, we send [[org.bitcoins.core.p2p.GetHeadersMessage]] to that peer. If None we gossip GetHeadersMessage to all peers
*/ */
def syncHelper(syncPeerOpt: Option[Peer]): Future[Unit] = { private def syncHelper(syncPeerOpt: Option[Peer]): Future[Unit] = {
logger.debug( logger.debug(
s"syncHelper() syncPeerOpt=$syncPeerOpt isStarted.get=${isStarted.get} syncFilterCancellableOpt.isDefined=${syncFilterCancellableOpt.isDefined}") s"syncHelper() syncPeerOpt=$syncPeerOpt isStarted.get=${isStarted.get} syncFilterCancellableOpt.isDefined=${syncFilterCancellableOpt.isDefined}")
val chainApi: ChainApi = ChainHandler.fromDatabase() val chainApi: ChainApi = ChainHandler.fromDatabase()