2023 12 07 mat socks5handler (#5322)

* Move socks5 greeting into Socks5Connection.socks5Handler() so each individual uage doesn't have to roll its own greeting

* Implement socks5Handler() to return a materialized stream

Get something working

Cleanup

* Remove connection parameter

* Add scaladoc
This commit is contained in:
Chris Stewart 2023-12-08 12:36:02 -06:00 committed by GitHub
parent 16c41c3b2e
commit bd3ad1df21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 128 additions and 98 deletions

View File

@ -59,45 +59,16 @@ case class PeerConnection(peer: Peer, queue: SourceQueue[NodeStreamMessage])(
private[this] val reconnectionDelay = 500.millis
private[this] var reconnectionCancellableOpt: Option[Cancellable] = None
private def sendVersionMsg(): Future[Unit] = {
for {
versionMsg <- versionMsgF
_ <- sendMsg(versionMsg)
} yield ()
}
private lazy val connection: Flow[
ByteString,
ByteString,
Future[Tcp.OutgoingConnection]] = {
val base = Tcp(system).outgoingConnection(remoteAddress = socket,
halfClose = false,
options = options)
nodeAppConfig.socks5ProxyParams match {
case Some(s) =>
val socks5Flow = Socks5Connection.socks5Handler(peer = peer,
sink = mergeHubSink,
credentialsOpt =
s.credentialsOpt)
val handleConnectionFlow = socks5Flow.mapAsync(1) {
case Left(bytes) => Future.successful(bytes)
case Right(state) =>
state match {
case Socks5ConnectionState.Connected =>
//need to send version message when we are first
//connected to initiate bitcoin protocol handshake
sendVersionMsg().map(_ => ByteString.empty)
case Socks5ConnectionState.Disconnected |
Socks5ConnectionState.Authenticating |
Socks5ConnectionState.Greeted =>
Future.successful(ByteString.empty)
}
}
base.viaMat(handleConnectionFlow)(Keep.left)
case None =>
base
}
(Future[Tcp.OutgoingConnection], UniqueKillSwitch)] = {
val base = Tcp(system)
.outgoingConnection(remoteAddress = socket,
halfClose = false,
options = options)
.viaMat(KillSwitches.single)(Keep.both)
base
}
private val chainApi = ChainHandler.fromDatabase()
@ -116,6 +87,10 @@ case class PeerConnection(peer: Peer, queue: SourceQueue[NodeStreamMessage])(
}
}
private def sendVersionMsg(): Future[Unit] = {
versionMsgF.flatMap(v => sendMsg(v.bytes, mergeHubSink))
}
private def parseHelper(
unalignedBytes: ByteString,
byteVec: ByteString): (ByteString, Vector[NetworkMessage]) = {
@ -168,7 +143,6 @@ case class PeerConnection(peer: Peer, queue: SourceQueue[NodeStreamMessage])(
Vector[NetworkMessage],
(Future[Tcp.OutgoingConnection], UniqueKillSwitch)] =
connection
.viaMat(KillSwitches.single)(Keep.both)
.joinMat(bidiFlow)(Keep.left)
private def connectionGraph(
@ -183,8 +157,8 @@ case class PeerConnection(peer: Peer, queue: SourceQueue[NodeStreamMessage])(
result
}
private def buildConnectionGraph(): RunnableGraph[
((Future[Tcp.OutgoingConnection], UniqueKillSwitch), Future[Done])] = {
private def buildConnectionGraph(): Future[
((Tcp.OutgoingConnection, UniqueKillSwitch), Future[Done])] = {
val handleNetworkMsgSink: Sink[Vector[NetworkMessage], Future[Done]] = {
Flow[Vector[NetworkMessage]]
@ -201,7 +175,49 @@ case class PeerConnection(peer: Peer, queue: SourceQueue[NodeStreamMessage])(
.toMat(Sink.ignore)(Keep.right)
}
connectionGraph(handleNetworkMsgSink)
val runningStream: Future[
((Tcp.OutgoingConnection, UniqueKillSwitch), Future[Done])] = {
nodeAppConfig.socks5ProxyParams match {
case Some(s) =>
val connectionSink =
Flow[Either[ByteString, Socks5ConnectionState]]
.mapAsync(1) {
case Left(bytes) => Future.successful(bytes)
case Right(state) =>
state match {
case Socks5ConnectionState.Connected =>
//need to send version message when we are first
//connected to initiate bitcoin protocol handshake
sendVersionMsg().map(_ => ByteString.empty)
case Socks5ConnectionState.Disconnected |
Socks5ConnectionState.Authenticating |
Socks5ConnectionState.Greeted =>
Future.successful(ByteString.empty)
}
}
.viaMat(parseToNetworkMsgFlow)(Keep.left)
.toMat(handleNetworkMsgSink)(Keep.right)
val source: Source[
ByteString,
(Future[Tcp.OutgoingConnection], UniqueKillSwitch)] =
mergeHubSource.viaMat(connection)(Keep.right)
Socks5Connection
.socks5Handler(
socket = peer.socket,
source = source,
sink = connectionSink,
mergeHubSink = mergeHubSink,
credentialsOpt = s.credentialsOpt
)
case None =>
val result = connectionGraph(handleNetworkMsgSink).run()
result._1._1.map(conn => ((conn, result._1._2), result._2))
}
}
runningStream
}
@volatile private[this] var connectionGraphOpt: Option[ConnectionGraph] = None
@ -215,10 +231,8 @@ case class PeerConnection(peer: Peer, queue: SourceQueue[NodeStreamMessage])(
case None =>
logger.info(s"Attempting to connect to peer=${peer}")
val ((outgoingConnectionF: Future[Tcp.OutgoingConnection],
killswitch: UniqueKillSwitch),
streamDoneF) = {
buildConnectionGraph().run()
val outgoingConnectionF = {
buildConnectionGraph()
}
val initializationCancellable =
@ -232,54 +246,49 @@ case class PeerConnection(peer: Peer, queue: SourceQueue[NodeStreamMessage])(
outgoingConnectionF.onComplete {
case scala.util.Success(o) =>
val tcp = o._1._1
logger.info(
s"Connected to remote=${o.remoteAddress} local=${o.localAddress}")
s"Connected to remote=${tcp.remoteAddress} local=${tcp.localAddress}")
case scala.util.Failure(err) =>
logger.info(
s"Failed to connect to peer=$peer with errMsg=${err.getMessage}")
}
val graph = ConnectionGraph(
mergeHubSink = mergeHubSink,
connectionF = outgoingConnectionF,
streamDoneF = streamDoneF,
killswitch = killswitch,
initializationCancellable = initializationCancellable
)
connectionGraphOpt = Some(graph)
val resultF: Future[Unit] = {
for {
_ <- outgoingConnectionF
outgoingConnection <- outgoingConnectionF
graph = ConnectionGraph(
mergeHubSink = mergeHubSink,
connectionF = outgoingConnectionF.map(_._1._1),
streamDoneF = outgoingConnection._2,
killswitch = outgoingConnection._1._2,
initializationCancellable = initializationCancellable
)
_ = {
connectionGraphOpt = Some(graph)
val _ = graph.streamDoneF
.onComplete {
case scala.util.Success(_) =>
val disconnectedPeer = DisconnectedPeer(peer, false)
queue.offer(disconnectedPeer)
case scala.util.Failure(err) =>
logger.info(
s"Connection with peer=$peer failed with err=${err.getMessage}")
val disconnectedPeer = DisconnectedPeer(peer, false)
queue.offer(disconnectedPeer)
}
}
_ = resetReconnect()
_ = initializationCancellable.cancel()
versionMsg <- versionMsgF
_ = {
_ <- {
nodeAppConfig.socks5ProxyParams match {
case Some(p) =>
val greetingBytes =
Socks5Connection.socks5Greeting(p.credentialsOpt.isDefined)
logger.debug(s"Writing socks5 greeting")
sendMsg(greetingBytes, graph.mergeHubSink)
case None => sendMsg(versionMsg)
case Some(_) => Future.unit
case None => sendVersionMsg()
}
}
} yield ()
}
val _ = graph.streamDoneF
.onComplete {
case scala.util.Success(_) =>
val disconnectedPeer = DisconnectedPeer(peer, false)
queue.offer(disconnectedPeer)
case scala.util.Failure(err) =>
logger.info(
s"Connection with peer=$peer failed with err=${err.getMessage}")
val disconnectedPeer = DisconnectedPeer(peer, false)
queue.offer(disconnectedPeer)
}
resultF.map(_ => ())
}
}

View File

@ -4,14 +4,14 @@ import akka.NotUsed
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated}
import akka.io.Tcp
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.util.ByteString
import grizzled.slf4j.Logging
import org.bitcoins.core.api.node.Peer
import org.bitcoins.core.api.tor.Credentials
import org.bitcoins.tor.Socks5Connection.Socks5Connect
import java.net.{Inet4Address, Inet6Address, InetAddress, InetSocketAddress}
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
/** Simple socks 5 client. It should be given a new connection, and will
@ -243,19 +243,25 @@ object Socks5Connection extends Logging {
def tryParseAuth(data: ByteString): Try[Boolean] = Try(parseAuth(data))
/** A flow to handle socks5 connections
* We emit Left(bytes) downstream when we have finished the socks5 handshake
* We emit Right(Socks5ConnectionState) downstream when we are still doing the socks5 handshake
* Emitting the Socks5ConnectionState downstream allows you to build your stream logic based on
* the state of the socks5 connection
/** @param socket the peer we are connecting to
* @param source the source that produces ByteStrings we need to send to our peer
* @param sink the sink that receives messages from our peer and performs application specific logic
* @param mergeHubSink a way for socks5Handler to send messages to the socks5 proxy to complete the handshake
* @param credentialsOpt the credentials to authenticate the socks5 proxy.
* @param mat
* @tparam MatSource the materialized value of the source given to us
* @tparam MatSink the materialized value of the sink given to us
* @return a running tcp connection along with the results of the materialize source and sink
*/
def socks5Handler(
peer: Peer,
sink: Sink[ByteString, NotUsed],
credentialsOpt: Option[Credentials])(implicit mat: Materializer): Flow[
ByteString,
Either[ByteString, Socks5ConnectionState],
NotUsed] = {
def socks5Handler[MatSource, MatSink](
socket: InetSocketAddress,
source: Source[
ByteString,
(Future[akka.stream.scaladsl.Tcp.OutgoingConnection], MatSource)],
sink: Sink[Either[ByteString, Socks5ConnectionState], MatSink],
mergeHubSink: Sink[ByteString, NotUsed],
credentialsOpt: Option[Credentials])(implicit mat: Materializer): Future[
((akka.stream.scaladsl.Tcp.OutgoingConnection, MatSource), MatSink)] = {
val flowState: Flow[
ByteString,
@ -279,7 +285,7 @@ object Socks5Connection extends Logging {
val authBytes =
socks5PasswordAuthenticationRequest(c.username,
c.password)
Source.single(authBytes).runWith(sink)
Source.single(authBytes).runWith(mergeHubSink)
val state = Socks5ConnectionState.Authenticating
(state, Right(state))
case None =>
@ -290,9 +296,9 @@ object Socks5Connection extends Logging {
} else {
val connRequestBytes =
Socks5Connection.socks5ConnectionRequest(peer.socket)
Socks5Connection.socks5ConnectionRequest(socket)
logger.debug(s"Writing socks5 connection request")
Source.single(connRequestBytes).runWith(sink)
Source.single(connRequestBytes).runWith(mergeHubSink)
val state = Socks5ConnectionState.Greeted
(state, Right(state))
}
@ -300,10 +306,10 @@ object Socks5Connection extends Logging {
tryParseAuth(bytes) match {
case Success(true) =>
val connRequestBytes =
Socks5Connection.socks5ConnectionRequest(peer.socket)
Socks5Connection.socks5ConnectionRequest(socket)
logger.debug(
s"Writing socks5 connection request after auth")
Source.single(connRequestBytes).runWith(sink)
Source.single(connRequestBytes).runWith(mergeHubSink)
val state = Socks5ConnectionState.Greeted
(state, Right(state))
case Success(false) =>
@ -316,12 +322,12 @@ object Socks5Connection extends Logging {
connectedAddressT match {
case scala.util.Success(connectedAddress) =>
logger.info(
s"Tor connection request succeeded. target=${peer.socket} connectedAddress=$connectedAddress")
s"Tor connection request succeeded. target=${socket} connectedAddress=$connectedAddress")
val state = Socks5ConnectionState.Connected
(state, Right(state))
case scala.util.Failure(err) =>
sys.error(
s"Tor connection request failed to target=${peer.socket} errMsg=${err.toString}")
s"Tor connection request failed to target=${socket} errMsg=${err.toString}")
}
case Socks5ConnectionState.Connected =>
(Socks5ConnectionState.Connected, Left(bytes))
@ -332,7 +338,22 @@ object Socks5Connection extends Logging {
)
}
flowState
val ((tcpConnectionF, matSource), matSink) = source
.viaMat(flowState)(Keep.left)
.toMat(sink)(Keep.both)
.run()
//send greeting to kick off stream
tcpConnectionF.map { conn =>
val passwordAuth = credentialsOpt.isDefined
val greetingSource: Source[ByteString, NotUsed] = {
Source.single(socks5Greeting(passwordAuth))
}
greetingSource.to(mergeHubSink).run()
((conn, matSource), matSink)
}(mat.executionContext)
}
}