Rework Socks5Connection.socks5Handler() to emit Socks5ConnectionState downstream (#5315)

This commit is contained in:
Chris Stewart 2023-12-02 13:02:48 -06:00 committed by GitHub
parent 39b127be8c
commit ef20d5ec83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 107 additions and 86 deletions

View File

@ -29,7 +29,7 @@ import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.constant.NodeConstants
import org.bitcoins.node.networking.peer.PeerConnection.ConnectionGraph
import org.bitcoins.node.{NodeStreamMessage, P2PLogger}
import org.bitcoins.tor.Socks5Connection
import org.bitcoins.tor.{Socks5Connection, Socks5ConnectionState}
import scodec.bits.ByteVector
import java.net.InetSocketAddress
@ -75,12 +75,26 @@ case class PeerConnection(peer: Peer, queue: SourceQueue[NodeStreamMessage])(
options = options)
nodeAppConfig.socks5ProxyParams match {
case Some(s) =>
base.viaMat(
Socks5Connection.socks5Handler(peer = peer,
sink = mergeHubSink,
onHandshakeComplete = sendVersionMsg,
credentialsOpt = s.credentialsOpt))(
Keep.left)
val socks5Flow = Socks5Connection.socks5Handler(peer = peer,
sink = mergeHubSink,
credentialsOpt =
s.credentialsOpt)
val handleConnectionFlow = socks5Flow.mapAsync(1) {
case Left(bytes) => Future.successful(bytes)
case Right(state) =>
state match {
case Socks5ConnectionState.Connected =>
//need to send version message when we are first
//connected to initiate bitcoin protocol handshake
sendVersionMsg().map(_ => ByteString.empty)
case Socks5ConnectionState.Disconnected |
Socks5ConnectionState.Authenticating |
Socks5ConnectionState.Greeted =>
Future.successful(ByteString.empty)
}
}
base.viaMat(handleConnectionFlow)(Keep.left)
case None =>
base
}

View File

@ -12,7 +12,6 @@ import org.bitcoins.core.api.tor.Credentials
import org.bitcoins.tor.Socks5Connection.Socks5Connect
import java.net.{Inet4Address, Inet6Address, InetAddress, InetSocketAddress}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
/** Simple socks 5 client. It should be given a new connection, and will
@ -244,88 +243,96 @@ object Socks5Connection extends Logging {
def tryParseAuth(data: ByteString): Try[Boolean] = Try(parseAuth(data))
/** A flow to handle socks5 connections
* We emit Left(bytes) downstream when we have finished the socks5 handshake
* We emit Right(Socks5ConnectionState) downstream when we are still doing the socks5 handshake
* Emitting the Socks5ConnectionState downstream allows you to build your stream logic based on
* the state of the socks5 connection
*/
def socks5Handler(
peer: Peer,
sink: Sink[ByteString, NotUsed],
onHandshakeComplete: () => Future[Unit],
credentialsOpt: Option[Credentials])(implicit
mat: Materializer,
ec: ExecutionContext): Flow[ByteString, ByteString, NotUsed] = {
Flow[ByteString]
.statefulMap[Socks5ConnectionState,
Either[ByteString, Socks5MessageResponse]](() =>
Socks5ConnectionState.Disconnected)(
{ case (state, bytes) =>
state match {
case Socks5ConnectionState.Disconnected =>
if (
parseGreetings(bytes, credentialsOpt.isDefined) == PasswordAuth
) {
(Socks5ConnectionState.Authenticating,
Right(Socks5MessageResponse.Socks5GreetingResponse(bytes)))
} else {
(Socks5ConnectionState.Greeted,
Right(Socks5MessageResponse.Socks5GreetingResponse(bytes)))
}
case Socks5ConnectionState.Authenticating =>
(Socks5ConnectionState.Greeted,
Right(Socks5MessageResponse.Socks5AuthResponse(bytes)))
case Socks5ConnectionState.Greeted =>
(Socks5ConnectionState.Connected,
Right(
Socks5MessageResponse.Socks5ConnectionRequestResponse(bytes)))
case Socks5ConnectionState.Connected =>
(Socks5ConnectionState.Connected, Left(bytes))
}
},
_ => None // don't care about the end state, we don't emit it downstream
)
.mapAsync(1) {
case Right(_: Socks5MessageResponse.Socks5GreetingResponse) =>
credentialsOpt match {
case Some(c) =>
logger.debug(s"Authenticating socks5 proxy...")
val authBytes =
socks5PasswordAuthenticationRequest(c.username, c.password)
Source.single(authBytes).runWith(sink)
Future.successful(ByteString.empty)
case None =>
val connRequestBytes =
Socks5Connection.socks5ConnectionRequest(peer.socket)
logger.debug(s"Writing socks5 connection request")
Source.single(connRequestBytes).runWith(sink)
Future.successful(ByteString.empty)
}
case Right(authResponse: Socks5MessageResponse.Socks5AuthResponse) =>
tryParseAuth(authResponse.byteString) match {
case Success(true) =>
val connRequestBytes =
Socks5Connection.socks5ConnectionRequest(peer.socket)
logger.debug(s"Writing socks5 connection request after auth")
Source.single(connRequestBytes).runWith(sink)
Future.successful(ByteString.empty)
case Success(false) =>
sys.error(s"Failed to authenticate with socks5 proxy")
case Failure(err) => throw err
}
credentialsOpt: Option[Credentials])(implicit mat: Materializer): Flow[
ByteString,
Either[ByteString, Socks5ConnectionState],
NotUsed] = {
case Right(
connReq: Socks5MessageResponse.Socks5ConnectionRequestResponse) =>
val connectedAddressT =
Socks5Connection.tryParseConnectedAddress(connReq.byteString)
connectedAddressT match {
case scala.util.Success(connectedAddress) =>
logger.info(
s"Tor connection request succeeded. target=${peer.socket} connectedAddress=$connectedAddress")
onHandshakeComplete().map(_ => ByteString.empty)
case scala.util.Failure(err) =>
sys.error(
s"Tor connection request failed to target=${peer.socket} errMsg=${err.toString}")
}
case Left(bytes) =>
//after socks5 handshake done, pass bytes downstream to be parsed
Future.successful(bytes)
}
val flowState: Flow[
ByteString,
Either[ByteString, Socks5ConnectionState],
NotUsed] = {
Flow[ByteString]
.statefulMap[Socks5ConnectionState,
Either[ByteString, Socks5ConnectionState]](() =>
Socks5ConnectionState.Disconnected)(
{ case (state, bytes) =>
state match {
case Socks5ConnectionState.Disconnected =>
if (
parseGreetings(bytes,
credentialsOpt.isDefined) == PasswordAuth
) {
logger.debug(s"Authenticating socks5 proxy...")
credentialsOpt match {
case Some(c) =>
val authBytes =
socks5PasswordAuthenticationRequest(c.username,
c.password)
Source.single(authBytes).runWith(sink)
val state = Socks5ConnectionState.Authenticating
(state, Right(state))
case None =>
sys.error(
s"Authentication required by socks5Proxy but we have no credentials")
}
} else {
val connRequestBytes =
Socks5Connection.socks5ConnectionRequest(peer.socket)
logger.debug(s"Writing socks5 connection request")
Source.single(connRequestBytes).runWith(sink)
val state = Socks5ConnectionState.Greeted
(state, Right(state))
}
case Socks5ConnectionState.Authenticating =>
tryParseAuth(bytes) match {
case Success(true) =>
val connRequestBytes =
Socks5Connection.socks5ConnectionRequest(peer.socket)
logger.debug(
s"Writing socks5 connection request after auth")
Source.single(connRequestBytes).runWith(sink)
val state = Socks5ConnectionState.Greeted
(state, Right(state))
case Success(false) =>
sys.error(s"Failed to authenticate with socks5 proxy")
case Failure(err) => throw err
}
case Socks5ConnectionState.Greeted =>
val connectedAddressT =
Socks5Connection.tryParseConnectedAddress(bytes)
connectedAddressT match {
case scala.util.Success(connectedAddress) =>
logger.info(
s"Tor connection request succeeded. target=${peer.socket} connectedAddress=$connectedAddress")
val state = Socks5ConnectionState.Connected
(state, Right(state))
case scala.util.Failure(err) =>
sys.error(
s"Tor connection request failed to target=${peer.socket} errMsg=${err.toString}")
}
case Socks5ConnectionState.Connected =>
(Socks5ConnectionState.Connected, Left(bytes))
}
},
_ =>
None // don't care about the end state, we don't emit it downstream
)
}
flowState
}
}