Fix infinite invalid header loop (#4667)

* fix infinite invalid header loop

* Adjust log levels to WARN

Co-authored-by: Chris Stewart <stewart.chris1234@gmail.com>
This commit is contained in:
Shreyansh 2022-08-26 20:13:07 +05:30 committed by GitHub
parent 969333c9e4
commit 2cae3f803d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 64 additions and 25 deletions

View file

@ -28,3 +28,5 @@ case class DuplicateFilters(message: String) extends ChainException(message)
case class InvalidBlockRange(message: String) extends ChainException(message)
case class InvalidBlockHeader(message: String) extends ChainException(message)
case class DuplicateHeaders(message: String) extends ChainException(message)

View file

@ -119,6 +119,10 @@ class ChainHandler(
val filteredHeaders = headers.filterNot(h =>
headersWeAlreadyHave.exists(_.hashBE == h.hashBE))
if (filteredHeaders.isEmpty) {
return Future.failed(DuplicateHeaders(s"Received duplicate headers."))
}
val blockchainUpdates: Vector[BlockchainUpdate] = {
Blockchain.connectHeadersToChains(headers = filteredHeaders,
blockchains = blockchains)

View file

@ -2,6 +2,7 @@ package org.bitcoins.node
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.core.p2p.{GetHeadersMessage, HeadersMessage}
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.P2PClient.ExpectResponseCommand
import org.bitcoins.server.BitcoinSAppConfig
@ -31,6 +32,9 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
}
}
lazy val invalidHeader = BlockHeader.fromHex(
s"0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4adae5494dffff7f2003000000")
override protected def getFreshConfig: BitcoinSAppConfig = {
BitcoinSTestAppConfig.getMultiPeerNeutrinoWithEmbeddedDbTestConfig(pgUrl)
}
@ -125,7 +129,6 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
node.nodeConfig,
node.chainConfig))
invalidHeader = node.chainAppConfig.chain.genesisBlock.blockHeader
invalidHeaderMessage = HeadersMessage(headers = Vector(invalidHeader))
sender <- node.peerManager.peerData(peer).peerMessageSender
_ <- node.getDataMessageHandler.addToStream(invalidHeaderMessage,
@ -144,7 +147,6 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
val peerManager = node.peerManager
def sendInvalidHeaders(peer: Peer): Future[Unit] = {
val invalidHeader = node.chainAppConfig.chain.genesisBlock.blockHeader
val invalidHeaderMessage =
HeadersMessage(headers = Vector(invalidHeader))
val senderF = node.peerManager.peerData(peer).peerMessageSender

View file

@ -368,6 +368,17 @@ case class PeerManager(
Future.unit
}
def sendResponseTimeout(peer: Peer, payload: NetworkPayload): Future[Unit] = {
logger.debug(
s"Sending response timeout for ${payload.commandName} to $peer")
if (peerData.contains(peer)) {
peerData(peer).client.map(_.actor ! ResponseTimeout(payload))
} else {
logger.debug(s"Requested to send response timeout for unknown $peer")
Future.unit
}
}
def syncFromNewPeer(): Future[DataMessageHandler] = {
logger.debug(s"Trying to sync from new peer")
val newNode =
@ -407,3 +418,5 @@ case class PeerManager(
val dataMessageStream: SourceQueueWithComplete[StreamDataMessageWrapper] =
dataMessageStreamSource.to(dataMessageStreamSink).run()
}
case class ResponseTimeout(payload: NetworkPayload)

View file

@ -14,7 +14,6 @@ import org.bitcoins.core.p2p.{
NetworkPayload
}
import org.bitcoins.core.util.{FutureUtil, NetworkUtil}
import org.bitcoins.node.P2PLogger
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.P2PClient.{
@ -27,6 +26,7 @@ import org.bitcoins.node.networking.peer.{
PeerMessageReceiver,
PeerMessageReceiverState
}
import org.bitcoins.node.{P2PLogger, ResponseTimeout}
import org.bitcoins.tor.Socks5Connection.{Socks5Connect, Socks5Connected}
import org.bitcoins.tor.{Socks5Connection, Socks5ProxyParams}
import scodec.bits.ByteVector
@ -119,6 +119,10 @@ case class P2PClientActor(
case _ =>
}
sendNetworkMessage(message, peerConnection)
case ResponseTimeout(msg) =>
currentPeerMsgHandlerRecv =
Await.result(currentPeerMsgHandlerRecv.onResponseTimeout(msg),
timeout)
case payload: NetworkPayload =>
val networkMsg = NetworkMessage(network, payload)
self.forward(networkMsg)
@ -319,8 +323,9 @@ case class P2PClientActor(
_: Normal | _: Disconnected | _: Waiting) =>
state match {
case wait: Waiting =>
currentPeerMsgHandlerRecv.onResponseTimeout(wait.responseFor)
wait.expectedResponseCancellable.cancel()
currentPeerMsgHandlerRecv = Await.result(
currentPeerMsgHandlerRecv.onResponseTimeout(wait.responseFor),
timeout)
case init: Initializing =>
init.initializationTimeoutCancellable.cancel()
case _ =>

View file

@ -1,7 +1,7 @@
package org.bitcoins.node.networking.peer
import akka.Done
import org.bitcoins.chain.blockchain.InvalidBlockHeader
import org.bitcoins.chain.blockchain.{DuplicateHeaders, InvalidBlockHeader}
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.core.api.chain.ChainApi
@ -235,17 +235,8 @@ case class DataMessageHandler(
copy(chainApi = processed)
}
val recoveredDMHF: Future[DataMessageHandler] =
chainApiHeaderProcessF.recoverWith {
case _: InvalidBlockHeader =>
logger.debug(
s"Invalid headers of count $count sent from ${syncPeer.get} in state $state")
recoverInvalidHeader(peer, peerMsgSender)
case throwable: Throwable => throw throwable
}
val getHeadersF: Future[DataMessageHandler] =
recoveredDMHF
chainApiHeaderProcessF
.flatMap { newDmh =>
val newApi = newDmh.chainApi
if (headers.nonEmpty) {
@ -391,6 +382,18 @@ case class DataMessageHandler(
}
}
getHeadersF.recoverWith {
case _: DuplicateHeaders =>
logger.warn(
s"Received duplicate headers from ${syncPeer.get} in state=$state")
Future.successful(this)
case _: InvalidBlockHeader =>
logger.warn(
s"Invalid headers of count $count sent from ${syncPeer.get} in state=$state")
recoverInvalidHeader(peer, peerMsgSender)
case e: Throwable => throw e
}
getHeadersF.failed.map { err =>
logger.error(s"Error when processing headers message", err)
}

View file

@ -140,10 +140,7 @@ class PeerMessageReceiver(
case good @ (_: Initializing | _: Normal | _: Waiting) =>
val handleF: Future[Unit] = good match {
case wait: Waiting =>
onResponseTimeout(wait.responseFor).map { _ =>
wait.expectedResponseCancellable.cancel()
()
}
onResponseTimeout(wait.responseFor).map(_ => ())
case wait: Initializing =>
wait.initializationTimeoutCancellable.cancel()
Future.unit
@ -262,7 +259,8 @@ class PeerMessageReceiver(
node.peerManager.onInitializationTimeout(peer)
}
def onResponseTimeout(networkPayload: NetworkPayload): Future[Unit] = {
def onResponseTimeout(
networkPayload: NetworkPayload): Future[PeerMessageReceiver] = {
assert(networkPayload.isInstanceOf[ExpectsResponse])
logger.debug(
s"Handling response timeout for ${networkPayload.commandName} from $peer")
@ -277,11 +275,22 @@ class PeerMessageReceiver(
case payload: ExpectsResponse =>
logger.debug(
s"Response for ${payload.commandName} from $peer timed out in state $state")
node.peerManager.onQueryTimeout(payload, peer)
node.peerManager.onQueryTimeout(payload, peer).map { _ =>
state match {
case _: Waiting if state.isConnected && state.isInitialized =>
val newState =
Normal(state.clientConnectP,
state.clientDisconnectP,
state.versionMsgP,
state.verackMsgP)
toState(newState)
case _: PeerMessageReceiverState => this
}
}
case _ =>
logger.error(
s"onResponseTimeout called for ${networkPayload.commandName} which does not expect response")
Future.unit
Future.successful(this)
}
}
@ -294,7 +303,8 @@ class PeerMessageReceiver(
logger.debug(s"Handling expected response for ${msg.commandName}")
val expectedResponseCancellable =
system.scheduler.scheduleOnce(nodeAppConfig.queryWaitTime)(
Await.result(onResponseTimeout(msg), 10.seconds))
Await.result(node.peerManager.sendResponseTimeout(peer, msg),
10.seconds))
val newState = Waiting(
clientConnectP = good.clientConnectP,
clientDisconnectP = good.clientDisconnectP,
@ -316,7 +326,7 @@ class PeerMessageReceiver(
case Preconnection | _: Initializing | _: Disconnected =>
//so we sent a message when things were good, but not we are back to connecting?
//can happen when can happen where once we initialize the remote peer immediately disconnects us
onResponseTimeout(msg).flatMap(_ => Future.successful(this))
onResponseTimeout(msg)
}
}