Multi peer: Sync with random peer (#3454)

* ensure random peer used for sync supports compact filters

* test file

* fix

* refactor

* rerun ci

* rerun ci

* changes from comments

* fix

* fix tests

* changes from comments

* refactor

* remove unintended diffs
This commit is contained in:
Shreyansh 2021-07-31 18:24:31 +05:30 committed by GitHub
parent 9aadf9053d
commit 1426c31483
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 164 additions and 45 deletions

View file

@ -0,0 +1,37 @@
package org.bitcoins.node
import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.node.NodeTestWithCachedBitcoindV19
import org.bitcoins.testkit.node.fixture.NeutrinoNodeConnectedWithBitcoind
import org.scalatest.{FutureOutcome, Outcome}
import scala.concurrent.Future
class NeutrinoUnsupportedPeerTest extends NodeTestWithCachedBitcoindV19 {
override protected def getFreshConfig: BitcoinSAppConfig =
BitcoinSTestAppConfig.getNeutrinoWithEmbeddedDbTestConfig(pgUrl)
override type FixtureParam = NeutrinoNodeConnectedWithBitcoind
override def withFixture(test: OneArgAsyncTest): FutureOutcome = {
val outcomeF: Future[Outcome] = for {
bitcoind <- cachedBitcoindWithFundsF
outcome = withNeutrinoNodeUnstarted(test, bitcoind)(system,
getFreshConfig)
f <- outcome.toFuture
} yield f
new FutureOutcome(outcomeF)
}
behavior of "NeutrinoNode"
it must "throw RuntimeException if peer does not support compact filters" in {
nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoind =>
val node = nodeConnectedWithBitcoind.node
val exception = recoverToExceptionIf[RuntimeException](node.start())
exception.map(e =>
assert(e.getMessage == "No peers supporting compact filters!"))
}
}

View file

@ -3,8 +3,7 @@ package org.bitcoins.node.networking.peer
import org.bitcoins.core.currency.BitcoinsInt
import org.bitcoins.core.p2p.TransactionMessage
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.node.OnTxReceived
import org.bitcoins.node.NodeCallbacks
import org.bitcoins.node.{NodeCallbacks, OnTxReceived}
import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.node.{
@ -66,7 +65,7 @@ class DataMessageHandlerNeutrinoNodesTest
DataMessageHandler(genesisChainApi)(node.executionContext,
node.nodeAppConfig,
node.chainConfig)
_ <- dataMessageHandler.handleDataPayload(payload, sender)
_ <- dataMessageHandler.handleDataPayload(payload, sender, node)
result <- resultP.future
} yield assert(result == tx)
}

View file

@ -48,7 +48,7 @@ class DataMessageHandlerTest extends NodeUnitTest {
chainApi.processHeaders(invalidPayload.headers))
// Verify we handle the payload correctly
_ <- dataMessageHandler.handleDataPayload(invalidPayload, sender)
_ <- dataMessageHandler.handleDataPayload(invalidPayload, sender, spv)
} yield succeed
}
@ -83,8 +83,8 @@ class DataMessageHandlerTest extends NodeUnitTest {
DataMessageHandler(genesisChainApi)(spv.executionContext,
spv.nodeAppConfig,
spv.chainConfig)
_ <- dataMessageHandler.handleDataPayload(payload1, sender)
_ <- dataMessageHandler.handleDataPayload(payload2, sender)
_ <- dataMessageHandler.handleDataPayload(payload1, sender, spv)
_ <- dataMessageHandler.handleDataPayload(payload2, sender, spv)
result <- resultP.future
} yield assert(result == ((merkleBlock, Vector(tx))))
}
@ -116,7 +116,7 @@ class DataMessageHandlerTest extends NodeUnitTest {
DataMessageHandler(genesisChainApi)(spv.executionContext,
spv.nodeAppConfig,
spv.chainConfig)
_ <- dataMessageHandler.handleDataPayload(payload, sender)
_ <- dataMessageHandler.handleDataPayload(payload, sender, spv)
result <- resultP.future
} yield assert(result == block)
}
@ -151,7 +151,7 @@ class DataMessageHandlerTest extends NodeUnitTest {
spv.nodeAppConfig,
spv.chainConfig)
_ <- dataMessageHandler.handleDataPayload(payload, sender)
_ <- dataMessageHandler.handleDataPayload(payload, sender, spv)
result <- resultP.future
} yield assert(result == Vector(header))
}
@ -184,7 +184,7 @@ class DataMessageHandlerTest extends NodeUnitTest {
spv.nodeAppConfig,
spv.chainConfig)
_ <- dataMessageHandler.handleDataPayload(payload, sender)
_ <- dataMessageHandler.handleDataPayload(payload, sender, spv)
result <- resultP.future
} yield assert(result == Vector((hash.flip, filter.filter)))
}

