Implement socks5 proxy in PeerMessageSender (#5136)

* WIP: Implement socks5 proxy in PeerMessageSender

* Get something working

* Refactor to use Either when passing a socks5 message or non socks5 ByteString downstream

* Socks5Message -> Socks5MessageResponse

* Revert things

* More cleanup

* Fix rebase

* Move socks5Handler() Flow into Socks5Connection

* Revert NeutrinoNode

* Implement auth for socks5 proxy in stream

* Cleanups
This commit is contained in:
Chris Stewart 2023-07-10 06:56:15 -05:00 committed by GitHub
parent 2b117d349a
commit ebe79287af
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 238 additions and 101 deletions

View file

@ -3,14 +3,11 @@ package org.bitcoins.node.networking.peer
import akka.actor.ActorSystem
import akka.stream.QueueOfferResult
import akka.stream.scaladsl.SourceQueueWithComplete
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.node.{NodeType, Peer}
import org.bitcoins.core.p2p._
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.networking.peer.PeerMessageReceiverState._
import org.bitcoins.node.{NodeStreamMessage, P2PLogger}
import org.bitcoins.node.util.PeerMessageSenderApi
import scala.concurrent.Future
@ -194,12 +191,9 @@ case class PeerMessageReceiver(
* but have NOT started the handshake
* This method will initiate the handshake
*/
protected[networking] def connect(
peer: Peer,
peerMessageSenderApi: PeerMessageSenderApi)(implicit
protected[networking] def connect(peer: Peer)(implicit
system: ActorSystem,
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig): PeerMessageReceiver = {
nodeAppConfig: NodeAppConfig): PeerMessageReceiver = {
import system.dispatcher
state match {
case bad @ (_: Initializing | _: Normal | _: InitializedDisconnect |
@ -221,9 +215,6 @@ case class PeerMessageReceiver(
val newState =
Preconnection.toInitializing(initializationTimeoutCancellable)
val chainApi = ChainHandler.fromDatabase()
peerMessageSenderApi.sendVersionMessage(chainApi, peer)
copy(state = newState)
}
}

View file

@ -3,6 +3,7 @@ package org.bitcoins.node.networking.peer
import akka.NotUsed
import akka.actor.{ActorSystem, Cancellable}
import akka.event.Logging
import akka.io.Inet.SocketOption
import akka.io.Tcp.SO.KeepAlive
import akka.stream.scaladsl.{
BidiFlow,
@ -19,19 +20,19 @@ import akka.util.ByteString
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.node.Peer
import org.bitcoins.core.bloom.BloomFilter
import org.bitcoins.core.number.Int32
import org.bitcoins.core.p2p._
import org.bitcoins.core.util.{FutureUtil, NetworkUtil}
import org.bitcoins.crypto.{DoubleSha256Digest, HashDigest}
import org.bitcoins.node.P2PLogger
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.constant.NodeConstants
import org.bitcoins.node.networking.peer.PeerMessageReceiver.NetworkMessageReceived
import org.bitcoins.node.networking.peer.PeerMessageSender.ConnectionGraph
import org.bitcoins.node.util.PeerMessageSenderApi
import org.bitcoins.tor.Socks5Connection
import scodec.bits.ByteVector
import java.net.InetSocketAddress
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
@ -43,32 +44,48 @@ case class PeerMessageSender(
chainAppConfig: ChainAppConfig,
system: ActorSystem)
extends P2PLogger {
import system.dispatcher
private val socket = peer.socket
private val socket: InetSocketAddress = {
nodeAppConfig.socks5ProxyParams match {
case Some(proxy) => proxy.address
case None => peer.socket
}
}
private val options = Vector(KeepAlive(true))
private val options: Vector[SocketOption] = Vector(KeepAlive(true))
private[this] var reconnectionTry = 0
private[this] var curReconnectionTry = 0
private[this] val reconnectionDelay = 500.millis
private[this] var reconnectionCancellableOpt: Option[Cancellable] = None
private def sendVersionMsg(): Future[Unit] = {
for {
versionMsg <- versionMsgF
_ <- sendMsg(versionMsg)
} yield ()
}
private lazy val connection: Flow[
ByteString,
ByteString,
Future[Tcp.OutgoingConnection]] = {
Tcp(system).outgoingConnection(socket, halfClose = false, options = options)
}
private def parseHelper(
unalignedBytes: ByteString,
byteVec: ByteString): (ByteString, Vector[NetworkMessage]) = {
val bytes: ByteVector = ByteVector(unalignedBytes ++ byteVec)
logger.trace(s"Bytes for message parsing: ${bytes.toHex}")
val (messages, newUnalignedBytes) =
NetworkUtil.parseIndividualMessages(bytes)
(ByteString.fromArray(newUnalignedBytes.toArray), messages)
val base = Tcp(system).outgoingConnection(remoteAddress = socket,
halfClose = false,
options = options)
nodeAppConfig.socks5ProxyParams match {
case Some(s) =>
base.viaMat(
Socks5Connection.socks5Handler(peer = peer,
sink = mergeHubSink,
onHandshakeComplete = sendVersionMsg,
credentialsOpt = s.credentialsOpt))(
Keep.left)
case None =>
base
}
}
private val chainApi = ChainHandler.fromDatabase()
@ -87,6 +104,16 @@ case class PeerMessageSender(
}
}
private def parseHelper(
unalignedBytes: ByteString,
byteVec: ByteString): (ByteString, Vector[NetworkMessage]) = {
val bytes: ByteVector = ByteVector(unalignedBytes ++ byteVec)
logger.trace(s"Bytes for message parsing: ${bytes.toHex}")
val (messages, newUnalignedBytes) =
NetworkUtil.parseIndividualMessages(bytes)
(ByteString.fromArray(newUnalignedBytes.toArray), messages)
}
private val parseToNetworkMsgFlow: Flow[
ByteString,
Vector[NetworkMessage],
@ -94,38 +121,36 @@ case class PeerMessageSender(
Flow[ByteString]
.statefulMap(() => ByteString.empty)(parseHelper,
{ _: ByteString => None })
.log("parseToNetworkMsgFlow",
{ case msgs: Vector[NetworkMessage] =>
s"received msgs=${msgs.map(_.payload.commandName)} from peer=$peer"
})
.log(
"parseToNetworkMsgFlow",
{ case msgs: Vector[NetworkMessage] =>
s"received msgs=${msgs.map(_.payload.commandName)} from peer=$peer socket=$socket"
})
.withAttributes(Attributes.logLevels(onFailure = Logging.DebugLevel))
}
private val writeNetworkMsgFlow: Flow[NetworkMessage, ByteString, NotUsed] = {
Flow[NetworkMessage].map(msg => ByteString(msg.bytes.toArray))
private val writeNetworkMsgFlow: Flow[ByteString, ByteString, NotUsed] = {
Flow.apply
}
private val bidiFlow: BidiFlow[
ByteString,
Vector[NetworkMessage],
NetworkMessage,
ByteString,
ByteString,
NotUsed] = {
BidiFlow.fromFlows(parseToNetworkMsgFlow, writeNetworkMsgFlow)
}
private val mergeHubSource: Source[
NetworkMessage,
Sink[NetworkMessage, NotUsed]] =
private val (mergeHubSink: Sink[ByteString, NotUsed],
mergeHubSource: Source[ByteString, NotUsed]) = {
MergeHub
.source[NetworkMessage](16)
.log("mergehub",
{ case msg: NetworkMessage =>
s"sending msg=${msg.payload.commandName} to peer=$peer"
})
.source[ByteString](1024)
.preMaterialize()
}
private val connectionFlow: Flow[
NetworkMessage,
ByteString,
Vector[NetworkMessage],
(Future[Tcp.OutgoingConnection], UniqueKillSwitch)] =
connection
@ -135,38 +160,45 @@ case class PeerMessageSender(
private def connectionGraph(
handleNetworkMsgSink: Sink[
Vector[NetworkMessage],
Future[PeerMessageReceiver]]): RunnableGraph[
(
(
Sink[NetworkMessage, NotUsed],
(Future[Tcp.OutgoingConnection], UniqueKillSwitch)),
Future[PeerMessageReceiver])] = {
Future[PeerMessageReceiver]]): RunnableGraph[(
(Future[Tcp.OutgoingConnection], UniqueKillSwitch),
Future[PeerMessageReceiver])] = {
val result = mergeHubSource
.viaMat(connectionFlow)(Keep.both)
.viaMat(connectionFlow)(Keep.right)
.toMat(handleNetworkMsgSink)(Keep.both)
result
}
private def buildConnectionGraph(): RunnableGraph[
(
(
Sink[NetworkMessage, NotUsed],
(Future[Tcp.OutgoingConnection], UniqueKillSwitch)),
Future[PeerMessageReceiver])] = {
val initializing =
initPeerMessageRecv.connect(peer, peerMessageSenderApi)
private def buildConnectionGraph(): RunnableGraph[(
(Future[Tcp.OutgoingConnection], UniqueKillSwitch),
Future[PeerMessageReceiver])] = {
val handleNetworkMsgSink: Sink[
Vector[NetworkMessage],
Future[PeerMessageReceiver]] = {
Flow[Vector[NetworkMessage]]
.foldAsync(initializing) { case (peerMsgRecv, msgs) =>
FutureUtil.foldLeftAsync(peerMsgRecv, msgs) { case (p, msg) =>
p.handleNetworkMessageReceived(networkMsgRecv =
NetworkMessageReceived(msg, peer))
.foldAsync(initPeerMessageRecv) { case (peerMsgRecv, msgs) =>
peerMsgRecv.state match {
case PeerMessageReceiverState.Preconnection =>
val c = initPeerMessageRecv.connect(peer)
FutureUtil.foldLeftAsync(c, msgs) { case (p, msg) =>
p.handleNetworkMessageReceived(networkMsgRecv =
NetworkMessageReceived(msg, peer))
}
case _: PeerMessageReceiverState.Initializing |
_: PeerMessageReceiverState.Disconnected |
_: PeerMessageReceiverState.InitializedDisconnect |
_: PeerMessageReceiverState.InitializedDisconnectDone |
_: PeerMessageReceiverState.Normal |
_: PeerMessageReceiverState.StoppedReconnect |
_: PeerMessageReceiverState.Waiting =>
FutureUtil.foldLeftAsync(peerMsgRecv, msgs) { case (p, msg) =>
p.handleNetworkMessageReceived(networkMsgRecv =
NetworkMessageReceived(msg, peer))
}
}
}
.toMat(Sink.last)(Keep.right)
}
@ -185,9 +217,8 @@ case class PeerMessageSender(
case None =>
logger.info(s"Attempting to connect to peer=${peer}")
val ((mergeHubSink: Sink[NetworkMessage, NotUsed],
(outgoingConnectionF: Future[Tcp.OutgoingConnection],
killswitch: UniqueKillSwitch)),
val ((outgoingConnectionF: Future[Tcp.OutgoingConnection],
killswitch: UniqueKillSwitch),
streamDoneF) = {
buildConnectionGraph().run()
}
@ -208,11 +239,23 @@ case class PeerMessageSender(
connectionGraphOpt = Some(graph)
val resultF: Future[NotUsed] = for {
_ <- outgoingConnectionF
_ = resetReconnect()
versionMsg <- versionMsgF
} yield Source.single(versionMsg).runWith(mergeHubSink)
val resultF: Future[Unit] = {
for {
_ <- outgoingConnectionF
_ = resetReconnect()
versionMsg <- versionMsgF
_ = {
nodeAppConfig.socks5ProxyParams match {
case Some(p) =>
val greetingBytes =
Socks5Connection.socks5Greeting(p.credentialsOpt.isDefined)
logger.debug(s"Writing socks5 greeting")
sendMsg(greetingBytes, graph.mergeHubSink)
case None => sendMsg(versionMsg)
}
}
} yield ()
}
val _ = graph.streamDoneF
.flatMap { p =>
@ -278,29 +321,6 @@ case class PeerMessageSender(
}
}
def sendFilterClearMessage(): Future[Unit] = {
sendMsg(FilterClearMessage)
}
def sendFilterAddMessage(hash: HashDigest): Future[Unit] = {
val message = FilterAddMessage.fromHash(hash)
logger.trace(s"Sending filteradd=$message to peer=${peer}")
sendMsg(message)
}
def sendFilterLoadMessage(bloom: BloomFilter): Future[Unit] = {
val message = FilterLoadMessage(bloom)
logger.trace(s"Sending filterload=$message to peer=${peer}")
sendMsg(message)
}
def sendGetCompactFilterCheckPointMessage(
stopHash: DoubleSha256Digest): Future[Unit] = {
val message = GetCompactFilterCheckPointMessage(stopHash)
logger.debug(s"Sending getcfcheckpt=$message to peer ${peer}")
sendMsg(message)
}
private[node] def sendMsg(msg: NetworkPayload): Future[Unit] = {
//version or verack messages are the only messages that
//can be sent before we are fully initialized
@ -310,26 +330,38 @@ case class PeerMessageSender(
}
private[node] def sendMsg(msg: NetworkMessage): Future[Unit] = {
logger.debug(s"Sending msg=${msg.header.commandName} to peer=${socket}")
logger.debug(
s"Sending msg=${msg.header.commandName} to peer=${peer} socket=$socket")
connectionGraphOpt match {
case Some(g) =>
val sendMsgF = Future {
Source.single(msg).to(g.mergeHubSink).run()
}.map(_ => ())
sendMsgF
sendMsg(msg.bytes, g.mergeHubSink)
case None =>
val exn = new RuntimeException(
s"Could not send msg=${msg.payload.commandName} because we do not have an active connection to peer=${peer}")
s"Could not send msg=${msg.payload.commandName} because we do not have an active connection to peer=${peer} socket=$socket")
Future.failed(exn)
}
}
private def sendMsg(
bytes: ByteVector,
mergeHubSink: Sink[ByteString, NotUsed]): Future[Unit] = {
sendMsg(ByteString.fromArray(bytes.toArray), mergeHubSink)
}
private def sendMsg(
bytes: ByteString,
mergeHubSink: Sink[ByteString, NotUsed]): Future[Unit] = {
val sendMsgF = Future {
Source.single(bytes).to(mergeHubSink).run()
}.map(_ => ())
sendMsgF
}
}
object PeerMessageSender {
case class ConnectionGraph(
mergeHubSink: Sink[NetworkMessage, NotUsed],
mergeHubSink: Sink[ByteString, NotUsed],
connectionF: Future[Tcp.OutgoingConnection],
streamDoneF: Future[PeerMessageReceiver],
killswitch: UniqueKillSwitch)

View file

@ -1,13 +1,18 @@
package org.bitcoins.tor
import akka.NotUsed
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated}
import akka.io.Tcp
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.util.ByteString
import grizzled.slf4j.Logging
import org.bitcoins.core.api.node.Peer
import org.bitcoins.core.api.tor.Credentials
import org.bitcoins.tor.Socks5Connection.Socks5Connect
import java.net.{Inet4Address, Inet6Address, InetAddress, InetSocketAddress}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
/** Simple socks 5 client. It should be given a new connection, and will
@ -239,4 +244,113 @@ object Socks5Connection extends Logging {
def tryParseAuth(data: ByteString): Try[Boolean] = Try(parseAuth(data))
def socks5Handler(
peer: Peer,
sink: Sink[ByteString, NotUsed],
onHandshakeComplete: () => Future[Unit],
credentialsOpt: Option[Credentials])(implicit
mat: Materializer,
ec: ExecutionContext): Flow[ByteString, ByteString, NotUsed] = {
Flow[ByteString]
.statefulMap[Socks5ConnectionState,
Either[ByteString, Socks5MessageResponse]](() =>
Socks5ConnectionState.Disconnected)(
{ case (state, bytes) =>
state match {
case Socks5ConnectionState.Disconnected =>
if (
parseGreetings(bytes, credentialsOpt.isDefined) == PasswordAuth
) {
(Socks5ConnectionState.Authenticating,
Right(Socks5MessageResponse.Socks5GreetingResponse(bytes)))
} else {
(Socks5ConnectionState.Greeted,
Right(Socks5MessageResponse.Socks5GreetingResponse(bytes)))
}
case Socks5ConnectionState.Authenticating =>
(Socks5ConnectionState.Greeted,
Right(Socks5MessageResponse.Socks5AuthResponse(bytes)))
case Socks5ConnectionState.Greeted =>
(Socks5ConnectionState.Connected,
Right(
Socks5MessageResponse.Socks5ConnectionRequestResponse(bytes)))
case Socks5ConnectionState.Connected =>
(Socks5ConnectionState.Connected, Left(bytes))
}
},
_ => None // don't care about the end state, we don't emit it downstream
)
.mapAsync(1) {
case Right(_: Socks5MessageResponse.Socks5GreetingResponse) =>
credentialsOpt match {
case Some(c) =>
logger.debug(s"Authenticating socks5 proxy...")
val authBytes =
socks5PasswordAuthenticationRequest(c.username, c.password)
Source.single(authBytes).runWith(sink)
Future.successful(ByteString.empty)
case None =>
val connRequestBytes =
Socks5Connection.socks5ConnectionRequest(peer.socket)
logger.debug(s"Writing socks5 connection request")
Source.single(connRequestBytes).runWith(sink)
Future.successful(ByteString.empty)
}
case Right(authResponse: Socks5MessageResponse.Socks5AuthResponse) =>
tryParseAuth(authResponse.byteString) match {
case Success(true) =>
val connRequestBytes =
Socks5Connection.socks5ConnectionRequest(peer.socket)
logger.debug(s"Writing socks5 connection request after auth")
Source.single(connRequestBytes).runWith(sink)
Future.successful(ByteString.empty)
case Success(false) =>
sys.error(s"Failed to authenticate with socks5 proxy")
case Failure(err) => throw err
}
case Right(
connReq: Socks5MessageResponse.Socks5ConnectionRequestResponse) =>
val connectedAddressT =
Socks5Connection.tryParseConnectedAddress(connReq.byteString)
connectedAddressT match {
case scala.util.Success(connectedAddress) =>
logger.info(
s"Tor connection request succeeded. target=${peer.socket} connectedAddress=$connectedAddress")
onHandshakeComplete().map(_ => ByteString.empty)
case scala.util.Failure(err) =>
sys.error(
s"Tor connection request failed to target=${peer.socket} errMsg=${err.toString}")
}
case Left(bytes) =>
//after socks5 handshake done, pass bytes downstream to be parsed
Future.successful(bytes)
}
}
}
sealed abstract class Socks5ConnectionState
object Socks5ConnectionState {
case object Disconnected extends Socks5ConnectionState
case object Greeted extends Socks5ConnectionState
case object Authenticating extends Socks5ConnectionState
case object Connected extends Socks5ConnectionState
}
sealed abstract class Socks5MessageResponse {
def byteString: ByteString
}
object Socks5MessageResponse {
case class Socks5GreetingResponse(byteString: ByteString)
extends Socks5MessageResponse
case class Socks5AuthResponse(byteString: ByteString)
extends Socks5MessageResponse
case class Socks5ConnectionRequestResponse(byteString: ByteString)
extends Socks5MessageResponse
}