2024 03 07 Add NodeState.FilterOrFilterHeaderSync, refactor PeerManager to use it (#5459)

* Refactor syncHelper() to return SyncNodeState rather than Unit

* Add FilterOrFilterHeaderSync, rework various methods in PeerManager to return Option[FilterOrFilterHeaderSyncState] to indicate if we are syncing rather than Future[Unit]

* Don't use getPeerMessageSender() if we can use syncPeerMessageSender()

* Fix case where syncpeer and new peer are different when creating a filter sync job

* Adjust createFilterSyncJob() return type, throw exception when we try to sync but PeerManager is not started

* Refactor to use SyncNodeState.{toFilterHeaderSync,toFilterSync}

* Cleanup comments
This commit is contained in:
Chris Stewart 2024-03-08 10:13:58 -06:00 committed by GitHub
parent e793c53dd7
commit 5967caeca7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 140 additions and 131 deletions

View File

@ -4,6 +4,8 @@ import org.bitcoins.core.api.node.{Peer, PeerWithServices}
import org.bitcoins.core.p2p.{CompactFilterMessage, ServiceIdentifier}
import org.bitcoins.node.NodeState.{
DoneSyncing,
FilterHeaderSync,
FilterSync,
MisbehavingPeer,
NodeShuttingDown,
RemovePeers
@ -191,6 +193,9 @@ sealed abstract class SyncNodeState extends NodeRunningState {
/** Services of our [[syncPeer]] */
def services: ServiceIdentifier = getPeerServices(syncPeer).get
def syncPeerMessageSender()(implicit nodeAppConfig: NodeAppConfig) =
getPeerMsgSender(syncPeer).get
def replaceSyncPeer(newSyncPeer: Peer): SyncNodeState = {
this match {
case h: NodeState.HeaderSync => h.copy(syncPeer = newSyncPeer)
@ -208,8 +213,23 @@ sealed abstract class SyncNodeState extends NodeRunningState {
replaceSyncPeer(p)
}
}
def toFilterHeaderSync: FilterHeaderSync = {
FilterHeaderSync(syncPeer, peerDataMap, waitingForDisconnection, peerFinder)
}
def toFilterSync: FilterSync = {
FilterSync(syncPeer = syncPeer,
peerDataMap = peerDataMap,
waitingForDisconnection = waitingForDisconnection,
filterBatchCache = Set.empty,
peerFinder = peerFinder)
}
}
/** Either we are syncing [[NodeState.FilterHeaderSync]] or [[NodeState.FilterSync]] */
sealed trait FilterOrFilterHeaderSync extends SyncNodeState
object NodeState {
case class HeaderSync(
@ -224,7 +244,7 @@ object NodeState {
peerDataMap: Map[PeerWithServices, PersistentPeerData],
waitingForDisconnection: Set[Peer],
peerFinder: PeerFinder)
extends SyncNodeState
extends FilterOrFilterHeaderSync
case class FilterSync(
syncPeer: Peer,
@ -232,7 +252,7 @@ object NodeState {
waitingForDisconnection: Set[Peer],
filterBatchCache: Set[CompactFilterMessage],
peerFinder: PeerFinder)
extends SyncNodeState {
extends FilterOrFilterHeaderSync {
override def toString: String = {
s"FilterSync(syncPeer=$syncPeer,peers=$peers,waitingForDisconnection=$waitingForDisconnection,filterBatchCache.size=${filterBatchCache.size})"

View File

@ -70,30 +70,31 @@ case class PeerManager(
bestFilterHeader: CompactFilterHeaderDb,
chainApi: ChainApi,
compactFilterStartHeightOpt: Option[Int],
nodeState: SyncNodeState)(implicit
chainAppConfig: ChainAppConfig): Future[Unit] = {
if (nodeState.services.nodeCompactFilters) {
val syncPeer = nodeState.syncPeer
val peerMsgSender = nodeState.getPeerMsgSender(syncPeer) match {
case Some(p) => p
case None =>
sys.error(s"Could not find peer=$syncPeer")
}
syncNodeState: SyncNodeState)(implicit
chainAppConfig: ChainAppConfig): Future[Option[FilterOrFilterHeaderSync]] = {
if (syncNodeState.services.nodeCompactFilters) {
val syncPeer = syncNodeState.syncPeer
val peerMsgSender = syncNodeState.syncPeerMessageSender()
val bestBlockHashF = chainApi.getBestBlockHash()
val sendCompactFilterHeaderMsgF = bestBlockHashF.flatMap {
bestBlockHash =>
PeerManager.sendNextGetCompactFilterHeadersCommand(
peerMessageSenderApi = peerMsgSender,
chainApi = chainApi,
filterHeaderBatchSize = chainAppConfig.filterHeaderBatchSize,
prevStopHash = bestFilterHeader.blockHashBE,
stopHash = bestBlockHash
)
}
val sendCompactFilterHeaderMsgF: Future[Option[FilterHeaderSync]] =
bestBlockHashF.flatMap { bestBlockHash =>
PeerManager
.sendNextGetCompactFilterHeadersCommand(
peerMessageSenderApi = peerMsgSender,
chainApi = chainApi,
filterHeaderBatchSize = chainAppConfig.filterHeaderBatchSize,
prevStopHash = bestFilterHeader.blockHashBE,
stopHash = bestBlockHash
)
.map { isSyncing =>
if (isSyncing) Some(syncNodeState.toFilterHeaderSync)
else None
}
}
sendCompactFilterHeaderMsgF.flatMap { isSyncFilterHeaders =>
sendCompactFilterHeaderMsgF.flatMap { fhsOpt =>
// If we have started syncing filters
if (!isSyncFilterHeaders) {
if (fhsOpt.isEmpty) {
PeerManager
.sendNextGetCompactFilterCommand(
peerMessageSenderApi = peerMsgSender,
@ -103,15 +104,18 @@ case class PeerManager(
stopBlockHash = bestFilterHeader.blockHashBE,
peer = syncPeer
)
.map(_ => ())
.map { syncing =>
if (syncing) Some(syncNodeState.toFilterSync)
else None
}
} else {
Future.unit
Future.successful(fhsOpt)
}
}
} else {
logger.warn(
s"Cannot syncCompactFilters() with peer=${nodeState.syncPeer} as the peer doesn't support block filters")
Future.unit
s"Cannot syncCompactFilters() with peer=${syncNodeState.syncPeer} as the peer doesn't support block filters")
Future.successful(None)
}
}
@ -295,10 +299,10 @@ case class PeerManager(
state match {
case s: SyncNodeState =>
val x = s.replaceSyncPeer(peer)
syncHelper(x).map(_ => x)
syncHelper(x)
case d: DoneSyncing =>
val h = d.toHeaderSync(peer)
syncHelper(h).map(_ => h)
syncHelper(h)
case x @ (_: RemovePeers | _: MisbehavingPeer |
_: NodeShuttingDown) =>
Future.successful(x)
@ -370,7 +374,7 @@ case class PeerManager(
val rm = state.removePeer(disconnectedPeer)
rm match {
case s: SyncNodeState =>
syncHelper(s).map(_ => s)
syncHelper(s)
case d: DoneSyncing =>
//defensively try to sync with the new peer
//this headerSync is not safe, need to exclude peer we are disconnencting
@ -379,7 +383,7 @@ case class PeerManager(
.asInstanceOf[DoneSyncing]
.toHeaderSync
hsOpt match {
case Some(hs) => syncHelper(hs).map(_ => hs)
case Some(hs) => syncHelper(hs)
case None =>
//no peers available to sync with, so return DoneSyncing
Future.successful(d)
@ -490,7 +494,7 @@ case class PeerManager(
case Some(p) =>
state match {
case s: SyncNodeState if !s.waitingForDisconnection.contains(p) =>
switchSyncToPeer(s, p).map(Some(_))
switchSyncToPeer(s, p)
case s: SyncNodeState =>
logger.warn(
s"Ignoring sync request for peer=${p} as its waiting for disconnection")
@ -562,10 +566,10 @@ case class PeerManager(
s"Connected to peer $peer with compact filter support=$hasCf. Connected peer count ${runningState.peerDataMap.size}")
state match {
case s: SyncNodeState =>
syncHelper(s).map(_ => s)
syncHelper(s)
case d: DoneSyncing =>
val x = d.toHeaderSync(c.peer)
syncHelper(x).map(_ => x)
syncHelper(x)
case x @ (_: MisbehavingPeer | _: RemovePeers |
_: NodeShuttingDown) =>
Future.successful(x)
@ -843,25 +847,25 @@ case class PeerManager(
private def switchSyncToPeer(
oldSyncState: SyncNodeState,
newPeer: Peer): Future[SyncNodeState] = {
newPeer: Peer): Future[Option[SyncNodeState]] = {
logger.debug(
s"switchSyncToPeer() oldSyncState=$oldSyncState newPeer=$newPeer")
val newState = oldSyncState.replaceSyncPeer(newPeer)
oldSyncState match {
newState match {
case s: HeaderSync =>
if (s.syncPeer != newPeer) {
syncHelper(newState).map(_ => newState)
syncHelper(s).map(Some(_))
} else {
//if its same peer we don't need to switch
Future.successful(oldSyncState)
Future.successful(Some(oldSyncState))
}
case s @ (_: FilterHeaderSync | _: FilterSync) =>
if (s.syncPeer != newPeer) {
case fofhs: FilterOrFilterHeaderSync =>
if (oldSyncState.syncPeer != newPeer) {
filterSyncHelper(chainApi = ChainHandler.fromDatabase(),
syncNodeState = newState).map(_ => newState)
fofhs = fofhs)
} else {
//if its same peer we don't need to switch
Future.successful(oldSyncState)
Future.successful(Some(fofhs))
}
}
@ -869,7 +873,7 @@ case class PeerManager(
/** If [[syncPeerOpt]] is given, we send getheaders to only that peer, if no sync peer given we gossip getheaders to all our peers */
private def getHeaderSyncHelper(
syncNodeState: SyncNodeState): Future[Unit] = {
headerSync: HeaderSync): Future[HeaderSync] = {
val blockchainsF =
BlockHeaderDAO()(ec, chainAppConfig).getBlockchains()
@ -877,48 +881,38 @@ case class PeerManager(
blockchains <- blockchainsF
// Get all of our cached headers in case of a reorg
cachedHeaders = blockchains.flatMap(_.headers).map(_.hashBE)
_ <- {
syncNodeState.getPeerMsgSender(syncNodeState.syncPeer) match {
case Some(peerMsgSender) =>
peerMsgSender.sendGetHeadersMessage(cachedHeaders)
case None =>
gossipGetHeadersMessage(cachedHeaders)
}
}
} yield ()
_ <- headerSync
.syncPeerMessageSender()
.sendGetHeadersMessage(cachedHeaders)
} yield headerSync
}
private def filterSyncHelper(
chainApi: ChainApi,
syncNodeState: SyncNodeState): Future[Unit] = {
fofhs: FilterOrFilterHeaderSync): Future[
Option[FilterOrFilterHeaderSync]] = {
for {
header <- chainApi.getBestBlockHeader()
bestFilterHeaderOpt <- chainApi.getBestFilterHeader()
bestFilterOpt <- chainApi.getBestFilter()
hasStaleTip <- chainApi.isTipStale()
_ <- {
resultOpt <- {
if (hasStaleTip) {
//if we have a stale tip, we will request to sync filter headers / filters
//after we are done syncing block headers
Future.unit
Future.successful(None)
} else {
val fhs = FilterHeaderSync(
syncPeer = syncNodeState.syncPeer,
peerDataMap = syncNodeState.peerDataMap,
waitingForDisconnection = syncNodeState.waitingForDisconnection,
syncNodeState.peerFinder
)
syncFilters(
bestFilterHeaderOpt = bestFilterHeaderOpt,
bestFilterOpt = bestFilterOpt,
bestBlockHeader = header,
chainApi = chainApi,
nodeState = fhs
fofhs = fofhs
)
}
}
} yield ()
} yield resultOpt
}
/** Scheduled job to sync compact filters */
@ -933,20 +927,19 @@ case class PeerManager(
/** Helper method to sync the blockchain over the network
*
* @param syncPeerOpt if syncPeer is given, we send [[org.bitcoins.core.p2p.GetHeadersMessage]] to that peer. If None we gossip GetHeadersMessage to all peers
* @param syncNodeState the state we should attempt to sync with
*/
private def syncHelper(syncNodeState: SyncNodeState): Future[Unit] = {
private def syncHelper(
syncNodeState: SyncNodeState): Future[SyncNodeState] = {
val syncPeer = syncNodeState.syncPeer
logger.debug(
s"syncHelper() syncPeer=$syncPeer isStarted.get=${isStarted.get} syncFilterCancellableOpt.isDefined=${syncFilterCancellableOpt.isDefined}")
s"syncHelper() syncNodeState=$syncNodeState isStarted.get=${isStarted.get} syncFilterCancellableOpt.isDefined=${syncFilterCancellableOpt.isDefined}")
val chainApi: ChainApi = ChainHandler.fromDatabase()
val headerF = chainApi.getBestBlockHeader()
val filterHeaderCountF = chainApi.getFilterHeaderCount()
val filterCountF = chainApi.getFilterCount()
for {
_ <- chainApi.setSyncing(true)
_ <- getHeaderSyncHelper(syncNodeState)
_ = {
val syncF = chainApi.setSyncing(true)
val resultF: Future[SyncNodeState] = syncNodeState match {
case h: HeaderSync =>
getHeaderSyncHelper(h)
case fofhs: FilterOrFilterHeaderSync =>
if (isStarted.get) {
//in certain cases, we can schedule this job while the peer manager is attempting to shutdown
//this is because we start syncing _after_ the connection to the peer is established
@ -957,21 +950,33 @@ case class PeerManager(
// 2. Shutting down the peer manager.
//
// the filter sync job gets scheduled _after_ PeerManager.stop() has been called
syncFilterCancellableOpt = syncFilterCancellableOpt match {
case s: Some[(Peer, Cancellable)] =>
s //do nothing as we already have a job scheduled
syncFilterCancellableOpt match {
case Some((p, _)) =>
//do nothing as we already have a job scheduled
val replaced = fofhs.replaceSyncPeer(p)
Future.successful(replaced)
case None =>
val c = createFilterSyncJob(chainApi, syncNodeState)
Some(c)
val job = createFilterSyncJob(chainApi, fofhs)
syncFilterCancellableOpt = Some((job._1.syncPeer, job._2))
Future.successful(job._1)
}
} else {
val exn = new RuntimeException(
s"Cannot start sync when PeerManager is not started")
Future.failed(exn)
}
}
header <- headerF
filterHeaderCount <- filterHeaderCountF
filterCount <- filterCountF
}
for {
header <- chainApi.getBestBlockHeader()
filterHeaderCount <- chainApi.getFilterHeaderCount()
filterCount <- chainApi.getFilterCount()
_ <- syncF
result <- resultF
} yield {
logger.info(
s"Starting sync node, height=${header.height} hash=${header.hashBE.hex} filterHeaderCount=$filterHeaderCount filterCount=$filterCount syncPeer=$syncPeer")
result
}
}
@ -995,7 +1000,9 @@ case class PeerManager(
private def createFilterSyncJob(
chainApi: ChainApi,
syncNodeState: SyncNodeState): (Peer, Cancellable) = {
fofhs: FilterOrFilterHeaderSync): (
FilterOrFilterHeaderSync,
Cancellable) = {
require(
syncFilterCancellableOpt.isEmpty,
s"Cannot schedule a syncFilterCancellable as one is already scheduled")
@ -1013,7 +1020,7 @@ case class PeerManager(
blockCount <- chainApi.getBlockCount()
currentFilterHeaderCount <- chainApi.getFilterHeaderCount()
currentFilterCount <- chainApi.getFilterCount()
_ <- {
resultOpt <- {
//make sure filter sync hasn't started since we schedule the job...
//see: https://github.com/bitcoin-s/bitcoin-s/issues/5167
val isOutOfSync = PeerManager.isFiltersOutOfSync(
@ -1026,12 +1033,12 @@ case class PeerManager(
if (isOutOfSync) {
//if it hasn't started it, start it
filterSyncHelper(chainApi, syncNodeState)
filterSyncHelper(chainApi, fofhs)
} else {
Future.unit
Future.successful(None)
}
}
} yield ()
} yield resultOpt
}
filterSyncF.onComplete {
case scala.util.Success(_) =>
@ -1043,7 +1050,7 @@ case class PeerManager(
()
}
(syncNodeState.syncPeer, cancellable)
(fofhs, cancellable)
}
/** Returns true if filter are in sync with their old counts, but out of sync with our block count */
@ -1053,31 +1060,31 @@ case class PeerManager(
bestFilterOpt: Option[CompactFilterDb],
bestBlockHeader: BlockHeaderDb,
chainApi: ChainApi,
nodeState: SyncNodeState): Future[Unit] = {
fofhs: FilterOrFilterHeaderSync): Future[
Option[FilterOrFilterHeaderSync]] = {
val isTipStaleF = chainApi.isTipStale()
isTipStaleF.flatMap { isTipStale =>
if (isTipStale) {
logger.error(
s"Cannot start syncing filters while blockchain tip is stale")
Future.unit
Future.successful(None)
} else {
logger.debug(
s"syncFilters() bestBlockHeader=$bestBlockHeader bestFilterHeaderOpt=$bestFilterHeaderOpt bestFilterOpt=$bestFilterOpt state=$nodeState")
s"syncFilters() bestBlockHeader=$bestBlockHeader bestFilterHeaderOpt=$bestFilterHeaderOpt bestFilterOpt=$bestFilterOpt state=$fofhs")
// If we have started syncing filters headers
(bestFilterHeaderOpt, bestFilterOpt) match {
case (None, None) | (None, Some(_)) =>
nodeState match {
fofhs match {
case fhs: FilterHeaderSync =>
val peerMsgSender =
nodeState.getPeerMsgSender(fhs.syncPeer).get
fofhs.syncPeerMessageSender()
PeerManager
.sendFirstGetCompactFilterHeadersCommand(
peerMessageSenderApi = peerMsgSender,
chainApi = chainApi,
stopBlockHeaderDb = bestBlockHeader,
state = fhs)
.map(_ => ())
case x @ (_: FilterSync | _: HeaderSync) =>
case x @ (_: FilterSync) =>
val exn = new RuntimeException(
s"Invalid state to start syncing filter headers with, got=$x")
Future.failed(exn)
@ -1098,12 +1105,12 @@ case class PeerManager(
//an old tip, our event driven node will start syncing
//filters after block headers are in sync
//do nothing
Future.unit
Future.successful(None)
} else {
syncCompactFilters(bestFilterHeader = bestFilterHeader,
chainApi = chainApi,
compactFilterStartHeightOpt = None,
nodeState = nodeState)
syncNodeState = fofhs)
}
case (Some(bestFilterHeader), None) =>
val compactFilterStartHeightOptF =
@ -1111,12 +1118,13 @@ case class PeerManager(
walletCreationTimeOpt)
for {
compactFilterStartHeightOpt <- compactFilterStartHeightOptF
_ <- syncCompactFilters(bestFilterHeader = bestFilterHeader,
chainApi = chainApi,
compactFilterStartHeightOpt =
compactFilterStartHeightOpt,
nodeState = nodeState)
} yield ()
resultOpt <- syncCompactFilters(bestFilterHeader =
bestFilterHeader,
chainApi = chainApi,
compactFilterStartHeightOpt =
compactFilterStartHeightOpt,
syncNodeState = fofhs)
} yield resultOpt
}
}
@ -1144,13 +1152,13 @@ case class PeerManager(
state match {
case sns: SyncNodeState =>
val newState = sns.replaceSyncPeer(syncPeer)
syncHelper(newState).map(_ => Some(newState))
syncHelper(newState).map(Some(_))
case d: DoneSyncing =>
val hs = HeaderSync(syncPeer,
d.peerDataMap,
d.waitingForDisconnection,
d.peerFinder)
syncHelper(hs).map(_ => Some(hs))
syncHelper(hs).map(Some(_))
case x @ (_: MisbehavingPeer | _: RemovePeers |
_: NodeShuttingDown) =>
Future.successful(Some(x))
@ -1199,7 +1207,7 @@ object PeerManager extends Logging {
peerMessageSenderApi: PeerMessageSenderApi,
chainApi: ChainApi,
stopBlockHeaderDb: BlockHeaderDb,
state: SyncNodeState)(implicit
state: FilterHeaderSync)(implicit
ec: ExecutionContext,
chainConfig: ChainAppConfig): Future[
Option[NodeState.FilterHeaderSync]] = {
@ -1240,13 +1248,7 @@ object PeerManager extends Logging {
case Some(filterSyncMarker) =>
peerMessageSenderApi
.sendGetCompactFilterHeadersMessage(filterSyncMarker)
.map(_ =>
Some(
FilterHeaderSync(syncPeer = state.syncPeer,
peerDataMap = state.peerDataMap,
waitingForDisconnection =
state.waitingForDisconnection,
state.peerFinder)))
.map(_ => Some(state))
case None =>
logger.info(
s"Filter headers are synced! filterHeader.blockHashBE=$blockHash")
@ -1315,7 +1317,7 @@ object PeerManager extends Logging {
}
def fetchCompactFilterHeaders(
state: SyncNodeState, //can we tighten this type up?
state: FilterHeaderSync,
chainApi: ChainApi,
peerMessageSenderApi: PeerMessageSenderApi,
stopBlockHeaderDb: BlockHeaderDb)(implicit

View File

@ -359,10 +359,7 @@ case class DataMessageHandler(
if (headerHeight > filterHeaderCount) {
logger.info(
s"Starting to fetch filter headers in data message handler")
val fhs = FilterHeaderSync(syncNodeState.syncPeer,
syncNodeState.peerDataMap,
syncNodeState.waitingForDisconnection,
syncNodeState.peerFinder)
val fhs = syncNodeState.toFilterHeaderSync
for {
syncingFilterHeadersState <- PeerManager
@ -485,13 +482,7 @@ case class DataMessageHandler(
)
.map { isSyncing =>
if (isSyncing) {
val newState = NodeState.FilterSync(
syncPeer = fs.syncPeer,
peerDataMap = fs.peerDataMap,
waitingForDisconnection = fs.waitingForDisconnection,
filterBatchCache = fs.filterBatchCache,
peerFinder = fs.peerFinder
)
val newState = fs.toFilterSync
Some(newState)
} else None
}
@ -506,11 +497,7 @@ case class DataMessageHandler(
val fs = syncNodeState match {
case x @ (_: HeaderSync | _: FilterHeaderSync) =>
FilterSync(x.syncPeer,
x.peerDataMap,
x.waitingForDisconnection,
Set.empty,
x.peerFinder)
x.toFilterSync
case fs: FilterSync => fs
}
@ -701,7 +688,7 @@ case class DataMessageHandler(
for {
lastBlockHeaderDbOpt <- chainApi.getHeader(lastHash.flip)
fhs <- PeerManager.fetchCompactFilterHeaders(
state = state,
state = state.toFilterHeaderSync,
chainApi = chainApi,
peerMessageSenderApi = peerMessageSenderApi,
stopBlockHeaderDb = lastBlockHeaderDbOpt.get)