P2P reconnect logic (#3572)

* P2P reconnect logic

* some fixes

* more changes

* fix DuplicateFilters exception in case of  disconnect in the middle of syncing filters

* more fixes

* Repurpose maxconnections test to be just a generic reconnection test

Co-authored-by: Chris Stewart <stewart.chris1234@gmail.com>
This commit is contained in:
rorp 2021-08-20 03:51:51 -07:00 committed by GitHub
parent 072c4164db
commit 54ce2ebeb2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 183 additions and 62 deletions

View file

@ -178,7 +178,8 @@ class P2PClientTest extends BitcoindRpcTest with CachedBitcoinSAppConfig {
val clientActorF: Future[TestActorRef[P2PClientActor]] =
peerMessageReceiverF.map { peerMsgRecv =>
TestActorRef(P2PClient.props(peer, peerMsgRecv), probe.ref)
TestActorRef(P2PClient.props(peer, peerMsgRecv, { () => Future.unit }),
probe.ref)
}
val p2pClientF: Future[P2PClient] = clientActorF.map {
client: TestActorRef[P2PClientActor] =>

View file

@ -0,0 +1,60 @@
package org.bitcoins.node.networking
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.asyncutil.AsyncUtil.RpcRetryException
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.PeerHandler
import org.bitcoins.testkit.node.{
CachedBitcoinSAppConfig,
NodeTestUtil,
NodeUnitTest
}
import org.bitcoins.testkit.rpc.BitcoindRpcTestUtil
import org.bitcoins.testkit.util.{AkkaUtil, BitcoindRpcTest}
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
class ReConnectionTest extends BitcoindRpcTest with CachedBitcoinSAppConfig {
lazy val bitcoindRpcF =
BitcoindRpcTestUtil.startedBitcoindRpcClient(clientAccum = clientAccum)
lazy val bitcoindPeerF: Future[Peer] =
bitcoindRpcF.flatMap(b => NodeTestUtil.getBitcoindPeer(b))
behavior of "ReConnectionTest"
it must "attempt to reconnect if max connections are full" in {
val peerHandlerF: Future[PeerHandler] = for {
_ <- cachedConfig.start()
peer <- bitcoindPeerF.flatMap(p => NodeUnitTest.buildPeerHandler(p))
} yield peer
val connectedF = for {
peerHandler <- peerHandlerF
bitcoindRpc <- bitcoindRpcF
_ = peerHandler.peerMsgSender.connect()
_ <- AsyncUtil
.retryUntilSatisfiedF(() => peerHandler.p2pClient.isConnected())
.recover { case _: RpcRetryException =>
//expect this to fail, we cannot connect
//because maxconnections=0
()
}
_ <- bitcoindRpc.stop()
//need to wait for mac to unlock the datadir
//before we can restart the bitcoind binary
_ <- AkkaUtil.nonBlockingSleep(3.seconds)
_ <- bitcoindRpc.start()
//now we should eventually automatically reconnect
_ <- AsyncUtil.retryUntilSatisfiedF(
conditionF = () => peerHandler.p2pClient.isConnected(),
interval = 500.millis,
maxTries = 60)
} yield succeed
connectedF
}
}

View file

@ -81,7 +81,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
/** The current data message handler.
* It should be noted that the dataMessageHandler contains
* chainstate. When we update with a new chainstate, we need to
* maek sure we update the [[DataMessageHandler]] via [[updateDataMessageHandler()]]
* make sure we update the [[DataMessageHandler]] via [[updateDataMessageHandler()]]
* to make sure we don't corrupt our chainstate cache
*/
def getDataMessageHandler: DataMessageHandler
@ -114,7 +114,8 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
val p2p = zipped.map { case (peer, peerMsgRecv) =>
P2PClient(context = system,
peer = peer,
peerMessageReceiver = peerMsgRecv)
peerMessageReceiver = peerMsgRecv,
onReconnect = sync)
}
p2p
}
@ -163,12 +164,14 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
peerMsgSenders(idx).connect()
val isInitializedF = for {
_ <- AsyncUtil.retryUntilSatisfiedF(() => isInitialized(idx),
maxTries = 200,
maxTries = 1024,
interval = 250.millis)
} yield ()
isInitializedF.failed.foreach(err =>
isInitializedF.failed.foreach { err =>
logger.error(
s"Failed to connect with peer=${peers(idx)} with err=$err"))
s"Failed to connect with peer=${peers(idx)} with err=$err")
sys.exit(-1)
}
isInitializedF.map { _ =>
nodeAppConfig.nodeType match {
case NodeType.NeutrinoNode => {

View file

@ -55,24 +55,33 @@ import scala.util._
*/
case class P2PClientActor(
peer: Peer,
initPeerMsgHandlerReceiver: PeerMessageReceiver
initPeerMsgHandlerReceiver: PeerMessageReceiver,
onReconnect: () => Future[Unit]
)(implicit config: NodeAppConfig)
extends Actor
with P2PLogger {
private var currentPeerMsgHandlerRecv = initPeerMsgHandlerReceiver
private var reconnectHandlerOpt: Option[() => Future[Unit]] = None
private val maxReconnectionTries = 16
private var reconnectionTry = 0
private val reconnectionDelay = 500.millis
/** The parameters for the network we are connected to
*/
private val network: NetworkParameters = config.network
private val timeout = 1000.seconds
/** The manager is an actor that handles the underlying low level I/O resources (selectors, channels)
* and instantiates workers for specific tasks, such as listening to incoming connections.
*/
def manager: ActorRef = IO(Tcp)(context.system)
/** The parameters for the network we are connected to
*/
val network: NetworkParameters = config.network
private val timeout = 1000.seconds
/** TODO: this comment seems wrong?
*
* This actor signifies the node we are connected to on the p2p network
@ -94,34 +103,21 @@ case class P2PClientActor(
case metaMsg: P2PClient.MetaMsg =>
sender() ! handleMetaMsg(metaMsg)
case Terminated(actor) if actor == peerConnection =>
context stop self
reconnect()
}
def receive: Receive = LoggingReceive {
override def receive: Receive = LoggingReceive {
case P2PClient.ConnectCommand =>
val (peerOrProxyAddress, proxyParams) =
peer.socks5ProxyParams match {
case Some(proxyParams) =>
val host = peer.socket.getHostName
if (!host.contains("localhost") && !host.contains("127.0.0.1")) {
val proxyAddress = proxyParams.address
logger.info(s"connecting to SOCKS5 proxy $proxyAddress")
(proxyAddress, Some(proxyParams))
} else {
val remoteAddress = peer.socket
logger.info(s"connecting to $remoteAddress")
(peer.socket, None)
}
case None =>
val remoteAddress = peer.socket
logger.info(s"connecting to $remoteAddress")
(peer.socket, None)
}
manager ! Tcp.Connect(peerOrProxyAddress,
timeout = Some(20.seconds),
options = KeepAlive(true) :: Nil,
pullMode = true)
context become connecting(proxyParams)
connect()
case metaMsg: P2PClient.MetaMsg =>
sender() ! handleMetaMsgDisconnected(metaMsg)
}
def reconnecting: Receive = LoggingReceive {
case P2PClient.ReconnectCommand =>
logger.info(s"reconnecting to ${peer.socket}")
reconnectHandlerOpt = Some(onReconnect)
connect()
case metaMsg: P2PClient.MetaMsg =>
sender() ! handleMetaMsgDisconnected(metaMsg)
}
@ -132,6 +128,7 @@ case class P2PClientActor(
val peerOrProxyAddress = c.remoteAddress
logger.error(
s"connection failed to ${peerOrProxyAddress} ${proxyParams}")
reconnect()
case event @ Tcp.Connected(peerOrProxyAddress, _) =>
val connection = sender()
@ -151,7 +148,6 @@ case class P2PClientActor(
context watch proxy
context become socks5Connecting(event,
proxy,
connection,
remoteAddress,
proxyAddress)
case None =>
@ -160,7 +156,6 @@ case class P2PClientActor(
context watch connection
val _ = handleEvent(event, connection, ByteVector.empty)
}
case metaMsg: P2PClient.MetaMsg =>
sender() ! handleMetaMsgDisconnected(metaMsg)
}
@ -168,21 +163,18 @@ case class P2PClientActor(
def socks5Connecting(
event: Tcp.Connected,
proxy: ActorRef,
connection: ActorRef,
remoteAddress: InetSocketAddress,
proxyAddress: InetSocketAddress): Receive = LoggingReceive {
case Tcp.CommandFailed(_: Socks5Connect) =>
logger.error(
s"connection failed to ${remoteAddress} via SOCKS5 ${proxyAddress}")
context stop self
reconnect()
case Socks5Connected(_) =>
logger.info(
s"connected to ${remoteAddress} via SOCKS5 proxy ${proxyAddress}")
context unwatch proxy
context watch connection
val _ = handleEvent(event, proxy, ByteVector.empty)
case Terminated(actor) if actor == proxy =>
context stop self
reconnect()
case metaMsg: P2PClient.MetaMsg =>
sender() ! handleMetaMsgDisconnected(metaMsg)
}
@ -195,6 +187,50 @@ case class P2PClientActor(
logger.warn(s"unhandled message=$message")
}
private def connect() = {
val (peerOrProxyAddress, proxyParams) =
peer.socks5ProxyParams match {
case Some(proxyParams) =>
val host = peer.socket.getHostName
if (!host.contains("localhost") && !host.contains("127.0.0.1")) {
val proxyAddress = proxyParams.address
logger.info(s"connecting to SOCKS5 proxy $proxyAddress")
(proxyAddress, Some(proxyParams))
} else {
val remoteAddress = peer.socket
logger.info(s"connecting to $remoteAddress")
(peer.socket, None)
}
case None =>
val remoteAddress = peer.socket
logger.info(s"connecting to $remoteAddress")
(peer.socket, None)
}
manager ! Tcp.Connect(peerOrProxyAddress,
timeout = Some(20.seconds),
options = KeepAlive(true) :: Nil,
pullMode = true)
context become connecting(proxyParams)
}
private def reconnect() = {
currentPeerMsgHandlerRecv = initPeerMsgHandlerReceiver
if (reconnectionTry >= maxReconnectionTries) {
logger.error("Exceeded maximum number of reconnection attempts")
context.stop(self)
} else {
val delay = reconnectionDelay * (1 << reconnectionTry)
reconnectionTry = reconnectionTry + 1
import context.dispatcher
context.system.scheduler.scheduleOnce(delay)(
self ! P2PClient.ReconnectCommand)
context.become(reconnecting)
}
}
/** Handles boiler plate [[Tcp.Message]] types.
*
* @return the unaligned bytes if we haven't received a full Bitcoin P2P message yet
@ -246,13 +282,9 @@ case class P2PClientActor(
unalignedBytes
case closeCmd @ (Tcp.ConfirmedClosed | Tcp.Closed | Tcp.Aborted |
Tcp.PeerClosed) =>
Tcp.PeerClosed | Tcp.ErrorClosed(_)) =>
logger.info(s"We've been disconnected by $peer command=${closeCmd}")
//tell our peer message handler we are disconnecting
val newPeerMsgRecv = currentPeerMsgHandlerRecv.disconnect()
currentPeerMsgHandlerRecv = newPeerMsgRecv
context.stop(self)
currentPeerMsgHandlerRecv.disconnect()
unalignedBytes
case Tcp.Received(byteString: ByteString) =>
@ -294,7 +326,11 @@ case class P2PClientActor(
case (peerMsgRecv: PeerMessageReceiver, m: NetworkMessage) =>
logger.trace(s"Processing message=${m}")
val msg = NetworkMessageReceived(m, P2PClient(self, peer))
peerMsgRecv.handleNetworkMessageReceived(msg)
if (peerMsgRecv.isConnected) {
peerMsgRecv.handleNetworkMessageReceived(msg)
} else {
Future.successful(peerMsgRecv)
}
}
logger.trace(s"About to process ${messages.length} messages")
@ -304,6 +340,11 @@ case class P2PClientActor(
val newMsgReceiver = Await.result(newMsgReceiverF, timeout)
currentPeerMsgHandlerRecv = newMsgReceiver
if (currentPeerMsgHandlerRecv.isInitialized) {
reconnectionTry = 0
reconnectHandlerOpt.foreach(_())
reconnectHandlerOpt = None
}
peerConnection ! Tcp.ResumeReading
newUnalignedBytes
}
@ -393,6 +434,8 @@ object P2PClient extends P2PLogger {
object ConnectCommand
object ReconnectCommand
/** A message hierarchy that canbe sent to [[P2PClientActor P2P Client Actor]]
* to query about meta information of a peer
*/
@ -413,18 +456,26 @@ object P2PClient extends P2PLogger {
*/
final case object IsDisconnected extends MetaMsg
def props(peer: Peer, peerMsgHandlerReceiver: PeerMessageReceiver)(implicit
def props(
peer: Peer,
peerMsgHandlerReceiver: PeerMessageReceiver,
onReconnect: () => Future[Unit])(implicit
config: NodeAppConfig
): Props =
Props(classOf[P2PClientActor], peer, peerMsgHandlerReceiver, config)
Props(classOf[P2PClientActor],
peer,
peerMsgHandlerReceiver,
onReconnect,
config)
def apply(
context: ActorRefFactory,
peer: Peer,
peerMessageReceiver: PeerMessageReceiver)(implicit
peerMessageReceiver: PeerMessageReceiver,
onReconnect: () => Future[Unit])(implicit
config: NodeAppConfig): P2PClient = {
val actorRef = context.actorOf(
props = props(peer = peer, peerMsgHandlerReceiver = peerMessageReceiver),
props = props(peer, peerMessageReceiver, onReconnect),
name = BitcoinSNodeUtil.createActorName(getClass))
P2PClient(actorRef, peer)

View file

@ -38,6 +38,12 @@ case class DataMessageHandler(
private val txDAO = BroadcastAbleTransactionDAO()
def reset: DataMessageHandler = copy(initialSyncDone = None,
currentFilterBatch = Vector.empty,
filterHeaderHeightOpt = None,
filterHeightOpt = None,
syncing = false)
def handleDataPayload(
payload: DataPayload,
peerMsgSender: PeerMessageSender,

View file

@ -56,11 +56,11 @@ class PeerMessageReceiver(
protected[networking] def disconnect(): PeerMessageReceiver = {
logger.trace(s"Disconnecting with internalstate=${state}")
state match {
case bad @ (_: Initializing | _: Disconnected | Preconnection) =>
case bad @ (_: Disconnected | Preconnection) =>
throw new RuntimeException(
s"Cannot disconnect from peer=${peer} when in state=${bad}")
case good: Normal =>
case good @ (_: Initializing | _: Normal) =>
logger.debug(s"Disconnected bitcoin peer=${peer}")
val newState = Disconnected(
clientConnectP = good.clientConnectP,
@ -69,9 +69,9 @@ class PeerMessageReceiver(
verackMsgP = good.verackMsgP
)
val newRecv = toState(newState)
newRecv
val newNode =
node.updateDataMessageHandler(node.getDataMessageHandler.reset)
new PeerMessageReceiver(newNode, newState, peer)
}
}

View file

@ -25,7 +25,7 @@ abstract class NodeTestUtil extends P2PLogger {
def client(peer: Peer, peerMsgReceiver: PeerMessageReceiver)(implicit
ref: ActorRefFactory,
conf: NodeAppConfig): P2PClient = {
P2PClient.apply(ref, peer, peerMsgReceiver)
P2PClient.apply(ref, peer, peerMsgReceiver, { () => Future.unit })
}
/** Helper method to get the [[java.net.InetSocketAddress]]