Implement PeerMessageReceiverState.InitializedDisconnect. This allows us to distinguish between disconnections we initalized and connections the peer intialized. This is needed for determining whether we should reconnect or not (#3583)

This commit is contained in:
Chris Stewart 2021-08-21 17:04:50 -05:00 committed by GitHub
parent 20575bcd68
commit ac8bdb120c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 108 additions and 32 deletions

View file

@ -48,16 +48,10 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
val initAssertion = isInitializedF.map(assert(_))
//checking all peers can be disconnected
def isAllDisconnectedF: Future[Boolean] = {
val disconnFs = node.peers.indices.map(node.isDisconnected)
val res = Future.sequence(disconnFs).map(_.forall(_ == true))
res
}
val disconnF = for {
_ <- initAssertion
_ <- node.stop()
f <- isAllDisconnectedF
f <- isAllDisconnectedF(node)
} yield f
disconnF.map(assert(_))
}
@ -133,4 +127,10 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
}
}
//checking all peers can be disconnected
private def isAllDisconnectedF(node: Node): Future[Boolean] = {
val disconnFs = node.peers.indices.map(node.isDisconnected)
val res = Future.sequence(disconnFs).map(_.forall(_ == true))
res
}
}

View file

@ -230,7 +230,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
} yield disconnect
def isAllDisconnectedF: Future[Boolean] = {
val connF = peerMsgSenders.indices.map(peerMsgSenders(_).isDisconnected())
val connF = peerMsgSenders.map(_.isDisconnected())
val res = Future.sequence(connF).map(_.forall(_ == true))
res
}

View file

