Network notifications (#4774)

* Network notifications

* add more tests

* don't drop futures on the floor

* Improve logging, don't drop futures on floor

* scalafmt

* add error messages to the failure notifications

* update unit tests

Co-authored-by: Chris Stewart <stewart.chris1234@gmail.com>
This commit is contained in:
rorp 2022-10-09 05:46:13 -07:00 committed by GitHub
parent 718053668d
commit de43dadf52
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 691 additions and 111 deletions

View file

@ -111,10 +111,24 @@ object DLCNodeWsType extends StringFactory[DLCNodeWsType] {
case object DLCConnectionInitiated extends DLCNodeWsType
case object DLCConnectionEstablished extends DLCNodeWsType
case object DLCConnectionFailed extends DLCNodeWsType
case object DLCOfferSendSucceed extends DLCNodeWsType
case object DLCOfferSendFailed extends DLCNodeWsType
case object DLCAcceptSucceed extends DLCNodeWsType
case object DLCAcceptFailed extends DLCNodeWsType
case object DLCSignSucceed extends DLCNodeWsType
case object DLCSignFailed extends DLCNodeWsType
private val all = Vector(DLCConnectionInitiated,
DLCConnectionEstablished,
DLCConnectionFailed)
private val all = Vector(
DLCConnectionInitiated,
DLCConnectionEstablished,
DLCConnectionFailed,
DLCOfferSendSucceed,
DLCOfferSendFailed,
DLCAcceptSucceed,
DLCAcceptFailed,
DLCSignSucceed,
DLCSignFailed
)
override def fromStringOpt(string: String): Option[DLCNodeWsType] = {
all.find(_.toString.toLowerCase() == string.toLowerCase)
@ -315,4 +329,52 @@ object DLCNodeNotification {
override def json: Value =
upickle.default.writeJs(this)(WsPicklers.dlcNodeConnectionFailedPickler)
}
case class DLCAcceptFailed(payload: (Sha256Digest, String))
extends DLCNodeNotification[(Sha256Digest, String)] {
override def `type`: DLCNodeWsType = DLCNodeWsType.DLCAcceptFailed
override def json: Value =
upickle.default.writeJs(this)(WsPicklers.dlcAcceptFailedPickler)
}
case class DLCAcceptSucceed(payload: Sha256Digest)
extends DLCNodeNotification[Sha256Digest] {
override def `type`: DLCNodeWsType = DLCNodeWsType.DLCAcceptSucceed
override def json: Value =
upickle.default.writeJs(this)(WsPicklers.dlcAcceptSucceedPickler)
}
case class DLCOfferSendFailed(payload: (Sha256Digest, String))
extends DLCNodeNotification[(Sha256Digest, String)] {
override def `type`: DLCNodeWsType = DLCNodeWsType.DLCOfferSendFailed
override def json: Value =
upickle.default.writeJs(this)(WsPicklers.dlcOfferSendFailedPickler)
}
case class DLCOfferSendSucceed(payload: Sha256Digest)
extends DLCNodeNotification[Sha256Digest] {
override def `type`: DLCNodeWsType = DLCNodeWsType.DLCOfferSendSucceed
override def json: Value =
upickle.default.writeJs(this)(WsPicklers.dlcOfferSendSucceedPickler)
}
case class DLCSignFailed(payload: (Sha256Digest, String))
extends DLCNodeNotification[(Sha256Digest, String)] {
override def `type`: DLCNodeWsType = DLCNodeWsType.DLCSignFailed
override def json: Value =
upickle.default.writeJs(this)(WsPicklers.dlcSignFailedPickler)
}
case class DLCSignSucceed(payload: Sha256Digest)
extends DLCNodeNotification[Sha256Digest] {
override def `type`: DLCNodeWsType = DLCNodeWsType.DLCSignSucceed
override def json: Value =
upickle.default.writeJs(this)(WsPicklers.dlcSignSucceedPickler)
}
}

View file

@ -7,9 +7,15 @@ import org.bitcoins.commons.jsonmodels.ws.ChainNotification.{
SyncFlagChangedNotification
}
import org.bitcoins.commons.jsonmodels.ws.DLCNodeNotification.{
DLCAcceptFailed,
DLCAcceptSucceed,
DLCNodeConnectionEstablished,
DLCNodeConnectionFailed,
DLCNodeConnectionInitiated
DLCNodeConnectionInitiated,
DLCOfferSendFailed,
DLCOfferSendSucceed,
DLCSignFailed,
DLCSignSucceed
}
import org.bitcoins.commons.jsonmodels.ws.WalletNotification.{
DLCOfferAddNotification,
@ -35,6 +41,7 @@ import org.bitcoins.commons.jsonmodels.ws.{
import org.bitcoins.core.config.DLC
import org.bitcoins.core.serializers.PicklerKeys
import org.bitcoins.core.util.NetworkUtil
import org.bitcoins.crypto.Sha256Digest
import upickle.default._
import java.net.InetSocketAddress
@ -202,6 +209,10 @@ object WsPicklers {
notification: DLCNodeNotification[_]): ujson.Obj = {
def addr2str(address: InetSocketAddress) =
address.getHostName + ":" + address.getPort
def failure2obj(payload: (Sha256Digest, String)): ujson.Obj = {
ujson.Obj(PicklerKeys.idKey -> writeJs(payload._1.hex),
PicklerKeys.errorKey -> writeJs(payload._2))
}
val payloadJson: ujson.Value = notification match {
case DLCNodeConnectionInitiated(address) =>
upickle.default.writeJs(addr2str(address))
@ -209,6 +220,15 @@ object WsPicklers {
upickle.default.writeJs(addr2str(address))
case DLCNodeConnectionFailed(address) =>
upickle.default.writeJs(addr2str(address))
case DLCAcceptFailed(payload) => failure2obj(payload)
case DLCAcceptSucceed(id) =>
upickle.default.writeJs(id.hex)
case DLCOfferSendFailed(payload) => failure2obj(payload)
case DLCOfferSendSucceed(id) =>
upickle.default.writeJs(id.hex)
case DLCSignFailed(payload) => failure2obj(payload)
case DLCSignSucceed(id) =>
upickle.default.writeJs(id.hex)
}
val notificationObj = ujson.Obj(
PicklerKeys.typeKey -> writeJs(notification.`type`),
@ -222,6 +242,11 @@ object WsPicklers {
val typeObj = read[DLCNodeWsType](obj(PicklerKeys.typeKey))
val payloadObj = obj(PicklerKeys.payloadKey)
def obj2failure(payload: ujson.Value): (Sha256Digest, String) = {
(Sha256Digest.fromHex(payload.obj(PicklerKeys.idKey).str),
payload.obj(PicklerKeys.errorKey).str)
}
typeObj match {
case DLCNodeWsType.DLCConnectionInitiated =>
val address: InetSocketAddress =
@ -235,6 +260,18 @@ object WsPicklers {
val address: InetSocketAddress =
NetworkUtil.parseInetSocketAddress(payloadObj.str, DLC.DefaultPort)
DLCNodeConnectionFailed(address)
case DLCNodeWsType.DLCAcceptFailed =>
DLCAcceptFailed(obj2failure(payloadObj))
case DLCNodeWsType.DLCAcceptSucceed =>
DLCAcceptSucceed(Sha256Digest.fromHex(payloadObj.str))
case DLCNodeWsType.DLCOfferSendFailed =>
DLCOfferSendFailed(obj2failure(payloadObj))
case DLCNodeWsType.DLCOfferSendSucceed =>
DLCOfferSendSucceed(Sha256Digest.fromHex(payloadObj.str))
case DLCNodeWsType.DLCSignFailed =>
DLCSignFailed(obj2failure(payloadObj))
case DLCNodeWsType.DLCSignSucceed =>
DLCSignSucceed(Sha256Digest.fromHex(payloadObj.str))
}
}
@ -276,6 +313,12 @@ object WsPicklers {
)
}
implicit val dlcNodeNotificationPickler: ReadWriter[
DLCNodeNotification[_]] = {
readwriter[ujson.Obj]
.bimap(writeDLCNodeNotification, readDLCNodeNotification)
}
implicit val walletNotificationPickler: ReadWriter[WalletNotification[_]] = {
readwriter[ujson.Obj].bimap(writeWalletNotification, readWalletNotification)
}
@ -363,4 +406,39 @@ object WsPicklers {
readDLCNodeNotification(_).asInstanceOf[DLCNodeConnectionEstablished])
}
implicit val dlcAcceptSucceedPickler: ReadWriter[DLCAcceptSucceed] = {
readwriter[ujson.Obj].bimap(
writeDLCNodeNotification(_),
readDLCNodeNotification(_).asInstanceOf[DLCAcceptSucceed])
}
implicit val dlcAcceptFailedPickler: ReadWriter[DLCAcceptFailed] = {
readwriter[ujson.Obj].bimap(
writeDLCNodeNotification(_),
readDLCNodeNotification(_).asInstanceOf[DLCAcceptFailed])
}
implicit val dlcSignSucceedPickler: ReadWriter[DLCSignSucceed] = {
readwriter[ujson.Obj].bimap(
writeDLCNodeNotification(_),
readDLCNodeNotification(_).asInstanceOf[DLCSignSucceed])
}
implicit val dlcSignFailedPickler: ReadWriter[DLCSignFailed] = {
readwriter[ujson.Obj].bimap(
writeDLCNodeNotification(_),
readDLCNodeNotification(_).asInstanceOf[DLCSignFailed])
}
implicit val dlcOfferSendSucceedPickler: ReadWriter[DLCOfferSendSucceed] = {
readwriter[ujson.Obj].bimap(
writeDLCNodeNotification(_),
readDLCNodeNotification(_).asInstanceOf[DLCOfferSendSucceed])
}
implicit val dlcOfferSendFailedPickler: ReadWriter[DLCOfferSendFailed] = {
readwriter[ujson.Obj].bimap(
writeDLCNodeNotification(_),
readDLCNodeNotification(_).asInstanceOf[DLCOfferSendFailed])
}
}

View file

@ -16,19 +16,20 @@ import org.bitcoins.commons.jsonmodels.ws.ChainNotification.{
BlockProcessedNotification,
SyncFlagChangedNotification
}
import org.bitcoins.commons.jsonmodels.ws.WalletNotification._
import org.bitcoins.commons.jsonmodels.ws.{
ChainNotification,
WalletNotification,
WalletWsType,
WsNotification
import org.bitcoins.commons.jsonmodels.ws.DLCNodeNotification.{
DLCAcceptFailed,
DLCNodeConnectionFailed,
DLCNodeConnectionInitiated
}
import org.bitcoins.commons.jsonmodels.ws.WalletNotification._
import org.bitcoins.commons.jsonmodels.ws._
import org.bitcoins.commons.rpc._
import org.bitcoins.commons.serializers.{Picklers, WsPicklers}
import org.bitcoins.core.currency.Bitcoins
import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.tlv.{DLCOfferTLV, LnMessage, LnMessageFactory}
import org.bitcoins.core.protocol.tlv.{DLCOfferTLV, LnMessage}
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.util.NetworkUtil
import org.bitcoins.core.wallet.fee.SatoshisPerVirtualByte
import org.bitcoins.crypto.{CryptoUtil, DoubleSha256DigestBE}
import org.bitcoins.testkit.server.{
@ -37,6 +38,7 @@ import org.bitcoins.testkit.server.{
}
import org.bitcoins.testkit.util.AkkaUtil
import java.net.InetSocketAddress
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Future, Promise}
import scala.util.Try
@ -53,13 +55,17 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture {
case message: TextMessage.Strict =>
//we should be able to parse the address message
val text = message.text
val dlcNodeNotificationOpt: Option[DLCNodeNotification[_]] = Try(
upickle.default.read[DLCNodeNotification[_]](text)(
WsPicklers.dlcNodeNotificationPickler)).toOption
val walletNotificationOpt: Option[WalletNotification[_]] = Try(
upickle.default.read[WalletNotification[_]](text)(
WsPicklers.walletNotificationPickler)).toOption
val chainNotificationOpt: Option[ChainNotification[_]] = Try(
upickle.default.read[ChainNotification[_]](text)(
WsPicklers.chainNotificationPickler)).toOption
walletNotificationOpt.getOrElse(chainNotificationOpt.get)
walletNotificationOpt.getOrElse(
chainNotificationOpt.getOrElse(dlcNodeNotificationOpt.get))
case msg =>
fail(s"Unexpected msg type received in the sink, msg=$msg")
}
@ -84,6 +90,36 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture {
.fromSinkAndSourceCoupledMat(sink, Source.maybe[Message])(Keep.both)
}
val str =
"fda71afd055b0006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910ffdd82efd041f000000000bebc200" +
"fda7103a02142d31363937383430313332313833343638303037000000000bebc20013343138353935353634383832353636383630" +
"380000000000000000fda722fd03d300030005fdd824bfaf6c09039d5f8cc720c387e5c303d0205c27da418e690e62577b5e9c896a" +
"ed76d91fe5df45b1082ee2c6367439f8120f1294d28658f1f4511319ba3da57e5f64cf8beaab25b9f3b9d15c4344cf6600dee69565" +
"30d05923f219f3bab3b1960b61fdd8225b000195c368ccd9b6b45755bbd11e58f1376b78657f4d159a683350a9ab2cf7da10f40000" +
"0000fdd8062b0002142d3136393738343031333231383334363830303713343138353935353634383832353636383630380564756d" +
"6d79fdd824bf4ee2503496cb46026da419c5cef4d2647531f7b3356b0736821e2f73162f6b01d0879369762fa0eedb112a1a02e36f" +
"86e035c3137b7ef5ba253f810464bde41165219eae4eec1d880e00d04c6d48cb6912b644c7a6b1094e7d82b36cfdce0f71fdd8225b" +
"00012808ee563361556fb9a4949b278c27c27056191102db5e9a977c5c82d623871d00000000fdd8062b0002142d31363937383430" +
"31333231383334363830303713343138353935353634383832353636383630380564756d6d79fdd824bf576589cd0d8e7fe996b049" +
"d180d002a0c75b510960847478225eba4e39f13284edb45c5f035998dfc8166f64b8b0c159dc3d909a742d0422148f85cb3fe8f016" +
"03433d626590b30351ceb2f0eae0b150b16d044f886c980f78b6c00eaa048979fdd8225b0001891f1c695bb10f4769608b36e703e6" +
"78428f36cdb2611e4e2fa70ad56f37532900000000fdd8062b0002142d313639373834303133323138333436383030371334313835" +
"3935353634383832353636383630380564756d6d79fdd824bfcee4dfa15d24db06fc145218d33df368198e426e876a98d351d83dc0" +
"04b8b428aec0674493b0e765f2ead36dd575cb24e37d791e3e18c69ed28a0865b169444ad0861afa1b22c38c465f0acbd140ce3de4" +
"85c22247164fad7daf7162b44d4901fdd8225b00013f340ca103eca4a6a1c49ccf8319ba9fbc6d9f31c7592586e87e3e7f0cb74f30" +
"00000000fdd8062b0002142d3136393738343031333231383334363830303713343138353935353634383832353636383630380564" +
"756d6d79fdd824bfdec152276ec26f0cbfb49e55f40e3a11ff1e305dde9481cc94832141a8102d9c861481796bf65b2205d75a1bd1" +
"62be4ef2e4f1ae01e22ce6318743b4b4cf628c4c19449df7997504313270546943decf3b71ffc3068e8a5c80320c8b91e62159fdd8" +
"225b0001ef88acc46190bb105a15827333e212b4f1bf5e437c8ae1f638a65a9d8ff0126000000000fdd8062b0002142d3136393738" +
"343031333231383334363830303713343138353935353634383832353636383630380564756d6d79036dc06b5ba0d4957f38e9cc28" +
"859e6ea88676c4453441e866605eed63181c164b001600149b1589d0bf8635c1f5662036200a4e1454966449000000000000000000" +
"00000005f5e1000002fda7143f0000000000000000002902000000000100e1f505000000001600141c50c646c3818f6f4d0229715e" +
"ae262ae4be69340000000000000000fffffffd006b0000fda7144b0000000000000001003502000000000100e1f505000000002200" +
"20ddcc4b14b4fe91e90e0b9afb090feb892f4f4e6268c5a342e6f48715bc2303e10000000000000000fffffffd0095000000160014" +
"2fc55cee805d4112a653e18d84cb405cb188b83000000000000000010000000000000000000000000000000a6320d4106320d411"
val offer: LnMessage[DLCOfferTLV] = LnMessage(DLCOfferTLV.fromHex(str))
it must "fail if RPC password is incorrect" in { serverWithBitcoind =>
val ServerWithBitcoind(_, server) = serverWithBitcoind
val req = buildReq(server.conf, Some("wrong password"))
@ -304,44 +340,6 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture {
val promise: Promise[Option[Message]] = notificationsF._2._2
val str =
"a71a006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000fdd82efd033900000000000186a0fda7204e" +
"0012fda72642000400000000fda728020000fd88b8000000fda728020000fdafc8fdc3500000fda728020000fdd6d8fe000186a000" +
"00fda728020000fe0003fffffe000186a00000fda724020000fda712fd02d9fdd824fd02d391177fd623a72d56e7bc12e3903f8d6b" +
"ce7f07a25226d54009cd7e670f5e7a7320b0704286580d8b6a7f31ab7bf71356a13c28aa609a021111b2e3d2b2db26bc120bd29248" +
"895b81f76b07d85a21b86021f22352d6376d19bbf5c93f918828f1fdd822fd026d0012fad0cde50a2258efa25cbba60ef0b6677cd2" +
"9654802937c112097edb64bd205beea02263d6461e60a9ca8e08209c8bd5552863156014d5418cad91ac590bbf13a847f105db9899" +
"d560e5040f9565c043c9e7fdf3782ad2708c5b99646c722b411854747248fb52e6486cce3eca5ddf9d64ecbe0864501a446efd3788" +
"63f9a4055fab50d2112320ff14d8747a72467589822103f197063b49e77b90d82d3a8d49b63c3ceb9bd3328398a53989d4237216a2" +
"4a1d12364efa2d2aec59cdc87909b115dca5b07106b70032ff78072f82ceeaf2e20db55086e9a2e5e5cac864992d747fd40f4b26bc" +
"3d7de958ee02460d1199ff81438c9b76b3934cbc4566d10f242563b95e7df79c28d52c9c46b617676a4ee84a549ee1f0f53865c9ef" +
"4d0ff825e2f438384c5f6238d0734beb570a1a49d035d9f86ee31c23f1e97bd34fba3f586c0fdf29997530e528b3200a0d7e34f865" +
"dc4ca7bfd722bf77c0478ddd25bfa2dc6a4ab973d0c1b7a9ff38283b7c19bbe02677a9b628f3ee3d5198d66c1623c9608293093c12" +
"6d4124a445bb6412f720493b6ffa411db53923895fd50c9563d50a97a86188084fe4c0f15ce50438ff0b00e1a9470185fd7c96296a" +
"e2be12056f61ceaeee7ced48314a3b6855bc9aa3b446b9dfad68553f5302c60670a95fb9bdc5e72db7f1e9d42e4f4baca1bcbb2261" +
"2db6417b45cc3d78b1ef33fc362a68db56df00ab1ee0700bd900200f6a24882101e71de7e18a7fb0d7da27b340de52f97d96239f35" +
"9cfe31afcaf69cc9ddfcbfbdb2267e673ad728a29dd22d31d1a1187162037480fdd80a100002000642544355534400000000001212" +
"446572696269742d4254432d394645423232036585c2349d728229d82c82b4d4e28d9889ccd430bbca1c949c042b93950f211f0016" +
"00147e54f6d4148c0c0b4571c51cf624bf010e87418145a4e91696acf94e000000000000c3500002fda714fd0163876e1274389fa6" +
"61014d02000000000101f798dcf8a5c9b0dca5d771ca5a0f9d882f0c7d2776925b3819e00f46daf699690000000000feffffff0220" +
"02010000000000160014c799807edca63a977eeddd66d0fe07d147fefe4b808400000000000016001490222a528db0f1d8a1056286" +
"b3b20c35c27c3b8704004730440220753db76fe9abafb01b141a36314abf530d7afd6365378c5bc036e0a735fe287402202f8f18cf" +
"c1675d918d03a6de2855275aa3e4305e8f6e1b4cad1ae5dd31b9d5be014730440220221d4e91113ed01c3d4519c84efd11f51c543f" +
"1efb4f658cd4e6ad69950dc44902206f3d9bfeae593c84975e27ba87d91ff0ed36bd15e0bc2d002d1370a51a61f89e01475221022b" +
"8d44f97a4ecd80b33db307fc4874654c27e9812e0079d3f5c806a054ca756321039b921e070bc3ae42e73d8fb091ddf18c8f59b923" +
"bdfa870347e83bc263ee4ea652ae003afa6100000000fffffffd006b0000fda714fd01635f8026cb666a3490014d02000000000101" +
"9c66c927a1736790803e65167f9cfd618e4383cff635fd0af30d6a9af6897a3e0200000000feffffff02b7a7000000000000160014" +
"c2d981b59e0374eeb1d9fca524e62e69170bd002e9de0000000000001600145f990a2987d844b3d5ff391c41b079f3935866b60400" +
"4730440220735f325169ddd1a8e8828d3dd75386503055d5802156c07733a5d07d18a7219502204c7bab4e8b957fa95cc048205628" +
"e9bd7f15f46df52306a39f596ae8df9d7e9c0147304402206cbbcea5def1ad4c937c2c6ac63346aef7e292974aa234890bd4c26eea" +
"302dbe022019099e153c4000f46d75bd65ac769d3a1058b8d6c34953ec6804aa71e7f2132b0147522102b51a93f2196916782166e5" +
"40260cb889b89e787256fd4d282ea25026abedb14a2103a8e77c9778e3ac62d2764668491f1febe0e92e5b270995d64e5aa39f64af" +
"bd8252ae8074036200000000fffffffd006b0000001600145333b7c568cba36b5f53c24d05d36c076e741e9022edb0610ecaac5010" +
"30b89bbde232b1000000000000000362037480620caf00"
val offer: LnMessage[DLCOfferTLV] =
LnMessageFactory(DLCOfferTLV).fromHex(str)
val expectedHash = CryptoUtil.sha256(offer.tlv.bytes)
ConsoleCli
@ -449,6 +447,50 @@ class WebsocketTests extends BitcoinSServerMainBitcoindFixture {
}
}
it must "receive dlc node updates" in { serverWithBitcoind =>
val ServerWithBitcoind(_, server) = serverWithBitcoind
val cliConfig = Config(rpcPortOpt = Some(server.conf.rpcPort),
rpcPassword = server.conf.rpcPassword)
val req = buildReq(server.conf)
val notificationsF: (
Future[WebSocketUpgradeResponse],
(Future[Seq[WsNotification[_]]], Promise[Option[Message]])) = {
Http()
.singleWebSocketRequest(req, websocketFlow)
}
val walletNotificationsF: Future[Seq[WsNotification[_]]] =
notificationsF._2._1
val promise: Promise[Option[Message]] = notificationsF._2._2
val peerAddr =
InetSocketAddress.createUnresolved("127.0.0.1", NetworkUtil.randomPort())
exec(AcceptDLC(offer = offer,
externalPayoutAddressOpt = None,
externalChangeAddressOpt = None,
peerAddr = peerAddr),
cliConfig)
for {
_ <- AkkaUtil.nonBlockingSleep(500.millis)
_ = promise.success(None)
notifications <- walletNotificationsF
} yield {
assert(notifications.contains(DLCNodeConnectionInitiated(peerAddr)))
assert(notifications.contains(DLCNodeConnectionFailed(peerAddr)))
assert(notifications.exists(n =>
n match {
case DLCAcceptFailed((id, error)) =>
id == offer.tlv.tempContractId && error.startsWith(
"Connection refused")
case _ => false
}))
}
}
/* TODO implement a real test for this case
it must "not queue things on the websocket while there is no one connected" in {
serverWithBitcoind =>

View file

@ -33,9 +33,15 @@ import org.bitcoins.core.util.FutureUtil
import org.bitcoins.crypto.{DoubleSha256DigestBE, Sha256Digest}
import org.bitcoins.dlc.node.{
DLCNodeCallbacks,
OnAcceptFailed,
OnAcceptSucceed,
OnOfferSendFailed,
OnOfferSendSucceed,
OnPeerConnectionEstablished,
OnPeerConnectionFailed,
OnPeerConnectionInitiated
OnPeerConnectionInitiated,
OnSignFailed,
OnSignSucceed
}
import org.bitcoins.dlc.wallet.{
DLCWalletCallbacks,
@ -285,10 +291,52 @@ object WebsocketUtil extends Logging {
offerF.map(_ => ())
}
import DLCNodeCallbacks._
val onAcceptSucceed: OnAcceptSucceed = { payload =>
val notification = DLCNodeNotification.DLCAcceptSucceed(payload)
val offerF = walletQueue.offer(notification)
offerF.map(_ => ())
}
onPeerConnectionInitiated(
onConnectionInitiated) + onPeerConnectionEstablished(
onConnectionEstablished) + onPeerConnectionFailed(onConnectionFailed)
val onAcceptFailed: OnAcceptFailed = { payload =>
val notification = DLCNodeNotification.DLCAcceptFailed(payload)
val offerF = walletQueue.offer(notification)
offerF.map(_ => ())
}
val onOfferSendSucceed: OnOfferSendSucceed = { payload =>
val notification = DLCNodeNotification.DLCOfferSendSucceed(payload)
val offerF = walletQueue.offer(notification)
offerF.map(_ => ())
}
val onOfferSendFailed: OnOfferSendFailed = { payload =>
val notification = DLCNodeNotification.DLCOfferSendFailed(payload)
val offerF = walletQueue.offer(notification)
offerF.map(_ => ())
}
val onSignSucceed: OnSignSucceed = { payload =>
val notification = DLCNodeNotification.DLCSignSucceed(payload)
val offerF = walletQueue.offer(notification)
offerF.map(_ => ())
}
val onSignFailed: OnSignFailed = { payload =>
val notification = DLCNodeNotification.DLCSignFailed(payload)
val offerF = walletQueue.offer(notification)
offerF.map(_ => ())
}
DLCNodeCallbacks(
onPeerConnectionInitiated = Vector(onConnectionInitiated),
onPeerConnectionEstablished = Vector(onConnectionEstablished),
onPeerConnectionFailed = Vector(onConnectionFailed),
onOfferSendSucceed = Vector(onOfferSendSucceed),
onOfferSendFailed = Vector(onOfferSendFailed),
onAcceptSucceed = Vector(onAcceptSucceed),
onAcceptFailed = Vector(onAcceptFailed),
onSignSucceed = Vector(onSignSucceed),
onSignFailed = Vector(onSignFailed)
)
}
}

View file

@ -169,4 +169,6 @@ object PicklerKeys {
final val typeKey: String = "type"
final val payloadKey: String = "payload"
val errorKey: String = "error"
}

View file

@ -2,6 +2,7 @@ package org.bitcoins.dlc.node
import akka.actor.ActorRef
import org.bitcoins.core.number.UInt32
import org.bitcoins.core.protocol.BigSizeUInt
import org.bitcoins.core.protocol.dlc.models.DLCState
import org.bitcoins.core.protocol.tlv.{LnMessage, SendOfferTLV}
import org.bitcoins.core.wallet.fee.SatoshisPerVirtualByte
@ -12,9 +13,10 @@ import org.bitcoins.testkit.wallet.BitcoinSDualWalletTest
import org.bitcoins.testkit.wallet.DLCWalletUtil._
import org.bitcoins.testkit.wallet.FundWalletUtil.FundedDLCWallet
import org.scalatest.FutureOutcome
import scodec.bits.ByteVector
import java.net.InetSocketAddress
import scala.concurrent.Promise
import scala.concurrent.{Future, Promise}
import scala.concurrent.duration.DurationInt
class DLCNegotiationTest extends BitcoinSDualWalletTest {
@ -24,6 +26,19 @@ class DLCNegotiationTest extends BitcoinSDualWalletTest {
withDualFundedDLCWallets(test)
}
private val handleWriteFn: (BigSizeUInt, ByteVector) => Future[Unit] = {
case (_: BigSizeUInt, _: ByteVector) =>
Future.unit
}
private val handleWriteErrorFn: (
BigSizeUInt,
ByteVector,
Throwable) => Future[Unit] = {
case (_: BigSizeUInt, _: ByteVector, _: Throwable) =>
Future.unit
}
it must "setup a DLC" in {
fundedDLCWallets: (FundedDLCWallet, FundedDLCWallet) =>
val walletA = fundedDLCWallets._1.wallet
@ -37,10 +52,17 @@ class DLCNegotiationTest extends BitcoinSDualWalletTest {
val handlerP = Promise[ActorRef]()
for {
_ <- DLCServer.bind(walletA, bindAddress, Vector(), None)
_ <- DLCServer.bind(dlcWalletApi = walletA,
bindAddress = bindAddress,
targets = Vector(),
torParams = None,
handleWrite = handleWriteFn,
handleWriteError = handleWriteErrorFn)
_ <- DLCClient.connect(Peer(connectAddress, socks5ProxyParams = None),
walletB,
Some(handlerP))
Some(handlerP),
handleWrite = handleWriteFn,
handleWriteError = handleWriteErrorFn)
handler <- handlerP.future
@ -89,12 +111,29 @@ class DLCNegotiationTest extends BitcoinSDualWalletTest {
InetSocketAddress.createUnresolved("127.0.0.1", port)
val handlerP = Promise[ActorRef]()
val okP = Promise[ByteVector]()
val errorP = Promise[ByteVector]()
for {
_ <- DLCServer.bind(walletA, bindAddress, Vector(), None)
_ <- DLCClient.connect(Peer(connectAddress, socks5ProxyParams = None),
walletB,
Some(handlerP))
_ <- DLCServer.bind(walletA,
bindAddress,
Vector(),
None,
handleWrite = handleWriteFn,
handleWriteError = handleWriteErrorFn)
_ <- DLCClient.connect(
Peer(connectAddress, socks5ProxyParams = None),
walletB,
Some(handlerP),
handleWrite = { (_, tlvId) =>
okP.success(tlvId)
Future.unit
},
handleWriteError = { (_, tlvId, ex) =>
errorP.success(tlvId)
Future.failed(ex)
}
)
handler <- handlerP.future
@ -113,14 +152,18 @@ class DLCNegotiationTest extends BitcoinSDualWalletTest {
None)
tlv = SendOfferTLV(peer = "peer", message = "msg", offer = offer.toTLV)
_ = assert(!okP.isCompleted)
_ = assert(!errorP.isCompleted)
_ = handler ! DLCDataHandler.Send(LnMessage(tlv))
ok <- okP.future
_ = assert(ok == tlv.offer.tempContractId.bytes)
_ = assert(!errorP.isCompleted)
_ <- TestAsyncUtil.awaitConditionF { () =>
walletA.listIncomingDLCOffers().map(_.nonEmpty)
}
postA <- walletA.listIncomingDLCOffers()
postB <- walletB.listIncomingDLCOffers()
} yield {
assert(postA.nonEmpty)
assert(postB.isEmpty)

View file

@ -70,7 +70,7 @@ class DLCNodeTest extends BitcoinSDLCNodeTest {
_ = assert(!errorP.isCompleted)
invalidAddr = InetSocketAddress.createUnresolved(addrB.getHostString,
NetworkUtil.randomPort())
_ <- recoverToSucceededIf[java.net.ConnectException](
_ <- recoverToSucceededIf[Exception](
nodeA.checkPeerConnection(invalidAddr))
error <- errorP.future
} yield {

View file

@ -4,6 +4,7 @@ import akka.actor.ActorRef
import akka.testkit.{TestActorRef, TestProbe}
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.core.number.UInt16
import org.bitcoins.core.protocol.BigSizeUInt
import org.bitcoins.core.protocol.tlv.{LnMessage, PingTLV, PongTLV}
import org.bitcoins.dlc.node.peer.Peer
import org.bitcoins.rpc.util.RpcUtil
@ -23,6 +24,19 @@ class DLCServerTest extends BitcoinSActorFixtureWithDLCWallet {
withFundedDLCWallet(test)(getFreshConfig)
}
private val handleWriteFn: (BigSizeUInt, ByteVector) => Future[Unit] = {
case (_: BigSizeUInt, _: ByteVector) =>
Future.unit
}
private val handleWriteErrorFn: (
BigSizeUInt,
ByteVector,
Throwable) => Future[Unit] = {
case (_: BigSizeUInt, _: ByteVector, _: Throwable) =>
Future.unit
}
it must "send/receive Ping and Pong TLVs over clearnet" in { dlcWalletApi =>
val port = RpcUtil.randomPort
val bindAddress =
@ -43,7 +57,9 @@ class DLCServerTest extends BitcoinSActorFixtureWithDLCWallet {
{ (_, _, connectionHandler) =>
serverConnectionHandlerOpt = Some(connectionHandler)
serverProbe.ref
}
},
handleWriteFn,
handleWriteErrorFn
))
val resultF: Future[Future[Assertion]] = for {
@ -53,13 +69,17 @@ class DLCServerTest extends BitcoinSActorFixtureWithDLCWallet {
var clientConnectionHandlerOpt = Option.empty[ActorRef]
val clientProbe = TestProbe()
val client = TestActorRef(
DLCClient.props(dlcWalletApi.wallet,
Some(connectedAddressPromise),
None,
{ (_, _, connectionHandler) =>
clientConnectionHandlerOpt = Some(connectionHandler)
clientProbe.ref
}))
DLCClient.props(
dlcWalletApi.wallet,
Some(connectedAddressPromise),
None,
{ (_, _, connectionHandler) =>
clientConnectionHandlerOpt = Some(connectionHandler)
clientProbe.ref
},
handleWriteFn,
handleWriteErrorFn
))
client ! DLCClient.Connect(Peer(connectAddress, socks5ProxyParams = None))
for {

View file

@ -4,6 +4,7 @@ import akka.actor.ActorRef
import akka.testkit.{TestActorRef, TestProbe}
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.core.number.UInt16
import org.bitcoins.core.protocol.BigSizeUInt
import org.bitcoins.core.protocol.tlv.{LnMessage, PingTLV, PongTLV}
import org.bitcoins.dlc.node.DLCDataHandler.Received
import org.bitcoins.dlc.node.peer.Peer
@ -36,6 +37,19 @@ class DLCServerTorTest
} else FutureOutcome.succeeded
}
private val handleWriteFn: (BigSizeUInt, ByteVector) => Future[Unit] = {
case (_: BigSizeUInt, _: ByteVector) =>
Future.unit
}
private val handleWriteErrorFn: (
BigSizeUInt,
ByteVector,
Throwable) => Future[Unit] = {
case (_: BigSizeUInt, _: ByteVector, _: Throwable) =>
Future.unit
}
it must "send/receive Ping and Pong TLVs over Tor" in { fundedDLCWallet =>
val timeout = 30.seconds
@ -65,7 +79,9 @@ class DLCServerTorTest
{ (_, _, connectionHandler) =>
serverConnectionHandlerOpt = Some(connectionHandler)
serverProbe.ref
}
},
handleWriteFn,
handleWriteErrorFn
))
val resultF: Future[Future[Assertion]] = for {
@ -83,7 +99,9 @@ class DLCServerTorTest
{ (_, _, connectionHandler) =>
clientConnectionHandlerOpt = Some(connectionHandler)
clientProbe.ref
}
},
handleWriteFn,
handleWriteErrorFn
))
client ! DLCClient.Connect(

View file

@ -4,9 +4,11 @@ import akka.actor._
import akka.event.LoggingReceive
import akka.io.{IO, Tcp}
import org.bitcoins.core.api.dlc.wallet.DLCWalletApi
import org.bitcoins.core.protocol.BigSizeUInt
import org.bitcoins.dlc.node.peer.Peer
import org.bitcoins.tor.Socks5Connection.{Socks5Connect, Socks5Connected}
import org.bitcoins.tor.{Socks5Connection, Socks5ProxyParams}
import scodec.bits.ByteVector
import java.io.IOException
import java.net.InetSocketAddress
@ -17,7 +19,9 @@ class DLCClient(
dlcWalletApi: DLCWalletApi,
connectedAddress: Option[Promise[InetSocketAddress]],
handlerP: Option[Promise[ActorRef]],
dataHandlerFactory: DLCDataHandler.Factory)
dataHandlerFactory: DLCDataHandler.Factory,
handleWrite: (BigSizeUInt, ByteVector) => Future[Unit],
handleWriteError: (BigSizeUInt, ByteVector, Throwable) => Future[Unit])
extends Actor
with ActorLogging {
@ -77,7 +81,9 @@ class DLCClient(
new DLCConnectionHandler(dlcWalletApi,
connection,
handlerP,
dataHandlerFactory)))
dataHandlerFactory,
handleWrite,
handleWriteError)))
connectedAddress.foreach(_.success(peerAddress))
}
}
@ -99,7 +105,9 @@ class DLCClient(
new DLCConnectionHandler(dlcWalletApi,
proxy,
handlerP,
dataHandlerFactory)))
dataHandlerFactory,
handleWrite,
handleWriteError)))
connectedAddress.foreach(_.success(remoteAddress))
case Terminated(actor) if actor == proxy =>
context stop self
@ -122,20 +130,38 @@ object DLCClient {
dlcWalletApi: DLCWalletApi,
connectedAddress: Option[Promise[InetSocketAddress]],
handlerP: Option[Promise[ActorRef]],
dataHandlerFactory: DLCDataHandler.Factory): Props = Props(
new DLCClient(dlcWalletApi, connectedAddress, handlerP, dataHandlerFactory))
dataHandlerFactory: DLCDataHandler.Factory,
handleWrite: (BigSizeUInt, ByteVector) => Future[Unit],
handleWriteError: (
BigSizeUInt,
ByteVector,
Throwable) => Future[Unit]): Props =
Props(
new DLCClient(dlcWalletApi,
connectedAddress,
handlerP,
dataHandlerFactory,
handleWrite,
handleWriteError))
def connect(
peer: Peer,
dlcWalletApi: DLCWalletApi,
handlerP: Option[Promise[ActorRef]],
dataHandlerFactory: DLCDataHandler.Factory =
DLCDataHandler.defaultFactory)(implicit
system: ActorSystem): Future[InetSocketAddress] = {
DLCDataHandler.defaultFactory,
handleWrite: (BigSizeUInt, ByteVector) => Future[Unit],
handleWriteError: (BigSizeUInt, ByteVector, Throwable) => Future[Unit])(
implicit system: ActorSystem): Future[InetSocketAddress] = {
val promise = Promise[InetSocketAddress]()
val actor =
system.actorOf(
props(dlcWalletApi, Some(promise), handlerP, dataHandlerFactory))
props(dlcWalletApi,
Some(promise),
handlerP,
dataHandlerFactory,
handleWrite,
handleWriteError))
actor ! Connect(peer)
promise.future
}

View file

@ -6,19 +6,23 @@ import akka.io.Tcp
import akka.util.ByteString
import grizzled.slf4j.Logging
import org.bitcoins.core.api.dlc.wallet.DLCWalletApi
import org.bitcoins.core.protocol.BigSizeUInt
import org.bitcoins.core.protocol.tlv._
import org.bitcoins.dlc.node.DLCConnectionHandler.parseIndividualMessages
import scodec.bits.ByteVector
import java.io.IOException
import scala.annotation.tailrec
import scala.concurrent.Promise
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success, Try}
class DLCConnectionHandler(
dlcWalletApi: DLCWalletApi,
connection: ActorRef,
handlerP: Option[Promise[ActorRef]],
dataHandlerFactory: DLCDataHandler.Factory)
dataHandlerFactory: DLCDataHandler.Factory,
handleWrite: (BigSizeUInt, ByteVector) => Future[Unit],
handleWriteError: (BigSizeUInt, ByteVector, Throwable) => Future[Unit])
extends Actor
with ActorLogging {
@ -38,8 +42,10 @@ class DLCConnectionHandler(
def connected(unalignedBytes: ByteVector): Receive = LoggingReceive {
case lnMessage: LnMessage[TLV] =>
val id = tlvId(lnMessage)
val byteMessage = ByteString(lnMessage.bytes.toArray)
connection ! Tcp.Write(byteMessage)
connection ! Tcp.Write(byteMessage,
DLCConnectionHandler.Ack(lnMessage.tlv.tpe, id))
connection ! Tcp.ResumeReading
case tlv: TLV =>
@ -89,13 +95,21 @@ class DLCConnectionHandler(
case Tcp.PeerClosed => context.stop(self)
case c @ Tcp.CommandFailed(_: Tcp.Write) =>
// O/S buffer was full
val errorMessage = "Cannot write bytes "
c.cause match {
case Some(ex) => log.error(errorMessage, ex)
case None => log.error(errorMessage)
case DLCConnectionHandler.Ack(tlvType, tlvId) =>
//is this right? do i need to block here (or not block?)
val _ = handleWrite(tlvType, tlvId)
()
case c @ Tcp.CommandFailed(write: Tcp.Write) =>
val ex = c.cause match {
case Some(ex) => ex
case None => new IOException("Tcp.Write failed")
}
log.error("Cannot write bytes ", ex)
val (tlvType, tlvId) = write.ack match {
case DLCConnectionHandler.Ack(t, id) => (t, id)
case _ => (BigSizeUInt(0), ByteVector.empty)
}
handleWriteError(tlvType, tlvId, ex)
handler ! DLCConnectionHandler.WriteFailed(c.cause)
case DLCConnectionHandler.CloseConnection =>
@ -105,24 +119,42 @@ class DLCConnectionHandler(
case Terminated(actor) if actor == connection =>
context.stop(self)
}
private def tlvId(lnMessage: LnMessage[TLV]): ByteVector = {
lnMessage.tlv match {
case acceptTLV: DLCAcceptTLV => acceptTLV.tempContractId.bytes
case offerTLV: DLCOfferTLV => offerTLV.tempContractId.bytes
case sendOfferTLV: SendOfferTLV =>
sendOfferTLV.offer.tempContractId.bytes
case dlcSign: DLCSignTLV => dlcSign.contractId
case tlv: TLV => tlv.sha256.bytes
}
}
}
object DLCConnectionHandler extends Logging {
case object CloseConnection
case class WriteFailed(cause: Option[Throwable])
case object Ack extends Tcp.Event
case class Ack(tlvType: BigSizeUInt, id: ByteVector) extends Tcp.Event
def props(
dlcWalletApi: DLCWalletApi,
connection: ActorRef,
handlerP: Option[Promise[ActorRef]],
dataHandlerFactory: DLCDataHandler.Factory): Props = {
dataHandlerFactory: DLCDataHandler.Factory,
handleWrite: (BigSizeUInt, ByteVector) => Future[Unit],
handleWriteError: (
BigSizeUInt,
ByteVector,
Throwable) => Future[Unit]): Props = {
Props(
new DLCConnectionHandler(dlcWalletApi,
connection,
handlerP,
dataHandlerFactory))
dataHandlerFactory,
handleWrite,
handleWriteError))
}
private[bitcoins] def parseIndividualMessages(

View file

@ -4,12 +4,13 @@ import akka.actor.{ActorRef, ActorSystem}
import grizzled.slf4j.Logging
import org.bitcoins.core.api.dlc.node.DLCNodeApi
import org.bitcoins.core.api.dlc.wallet.DLCWalletApi
import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.dlc.models.DLCMessage
import org.bitcoins.core.protocol.tlv._
import org.bitcoins.core.protocol.{BigSizeUInt, BitcoinAddress}
import org.bitcoins.crypto.Sha256Digest
import org.bitcoins.dlc.node.config._
import org.bitcoins.dlc.node.peer.Peer
import scodec.bits.ByteVector
import java.net.InetSocketAddress
import scala.concurrent._
@ -31,7 +32,9 @@ case class DLCNode(wallet: DLCWalletApi)(implicit
wallet,
config.listenAddress,
config.torConf.targets,
config.torParams
config.torParams,
handleWrite = handleTLVSendSucceed,
handleWriteError = handleTLVSendFailed
)
.map { case (addr, actor) =>
hostAddressP.success(addr)
@ -65,7 +68,7 @@ case class DLCNode(wallet: DLCWalletApi)(implicit
externalPayoutAddress: Option[BitcoinAddress],
externalChangeAddress: Option[BitcoinAddress]): Future[
DLCMessage.DLCAccept] = {
for {
val f = for {
handler <- connectToPeer(peerAddress)
accept <- wallet.acceptDLCOffer(dlcOffer.tlv,
Some(peerAddress),
@ -75,13 +78,19 @@ case class DLCNode(wallet: DLCWalletApi)(implicit
handler ! DLCDataHandler.Send(accept.toMessage)
accept
}
f.failed.foreach(err =>
config.callBacks.executeOnAcceptFailed(dlcOffer.tlv.tempContractId,
err.getMessage))
f
}
override def sendDLCOffer(
peerAddress: InetSocketAddress,
message: String,
offerTLV: DLCOfferTLV): Future[Sha256Digest] = {
for {
val f = for {
handler <- connectToPeer(peerAddress)
localAddress <- getHostAddress
} yield {
@ -93,6 +102,16 @@ case class DLCNode(wallet: DLCWalletApi)(implicit
handler ! DLCDataHandler.Send(lnMessage)
offerTLV.tempContractId
}
f.failed.foreach { err =>
logger.error(
s"Failed to send offer.tempContractId=${offerTLV.tempContractId}",
err)
config.callBacks.executeOnOfferSendFailed(offerTLV.tempContractId,
err.getMessage)
}
f
}
override def sendDLCOffer(
@ -121,23 +140,69 @@ case class DLCNode(wallet: DLCWalletApi)(implicit
}
}
private def handleTLVSendFailed(
tlvType: BigSizeUInt,
tlvId: ByteVector,
error: Throwable): Future[Unit] = {
logger.info("TLV send error ", error)
tlvType match {
case SendOfferTLV.tpe | DLCOfferTLV.tpe =>
config.callBacks.executeOnOfferSendFailed(Sha256Digest.fromBytes(tlvId),
error.getMessage)
case DLCAcceptTLV.tpe =>
config.callBacks.executeOnAcceptFailed(Sha256Digest.fromBytes(tlvId),
error.getMessage)
case DLCSignTLV.tpe =>
config.callBacks.executeOnSignFailed(Sha256Digest.fromBytes(tlvId),
error.getMessage)
case unknown =>
val exn = new RuntimeException(
s"Unknown tpe=$unknown inside of handleTLVSendFailed")
Future.failed(exn)
}
}
private def handleTLVSendSucceed(
tlvType: BigSizeUInt,
tlvId: ByteVector): Future[Unit] = {
tlvType match {
case SendOfferTLV.tpe | DLCOfferTLV.tpe =>
config.callBacks.executeOnOfferSendSucceed(
Sha256Digest.fromBytes(tlvId))
case DLCAcceptTLV.tpe =>
config.callBacks.executeOnAcceptSucceed(Sha256Digest.fromBytes(tlvId))
case DLCSignTLV.tpe =>
config.callBacks.executeOnSignSucceed(Sha256Digest.fromBytes(tlvId))
case unknown =>
val exn = new RuntimeException(
s"Unknown tpe=$unknown inside of handleTLVSendSucceed")
Future.failed(exn)
}
}
private def connectToPeer(
peerAddress: InetSocketAddress): Future[ActorRef] = {
config.callBacks.executeOnPeerConnectionInitiated(peerAddress)
val peer =
Peer(socket = peerAddress, socks5ProxyParams = config.socks5ProxyParams)
val handlerP = Promise[ActorRef]()
val f = for {
_ <- DLCClient.connect(peer, wallet, Some(handlerP))
_ <- config.callBacks.executeOnPeerConnectionInitiated(peerAddress)
_ <- DLCClient.connect(peer,
wallet,
Some(handlerP),
handleWrite = handleTLVSendSucceed,
handleWriteError = handleTLVSendFailed)
handler <- handlerP.future
} yield handler
f.onComplete {
case Success(_) =>
config.callBacks.executeOnPeerConnectionEstablished(peerAddress)
case Failure(_) =>
case Failure(err) =>
logger.error(s"Failed to establish connect to peer=$peerAddress", err)
config.callBacks.executeOnPeerConnectionFailed(peerAddress)
}

View file

@ -3,6 +3,7 @@ package org.bitcoins.dlc.node
import grizzled.slf4j.Logging
import org.bitcoins.core.api.callback.{CallbackFactory, ModuleCallbacks}
import org.bitcoins.core.api.{Callback, CallbackHandler}
import org.bitcoins.crypto.Sha256Digest
import java.net.InetSocketAddress
import scala.concurrent.{ExecutionContext, Future}
@ -22,6 +23,20 @@ trait DLCNodeCallbacks extends ModuleCallbacks[DLCNodeCallbacks] with Logging {
InetSocketAddress,
OnPeerConnectionFailed]
def onOfferSendSucceed: CallbackHandler[Sha256Digest, OnOfferSendSucceed]
def onOfferSendFailed: CallbackHandler[
(Sha256Digest, String),
OnOfferSendFailed]
def onAcceptSucceed: CallbackHandler[Sha256Digest, OnAcceptSucceed]
def onAcceptFailed: CallbackHandler[(Sha256Digest, String), OnAcceptFailed]
def onSignSucceed: CallbackHandler[Sha256Digest, OnSignSucceed]
def onSignFailed: CallbackHandler[(Sha256Digest, String), OnSignFailed]
override def +(other: DLCNodeCallbacks): DLCNodeCallbacks
def executeOnPeerConnectionInitiated(peerAddress: InetSocketAddress)(implicit
@ -53,6 +68,60 @@ trait DLCNodeCallbacks extends ModuleCallbacks[DLCNodeCallbacks] with Logging {
s"${onPeerConnectionFailed.name} Callback failed with error: ",
err))
}
def executeOnOfferSendSucceed(tempContractId: Sha256Digest)(implicit
ec: ExecutionContext): Future[Unit] = {
onOfferSendSucceed.execute(
tempContractId,
(err: Throwable) =>
logger.error(s"${onOfferSendSucceed.name} Callback failed with error: ",
err))
}
def executeOnOfferSendFailed(
tempContractId: Sha256Digest,
errorMessage: String)(implicit ec: ExecutionContext): Future[Unit] = {
onOfferSendFailed.execute(
(tempContractId, errorMessage),
(err: Throwable) =>
logger.error(s"${onOfferSendFailed.name} Callback failed with error: ",
err))
}
def executeOnAcceptSucceed(tempContractId: Sha256Digest)(implicit
ec: ExecutionContext): Future[Unit] = {
onAcceptSucceed.execute(
tempContractId,
(err: Throwable) =>
logger.error(s"${onAcceptSucceed.name} Callback failed with error: ",
err))
}
def executeOnAcceptFailed(tempContractId: Sha256Digest, errorMessage: String)(
implicit ec: ExecutionContext): Future[Unit] = {
onAcceptFailed.execute(
(tempContractId, errorMessage),
(err: Throwable) =>
logger.error(s"${onAcceptFailed.name} Callback failed with error: ",
err))
}
def executeOnSignSucceed(tempContractId: Sha256Digest)(implicit
ec: ExecutionContext): Future[Unit] = {
onSignSucceed.execute(
tempContractId,
(err: Throwable) =>
logger.error(s"${onSignSucceed.name} Callback failed with error: ",
err))
}
def executeOnSignFailed(tempContractId: Sha256Digest, errorMessage: String)(
implicit ec: ExecutionContext): Future[Unit] = {
onSignFailed.execute(
(tempContractId, errorMessage),
(err: Throwable) =>
logger.error(s"${onSignFailed.name} Callback failed with error: ", err))
}
}
trait OnPeerConnectionInitiated extends Callback[InetSocketAddress]
@ -61,6 +130,18 @@ trait OnPeerConnectionEstablished extends Callback[InetSocketAddress]
trait OnPeerConnectionFailed extends Callback[InetSocketAddress]
trait OnOfferSendSucceed extends Callback[Sha256Digest]
trait OnOfferSendFailed extends Callback[(Sha256Digest, String)]
trait OnAcceptSucceed extends Callback[Sha256Digest]
trait OnAcceptFailed extends Callback[(Sha256Digest, String)]
trait OnSignSucceed extends Callback[Sha256Digest]
trait OnSignFailed extends Callback[(Sha256Digest, String)]
object DLCNodeCallbacks extends CallbackFactory[DLCNodeCallbacks] {
// Use Impl pattern here to enforce the correct names on the CallbackHandlers
@ -73,7 +154,15 @@ object DLCNodeCallbacks extends CallbackFactory[DLCNodeCallbacks] {
OnPeerConnectionEstablished],
onPeerConnectionFailed: CallbackHandler[
InetSocketAddress,
OnPeerConnectionFailed])
OnPeerConnectionFailed],
onOfferSendSucceed: CallbackHandler[Sha256Digest, OnOfferSendSucceed],
onOfferSendFailed: CallbackHandler[
(Sha256Digest, String),
OnOfferSendFailed],
onAcceptSucceed: CallbackHandler[Sha256Digest, OnAcceptSucceed],
onAcceptFailed: CallbackHandler[(Sha256Digest, String), OnAcceptFailed],
onSignSucceed: CallbackHandler[Sha256Digest, OnSignSucceed],
onSignFailed: CallbackHandler[(Sha256Digest, String), OnSignFailed])
extends DLCNodeCallbacks {
override def +(other: DLCNodeCallbacks): DLCNodeCallbacks =
@ -83,7 +172,13 @@ object DLCNodeCallbacks extends CallbackFactory[DLCNodeCallbacks] {
onPeerConnectionEstablished =
onPeerConnectionEstablished ++ other.onPeerConnectionEstablished,
onPeerConnectionFailed =
onPeerConnectionFailed ++ other.onPeerConnectionFailed
onPeerConnectionFailed ++ other.onPeerConnectionFailed,
onOfferSendSucceed = onOfferSendSucceed ++ other.onOfferSendSucceed,
onOfferSendFailed = onOfferSendFailed ++ other.onOfferSendFailed,
onAcceptSucceed = onAcceptSucceed ++ other.onAcceptSucceed,
onAcceptFailed = onAcceptFailed ++ other.onAcceptFailed,
onSignSucceed = onSignSucceed ++ other.onSignSucceed,
onSignFailed = onSignFailed ++ other.onSignFailed
)
}
@ -107,8 +202,14 @@ object DLCNodeCallbacks extends CallbackFactory[DLCNodeCallbacks] {
Vector.empty,
onPeerConnectionEstablished: Vector[OnPeerConnectionEstablished] =
Vector.empty,
onPeerConnectionFailed: Vector[OnPeerConnectionFailed] =
Vector.empty): DLCNodeCallbacks = {
onPeerConnectionFailed: Vector[OnPeerConnectionFailed] = Vector.empty,
onOfferSendSucceed: Vector[OnOfferSendSucceed] = Vector.empty,
onOfferSendFailed: Vector[OnOfferSendFailed] = Vector.empty,
onAcceptSucceed: Vector[OnAcceptSucceed] = Vector.empty,
onAcceptFailed: Vector[OnAcceptFailed] = Vector.empty,
onSignSucceed: Vector[OnSignSucceed] = Vector.empty,
onSignFailed: Vector[OnSignFailed] = Vector.empty
): DLCNodeCallbacks = {
DLCNodeCallbacksImpl(
onPeerConnectionInitiated =
CallbackHandler[InetSocketAddress, OnPeerConnectionInitiated](
@ -121,7 +222,26 @@ object DLCNodeCallbacks extends CallbackFactory[DLCNodeCallbacks] {
onPeerConnectionFailed =
CallbackHandler[InetSocketAddress, OnPeerConnectionFailed](
"onPeerConnectionFailed",
onPeerConnectionFailed)
onPeerConnectionFailed),
onOfferSendSucceed =
CallbackHandler[Sha256Digest, OnOfferSendSucceed]("onOfferSendSucceed",
onOfferSendSucceed),
onOfferSendFailed =
CallbackHandler[(Sha256Digest, String), OnOfferSendFailed](
"onOfferSendFailed",
onOfferSendFailed),
onAcceptSucceed =
CallbackHandler[Sha256Digest, OnAcceptSucceed]("onAcceptSucceed",
onAcceptSucceed),
onAcceptFailed = CallbackHandler[(Sha256Digest, String), OnAcceptFailed](
"onAcceptFailed",
onAcceptFailed),
onSignSucceed =
CallbackHandler[Sha256Digest, OnSignSucceed]("onSignSucceed",
onSignSucceed),
onSignFailed =
CallbackHandler[(Sha256Digest, String), OnSignFailed]("onSignFailed",
onSignFailed)
)
}
}

View file

@ -5,7 +5,9 @@ import akka.event.LoggingReceive
import akka.io.{IO, Tcp}
import grizzled.slf4j.Logging
import org.bitcoins.core.api.dlc.wallet.DLCWalletApi
import org.bitcoins.core.protocol.BigSizeUInt
import org.bitcoins.tor._
import scodec.bits.ByteVector
import java.io.IOException
import java.net.InetSocketAddress
@ -15,7 +17,9 @@ class DLCServer(
dlcWalletApi: DLCWalletApi,
bindAddress: InetSocketAddress,
boundAddress: Option[Promise[InetSocketAddress]],
dataHandlerFactory: DLCDataHandler.Factory = DLCDataHandler.defaultFactory)
dataHandlerFactory: DLCDataHandler.Factory,
handleWrite: (BigSizeUInt, ByteVector) => Future[Unit],
handleWriteError: (BigSizeUInt, ByteVector, Throwable) => Future[Unit])
extends Actor
with ActorLogging {
@ -47,7 +51,9 @@ class DLCServer(
new DLCConnectionHandler(dlcWalletApi,
connection,
None,
dataHandlerFactory)))
dataHandlerFactory,
handleWrite,
handleWriteError)))
}
override def postStop(): Unit = {
@ -72,14 +78,27 @@ object DLCServer extends Logging {
dlcWalletApi: DLCWalletApi,
bindAddress: InetSocketAddress,
boundAddress: Option[Promise[InetSocketAddress]] = None,
dataHandlerFactory: DLCDataHandler.Factory): Props = Props(
new DLCServer(dlcWalletApi, bindAddress, boundAddress, dataHandlerFactory))
dataHandlerFactory: DLCDataHandler.Factory,
handleWrite: (BigSizeUInt, ByteVector) => Future[Unit],
handleWriteError: (
BigSizeUInt,
ByteVector,
Throwable) => Future[Unit]): Props =
Props(
new DLCServer(dlcWalletApi,
bindAddress,
boundAddress,
dataHandlerFactory,
handleWrite,
handleWriteError))
def bind(
dlcWalletApi: DLCWalletApi,
bindAddress: InetSocketAddress,
targets: Vector[InetSocketAddress],
torParams: Option[TorParams],
handleWrite: (BigSizeUInt, ByteVector) => Future[Unit],
handleWriteError: (BigSizeUInt, ByteVector, Throwable) => Future[Unit],
dataHandlerFactory: DLCDataHandler.Factory =
DLCDataHandler.defaultFactory)(implicit
system: ActorSystem): Future[(InetSocketAddress, ActorRef)] = {
@ -105,7 +124,12 @@ object DLCServer extends Logging {
Future.successful(None)
}
actorRef = system.actorOf(
props(dlcWalletApi, bindAddress, Some(promise), dataHandlerFactory))
props(dlcWalletApi,
bindAddress,
Some(promise),
dataHandlerFactory,
handleWrite,
handleWriteError))
boundAddress <- promise.future
} yield {
val addr = onionAddress.getOrElse(boundAddress)