From 1051e6365ab9729fb2d95b5226a7bff1e847bbc9 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Mon, 2 Aug 2021 18:15:56 -0500 Subject: [PATCH] DLC P2P Client (#3402) * DLC P2P WIP * P2PClient refactor (#10) * Add to CI * Remove unused * Attempt to create DLCNode and Tests * Full Tor support * Get DLCNegotiationTest passing * Config options, connect & send func * Test for DLCNode * Add createDLCNode to config * Fix formatting * Update DLC state after all other data is set * Remove unneeded line * Respond to some review * 2021 07 26 dlc node code review (#13) * WIP * WIP2 * Rewrite tests not use Await.result() * Skip Tor test on CI * Cleanup threadpool leaks in tests * Handle actor pattern matching better * Respond to review * Implement DLCNode.stop * sock5 -> socks5 * Use Tcp.Unbind * Respond to review * Implement postStop * Switch to unbind Co-authored-by: rorp Co-authored-by: Chris Stewart --- .../Linux_2.12_App_Chain_Node_Core_Tests.yml | 2 +- .../Linux_2.13_App_Chain_Node_Core_Tests.yml | 2 +- .../Mac_2.13_Wallet_Node_DLC_Tests.yml | 2 +- .../bitcoins/server/BitcoinSAppConfig.scala | 4 + build.sbt | 23 +++ db-commons/src/main/resources/reference.conf | 23 ++- .../dlc/node/DLCConnectionHandlerTest.scala | 52 ++++++ .../dlc/node/DLCNegotiationTest.scala | 78 +++++++++ .../org/bitcoins/dlc/node/DLCNodeTest.scala | 56 +++++++ .../org/bitcoins/dlc/node/DLCServerTest.scala | 97 +++++++++++ .../bitcoins/dlc/node/DLCServerTorTest.scala | 157 ++++++++++++++++++ dlc-node/dlc-node.sbt | 5 + .../org/bitcoins/dlc/node/DLCClient.scala | 133 +++++++++++++++ .../dlc/node/DLCConnectionHandler.scala | 154 +++++++++++++++++ .../bitcoins/dlc/node/DLCDataHandler.scala | 87 ++++++++++ .../scala/org/bitcoins/dlc/node/DLCNode.scala | 62 +++++++ .../org/bitcoins/dlc/node/DLCServer.scala | 110 ++++++++++++ .../dlc/node/config/DLCNodeAppConfig.scala | 94 +++++++++++ .../org/bitcoins/dlc/node/peer/Peer.scala | 30 ++++ .../org/bitcoins/dlc/wallet/DLCWallet.scala | 7 +- docs/config/configuration.md | 23 ++- .../bitcoins/node/config/NodeAppConfig.scala | 9 +- project/Deps.scala | 29 +++- .../testkitcore/gen/LnMessageGen.scala | 7 + .../testkit/dlc/BitcoinSDLCNodeTest.scala | 58 +++++++ .../testkit/util/BitcoinSActorTest.scala | 24 +++ .../bitcoins/testkit/util/NetworkUtil.scala | 15 ++ .../scala/org/bitcoins/tor/TorParams.scala | 12 ++ 28 files changed, 1332 insertions(+), 23 deletions(-) create mode 100644 dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCConnectionHandlerTest.scala create mode 100644 dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCNegotiationTest.scala create mode 100644 dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCNodeTest.scala create mode 100644 dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCServerTest.scala create mode 100644 dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCServerTorTest.scala create mode 100644 dlc-node/dlc-node.sbt create mode 100644 dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCClient.scala create mode 100644 dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCConnectionHandler.scala create mode 100644 dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCDataHandler.scala create mode 100644 dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCNode.scala create mode 100644 dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCServer.scala create mode 100644 dlc-node/src/main/scala/org/bitcoins/dlc/node/config/DLCNodeAppConfig.scala create mode 100644 dlc-node/src/main/scala/org/bitcoins/dlc/node/peer/Peer.scala create mode 100644 testkit/src/main/scala/org/bitcoins/testkit/dlc/BitcoinSDLCNodeTest.scala create mode 100644 testkit/src/main/scala/org/bitcoins/testkit/util/BitcoinSActorTest.scala create mode 100644 testkit/src/main/scala/org/bitcoins/testkit/util/NetworkUtil.scala create mode 100644 tor/src/main/scala/org/bitcoins/tor/TorParams.scala diff --git a/.github/workflows/Linux_2.12_App_Chain_Node_Core_Tests.yml b/.github/workflows/Linux_2.12_App_Chain_Node_Core_Tests.yml index 2265dc8f37..a31b427cd3 100644 --- a/.github/workflows/Linux_2.12_App_Chain_Node_Core_Tests.yml +++ b/.github/workflows/Linux_2.12_App_Chain_Node_Core_Tests.yml @@ -28,4 +28,4 @@ jobs: ~/.bitcoin-s/binaries key: ${{ runner.os }}-cache - name: run tests - run: sbt ++2.12.14 downloadBitcoind coverage chainTest/test chain/coverageReport chain/coverageAggregate chain/coveralls nodeTest/test node/coverageReport node/coverageAggregate node/coveralls cryptoJVM/test cryptoTestJVM/test cryptoJVM/coverageReport cryptoJVM/coverageAggregate cryptoJVM/coveralls coreTestJVM/test dlcTest/test coreJVM/coverageReport coreJVM/coverageAggregate coreJVM/coveralls secp256k1jni/test zmq/test zmq/coverageReport zmq/coverageAggregate zmq/coveralls appCommonsTest/test appServerTest/test oracleServerTest/test + run: sbt ++2.12.14 downloadBitcoind coverage chainTest/test chain/coverageReport chain/coverageAggregate chain/coveralls nodeTest/test node/coverageReport node/coverageAggregate node/coveralls cryptoJVM/test cryptoTestJVM/test cryptoJVM/coverageReport cryptoJVM/coverageAggregate cryptoJVM/coveralls coreTestJVM/test dlcTest/test coreJVM/coverageReport coreJVM/coverageAggregate coreJVM/coveralls secp256k1jni/test zmq/test zmq/coverageReport zmq/coverageAggregate zmq/coveralls appCommonsTest/test appServerTest/test oracleServerTest/test dlcNodeTest/test diff --git a/.github/workflows/Linux_2.13_App_Chain_Node_Core_Tests.yml b/.github/workflows/Linux_2.13_App_Chain_Node_Core_Tests.yml index 695f92b907..e1ae0310ed 100644 --- a/.github/workflows/Linux_2.13_App_Chain_Node_Core_Tests.yml +++ b/.github/workflows/Linux_2.13_App_Chain_Node_Core_Tests.yml @@ -28,4 +28,4 @@ jobs: ~/.bitcoin-s/binaries key: ${{ runner.os }}-cache - name: run tests - run: sbt ++2.13.6 downloadBitcoind coverage chainTest/test chain/coverageReport chain/coverageAggregate chain/coveralls nodeTest/test node/coverageReport node/coverageAggregate node/coveralls cryptoTestJVM/test cryptoJVM/test cryptoJVM/coverageReport cryptoJVM/coverageAggregate cryptoJVM/coveralls coreTestJVM/test dlcTest/test coreJVM/coverageReport coreJVM/coverageAggregate coreJVM/coveralls secp256k1jni/test zmq/test zmq/coverageReport zmq/coverageAggregate zmq/coveralls appCommonsTest/test appServerTest/test oracleServerTest/test + run: sbt ++2.13.6 downloadBitcoind coverage chainTest/test chain/coverageReport chain/coverageAggregate chain/coveralls nodeTest/test node/coverageReport node/coverageAggregate node/coveralls cryptoTestJVM/test cryptoJVM/test cryptoJVM/coverageReport cryptoJVM/coverageAggregate cryptoJVM/coveralls coreTestJVM/test dlcTest/test coreJVM/coverageReport coreJVM/coverageAggregate coreJVM/coveralls secp256k1jni/test zmq/test zmq/coverageReport zmq/coverageAggregate zmq/coveralls appCommonsTest/test appServerTest/test oracleServerTest/test dlcNodeTest/test diff --git a/.github/workflows/Mac_2.13_Wallet_Node_DLC_Tests.yml b/.github/workflows/Mac_2.13_Wallet_Node_DLC_Tests.yml index 0fea277676..14aada1da0 100644 --- a/.github/workflows/Mac_2.13_Wallet_Node_DLC_Tests.yml +++ b/.github/workflows/Mac_2.13_Wallet_Node_DLC_Tests.yml @@ -28,4 +28,4 @@ jobs: ~/.bitcoin-s/binaries key: ${{ runner.os }}-cache - name: run tests - run: sbt ++2.13.6 downloadBitcoind coverage cryptoTestJVM/test coreTestJVM/test secp256k1jni/test dlcTest/test appCommonsTest/test walletTest/test dlcWalletTest/test wallet/coverageReport wallet/coverageAggregate wallet/coveralls nodeTest/test node/coverageReport node/coverageAggregate node/coveralls dlcOracleTest/test asyncUtilsTestJVM/test dlcOracle/coverageReport dlcOracle/coveralls + run: sbt ++2.13.6 downloadBitcoind coverage cryptoTestJVM/test coreTestJVM/test secp256k1jni/test dlcTest/test appCommonsTest/test walletTest/test dlcWalletTest/test wallet/coverageReport wallet/coverageAggregate wallet/coveralls nodeTest/test node/coverageReport node/coverageAggregate node/coveralls dlcOracleTest/test asyncUtilsTestJVM/test dlcOracle/coverageReport dlcOracle/coveralls dlcNodeTest/test diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoinSAppConfig.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoinSAppConfig.scala index d9cfe422e2..9cc0f05784 100644 --- a/app/server/src/main/scala/org/bitcoins/server/BitcoinSAppConfig.scala +++ b/app/server/src/main/scala/org/bitcoins/server/BitcoinSAppConfig.scala @@ -7,6 +7,7 @@ import org.bitcoins.commons.file.FileUtil import org.bitcoins.core.util.StartStopAsync import org.bitcoins.db.AppConfig import org.bitcoins.db.util.ServerArgParser +import org.bitcoins.dlc.node.config.DLCNodeAppConfig import org.bitcoins.dlc.wallet.DLCAppConfig import org.bitcoins.keymanager.config.KeyManagerAppConfig import org.bitcoins.node.config.NodeAppConfig @@ -33,6 +34,9 @@ case class BitcoinSAppConfig( lazy val chainConf: ChainAppConfig = ChainAppConfig(directory, confs: _*) lazy val dlcConf: DLCAppConfig = DLCAppConfig(directory, confs: _*) + lazy val dlcNodeConf: DLCNodeAppConfig = + DLCNodeAppConfig(directory, confs: _*) + def copyWithConfig(newConfs: Vector[Config]): BitcoinSAppConfig = { val configs = newConfs ++ confs BitcoinSAppConfig(directory, configs: _*) diff --git a/build.sbt b/build.sbt index 13381d7d2d..6f18cf468f 100644 --- a/build.sbt +++ b/build.sbt @@ -195,6 +195,8 @@ lazy val `bitcoin-s` = project walletTest, dlcWallet, dlcWalletTest, + dlcNode, + dlcNodeTest, appServer, appServerTest, appCommons, @@ -249,6 +251,8 @@ lazy val `bitcoin-s` = project walletTest, dlcWallet, dlcWalletTest, + dlcNode, + dlcNodeTest, appServer, appServerTest, appCommons, @@ -382,6 +386,7 @@ lazy val appServer = project chain, wallet, dlcWallet, + dlcNode, bitcoindRpc, feeProvider, zmq @@ -698,6 +703,24 @@ lazy val dlcWalletTest = project ) .dependsOn(coreJVM % testAndCompile, dlcWallet, testkit, dlcTest) +lazy val dlcNode = project + .in(file("dlc-node")) + .settings(CommonSettings.prodSettings: _*) + .settings( + name := "bitcoin-s-dlc-node", + libraryDependencies ++= Deps.dlcNode + ) + .dependsOn(coreJVM, tor, dbCommons) + +lazy val dlcNodeTest = project + .in(file("dlc-node-test")) + .settings(CommonSettings.testSettings: _*) + .settings( + name := "bitcoin-s-dlc-node-test", + libraryDependencies ++= Deps.dlcNodeTest + ) + .dependsOn(coreJVM % testAndCompile, dlcNode, testkit) + lazy val dlcOracle = project .in(file("dlc-oracle")) .settings(CommonSettings.prodSettings: _*) diff --git a/db-commons/src/main/resources/reference.conf b/db-commons/src/main/resources/reference.conf index 404c5ca58a..9f850ce0e9 100644 --- a/db-commons/src/main/resources/reference.conf +++ b/db-commons/src/main/resources/reference.conf @@ -53,8 +53,21 @@ bitcoin-s { # You can configure SOCKS5 proxy to use Tor for outgoing connections proxy { enabled = false - host = "127.0.0.1" - port = 9050 + socks5 = "127.0.0.1:9050" + } + + tor { + # You can enable Tor for incoming connections + enabled = false + control = "127.0.0.1:9051" + + # The password used to arrive at the HashedControlPassword for the control port. + # If provided, the HASHEDPASSWORD authentication method will be used instead of + # the SAFECOOKIE one. + # password = securePassword + + # The path to the private key of the onion service being created + # privateKeyPath = /path/to/priv/key } chain = ${bitcoin-s.dbDefault} @@ -140,6 +153,12 @@ bitcoin-s { # } } + dlcnode { + # The address we are listening on for incoming connections for DLCs + # Binding to 0.0.0.0 makes us listen to all incoming connections + listen = "0.0.0.0:2862" + } + oracle = ${bitcoin-s.dbDefault} oracle { # this config key is read by Slick diff --git a/dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCConnectionHandlerTest.scala b/dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCConnectionHandlerTest.scala new file mode 100644 index 0000000000..c468f5ba61 --- /dev/null +++ b/dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCConnectionHandlerTest.scala @@ -0,0 +1,52 @@ +package org.bitcoins.dlc.node + +import org.bitcoins.core.protocol.tlv._ +import org.bitcoins.testkit.util.BitcoinSAsyncTest +import org.bitcoins.testkitcore.gen.LnMessageGen + +class DLCConnectionHandlerTest extends BitcoinSAsyncTest { + + behavior of "parseIndividualMessages" + + it must "DLC Accept message that is not aligned with a tcp frame" in { + forAllAsync(LnMessageGen.dlcAcceptMessage) { accept => + //split the msg at a random index to simulate a tcp frame not being aligned + val randomIndex = scala.util.Random.nextInt().abs % accept.bytes.size + val (firstHalf, secondHalf) = accept.bytes.splitAt(randomIndex) + val (firstHalfParseHeaders, remainingBytes) = + DLCConnectionHandler.parseIndividualMessages(firstHalf) + firstHalfParseHeaders must be(empty) + + val (secondHalfParsedHeaders, _) = + DLCConnectionHandler.parseIndividualMessages( + remainingBytes ++ secondHalf) + val parsedLnMessage = secondHalfParsedHeaders.head + val parsedLnAcceptMessage = + parsedLnMessage.asInstanceOf[LnMessage[DLCAcceptTLV]] + + parsedLnAcceptMessage.bytes must be(accept.bytes) + } + } + + it must "return the entire byte array if a message is not aligned to a byte frame" in { + forAllAsync(LnMessageGen.dlcAcceptMessage) { accept => + // remove last byte so the message is not aligned + val bytes = accept.bytes.dropRight(1) + val (parsedMessages, unAlignedBytes) = + DLCConnectionHandler.parseIndividualMessages(bytes) + + assert(parsedMessages.isEmpty) + assert(unAlignedBytes == bytes) + } + } + + // todo figure out how to properly handle unknown messages + it must "parse an unknown message" ignore { + forAllAsync(LnMessageGen.unknownMessage) { unknown => + val (messages, leftover) = + DLCConnectionHandler.parseIndividualMessages(unknown.bytes) + assert(messages == Vector(unknown)) + assert(leftover.isEmpty) + } + } +} diff --git a/dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCNegotiationTest.scala b/dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCNegotiationTest.scala new file mode 100644 index 0000000000..a766d4ecd4 --- /dev/null +++ b/dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCNegotiationTest.scala @@ -0,0 +1,78 @@ +package org.bitcoins.dlc.node + +import akka.actor.ActorRef +import org.bitcoins.core.number.UInt32 +import org.bitcoins.core.protocol.dlc.models.DLCState +import org.bitcoins.core.wallet.fee.SatoshisPerVirtualByte +import org.bitcoins.dlc.node.peer.Peer +import org.bitcoins.rpc.util.RpcUtil +import org.bitcoins.testkit.async.TestAsyncUtil +import org.bitcoins.testkit.wallet.BitcoinSDualWalletTest +import org.bitcoins.testkit.wallet.DLCWalletUtil._ +import org.bitcoins.testkit.wallet.FundWalletUtil.FundedDLCWallet +import org.scalatest.FutureOutcome + +import java.net.InetSocketAddress +import scala.concurrent.Promise +import scala.concurrent.duration.DurationInt + +class DLCNegotiationTest extends BitcoinSDualWalletTest { + type FixtureParam = (FundedDLCWallet, FundedDLCWallet) + + override def withFixture(test: OneArgAsyncTest): FutureOutcome = { + withDualFundedDLCWallets(test) + } + + it must "setup a DLC" in { + fundedDLCWallets: (FundedDLCWallet, FundedDLCWallet) => + val walletA = fundedDLCWallets._1.wallet + val walletB = fundedDLCWallets._2.wallet + val port = RpcUtil.randomPort + val bindAddress = + new InetSocketAddress("0.0.0.0", port) + val connectAddress = + InetSocketAddress.createUnresolved("127.0.0.1", port) + + val serverF = DLCServer.bind(walletA, bindAddress, None) + + val handlerP = Promise[ActorRef]() + val clientF = + DLCClient.connect(Peer(connectAddress), walletB, Some(handlerP)) + + for { + _ <- serverF + _ <- clientF + + handler <- handlerP.future + + // verify we have no DLCs + preDLCsA <- walletA.listDLCs() + preDLCsB <- walletB.listDLCs() + _ = assert(preDLCsA.isEmpty) + _ = assert(preDLCsB.isEmpty) + + offer <- walletB.createDLCOffer(sampleContractInfo, + half, + Some(SatoshisPerVirtualByte.one), + UInt32.zero, + UInt32.one) + accept <- walletA.acceptDLCOffer(offer) + + // Send accept message to begin p2p + _ = handler ! accept.toMessage + + _ <- TestAsyncUtil.awaitConditionF( + () => + for { + dlcsA <- walletA.listDLCs() + dlcsB <- walletB.listDLCs() + } yield { + dlcsA.head.state == DLCState.Broadcasted && + dlcsB.head.state == DLCState.Signed + }, + interval = 1.second, + maxTries = 15 + ) + } yield succeed + } +} diff --git a/dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCNodeTest.scala b/dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCNodeTest.scala new file mode 100644 index 0000000000..4516f0bbd3 --- /dev/null +++ b/dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCNodeTest.scala @@ -0,0 +1,56 @@ +package org.bitcoins.dlc.node + +import org.bitcoins.core.number.UInt32 +import org.bitcoins.core.protocol.dlc.models.DLCState +import org.bitcoins.core.wallet.fee.SatoshisPerVirtualByte +import org.bitcoins.testkit.async.TestAsyncUtil +import org.bitcoins.testkit.dlc.BitcoinSDLCNodeTest +import org.bitcoins.testkit.wallet.DLCWalletUtil._ +import org.scalatest.FutureOutcome + +import scala.concurrent.duration.DurationInt + +class DLCNodeTest extends BitcoinSDLCNodeTest { + type FixtureParam = (DLCNode, DLCNode) + + override def withFixture(test: OneArgAsyncTest): FutureOutcome = { + witTwoFundedDLCNodes(test) + } + + it must "setup a DLC" in { nodes: (DLCNode, DLCNode) => + val nodeA = nodes._1 + val nodeB = nodes._2 + val walletA = nodeA.wallet + val walletB = nodeB.wallet + + for { + (addrA, _) <- nodeA.serverBindF + // verify we have no DLCs + preDLCsA <- walletA.listDLCs() + preDLCsB <- walletB.listDLCs() + _ = assert(preDLCsA.isEmpty) + _ = assert(preDLCsB.isEmpty) + + offer <- walletA.createDLCOffer(sampleContractInfo, + half, + Some(SatoshisPerVirtualByte.one), + UInt32.zero, + UInt32.one) + + _ <- nodeB.acceptDLCOffer(addrA, offer.toMessage) + + _ <- TestAsyncUtil.awaitConditionF( + () => + for { + dlcsA <- walletA.listDLCs() + dlcsB <- walletB.listDLCs() + } yield { + dlcsA.head.state == DLCState.Signed && + dlcsB.head.state == DLCState.Broadcasted + }, + interval = 1.second, + maxTries = 15 + ) + } yield succeed + } +} diff --git a/dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCServerTest.scala b/dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCServerTest.scala new file mode 100644 index 0000000000..cb1e8d09a4 --- /dev/null +++ b/dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCServerTest.scala @@ -0,0 +1,97 @@ +package org.bitcoins.dlc.node + +import akka.actor.ActorRef +import akka.testkit.{TestActorRef, TestProbe} +import org.bitcoins.asyncutil.AsyncUtil +import org.bitcoins.core.number.UInt16 +import org.bitcoins.core.protocol.tlv.{LnMessage, PingTLV, PongTLV} +import org.bitcoins.dlc.node.peer.Peer +import org.bitcoins.rpc.util.RpcUtil +import org.bitcoins.testkit.util.BitcoinSActorFixtureWithDLCWallet +import org.bitcoins.testkit.wallet.FundWalletUtil.FundedDLCWallet +import org.scalatest.{Assertion, FutureOutcome} +import scodec.bits.ByteVector + +import java.net.InetSocketAddress +import scala.concurrent.{Future, Promise} + +class DLCServerTest extends BitcoinSActorFixtureWithDLCWallet { + + override type FixtureParam = FundedDLCWallet + + override def withFixture(test: OneArgAsyncTest): FutureOutcome = { + withFundedDLCWallet(test, getBIP39PasswordOpt())(getFreshConfig) + } + + it must "send/receive Ping and Pong TLVs over clearnet" in { dlcWalletApi => + val port = RpcUtil.randomPort + val bindAddress = + new InetSocketAddress("0.0.0.0", port) + val connectAddress = + InetSocketAddress.createUnresolved("localhost", port) + + var serverConnectionHandlerOpt = Option.empty[ActorRef] + val serverProbe = TestProbe() + + val boundAddressPromise = Promise[InetSocketAddress]() + + TestActorRef( + DLCServer.props( + dlcWalletApi.wallet, + bindAddress, + Some(boundAddressPromise), + { (_, _, connectionHandler) => + serverConnectionHandlerOpt = Some(connectionHandler) + serverProbe.ref + } + )) + + val resultF: Future[Future[Assertion]] = for { + _ <- boundAddressPromise.future + connectedAddressPromise = Promise[InetSocketAddress]() + } yield { + var clientConnectionHandlerOpt = Option.empty[ActorRef] + val clientProbe = TestProbe() + val client = TestActorRef( + DLCClient.props(dlcWalletApi.wallet, + Some(connectedAddressPromise), + None, + { (_, _, connectionHandler) => + clientConnectionHandlerOpt = Some(connectionHandler) + clientProbe.ref + })) + client ! DLCClient.Connect(Peer(connectAddress)) + + for { + _ <- connectedAddressPromise.future + _ <- AsyncUtil.retryUntilSatisfied(serverConnectionHandlerOpt.isDefined) + _ <- AsyncUtil.retryUntilSatisfied(clientConnectionHandlerOpt.isDefined) + pingTLV = + PingTLV(UInt16.one, + ByteVector.fromValidHex("00112233445566778899aabbccddeeff")) + clientConnectionHandler = clientConnectionHandlerOpt.get + _ = clientProbe.send(clientConnectionHandler, pingTLV) + _ = serverProbe.expectMsg(LnMessage(pingTLV)) + pongTLV = PongTLV.forIgnored( + ByteVector.fromValidHex("00112233445566778899aabbccddeeff")) + serverConnectionHandler = serverConnectionHandlerOpt.get + _ = serverProbe.send(serverConnectionHandler, pongTLV) + _ = clientProbe.expectMsg(LnMessage(pongTLV)) + // 131063 - is a magic size for OS X when this test case starts failing (131073 overall TLV size) + ignored = ByteVector.fill(65000)(0x55) + bigTLV = + PongTLV.forIgnored(ignored) + _ = clientProbe.send(clientConnectionHandler, bigTLV) + _ = serverProbe.expectMsg(LnMessage(bigTLV)) + _ = clientProbe.send(clientConnectionHandler, + DLCConnectionHandler.CloseConnection) + _ = clientProbe.send(clientConnectionHandler, pingTLV) + _ = serverProbe.expectNoMessage() + _ = serverProbe.send(serverConnectionHandler, pongTLV) + _ = clientProbe.expectNoMessage() + } yield succeed + } + + resultF.flatten + } +} diff --git a/dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCServerTorTest.scala b/dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCServerTorTest.scala new file mode 100644 index 0000000000..2e9fbc491b --- /dev/null +++ b/dlc-node-test/src/test/scala/org/bitcoins/dlc/node/DLCServerTorTest.scala @@ -0,0 +1,157 @@ +package org.bitcoins.dlc.node + +import akka.actor.ActorRef +import akka.testkit.{TestActorRef, TestProbe} +import org.bitcoins.asyncutil.AsyncUtil +import org.bitcoins.core.number.UInt16 +import org.bitcoins.core.protocol.tlv.{LnMessage, PingTLV, PongTLV} +import org.bitcoins.core.util.EnvUtil +import org.bitcoins.dlc.node.peer.Peer +import org.bitcoins.rpc.util.RpcUtil +import org.bitcoins.testkit.util.BitcoinSActorFixtureWithDLCWallet +import org.bitcoins.testkit.util.NetworkUtil._ +import org.bitcoins.testkit.wallet.FundWalletUtil.FundedDLCWallet +import org.bitcoins.tor.{Socks5ProxyParams, TorController, TorProtocolHandler} +import org.scalatest.{Assertion, FutureOutcome} +import scodec.bits.ByteVector + +import java.io.File +import java.net.InetSocketAddress +import scala.concurrent.duration.DurationInt +import scala.concurrent.{Future, Promise} + +class DLCServerTorTest extends BitcoinSActorFixtureWithDLCWallet { + + val torProxyAddress = new InetSocketAddress("localhost", 9050) + val torControlAddress = new InetSocketAddress("localhost", 9051) + val torProxyEnabled: Boolean = portIsBound(torProxyAddress) + val torControlEnabled: Boolean = portIsBound(torControlAddress) + + override type FixtureParam = FundedDLCWallet + + override def withFixture(test: OneArgAsyncTest): FutureOutcome = { + // Skip on CI as we don't have access to tor control port + if (EnvUtil.isCI) { + FutureOutcome.succeeded + } else withFundedDLCWallet(test, getBIP39PasswordOpt())(getFreshConfig) + } + + it must "send/receive Ping and Pong TLVs over Tor" in { fundedDLCWallet => + assume(torProxyEnabled, "Tor daemon is not running or listening port 9050") + assume(torControlEnabled, + "Tor daemon is not running or listening port 9051") + + val timeout = 30.seconds + + val resultF: Future[Assertion] = withTempFile("onion_", "_private_key") { + pkFile => + val port = RpcUtil.randomPort + val bindAddress = + new InetSocketAddress("0.0.0.0", port) + + val connectAddressF = TorController.setUpHiddenService( + torControlAddress, + TorProtocolHandler.SafeCookie(), + pkFile.toPath, + port + ) + + var serverConnectionHandlerOpt = Option.empty[ActorRef] + val serverProbe = TestProbe() + + val boundAddressPromise = Promise[InetSocketAddress]() + + TestActorRef( + DLCServer.props( + fundedDLCWallet.wallet, + bindAddress, + Some(boundAddressPromise), + { (_, _, connectionHandler) => + serverConnectionHandlerOpt = Some(connectionHandler) + serverProbe.ref + } + )) + + val resultF: Future[Future[Assertion]] = for { + _ <- boundAddressPromise.future + connectedAddressPromise = Promise[InetSocketAddress]() + connectAddress <- connectAddressF + } yield { + var clientConnectionHandlerOpt = Option.empty[ActorRef] + val clientProbe = TestProbe() + val client = TestActorRef( + DLCClient.props( + fundedDLCWallet.wallet, + Some(connectedAddressPromise), + None, + { (_, _, connectionHandler) => + clientConnectionHandlerOpt = Some(connectionHandler) + clientProbe.ref + } + )) + + client ! DLCClient.Connect( + Peer(connectAddress, + socks5ProxyParams = Some( + Socks5ProxyParams( + address = torProxyAddress, + credentialsOpt = None, + randomizeCredentials = true + )))) + + for { + _ <- connectedAddressPromise.future + _ <- AsyncUtil.retryUntilSatisfied( + serverConnectionHandlerOpt.isDefined) + _ <- AsyncUtil.retryUntilSatisfied( + clientConnectionHandlerOpt.isDefined) + pingTLV = + PingTLV( + UInt16.one, + ByteVector.fromValidHex("00112233445566778899aabbccddeeff")) + clientConnectionHandler = clientConnectionHandlerOpt.get + _ = clientProbe.send(clientConnectionHandler, pingTLV) + _ = serverProbe.expectMsg(timeout, LnMessage(pingTLV)) + pongTLV = PongTLV.forIgnored( + ByteVector.fromValidHex("00112233445566778899aabbccddeeff")) + serverConnectionHandler = serverConnectionHandlerOpt.get + _ = serverProbe.send(serverConnectionHandler, pongTLV) + _ = clientProbe.expectMsg(timeout, LnMessage(pongTLV)) + // 131063 - is a magic size for OS X when this test case starts failing (131073 overall TLV size) + ignored = ByteVector.fill(65000)(0x55) + bigTLV = PongTLV.forIgnored(ignored) + _ = clientProbe.send(clientConnectionHandler, bigTLV) + _ = serverProbe.expectMsg(timeout, LnMessage(bigTLV)) + _ = clientProbe.send(clientConnectionHandler, + DLCConnectionHandler.CloseConnection) + _ = clientProbe.send(clientConnectionHandler, pingTLV) + _ = serverProbe.expectNoMessage() + _ = serverProbe.send(serverConnectionHandler, pongTLV) + _ = clientProbe.expectNoMessage() + } yield { + succeed + } + } + resultF.flatten + } + + resultF + } + + private def withTempFile( + prefix: String, + suffix: String, + create: Boolean = false)( + f: File => Future[Assertion]): Future[Assertion] = { + val tempFile = File.createTempFile(prefix, suffix) + if (!create) { + tempFile.delete() + } + try { + f(tempFile) + } finally { + val _ = tempFile.delete() + } + } + +} diff --git a/dlc-node/dlc-node.sbt b/dlc-node/dlc-node.sbt new file mode 100644 index 0000000000..16122e25be --- /dev/null +++ b/dlc-node/dlc-node.sbt @@ -0,0 +1,5 @@ +name := "bitcoin-s-dlc-node" + +libraryDependencies ++= Deps.dlcNode + +CommonSettings.prodSettings diff --git a/dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCClient.scala b/dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCClient.scala new file mode 100644 index 0000000000..0cc362329d --- /dev/null +++ b/dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCClient.scala @@ -0,0 +1,133 @@ +package org.bitcoins.dlc.node + +import akka.actor._ +import akka.event.LoggingReceive +import akka.io.{IO, Tcp} +import org.bitcoins.core.api.dlc.wallet.DLCWalletApi +import org.bitcoins.dlc.node.peer.Peer +import org.bitcoins.tor.Socks5Connection.{Socks5Connect, Socks5Connected} +import org.bitcoins.tor.{Socks5Connection, Socks5ProxyParams} + +import java.io.IOException +import java.net.InetSocketAddress +import scala.concurrent.{Future, Promise} + +class DLCClient( + dlcWalletApi: DLCWalletApi, + connectedAddress: Option[Promise[InetSocketAddress]], + handlerP: Option[Promise[ActorRef]], + dataHandlerFactory: DLCDataHandler.Factory) + extends Actor + with ActorLogging { + + import context.system + + override def receive: Receive = LoggingReceive { + case DLCClient.Connect(peer) => + val peerOrProxyAddress = + peer.socks5ProxyParams match { + case Some(proxyParams) => + val proxyAddress = proxyParams.address + log.info(s"connecting to SOCKS5 proxy $proxyAddress") + proxyAddress + case None => + val remoteAddress = peer.socket + log.info(s"connecting to $remoteAddress") + remoteAddress + } + context.become(connecting(peer)) + IO(Tcp) ! Tcp.Connect(peerOrProxyAddress) + } + + def connecting(peer: Peer): Receive = LoggingReceive { + case c @ Tcp.CommandFailed(cmd: Tcp.Connect) => + val ex = c.cause.getOrElse(new IOException("Unknown Error")) + log.error(s"Cannot connect to ${cmd.remoteAddress} ", ex) + throw ex + + case Tcp.Connected(peerOrProxyAddress, _) => + val connection = sender() + peer.socks5ProxyParams match { + case Some(proxyParams) => + val proxyAddress = peerOrProxyAddress + val remoteAddress = peer.socket + log.info(s"connected to SOCKS5 proxy $proxyAddress") + log.info(s"connecting to $remoteAddress via SOCKS5 $proxyAddress") + val proxy = + context.actorOf(Socks5Connection.props( + sender(), + Socks5ProxyParams.proxyCredentials(proxyParams), + Socks5Connect(remoteAddress)), + "Socks5Connection") + context watch proxy + context become socks5Connecting(proxy, remoteAddress, proxyAddress) + case None => + val peerAddress = peerOrProxyAddress + log.info(s"connected to $peerAddress") + val _ = context.actorOf( + Props( + new DLCConnectionHandler(dlcWalletApi, + connection, + handlerP, + dataHandlerFactory))) + connectedAddress.foreach(_.success(peerAddress)) + } + } + + def socks5Connecting( + proxy: ActorRef, + remoteAddress: InetSocketAddress, + proxyAddress: InetSocketAddress): Receive = LoggingReceive { + case c @ Tcp.CommandFailed(_: Socks5Connect) => + val ex = c.cause.getOrElse(new IOException("UnknownError")) + log.error(s"connection failed to $remoteAddress via SOCKS5 $proxyAddress", + ex) + throw ex + case Socks5Connected(_) => + log.info(s"connected to $remoteAddress via SOCKS5 proxy $proxyAddress") + val _ = context.actorOf( + Props( + new DLCConnectionHandler(dlcWalletApi, + proxy, + handlerP, + dataHandlerFactory))) + connectedAddress.foreach(_.success(remoteAddress)) + case Terminated(actor) if actor == proxy => + context stop self + } + + override def aroundReceive(receive: Receive, msg: Any): Unit = try { + super.aroundReceive(receive, msg) + } catch { + case t: Throwable => + connectedAddress.foreach(_.tryFailure(t)) + } + +} + +object DLCClient { + + case class Connect(peer: Peer) + + def props( + dlcWalletApi: DLCWalletApi, + connectedAddress: Option[Promise[InetSocketAddress]], + handlerP: Option[Promise[ActorRef]], + dataHandlerFactory: DLCDataHandler.Factory): Props = Props( + new DLCClient(dlcWalletApi, connectedAddress, handlerP, dataHandlerFactory)) + + def connect( + peer: Peer, + dlcWalletApi: DLCWalletApi, + handlerP: Option[Promise[ActorRef]], + dataHandlerFactory: DLCDataHandler.Factory = + DLCDataHandler.defaultFactory)(implicit + system: ActorSystem): Future[InetSocketAddress] = { + val promise = Promise[InetSocketAddress]() + val actor = + system.actorOf( + props(dlcWalletApi, Some(promise), handlerP, dataHandlerFactory)) + actor ! Connect(peer) + promise.future + } +} diff --git a/dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCConnectionHandler.scala b/dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCConnectionHandler.scala new file mode 100644 index 0000000000..0651f7be6d --- /dev/null +++ b/dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCConnectionHandler.scala @@ -0,0 +1,154 @@ +package org.bitcoins.dlc.node + +import akka.actor._ +import akka.event.LoggingReceive +import akka.io.Tcp +import akka.util.ByteString +import grizzled.slf4j.Logging +import org.bitcoins.core.api.dlc.wallet.DLCWalletApi +import org.bitcoins.core.protocol.tlv._ +import org.bitcoins.dlc.node.DLCConnectionHandler.parseIndividualMessages +import scodec.bits.ByteVector + +import scala.annotation.tailrec +import scala.concurrent.Promise +import scala.util.{Failure, Success, Try} + +class DLCConnectionHandler( + dlcWalletApi: DLCWalletApi, + connection: ActorRef, + handlerP: Option[Promise[ActorRef]], + dataHandlerFactory: DLCDataHandler.Factory) + extends Actor + with ActorLogging { + + private val handler = { + val h = dataHandlerFactory(dlcWalletApi, context, self) + handlerP.foreach(_.success(h)) + h + } + + override def preStart(): Unit = { + context.watch(connection) + connection ! Tcp.Register(self) + connection ! Tcp.ResumeReading + } + + override def receive: Receive = connected(ByteVector.empty) + + def connected(unalignedBytes: ByteVector): Receive = LoggingReceive { + case lnMessage: LnMessage[TLV] => + val byteMessage = ByteString(lnMessage.bytes.toArray) + connection ! Tcp.Write(byteMessage) + connection ! Tcp.ResumeReading + + case tlv: TLV => + Try(LnMessage[TLV](tlv)) match { + case Success(message) => self.forward(message) + case Failure(ex) => + ex.printStackTrace() + log.error(s"Cannot send message", ex) + } + + case Tcp.Received(data) => + val byteVec = ByteVector(data.toArray) + log.debug(s"Received ${byteVec.length} TCP bytes") + log.debug(s"Received TCP bytes: ${byteVec.toHex}") + log.debug { + val post = + if (unalignedBytes.isEmpty) "None" + else unalignedBytes.toHex + s"Unaligned bytes: $post" + } + + if (unalignedBytes.isEmpty) { + connection ! Tcp.ResumeReading + } + + //we need to aggregate our previous 'unalignedBytes' with the new message + //we just received from our peer to hopefully be able to parse full messages + val bytes: ByteVector = unalignedBytes ++ byteVec + log.debug(s"Bytes for message parsing: ${bytes.toHex}") + val (messages, newUnalignedBytes) = parseIndividualMessages(bytes) + + log.debug { + val length = messages.length + val suffix = if (length == 0) "" else s": ${messages.mkString(", ")}" + + s"Parsed $length message(s) from bytes$suffix" + } + log.debug(s"Unaligned bytes after this: ${newUnalignedBytes.length}") + if (newUnalignedBytes.nonEmpty) { + log.debug(s"Unaligned bytes: ${newUnalignedBytes.toHex}") + } + + messages.foreach(m => handler ! m) + + connection ! Tcp.ResumeReading + context.become(connected(newUnalignedBytes)) + + case Tcp.PeerClosed => context.stop(self) + + case c @ Tcp.CommandFailed(_: Tcp.Write) => + // O/S buffer was full + val errorMessage = "Cannot write bytes " + c.cause match { + case Some(ex) => log.error(errorMessage, ex) + case None => log.error(errorMessage) + } + + handler ! DLCConnectionHandler.WriteFailed(c.cause) + case DLCConnectionHandler.CloseConnection => + connection ! Tcp.Close + case _: Tcp.ConnectionClosed => + context.stop(self) + case Terminated(actor) if actor == connection => + context.stop(self) + } +} + +object DLCConnectionHandler extends Logging { + + case object CloseConnection + case class WriteFailed(cause: Option[Throwable]) + case object Ack extends Tcp.Event + + def props( + dlcWalletApi: DLCWalletApi, + connection: ActorRef, + handlerP: Option[Promise[ActorRef]], + dataHandlerFactory: DLCDataHandler.Factory): Props = { + Props( + new DLCConnectionHandler(dlcWalletApi, + connection, + handlerP, + dataHandlerFactory)) + } + + private[bitcoins] def parseIndividualMessages( + bytes: ByteVector): (Vector[LnMessage[TLV]], ByteVector) = { + @tailrec + def loop( + remainingBytes: ByteVector, + accum: Vector[LnMessage[TLV]]): (Vector[LnMessage[TLV]], ByteVector) = { + if (remainingBytes.length <= 0) { + (accum, remainingBytes) + } else { + // todo figure out how to properly handle unknown messages + Try(LnMessage.parseKnownMessage(remainingBytes)) match { + case Failure(_) => + // If we can't parse the entire message, continue on until we can + // so we properly skip it + (accum, remainingBytes) + case Success(message) => + val newRemainingBytes = remainingBytes.drop(message.byteSize) + logger.debug( + s"Parsed a message=${message.typeName} from bytes, continuing with remainingBytes=${newRemainingBytes.length}") + loop(newRemainingBytes, accum :+ message) + } + } + } + + loop(bytes, Vector.empty) + } +} diff --git a/dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCDataHandler.scala b/dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCDataHandler.scala new file mode 100644 index 0000000000..24b961d1ae --- /dev/null +++ b/dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCDataHandler.scala @@ -0,0 +1,87 @@ +package org.bitcoins.dlc.node + +import akka.actor._ +import akka.event.LoggingReceive +import org.bitcoins.core.api.dlc.wallet.DLCWalletApi +import org.bitcoins.core.protocol.tlv._ + +import scala.concurrent._ + +class DLCDataHandler(dlcWalletApi: DLCWalletApi, connectionHandler: ActorRef) + extends Actor + with ActorLogging { + implicit val ec: ExecutionContextExecutor = context.system.dispatcher + + override def preStart(): Unit = { + val _ = context.watch(connectionHandler) + } + + override def receive: Receive = LoggingReceive { + case lnMessage: LnMessage[TLV] => + log.info(s"Received LnMessage ${lnMessage.typeName}") + val f: Future[Unit] = handleTLVMessage(lnMessage) + f.failed.foreach(err => + log.error(s"Failed to process lnMessage=${lnMessage}", err)) + case DLCConnectionHandler.WriteFailed(_) => + log.error("Write failed") + case Terminated(actor) if actor == connectionHandler => + context.stop(self) + } + + private def handleTLVMessage(lnMessage: LnMessage[TLV]): Future[Unit] = { + lnMessage.tlv match { + case msg @ (_: UnknownTLV | _: DLCOracleTLV | _: DLCSetupPieceTLV) => + log.error(s"Received unhandled message $msg") + Future.unit + case _: InitTLV => + Future.unit // todo init logic + case error: ErrorTLV => + log.error(error.toString) + Future.unit //is this right? + case ping: PingTLV => + val pong = PongTLV.forIgnored(ping.ignored) + connectionHandler ! LnMessage(pong) + Future.unit + case pong: PongTLV => + log.debug(s"Received pong message $pong") + Future.unit + case dlcOffer: DLCOfferTLV => + val f = for { + accept <- dlcWalletApi.acceptDLCOffer(dlcOffer) + _ = connectionHandler ! accept.toMessage + } yield () + f + case dlcAccept: DLCAcceptTLV => + val f = for { + sign <- dlcWalletApi.signDLC(dlcAccept) + _ = connectionHandler ! sign.toMessage + } yield () + f + case dlcSign: DLCSignTLV => + val f = for { + _ <- dlcWalletApi.addDLCSigs(dlcSign) + _ <- dlcWalletApi.broadcastDLCFundingTx(dlcSign.contractId) + } yield () + f + } + } +} + +object DLCDataHandler { + + type Factory = (DLCWalletApi, ActorContext, ActorRef) => ActorRef + + sealed trait Command + case class Received(tlv: TLV) extends Command + case class Send(tlv: TLV) extends Command + + def defaultFactory( + dlcWalletApi: DLCWalletApi, + context: ActorContext, + connectionHandler: ActorRef): ActorRef = { + context.actorOf(props(dlcWalletApi, connectionHandler)) + } + + def props(dlcWalletApi: DLCWalletApi, connectionHandler: ActorRef): Props = + Props(new DLCDataHandler(dlcWalletApi, connectionHandler)) +} diff --git a/dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCNode.scala b/dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCNode.scala new file mode 100644 index 0000000000..de5a6fe2f9 --- /dev/null +++ b/dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCNode.scala @@ -0,0 +1,62 @@ +package org.bitcoins.dlc.node + +import akka.actor.{ActorRef, ActorSystem} +import grizzled.slf4j.Logging +import org.bitcoins.core.api.dlc.wallet.DLCWalletApi +import org.bitcoins.core.protocol.tlv._ +import org.bitcoins.core.util.StartStopAsync +import org.bitcoins.dlc.node.config._ +import org.bitcoins.dlc.node.peer.Peer + +import java.net.InetSocketAddress +import scala.concurrent._ + +case class DLCNode(wallet: DLCWalletApi)(implicit + system: ActorSystem, + val config: DLCNodeAppConfig) + extends StartStopAsync[Unit] + with Logging { + + implicit val ec: ExecutionContextExecutor = system.dispatcher + + private[node] lazy val serverBindF: Future[(InetSocketAddress, ActorRef)] = { + logger.info( + s"Binding server to ${config.listenAddress}, with tor hidden service: ${config.torParams.isDefined}") + + DLCServer.bind( + wallet, + config.listenAddress, + config.torParams + ) + } + + override def start(): Future[Unit] = { + serverBindF.map(_ => ()) + } + + override def stop(): Future[Unit] = { + serverBindF.map { case (_, actorRef) => + system.stop(actorRef) + } + } + + private[node] def connectAndSendToPeer( + peerAddress: InetSocketAddress, + message: LnMessage[TLV]): Future[Unit] = { + val peer = + Peer(socket = peerAddress, socks5ProxyParams = config.socks5ProxyParams) + + val handlerP = Promise[ActorRef]() + + for { + _ <- DLCClient.connect(peer, wallet, Some(handlerP)) + handler <- handlerP.future + } yield handler ! message + } + + def acceptDLCOffer( + peerAddress: InetSocketAddress, + dlcOffer: LnMessage[DLCOfferTLV]): Future[Unit] = { + connectAndSendToPeer(peerAddress, dlcOffer) + } +} diff --git a/dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCServer.scala b/dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCServer.scala new file mode 100644 index 0000000000..35bb3a9975 --- /dev/null +++ b/dlc-node/src/main/scala/org/bitcoins/dlc/node/DLCServer.scala @@ -0,0 +1,110 @@ +package org.bitcoins.dlc.node + +import akka.actor._ +import akka.event.LoggingReceive +import akka.io.{IO, Tcp} +import org.bitcoins.core.api.dlc.wallet.DLCWalletApi +import org.bitcoins.tor._ + +import java.io.IOException +import java.net.InetSocketAddress +import scala.concurrent.{Future, Promise} + +class DLCServer( + dlcWalletApi: DLCWalletApi, + bindAddress: InetSocketAddress, + boundAddress: Option[Promise[InetSocketAddress]], + dataHandlerFactory: DLCDataHandler.Factory = DLCDataHandler.defaultFactory) + extends Actor + with ActorLogging { + + import context.system + + IO(Tcp) ! Tcp.Bind(self, bindAddress) + + var socket: ActorRef = _ + + override def receive: Receive = LoggingReceive { + case Tcp.Bound(localAddress) => + log.info(s"Bound at $localAddress") + boundAddress.foreach(_.success(localAddress)) + socket = sender() + + case DLCServer.Disconnect => + socket ! Tcp.Unbind + + case c @ Tcp.CommandFailed(_: Tcp.Bind) => + val ex = c.cause.getOrElse(new IOException("Unknown Error")) + log.error(s"Cannot bind $boundAddress", ex) + throw ex + + case Tcp.Connected(remoteAddress, _) => + val connection = sender() + log.info(s"Received a connection from $remoteAddress") + val _ = context.actorOf( + Props( + new DLCConnectionHandler(dlcWalletApi, + connection, + None, + dataHandlerFactory))) + } + + override def postStop(): Unit = { + super.postStop() + socket ! Tcp.Unbind + } + + override def aroundReceive(receive: Receive, msg: Any): Unit = try { + super.aroundReceive(receive, msg) + } catch { + case t: Throwable => + boundAddress.foreach(_.tryFailure(t)) + } + +} + +object DLCServer { + + case object Disconnect + + def props( + dlcWalletApi: DLCWalletApi, + bindAddress: InetSocketAddress, + boundAddress: Option[Promise[InetSocketAddress]] = None, + dataHandlerFactory: DLCDataHandler.Factory): Props = Props( + new DLCServer(dlcWalletApi, bindAddress, boundAddress, dataHandlerFactory)) + + def bind( + dlcWalletApi: DLCWalletApi, + bindAddress: InetSocketAddress, + torParams: Option[TorParams], + dataHandlerFactory: DLCDataHandler.Factory = + DLCDataHandler.defaultFactory)(implicit + system: ActorSystem): Future[(InetSocketAddress, ActorRef)] = { + import system.dispatcher + + val promise = Promise[InetSocketAddress]() + + for { + onionAddress <- torParams match { + case Some(params) => + TorController + .setUpHiddenService( + params.controlAddress, + params.authentication, + params.privateKeyPath, + bindAddress.getPort + ) + .map(Some(_)) + case None => Future.successful(None) + } + actorRef = system.actorOf( + props(dlcWalletApi, bindAddress, Some(promise), dataHandlerFactory)) + boundAddress <- promise.future + } yield { + val addr = onionAddress.getOrElse(boundAddress) + + (addr, actorRef) + } + } +} diff --git a/dlc-node/src/main/scala/org/bitcoins/dlc/node/config/DLCNodeAppConfig.scala b/dlc-node/src/main/scala/org/bitcoins/dlc/node/config/DLCNodeAppConfig.scala new file mode 100644 index 0000000000..ce8daeda7b --- /dev/null +++ b/dlc-node/src/main/scala/org/bitcoins/dlc/node/config/DLCNodeAppConfig.scala @@ -0,0 +1,94 @@ +package org.bitcoins.dlc.node.config + +import akka.actor.ActorSystem +import com.typesafe.config.Config +import org.bitcoins.core.api.dlc.wallet.DLCWalletApi +import org.bitcoins.core.util.FutureUtil +import org.bitcoins.db._ +import org.bitcoins.dlc.node.DLCNode +import org.bitcoins.tor +import org.bitcoins.tor.{Socks5ProxyParams, TorParams} +import org.bitcoins.tor.TorProtocolHandler.{Password, SafeCookie} + +import java.io.File +import java.net.{InetSocketAddress, URI} +import java.nio.file.Path +import scala.concurrent._ + +/** Configuration for the Bitcoin-S wallet + * + * @param directory The data directory of the wallet + * @param conf Optional sequence of configuration overrides + */ +case class DLCNodeAppConfig( + private val directory: Path, + private val conf: Config*) + extends AppConfig { + override protected[bitcoins] def configOverrides: List[Config] = conf.toList + + override protected[bitcoins] def moduleName: String = "dlcnode" + + override protected[bitcoins] type ConfigType = DLCNodeAppConfig + + override protected[bitcoins] def newConfigOfType( + configs: Seq[Config]): DLCNodeAppConfig = + DLCNodeAppConfig(directory, configs: _*) + + protected[bitcoins] def baseDatadir: Path = directory + + override def start(): Future[Unit] = { + FutureUtil.unit + } + + override def stop(): Future[Unit] = Future.unit + + lazy val socks5ProxyParams: Option[Socks5ProxyParams] = { + if (config.getBoolean("bitcoin-s.proxy.enabled")) { + val uri = new URI("tcp://" + config.getString("bitcoin-s.proxy.socks5")) + val sock5 = InetSocketAddress.createUnresolved(uri.getHost, uri.getPort) + Some( + Socks5ProxyParams( + address = sock5, + credentialsOpt = None, + randomizeCredentials = true + ) + ) + } else { + None + } + } + + lazy val torParams: Option[TorParams] = { + if (config.getBoolean("bitcoin-s.tor.enabled")) { + val controlURI = new URI(config.getString("bitcoin-s.tor.control")) + val control = InetSocketAddress.createUnresolved(controlURI.getHost, + controlURI.getPort) + + val auth = config.getStringOrNone("bitcoin-s.tor.password") match { + case Some(pass) => Password(pass) + case None => SafeCookie() // todo allow configuring the cookie? + } + + val privKeyPath = + config.getStringOrNone("bitcoin-s.tor.privateKeyPath") match { + case Some(path) => new File(path).toPath + case None => datadir.resolve("tor_priv_key") + } + + Some(tor.TorParams(control, auth, privKeyPath)) + } else { + None + } + } + + lazy val listenAddress: InetSocketAddress = { + val str = config.getString(s"bitcoin-s.$moduleName.listen") + val uri = new URI("tcp://" + str) + new InetSocketAddress(uri.getHost, uri.getPort) + } + + def createDLCNode(dlcWallet: DLCWalletApi)(implicit + system: ActorSystem): DLCNode = { + DLCNode(dlcWallet)(system, this) + } +} diff --git a/dlc-node/src/main/scala/org/bitcoins/dlc/node/peer/Peer.scala b/dlc-node/src/main/scala/org/bitcoins/dlc/node/peer/Peer.scala new file mode 100644 index 0000000000..4e448fda46 --- /dev/null +++ b/dlc-node/src/main/scala/org/bitcoins/dlc/node/peer/Peer.scala @@ -0,0 +1,30 @@ +package org.bitcoins.dlc.node.peer + +import org.bitcoins.core.api.db.DbRowAutoInc +import org.bitcoins.tor.Socks5ProxyParams + +import java.net.InetSocketAddress + +case class Peer( + socket: InetSocketAddress, + id: Option[Long] = None, + socks5ProxyParams: Option[Socks5ProxyParams] = None) + extends DbRowAutoInc[Peer] { + + override def copyWithId(id: Long): Peer = { + this.copy(id = Some(id)) + } + + override def toString: String = + s"Peer(${socket.getHostString}:${socket.getPort})" + +} + +object Peer { + + def fromSocket( + socket: InetSocketAddress, + socks5ProxyParams: Option[Socks5ProxyParams] = None): Peer = { + Peer(socket, socks5ProxyParams = socks5ProxyParams) + } +} diff --git a/dlc-wallet/src/main/scala/org/bitcoins/dlc/wallet/DLCWallet.scala b/dlc-wallet/src/main/scala/org/bitcoins/dlc/wallet/DLCWallet.scala index 08f26c82ed..a02e44b46b 100644 --- a/dlc-wallet/src/main/scala/org/bitcoins/dlc/wallet/DLCWallet.scala +++ b/dlc-wallet/src/main/scala/org/bitcoins/dlc/wallet/DLCWallet.scala @@ -816,7 +816,6 @@ abstract class DLCWallet _ <- dlcSigsDAO.createAll(sigsDbs) _ <- dlcRefundSigDAO.upsert(refundSigsDb) _ <- dlcAcceptDAO.upsert(dlcAcceptDb) - _ <- dlcDAO.update(dlc.updateState(DLCState.Accepted)) // .get is safe here because we must have an offer if we have a dlcDAO offerDb <- dlcOfferDAO.findByDLCId(dlc.dlcId).map(_.get) @@ -855,8 +854,10 @@ abstract class DLCWallet case None => scriptPubKeyDAO.create(spkDb) } - updatedDLCDb <- - updateFundingOutPoint(dlcDb.contractIdOpt.get, outPoint) + updatedDLCDb <- dlcDAO.update( + dlcDb + .updateState(DLCState.Accepted) + .updateFundingOutPoint(outPoint)) } yield (updatedDLCDb, sigsDbs) case (dlc, Some(_)) => logger.debug( diff --git a/docs/config/configuration.md b/docs/config/configuration.md index ac43ca3e39..ba7ec3ed95 100644 --- a/docs/config/configuration.md +++ b/docs/config/configuration.md @@ -171,8 +171,21 @@ bitcoin-s { proxy { # You can configure SOCKS5 proxy to use Tor for outgoing connections enabled = false - host = "127.0.0.1" - port = 9050 + socks5 = "127.0.0.1:9050" + } + + tor { + # You can enable Tor for incoming connections + enabled = false + control = 127.0.0.1:9051 + + # The password used to arrive at the HashedControlPassword for the control port. + # If provided, the HASHEDPASSWORD authentication method will be used instead of + # the SAFECOOKIE one. + # password = securePassword + + # The path to the private key of the onion service being created + # privateKeyPath = /path/to/priv/key } chain { @@ -258,6 +271,12 @@ bitcoin-s { # target = 1 # Will always use 1 sat/vbyte } + dlcnode { + # The address we are listening on for incoming connections for DLCs + # Binding to 0.0.0.0 makes us listen to all incoming connections + listen = "0.0.0.0:2862" + } + server { # The port we bind our rpc server on rpcport = 9999 diff --git a/node/src/main/scala/org/bitcoins/node/config/NodeAppConfig.scala b/node/src/main/scala/org/bitcoins/node/config/NodeAppConfig.scala index 79e5c3604b..ae683f163b 100644 --- a/node/src/main/scala/org/bitcoins/node/config/NodeAppConfig.scala +++ b/node/src/main/scala/org/bitcoins/node/config/NodeAppConfig.scala @@ -17,7 +17,7 @@ import org.bitcoins.node.models.Peer import org.bitcoins.node.networking.peer.DataMessageHandler import org.bitcoins.tor.Socks5ProxyParams -import java.net.InetSocketAddress +import java.net.{InetSocketAddress, URI} import java.nio.file.Path import scala.concurrent.{ExecutionContext, Future} @@ -91,12 +91,11 @@ case class NodeAppConfig( lazy val socks5ProxyParams: Option[Socks5ProxyParams] = { if (config.getBoolean("bitcoin-s.proxy.enabled")) { + val uri = new URI("tcp://" + config.getString("bitcoin-s.proxy.socks5")) + val sock5 = InetSocketAddress.createUnresolved(uri.getHost, uri.getPort) Some( Socks5ProxyParams( - address = InetSocketAddress.createUnresolved( - config.getString("bitcoin-s.proxy.host"), - config.getInt("bitcoin-s.proxy.port") - ), + address = sock5, credentialsOpt = None, randomizeCredentials = true ) diff --git a/project/Deps.scala b/project/Deps.scala index 742c2cca25..536509405c 100644 --- a/project/Deps.scala +++ b/project/Deps.scala @@ -100,6 +100,9 @@ object Deps { val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % V.akkaStreamv withSources () withJavadoc () + val akkaTestkit = + "com.typesafe.akka" %% "akka-testkit" % V.akkaActorV withSources () withJavadoc () + val scalaFx = "org.scalafx" %% "scalafx" % V.scalaFxV withSources () withJavadoc () @@ -258,9 +261,6 @@ object Deps { "com.typesafe.akka" %% "akka-stream-testkit" % V.akkaStreamv % "test" withSources () withJavadoc () val playJson = Compile.playJson % "test" - val akkaTestkit = - "com.typesafe.akka" %% "akka-testkit" % V.akkaActorV withSources () withJavadoc () - val scalameter = "com.storm-enroute" %% "scalameter" % V.scalameterV % "test" withSources () withJavadoc () @@ -269,6 +269,9 @@ object Deps { val pgEmbedded = "com.opentable.components" % "otj-pg-embedded" % V.pgEmbeddedV % "test" withSources () withJavadoc () + + val akkaTestkit = + "com.typesafe.akka" %% "akka-testkit" % V.akkaActorV withSources () withJavadoc () } def asyncUtils = Def.setting { @@ -315,7 +318,19 @@ object Deps { val dlcWallet = List( Compile.newMicroJson, - Compile.logback + Compile.grizzledSlf4j + ) + + val dlcNode = + List( + Compile.newMicroJson, + Compile.grizzledSlf4j, + Compile.akkaActor + ) + + val dlcNodeTest = + List( + Test.akkaTestkit ) val dlcWalletTest = @@ -540,7 +555,7 @@ object Deps { Compile.pgEmbedded, Compile.slf4j, Compile.grizzledSlf4j, - Test.akkaTestkit + Compile.akkaTestkit ) } @@ -563,7 +578,6 @@ object Deps { ) val walletTest = List( - Test.akkaTestkit, Test.pgEmbedded ) @@ -578,8 +592,7 @@ object Deps { val walletServerTest = List( Test.scalaMock, Test.akkaHttpTestkit, - Test.akkaStream, - Test.akkaTestkit + Test.akkaStream ) val dlcOracle = diff --git a/testkit-core/src/main/scala/org/bitcoins/testkitcore/gen/LnMessageGen.scala b/testkit-core/src/main/scala/org/bitcoins/testkitcore/gen/LnMessageGen.scala index 5b30729aa3..01b9c8d560 100644 --- a/testkit-core/src/main/scala/org/bitcoins/testkitcore/gen/LnMessageGen.scala +++ b/testkit-core/src/main/scala/org/bitcoins/testkitcore/gen/LnMessageGen.scala @@ -73,6 +73,13 @@ trait LnMessageGen extends TLVGen { def lnMessage: Gen[LnMessage[TLV]] = { Gen.oneOf( unknownMessage, + knownLnMessage + ) + } + + def knownLnMessage: Gen[LnMessage[TLV]] = { + Gen.oneOf( + initMessage, errorMessage, pingMessage, pongMessage, diff --git a/testkit/src/main/scala/org/bitcoins/testkit/dlc/BitcoinSDLCNodeTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/dlc/BitcoinSDLCNodeTest.scala new file mode 100644 index 0000000000..71a14247c8 --- /dev/null +++ b/testkit/src/main/scala/org/bitcoins/testkit/dlc/BitcoinSDLCNodeTest.scala @@ -0,0 +1,58 @@ +package org.bitcoins.testkit.dlc + +import com.typesafe.config.ConfigFactory +import org.bitcoins.dlc.node.DLCNode +import org.bitcoins.dlc.wallet.DLCWallet +import org.bitcoins.rpc.util.RpcUtil +import org.bitcoins.server.BitcoinSAppConfig +import org.bitcoins.testkit.wallet.BitcoinSWalletTest.destroyDLCWallet +import org.bitcoins.testkit.wallet._ +import org.scalatest.FutureOutcome + +trait BitcoinSDLCNodeTest extends BitcoinSWalletTest { + + /** Wallet config with data directory set to user temp directory */ + override protected def getFreshConfig: BitcoinSAppConfig = { + val dlcListen = ConfigFactory.parseString( + s"""bitcoin-s.dlcnode.listen = "0.0.0.0:${RpcUtil.randomPort}" """) + BaseWalletTest.getFreshConfig(pgUrl, Vector(dlcListen)) + } + + /** Creates two DLC nodes with wallets that are funded with some bitcoin, + * these wallets are NOT peered with a bitcoind so the funds in + * the wallets are not tied to an underlying blockchain. + */ + def witTwoFundedDLCNodes(test: OneArgAsyncTest): FutureOutcome = { + makeDependentFixture( + build = () => { + val configA = getFreshConfig + val configB = getFreshConfig + for { + walletA <- + FundWalletUtil.createFundedDLCWallet( + nodeApi, + chainQueryApi, + getBIP39PasswordOpt(), + Some(segwitWalletConf))(configA, system) + walletB <- FundWalletUtil.createFundedDLCWallet( + nodeApi, + chainQueryApi, + getBIP39PasswordOpt(), + Some(segwitWalletConf))(configB, system) + + nodeA = configA.dlcNodeConf.createDLCNode(walletA.wallet) + nodeB = configB.dlcNodeConf.createDLCNode(walletB.wallet) + + _ <- nodeA.start() + _ <- nodeB.start() + } yield (nodeA, nodeB) + }, + destroy = { nodes: (DLCNode, DLCNode) => + for { + _ <- destroyDLCWallet(nodes._1.wallet.asInstanceOf[DLCWallet]) + _ <- destroyDLCWallet(nodes._2.wallet.asInstanceOf[DLCWallet]) + } yield () + } + )(test) + } +} diff --git a/testkit/src/main/scala/org/bitcoins/testkit/util/BitcoinSActorTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/util/BitcoinSActorTest.scala new file mode 100644 index 0000000000..fc72888fa6 --- /dev/null +++ b/testkit/src/main/scala/org/bitcoins/testkit/util/BitcoinSActorTest.scala @@ -0,0 +1,24 @@ +package org.bitcoins.testkit.util + +import akka.testkit.{ImplicitSender, TestKitBase} +import org.bitcoins.testkit.wallet.BitcoinSWalletTest +import org.scalatest.flatspec.FixtureAsyncFlatSpec +import org.scalatest.matchers.must.Matchers +import org.scalatest.BeforeAndAfterAll + +trait BitcoinSActorTest + extends FixtureAsyncFlatSpec + with Matchers + with TestKitBase + with BeforeAndAfterAll + with ImplicitSender + +trait BitcoinSActorFixtureWithDLCWallet + extends BitcoinSActorTest + with BitcoinSWalletTest { + + override def afterAll(): Unit = { + super[BitcoinSWalletTest].afterAll() + super[BitcoinSActorTest].afterAll() + } +} diff --git a/testkit/src/main/scala/org/bitcoins/testkit/util/NetworkUtil.scala b/testkit/src/main/scala/org/bitcoins/testkit/util/NetworkUtil.scala new file mode 100644 index 0000000000..4d3d88850a --- /dev/null +++ b/testkit/src/main/scala/org/bitcoins/testkit/util/NetworkUtil.scala @@ -0,0 +1,15 @@ +package org.bitcoins.testkit.util + +import grizzled.slf4j.Logging + +import java.net.{InetSocketAddress, Socket} +import scala.util.Try + +object NetworkUtil extends Logging { + + def portIsBound(address: InetSocketAddress): Boolean = + Try { + val socket = new Socket(address.getHostString, address.getPort) + socket.close() + }.isSuccess +} diff --git a/tor/src/main/scala/org/bitcoins/tor/TorParams.scala b/tor/src/main/scala/org/bitcoins/tor/TorParams.scala new file mode 100644 index 0000000000..a9a9fa453e --- /dev/null +++ b/tor/src/main/scala/org/bitcoins/tor/TorParams.scala @@ -0,0 +1,12 @@ +package org.bitcoins.tor + +import org.bitcoins.tor.TorProtocolHandler.Authentication + +import java.net.InetSocketAddress +import java.nio.file.Path + +case class TorParams( + controlAddress: InetSocketAddress, + authentication: Authentication, + privateKeyPath: Path +)