@ -11,8 +11,17 @@ import org.bitcoins.core.util.FutureUtil
import org.bitcoins.node.P2PLogger
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.PeerMessageReceiver
import org.bitcoins.node.networking.P2PClient.NodeCommand
import org.bitcoins.node.networking.peer.{
PeerMessageReceiver,
PeerMessageReceiverState
}
import org.bitcoins.node.networking.peer.PeerMessageReceiver.NetworkMessageReceived
import org.bitcoins.node.networking.peer.PeerMessageReceiverState.{
Disconnected,
Initializing,
Normal
}
import org.bitcoins.node.util.BitcoinSNodeUtil
import org.bitcoins.tor.Socks5Connection.{Socks5Connect, Socks5Connected}
import org.bitcoins.tor.{Socks5Connection, Socks5ProxyParams}
@ -100,6 +109,8 @@ case class P2PClientActor(
val newUnalignedBytes =
handleTcpMessage(message, peerConnection, unalignedBytes)
context.become(awaitNetworkRequest(peerConnection, newUnalignedBytes))
case nodeCommand: NodeCommand =>
handleNodeCommand(nodeCommand, Some(peerConnection))
case metaMsg: P2PClient.MetaMsg =>
sender() ! handleMetaMsg(metaMsg)
case Terminated(actor) if actor == peerConnection =>
@ -108,7 +119,7 @@ case class P2PClientActor(
override def receive: Receive = LoggingReceive {
case P2PClient.ConnectCommand =>
connect()
handleNodeCommand(P2PClient.ConnectCommand, None)
case metaMsg: P2PClient.MetaMsg =>
sender() ! handleMetaMsgDisconnected(metaMsg)
}
@ -187,7 +198,7 @@ case class P2PClientActor(
logger.warn(s"unhandled message=$message")
}
private def connect() = {
private def connect(): Unit = {
val (peerOrProxyAddress, proxyParams) =
peer.socks5ProxyParams match {
case Some(proxyParams) =>
@ -213,21 +224,29 @@ case class P2PClientActor(
context become connecting(proxyParams)
}
private def reconnect() = {
currentPeerMsgHandlerRecv = initPeerMsgHandlerReceiver
private def reconnect(): Unit = {
currentPeerMsgHandlerRecv.state match {
case _: PeerMessageReceiverState.InitializedDisconnect =>
logger.warn(
s"Ignoring reconnection attempts as we initialized disconnect from peer=$peer")
()
case PeerMessageReceiverState.Preconnection | _: Initializing |
_: Normal | _: Disconnected =>
currentPeerMsgHandlerRecv = initPeerMsgHandlerReceiver
if (reconnectionTry >= maxReconnectionTries) {
logger.error("Exceeded maximum number of reconnection attempts")
context.stop(self)
} else {
val delay = reconnectionDelay * (1 << reconnectionTry)
reconnectionTry = reconnectionTry + 1
if (reconnectionTry >= maxReconnectionTries) {
logger.error("Exceeded maximum number of reconnection attempts")
context.stop(self)
} else {
val delay = reconnectionDelay * (1 << reconnectionTry)
reconnectionTry = reconnectionTry + 1
import context.dispatcher
context.system.scheduler.scheduleOnce(delay)(
self ! P2PClient.ReconnectCommand)
import context.dispatcher
context.system.scheduler.scheduleOnce(delay)(
self ! P2PClient.ReconnectCommand)
context.become(reconnecting)
context.become(reconnecting)
}
}
}
@ -393,6 +412,20 @@ case class P2PClientActor(
peerConnection ! Tcp.ResumeReading
}
private def handleNodeCommand(
command: NodeCommand,
peerConnectionOpt: Option[ActorRef]): Unit = command match {
case P2PClient.ConnectCommand =>
connect()
case P2PClient.ReconnectCommand =>
reconnect()
case P2PClient.CloseCommand =>
currentPeerMsgHandlerRecv =
currentPeerMsgHandlerRecv.initializeDisconnect()
peerConnectionOpt.map(actor => actor.tell(Tcp.Close, self))
()
}
}
case class P2PClient(actor: ActorRef, peer: Peer) extends P2PLogger {
@ -432,9 +465,12 @@ case class P2PClient(actor: ActorRef, peer: Peer) extends P2PLogger {
object P2PClient extends P2PLogger {
object ConnectCommand
sealed trait NodeCommand
case object ConnectCommand extends NodeCommand
object ReconnectCommand
case object ReconnectCommand extends NodeCommand
case object CloseCommand extends NodeCommand
/** A message hierarchy that canbe sent to [[P2PClientActor P2P Client Actor]]
* to query about meta information of a peer

View file

@ -36,7 +36,8 @@ class PeerMessageReceiver(
protected[networking] def connect(client: P2PClient): PeerMessageReceiver = {
state match {
case bad @ (_: Initializing | _: Normal | _: Disconnected) =>
case bad @ (_: Initializing | _: Normal | _: InitializedDisconnect |
_: Disconnected) =>
throw new RuntimeException(s"Cannot call connect when in state=${bad}")
case Preconnection =>
logger.info(s"Connection established with peer=${peer}")
@ -53,14 +54,35 @@ class PeerMessageReceiver(
}
}
/** Initializes the disconnection from our peer on the network.
* This is different than [[disconnect()]] as that indicates the
* peer initialized a disconnection from us
*/
private[networking] def initializeDisconnect(): PeerMessageReceiver = {
state match {
case bad @ (_: Disconnected | Preconnection) =>
throw new RuntimeException(
s"Cannot initialize disconnect from peer=$peer when in state=$bad")
case _: InitializedDisconnect =>
logger.warn(
s"Already initialized disconnected from peer=$peer, this is a noop")
this
case state @ (_: Initializing | _: Normal) =>
val newState = InitializedDisconnect(state.clientConnectP,
state.clientDisconnectP,
state.versionMsgP,
state.verackMsgP)
toState(newState)
}
}
protected[networking] def disconnect(): PeerMessageReceiver = {
logger.trace(s"Disconnecting with internalstate=${state}")
state match {
case bad @ (_: Disconnected | Preconnection) =>
throw new RuntimeException(
s"Cannot disconnect from peer=${peer} when in state=${bad}")
case good @ (_: Initializing | _: Normal) =>
case good @ (_: Initializing | _: Normal | _: InitializedDisconnect) =>
logger.debug(s"Disconnected bitcoin peer=${peer}")
val newState = Disconnected(
clientConnectP = good.clientConnectP,
@ -140,7 +162,8 @@ class PeerMessageReceiver(
logger.trace(s"Received versionMsg=$versionMsg from peer=$peer")
state match {
case bad @ (_: Disconnected | _: Normal | Preconnection) =>
case bad @ (_: Disconnected | _: Normal | Preconnection |
_: InitializedDisconnect) =>
Future.failed(
new RuntimeException(
s"Cannot handle version message while in state=${bad}"))
@ -158,7 +181,8 @@ class PeerMessageReceiver(
case VerAckMessage =>
state match {
case bad @ (_: Disconnected | _: Normal | Preconnection) =>
case bad @ (_: Disconnected | _: InitializedDisconnect | _: Normal |
Preconnection) =>
Future.failed(
new RuntimeException(
s"Cannot handle version message while in state=${bad}"))

View file

@ -163,6 +163,23 @@ object PeerMessageReceiverState {
override def toString: String = "Normal"
}
/** The state for when we initialized as disconnect from our peer */
case class InitializedDisconnect(
clientConnectP: Promise[P2PClient],
clientDisconnectP: Promise[Unit],
versionMsgP: Promise[VersionMessage],
verackMsgP: Promise[VerAckMessage.type])
extends PeerMessageReceiverState {
require(
isConnected,
s"We cannot have a PeerMessageReceiverState.Normal if the Peer is not connected")
require(
isInitialized,
s"We cannot have a PeerMessageReceiverState.Normal if the Peer is not initialized")
override def toString: String = "InitializedDisconnect"
}
case class Disconnected(
clientConnectP: Promise[P2PClient],
clientDisconnectP: Promise[Unit],

View file

@ -1,7 +1,6 @@
package org.bitcoins.node.networking.peer
import akka.actor.ActorRef
import akka.io.Tcp
import akka.util.Timeout
import org.bitcoins.core.api.chain.{ChainApi, FilterSyncMarker}
import org.bitcoins.core.bloom.BloomFilter
@ -48,7 +47,7 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
isConnected().flatMap {
case true =>
logger.info(s"Disconnecting peer at socket=${socket}")
(client.actor ! Tcp.Close)
(client.actor ! P2PClient.CloseCommand)
Future.unit
case false =>
val err =