View file

@ -19,7 +19,7 @@ import scala.concurrent.Future
case class NeutrinoNode(
nodePeer: Vector[Peer],
dataMessageHandler: DataMessageHandler,
private var dataMessageHandler: DataMessageHandler,
nodeConfig: NodeAppConfig,
chainConfig: ChainAppConfig,
actorSystem: ActorSystem)
@ -36,9 +36,12 @@ case class NeutrinoNode(
override val peers: Vector[Peer] = nodePeer
override def getDataMessageHandler: DataMessageHandler = dataMessageHandler
override def updateDataMessageHandler(
dataMessageHandler: DataMessageHandler): NeutrinoNode = {
copy(dataMessageHandler = dataMessageHandler)
this.dataMessageHandler = dataMessageHandler
this
}
override def start(): Future[NeutrinoNode] = {
@ -46,8 +49,8 @@ case class NeutrinoNode(
node <- super.start()
chainApi <- chainApiFromDb()
bestHash <- chainApi.getBestBlockHash()
_ <- peerMsgSenders(0).sendGetCompactFilterCheckPointMessage(
stopHash = bestHash.flip)
_ <- randomPeerMsgSenderWithCompactFilters
.sendGetCompactFilterCheckPointMessage(stopHash = bestHash.flip)
} yield {
node.asInstanceOf[NeutrinoNode]
}
@ -75,7 +78,7 @@ case class NeutrinoNode(
blockchains <- blockchainsF
// Get all of our cached headers in case of a reorg
cachedHeaders = blockchains.flatMap(_.headers).map(_.hashBE.flip)
_ <- peerMsgSenders(0).sendGetHeadersMessage(cachedHeaders)
_ <- randomPeerMsgSender.sendGetHeadersMessage(cachedHeaders)
_ <- syncFilters(bestFilterHeaderOpt = bestFilterHeaderOpt,
bestFilterOpt = bestFilterOpt,
bestBlockHeader = header,
@ -128,10 +131,11 @@ case class NeutrinoNode(
chainApi: ChainApi,
bestFilterOpt: Option[CompactFilterDb]): Future[Unit] = {
val sendCompactFilterHeaderMsgF = {
peerMsgSenders(0).sendNextGetCompactFilterHeadersCommand(
chainApi = chainApi,
filterHeaderBatchSize = chainConfig.filterHeaderBatchSize,
prevStopHash = bestFilterHeader.blockHashBE)
randomPeerMsgSenderWithCompactFilters
.sendNextGetCompactFilterHeadersCommand(
chainApi = chainApi,
filterHeaderBatchSize = chainConfig.filterHeaderBatchSize,
prevStopHash = bestFilterHeader.blockHashBE)
}
sendCompactFilterHeaderMsgF.flatMap { isSyncFilterHeaders =>
// If we have started syncing filters
@ -143,7 +147,7 @@ case class NeutrinoNode(
//means we are not syncing filter headers, and our filters are NOT
//in sync with our compact filter headers
logger.info(s"Starting sync filters in NeutrinoNode.sync()")
peerMsgSenders(0)
randomPeerMsgSenderWithCompactFilters
.sendNextGetCompactFilterCommand(
chainApi = chainApi,
filterBatchSize = chainConfig.filterBatchSize,

View file

@ -11,7 +11,7 @@ import org.bitcoins.chain.models.{
}
import org.bitcoins.core.api.chain._
import org.bitcoins.core.api.node.NodeApi
import org.bitcoins.core.p2p.{NetworkPayload, TypeIdentifier}
import org.bitcoins.core.p2p.{NetworkPayload, ServiceIdentifier, TypeIdentifier}
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.node.config.NodeAppConfig
@ -27,9 +27,10 @@ import org.bitcoins.node.networking.peer.{
PeerMessageSender
}
import scala.collection.mutable
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.util.{Failure, Random, Success}
/** This a base trait for various kinds of nodes. It contains house keeping methods required for all nodes.
*/
@ -45,13 +46,45 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
val peers: Vector[Peer]
private val _peerServices: mutable.Map[Peer, ServiceIdentifier] =
mutable.Map.empty
def peerServices: Map[Peer, ServiceIdentifier] = _peerServices.toMap
def setPeerServices(
peer: Peer,
serviceIdentifier: ServiceIdentifier): Unit = {
_peerServices.put(peer, serviceIdentifier)
()
}
def randomPeerMsgSenderWithService(
f: ServiceIdentifier => Boolean): PeerMessageSender = {
val filteredPeers =
peerServices.filter(p => f(p._2)).keys.toVector
if (filteredPeers.isEmpty)
throw new RuntimeException("No peers supporting compact filters!")
val peer = filteredPeers(Random.nextInt(filteredPeers.length))
peerMsgSenders
.find(_.client.peer == peer)
.getOrElse(throw new RuntimeException("This should not happen."))
}
def randomPeerMsgSenderWithCompactFilters: PeerMessageSender = {
randomPeerMsgSenderWithService(_.nodeCompactFilters)
}
def randomPeerMsgSender: PeerMessageSender = {
peerMsgSenders(Random.nextInt(peerMsgSenders.length))
}
/** The current data message handler.
* It should be noted that the dataMessageHandler contains
* chainstate. When we update with a new chainstate, we need to
* maek sure we update the [[DataMessageHandler]] via [[updateDataMessageHandler()]]
* to make sure we don't corrupt our chainstate cache
*/
def dataMessageHandler: DataMessageHandler
def getDataMessageHandler: DataMessageHandler
def nodeCallbacks: NodeCallbacks = nodeAppConfig.nodeCallbacks
@ -137,7 +170,20 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
logger.error(
s"Failed to connect with peer=${peers(idx)} with err=$err"))
isInitializedF.map { _ =>
logger.info(s"Our peer=${peers(idx)} has been initialized")
nodeAppConfig.nodeType match {
case NodeType.NeutrinoNode => {
if (peerServices(peers(idx)).nodeCompactFilters) {
logger.info(s"Our peer=${peers(idx)} has been initialized")
} else {
logger.info(
s"Our peer=${peers(idx)} does not support compact filters. Disconnecting.")
peerMsgSenders(idx).disconnect()
}
}
case NodeType.SpvNode =>
case NodeType.BitcoindBackend =>
case NodeType.FullNode =>
}
}
}
@ -221,7 +267,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
// Get all of our cached headers in case of a reorg
cachedHeaders = blockchains.flatMap(_.headers).map(_.hashBE.flip)
_ <- peerMsgSenders(0).sendGetHeadersMessage(cachedHeaders)
_ <- randomPeerMsgSender.sendGetHeadersMessage(cachedHeaders)
} yield {
logger.info(
s"Starting sync node, height=${header.height} hash=${header.hashBE}")

View file

@ -36,6 +36,8 @@ case class SpvNode(
def bloomFilter: BloomFilter = _bloomFilter.atomicGet
override def getDataMessageHandler: DataMessageHandler = dataMessageHandler
def setBloomFilter(bloom: BloomFilter): SpvNode = {
_bloomFilter.atomicSet(bloom)
this

View file

@ -8,7 +8,7 @@ import org.bitcoins.core.p2p._
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.BroadcastAbleTransactionDAO
import org.bitcoins.node.{NodeType, P2PLogger}
import org.bitcoins.node.{Node, NodeType, P2PLogger}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Try
@ -40,7 +40,11 @@ case class DataMessageHandler(
def handleDataPayload(
payload: DataPayload,
peerMsgSender: PeerMessageSender): Future[DataMessageHandler] = {
peerMsgSender: PeerMessageSender,
node: Node): Future[DataMessageHandler] = {
lazy val peerWithCompactFilters = node.randomPeerMsgSenderWithCompactFilters
lazy val randomPeer = node.randomPeerMsgSender
val resultF = payload match {
case checkpoint: CompactFilterCheckPointMessage =>
@ -66,14 +70,15 @@ case class DataMessageHandler(
logger.info(
s"Received maximum amount of filter headers in one header message. This means we are not synced, requesting more")
sendNextGetCompactFilterHeadersCommand(
peerMsgSender,
peerWithCompactFilters,
filterHeader.stopHash.flip).map(_ => syncing)
} else {
logger.info(
s"Done syncing filter headers, beginning to sync filters in datamessagehandler")
sendFirstGetCompactFilterCommand(peerMsgSender).map { synced =>
if (!synced) logger.info("We are synced")
syncing
sendFirstGetCompactFilterCommand(peerWithCompactFilters).map {
synced =>
if (!synced) logger.info("We are synced")
syncing
}
}
newFilterHeaderHeight <- filterHeaderHeightOpt match {
@ -135,7 +140,8 @@ case class DataMessageHandler(
if (batchSizeFull) {
logger.info(
s"Received maximum amount of filters in one batch. This means we are not synced, requesting more")
sendNextGetCompactFilterCommand(peerMsgSender, newFilterHeight)
sendNextGetCompactFilterCommand(peerWithCompactFilters,
newFilterHeight)
} else Future.unit
} yield {
this.copy(
@ -207,7 +213,7 @@ case class DataMessageHandler(
if (count.toInt == HeadersMessage.MaxHeadersCount) {
logger.info(
s"Received maximum amount of headers in one header message. This means we are not synced, requesting more")
peerMsgSender
randomPeer
.sendGetHeadersMessage(lastHash)
.map(_ => syncing)
} else {
@ -227,7 +233,8 @@ case class DataMessageHandler(
) {
logger.info(
s"Starting to fetch filter headers in data message handler")
sendFirstGetCompactFilterHeadersCommand(peerMsgSender)
sendFirstGetCompactFilterHeadersCommand(
peerWithCompactFilters)
} else {
Try(initialSyncDone.map(_.success(Done)))
Future.successful(syncing)

View file

@ -45,7 +45,7 @@ class PeerMessageReceiver(
val peerMsgSender = PeerMessageSender(client)
peerMsgSender.sendVersionMessage(node.dataMessageHandler.chainApi)
peerMsgSender.sendVersionMessage(node.getDataMessageHandler.chainApi)
val newRecv = toState(newState)
@ -118,15 +118,16 @@ class PeerMessageReceiver(
sender: PeerMessageSender): Future[PeerMessageReceiver] = {
//else it means we are receiving this data payload from a peer,
//we need to handle it
node.dataMessageHandler.handleDataPayload(payload, sender).map { handler =>
val newNode = node.updateDataMessageHandler(handler)
new PeerMessageReceiver(newNode, state, peer)
node.getDataMessageHandler.handleDataPayload(payload, sender, node).map {
handler =>
val newNode = node.updateDataMessageHandler(handler)
new PeerMessageReceiver(newNode, state, peer)
}
}
/** Handles control payloads defined here https://bitcoin.org/en/developer-reference#control-messages
*
* @param payload the payload we need to do something with
* @param payload the payload we need to do something with
* @param sender the [[PeerMessageSender]] we can use to initialize an subsequent messages that need to be sent
* @return the requests with the request removed for which the @payload is responding too
*/
@ -148,6 +149,7 @@ class PeerMessageReceiver(
val newState = good.withVersionMsg(versionMsg)
sender.sendVerackMessage()
node.setPeerServices(peer, versionMsg.services)
val newRecv = toState(newState)

View file

@ -42,25 +42,25 @@
<logger name="org.bitcoins.wallet.config" level="WARN"/>
<!-- inspect table creation, etc -->
<logger name="org.bitcoins.chain.db" level="WARN" />
<logger name="org.bitcoins.node.db" level="WARN" />
<logger name="org.bitcoins.wallet.db" level="WARN" />
<logger name="org.bitcoins.chain.db" level="WARN"/>
<logger name="org.bitcoins.node.db" level="WARN"/>
<logger name="org.bitcoins.wallet.db" level="WARN"/>
<!-- ╔═════════════════╗ -->
<!-- ║ Node module ║ -->
<!-- ╚═════════════════╝ -->
<!-- See incoming message names and the peer it's sent from -->
<logger name="org.bitcoins.node.networking.peer.PeerMessageReceiver" level="WARN"/>
<logger name="org.bitcoins.node.networking.peer.PeerMessageReceiver" level="WARN "/>
<!-- See outgoing message names and the peer it's sent to -->
<logger name="org.bitcoins.node.networking.peer.PeerMessageSender" level="WARN"/>
<logger name="org.bitcoins.node.networking.peer.PeerMessageSender" level="WARN "/>
<!-- Inspect handling of headers and inventory messages -->
<logger name="org.bitcoins.node.networking.peer.DataMessageHandler" level="WARN"/>
<logger name="org.bitcoins.node.networking.peer.DataMessageHandler" level="WARN "/>
<!-- inspect TCP details -->
<logger name="org.bitcoins.node.networking.P2PClientActor" level="WARN"/>
<logger name="org.bitcoins.node.networking.P2PClientActor" level="WARN "/>
<!-- ╔════════════════════╗ -->
<!-- ║ Chain module ║ -->

View file

@ -8,6 +8,7 @@ import org.bitcoins.rpc.client.v21.BitcoindV21RpcClient
import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.node.NodeUnitTest.{createPeer, syncNeutrinoNode}
import org.bitcoins.testkit.node.fixture.{
NeutrinoNodeConnectedWithBitcoind,
NeutrinoNodeConnectedWithBitcoinds,
SpvNodeConnectedWithBitcoind,
SpvNodeConnectedWithBitcoindV21
@ -76,6 +77,27 @@ trait NodeTestWithCachedBitcoind extends BaseNodeTest { _: CachedBitcoind[_] =>
})(test)
}
def withNeutrinoNodeUnstarted(
test: OneArgAsyncTest,
bitcoind: BitcoindRpcClient)(implicit
system: ActorSystem,
appConfig: BitcoinSAppConfig): FutureOutcome = {
val nodeWithBitcoindBuilder: () => Future[
NeutrinoNodeConnectedWithBitcoind] = { () =>
require(appConfig.nodeType == NodeType.NeutrinoNode)
for {
node <- NodeUnitTest.createNeutrinoNode(bitcoind)(system,
appConfig.chainConf,
appConfig.nodeConf)
} yield NeutrinoNodeConnectedWithBitcoind(node, bitcoind)
}
makeDependentFixture[NeutrinoNodeConnectedWithBitcoind](
build = nodeWithBitcoindBuilder,
{ case x: NeutrinoNodeConnectedWithBitcoind =>
tearDownNode(x.node)
})(test)
}
def withNeutrinoNodeConnectedToBitcoinds(
test: OneArgAsyncTest,
bitcoinds: Vector[BitcoindRpcClient])(implicit