diff --git a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala index a5233a4371..d2800d522c 100644 --- a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala +++ b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala @@ -63,7 +63,7 @@ case class NeutrinoNode( peerManager.randomPeerWithService(serviceIdentifier).isDefined) for { _ <- peerAvailableF - _ <- peerManager.syncHelper(None) + _ <- peerManager.sync(None) } yield () } diff --git a/node/src/main/scala/org/bitcoins/node/NodeStreamMessage.scala b/node/src/main/scala/org/bitcoins/node/NodeStreamMessage.scala index 9b56e0a1a6..d25b86e9ed 100644 --- a/node/src/main/scala/org/bitcoins/node/NodeStreamMessage.scala +++ b/node/src/main/scala/org/bitcoins/node/NodeStreamMessage.scala @@ -32,6 +32,7 @@ object NodeStreamMessage { case class SendResponseTimeout(peer: Peer, payload: NetworkPayload) extends NodeStreamMessage + case class StartSync(peerOpt: Option[Peer]) extends NodeStreamMessage case class SendToPeer(msg: NetworkMessage, peerOpt: Option[Peer]) case class Initialized(peer: Peer) diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index 8fec551562..1c1785b87c 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -776,6 +776,8 @@ case class PeerManager( NodeStreamMessage, Future[DataMessageHandler]] = { Sink.foldAsync(initDmh) { + case (dmh, s: StartSync) => + syncHelper(s.peerOpt).map(_ => dmh) case (dmh, DataMessageWrapper(payload, peer)) => logger.debug(s"Got ${payload.commandName} from peer=${peer} in stream") val peerDataOpt = peerDataMap.get(peer) @@ -952,11 +954,16 @@ case class PeerManager( @volatile private[this] var syncFilterCancellableOpt: Option[Cancellable] = None + def sync(syncPeerOpt: Option[Peer]): Future[Unit] = { + val s = StartSync(syncPeerOpt) + offer(s).map(_ => ()) + } + /** Helper method to sync the blockchain over the network * * @param syncPeerOpt if syncPeer is given, we send [[org.bitcoins.core.p2p.GetHeadersMessage]] to that peer. If None we gossip GetHeadersMessage to all peers */ - def syncHelper(syncPeerOpt: Option[Peer]): Future[Unit] = { + private def syncHelper(syncPeerOpt: Option[Peer]): Future[Unit] = { logger.debug( s"syncHelper() syncPeerOpt=$syncPeerOpt isStarted.get=${isStarted.get} syncFilterCancellableOpt.isDefined=${syncFilterCancellableOpt.isDefined}") val chainApi: ChainApi = ChainHandler.fromDatabase()