Add functionality for broadcasting TXs to node (#577)

* Add functionality for broadcasting TXs to node

In this commit we add functionality and tests
for broadcasting a TX from our node. To accomplish
this we introduce a table over broadcastable TXs
that's added to when the externally facing method
broadcastTransaction(tx) withing SpvNode is called.
We send out a inv message for the TX we just added,
and upon receiving a getdata message we search in
the previously mentioned table for entries where
the hashes match up.

* Broadcast TX from server to SPV node

* Perform assertions on the balance of bitcoind after sending a TX

* Remove typeclass from broadcast TX

* Refactor withFundedWalletAndBitcoind

* Match on BitcoindExecption instead of throwable

* Clean up broadcast functionality after code review
This commit is contained in:
Torkel Rogstad 2019-07-17 14:32:05 +02:00 committed by Chris Stewart
parent 90dbb9d9f8
commit 7f0b11c019
13 changed files with 313 additions and 19 deletions

View file

@ -120,7 +120,7 @@ object Main
_ <- node.sync()
start <- {
val walletRoutes = WalletRoutes(wallet)
val walletRoutes = WalletRoutes(wallet, node)
val nodeRoutes = NodeRoutes(node)
val chainRoutes = ChainRoutes(node.chainApi)
val server = Server(Seq(walletRoutes, nodeRoutes, chainRoutes))

View file

@ -11,12 +11,14 @@ import org.bitcoins.core.util.BitcoinSLogger
import org.bitcoins.core.currency._
import org.bitcoins.wallet.api.UnlockedWalletApi
import org.bitcoins.core.wallet.fee.SatoshisPerByte
import org.bitcoins.node.SpvNode
import de.heikoseeberger.akkahttpupickle.UpickleSupport._
import scala.util.Failure
import scala.util.Success
case class WalletRoutes(wallet: UnlockedWalletApi)(implicit system: ActorSystem)
case class WalletRoutes(wallet: UnlockedWalletApi, node: SpvNode)(
implicit system: ActorSystem)
extends BitcoinSLogger
with ServerRoute {
import system.dispatcher
@ -48,10 +50,8 @@ case class WalletRoutes(wallet: UnlockedWalletApi)(implicit system: ActorSystem)
// TODO dynamic fees
val feeRate = SatoshisPerByte(100.sats)
wallet.sendToAddress(address, bitcoins, feeRate).map { tx =>
// TODO this TX isn't being broadcast anywhere
// would be better to dump the entire TX hex until that's implemented?
node.broadcastTransaction(tx)
Server.httpSuccess(tx.txIdBE)
}
}
}

View file

@ -58,13 +58,6 @@ abstract class DbCommonsColumnMappers {
}
/** Responsible for mapping a [[DoubleSha256Digest]] to a String, and vice versa */
implicit val doubleSha256DigestMapper: BaseColumnType[DoubleSha256Digest] =
MappedColumnType.base[DoubleSha256Digest, String](
_.hex,
DoubleSha256Digest.fromHex
)
implicit val doubleSha256DigestBEMapper: BaseColumnType[
DoubleSha256DigestBE] =
MappedColumnType.base[DoubleSha256DigestBE, String](

View file

@ -0,0 +1,90 @@
package org.bitcoins.node
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.scalatest.FutureOutcome
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.testkit.node.NodeTestUtil
import org.bitcoins.core.currency._
import org.bitcoins.core.wallet.fee.SatoshisPerByte
import org.bitcoins.rpc.util.AsyncUtil
import org.bitcoins.rpc.BitcoindException
import org.bitcoins.core.protocol.transaction.Transaction
import org.scalactic.Bool
import scala.concurrent.Future
import scala.concurrent.duration._
import org.bitcoins.testkit.async.TestAsyncUtil
class BroadcastTransactionTest extends BitcoinSWalletTest {
override type FixtureParam = WalletWithBitcoind
def withFixture(test: OneArgAsyncTest): FutureOutcome =
withFundedWalletAndBitcoind(test)
it must "broadcast a transaction" in { param =>
val WalletWithBitcoind(wallet, rpc) = param
/**
* This is not ideal, how do we get one implicit value (`config`)
* to resolve to multiple implicit parameters?
*/
implicit val nodeConfig: NodeAppConfig = config
implicit val chainConfig: ChainAppConfig = config
def hasSeenTx(transaction: Transaction): Future[Boolean] = {
rpc
.getRawTransaction(transaction.txIdBE)
.map { _ =>
true
}
.recover {
case BitcoindException.InvalidAddressOrKey(_) =>
false
case other =>
logger.error(
s"Received unexpected error on getrawtransaction: $other")
throw other
}
}
for {
_ <- config.initialize()
address <- rpc.getNewAddress
bloom <- wallet.getBloomFilter()
spv <- {
val peer = Peer.fromBitcoind(rpc.instance)
val chainHandler = {
val bhDao = BlockHeaderDAO()
ChainHandler(bhDao, config)
}
val spv =
SpvNode(peer, chainHandler, bloomFilter = bloom)
spv.start()
}
_ <- spv.sync()
_ <- NodeTestUtil.awaitSync(spv, rpc)
tx <- wallet
.sendToAddress(address, 1.bitcoin, SatoshisPerByte(10.sats))
bitcoindBalancePreBroadcast <- rpc.getBalance
_ = spv.broadcastTransaction(tx)
_ <- TestAsyncUtil.awaitConditionF(() => hasSeenTx(tx),
duration = 1.second)
fromBitcoind <- rpc.getRawTransaction(tx.txIdBE)
_ = assert(fromBitcoind.vout.exists(_.value == 1.bitcoin))
_ <- rpc.getNewAddress.flatMap(rpc.generateToAddress(1, _))
bitcoindBalancePostBroadcast <- rpc.getBalance
} yield assert(bitcoindBalancePreBroadcast < bitcoindBalancePostBroadcast)
}
}

View file

@ -0,0 +1,21 @@
package org.bitcoins.node.models
import org.bitcoins.testkit.node.NodeUnitTest
import org.bitcoins.testkit.fixtures.NodeDAOFixture
import org.bitcoins.testkit.Implicits._
import org.bitcoins.testkit.core.gen.TransactionGenerators
class BroadcastAbleTransactionDAOTest extends NodeDAOFixture {
behavior of "BroadcastAbleTransactionDAO"
it must "write a TX and read it back" in { daos =>
val txDAO = daos.txDAO
val tx = TransactionGenerators.transaction.sampleSome
for {
created <- txDAO.create(BroadcastAbleTransaction(tx))
read <- txDAO.read(created.id.get)
} yield assert(read.contains(created))
}
}

View file

@ -17,6 +17,12 @@ import scala.concurrent.Future
import org.bitcoins.core.bloom.BloomFilter
import org.bitcoins.core.p2p.FilterLoadMessage
import org.bitcoins.core.p2p.NetworkPayload
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.node.models.BroadcastAbleTransaction
import org.bitcoins.node.models.BroadcastAbleTransactionDAO
import slick.jdbc.SQLiteProfile
import scala.util.Failure
import scala.util.Success
case class SpvNode(
peer: Peer,
@ -30,6 +36,8 @@ case class SpvNode(
extends BitcoinSLogger {
import system.dispatcher
private val txDAO = BroadcastAbleTransactionDAO(SQLiteProfile)
private val peerMsgRecv =
PeerMessageReceiver.newReceiver(callbacks)
@ -85,6 +93,22 @@ case class SpvNode(
isStoppedF.map(_ => this)
}
/** Broadcasts the given transaction over the P2P network */
def broadcastTransaction(transaction: Transaction): Unit = {
val broadcastTx = BroadcastAbleTransaction(transaction)
txDAO.create(broadcastTx).onComplete {
case Failure(exception) =>
logger.error(s"Error when writing broadcastable TX to DB", exception)
case Success(written) =>
logger.debug(
s"Wrote tx=${written.transaction.txIdBE} to broadcastable table")
}
logger.info(s"Sending out inv for tx=${transaction.txIdBE}")
peerMsgSender.sendInventoryMessage(transaction)
}
/** Checks if we have a tcp connection with our peer */
def isConnected: Boolean = peerMsgRecv.isConnected

View file

@ -1,8 +1,12 @@
package org.bitcoins.node.db
import org.bitcoins.db.DbManagement
import slick.lifted.TableQuery
import org.bitcoins.node.models.BroadcastAbleTransactionTable
object NodeDbManagement extends DbManagement {
override val allTables = List.empty
private val txTable = TableQuery[BroadcastAbleTransactionTable]
override val allTables = List(txTable)
}

View file

@ -0,0 +1,29 @@
package org.bitcoins.node.models
import slick.jdbc.SQLiteProfile.api._
import slick.jdbc.JdbcProfile
import org.bitcoins.db.CRUDAutoInc
import org.bitcoins.node.config.NodeAppConfig
import scala.concurrent.ExecutionContext
import slick.lifted.TableQuery
import scala.concurrent.Future
import org.bitcoins.core.crypto.DoubleSha256Digest
final case class BroadcastAbleTransactionDAO(profile: JdbcProfile)(
implicit val appConfig: NodeAppConfig,
val ec: ExecutionContext)
extends CRUDAutoInc[BroadcastAbleTransaction] {
val table: TableQuery[BroadcastAbleTransactionTable] =
TableQuery[BroadcastAbleTransactionTable]
/** Searches for a TX by its TXID */
def findByHash(
hash: DoubleSha256Digest): Future[Option[BroadcastAbleTransaction]] = {
import org.bitcoins.db.DbCommonsColumnMappers._
val query = table.filter(_.txid === hash.flip)
database.run(query.result).map(_.headOption)
}
}

View file

@ -16,6 +16,11 @@ import org.bitcoins.core.p2p.TransactionMessage
import org.bitcoins.core.p2p.MerkleBlockMessage
import org.bitcoins.node.SpvNodeCallbacks
import org.bitcoins.core.p2p.GetDataMessage
import org.bitcoins.node.models.BroadcastAbleTransactionDAO
import slick.jdbc.SQLiteProfile
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.core.p2p.TypeIdentifier
import org.bitcoins.core.p2p.MsgUnassigned
/** This actor is meant to handle a [[org.bitcoins.node.messages.DataPayload]]
* that a peer to sent to us on the p2p network, for instance, if we a receive a
@ -23,25 +28,56 @@ import org.bitcoins.core.p2p.GetDataMessage
*/
class DataMessageHandler(callbacks: SpvNodeCallbacks)(
implicit ec: ExecutionContext,
appConfig: ChainAppConfig)
chainConf: ChainAppConfig,
nodeConf: NodeAppConfig)
extends BitcoinSLogger {
val callbackNum = callbacks.onBlockReceived.length + callbacks.onMerkleBlockReceived.length + callbacks.onTxReceived.length
private val callbackNum = callbacks.onBlockReceived.length + callbacks.onMerkleBlockReceived.length + callbacks.onTxReceived.length
logger.debug(s"Given $callbackNum of callback(s)")
private val blockHeaderDAO: BlockHeaderDAO = BlockHeaderDAO()
private val txDAO = BroadcastAbleTransactionDAO(SQLiteProfile)
def handleDataPayload(
payload: DataPayload,
peerMsgSender: PeerMessageSender): Future[Unit] = {
payload match {
case getData: GetDataMessage =>
logger.debug(
s"Received a getdata message for inventories=${getData.inventories}")
getData.inventories.foreach { inv =>
logger.debug(s"Looking for inv=$inv")
inv.typeIdentifier match {
case TypeIdentifier.MsgTx =>
txDAO.findByHash(inv.hash).map {
case Some(tx) =>
peerMsgSender.sendTransactionMessage(tx.transaction)
case None =>
logger.warn(
s"Got request to send data with hash=${inv.hash}, but found nothing")
}
case other @ (TypeIdentifier.MsgBlock |
TypeIdentifier.MsgFilteredBlock |
TypeIdentifier.MsgCompactBlock |
TypeIdentifier.MsgFilteredWitnessBlock |
TypeIdentifier.MsgWitnessBlock | TypeIdentifier.MsgWitnessTx) =>
logger.warn(
s"Got request to send data type=$other, this is not implemented yet")
case unassigned: MsgUnassigned =>
logger.warn(
s"Received unassigned message we do not understand, msg=${unassigned}")
}
}
FutureUtil.unit
case headersMsg: HeadersMessage =>
logger.trace(
s"Received headers message with ${headersMsg.count.toInt} headers")
val headers = headersMsg.headers
val chainApi: ChainApi =
ChainHandler(blockHeaderDAO, chainConfig = appConfig)
ChainHandler(blockHeaderDAO, chainConfig = chainConf)
val chainApiF = chainApi.processHeaders(headers)
chainApiF.map { newApi =>

View file

@ -8,6 +8,7 @@ import org.bitcoins.core.util.BitcoinSLogger
import org.bitcoins.core.p2p._
import org.bitcoins.node.networking.P2PClient
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.core.protocol.transaction.Transaction
case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
extends BitcoinSLogger {
@ -55,6 +56,23 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
sendMsg(sendHeadersMsg)
}
/**
* Sends a inventory message with the given transactions
*/
def sendInventoryMessage(transactions: Transaction*): Unit = {
val inventories =
transactions.map(tx => Inventory(TypeIdentifier.MsgTx, tx.txId))
val message = InventoryMessage(inventories)
logger.trace(s"Sending inv=$message to peer=${client.peer}")
sendMsg(message)
}
def sendTransactionMessage(transaction: Transaction): Unit = {
val message = TransactionMessage(transaction)
logger.trace(s"Sending txmessage=$message to peer=${client.peer}")
sendMsg(message)
}
private[node] def sendMsg(msg: NetworkPayload): Unit = {
logger.debug(s"Sending msg=${msg.commandName} to peer=${socket}")
val newtworkMsg = NetworkMessage(conf.network, msg)

View file

@ -111,6 +111,25 @@ trait BitcoinSFixture extends fixture.AsyncFlatSpec {
}
}
/**
*
* Given two fixture building methods (one dependent on the other) and
* a function that processes the result of the builders returning a Future,
* returns a single fixture building method where the fixture is wrapper.
*
* This method is identical to `composeBuildersAndWrap`, except that
* the wrapping function returns a `Future[C]` instead of a `C`
*/
def composeBuildersAndWrapFuture[T, U, C](
builder: () => Future[T],
dependentBuilder: T => Future[U],
processResult: (T, U) => Future[C]
): () => Future[C] = () => {
composeBuilders(builder, dependentBuilder)().flatMap {
case (first, second) => processResult(first, second)
}
}
def createBitcoindWithFunds()(
implicit system: ActorSystem): Future[BitcoindRpcClient] = {
for {

View file

@ -0,0 +1,26 @@
package org.bitcoins.testkit.fixtures
import org.bitcoins.node.models.BroadcastAbleTransactionDAO
import org.scalatest._
import org.bitcoins.testkit.node.NodeUnitTest
import slick.jdbc.SQLiteProfile
import org.bitcoins.node.db.NodeDbManagement
import org.bitcoins.node.config.NodeAppConfig
case class NodeDAOs(txDAO: BroadcastAbleTransactionDAO)
/** Provides a fixture where all DAOs used by the node projects are provided */
trait NodeDAOFixture extends fixture.AsyncFlatSpec with NodeUnitTest {
private lazy val daos = {
val tx = BroadcastAbleTransactionDAO(SQLiteProfile)
NodeDAOs(tx)
}
final override type FixtureParam = NodeDAOs
implicit private val nodeConfig: NodeAppConfig = config
def withFixture(test: OneArgAsyncTest): FutureOutcome =
makeFixture(build = () => NodeDbManagement.createAll().map(_ => daos),
destroy = () => NodeDbManagement.dropAll())(test)
}

View file

@ -18,6 +18,7 @@ import org.scalatest._
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import org.bitcoins.core.currency._
import org.bitcoins.db.AppConfig
import org.bitcoins.server.BitcoinSAppConfig
import com.typesafe.config.Config
@ -140,13 +141,46 @@ trait BitcoinSWalletTest
def withNewWalletAndBitcoind(test: OneArgAsyncTest): FutureOutcome = {
val builder: () => Future[WalletWithBitcoind] = composeBuildersAndWrap(
createDefaultWallet _,
createWalletWithBitcoind,
(_: UnlockedWalletApi, walletWithBitcoind: WalletWithBitcoind) =>
builder = createDefaultWallet _,
dependentBuilder = createWalletWithBitcoind,
wrap = (_: UnlockedWalletApi, walletWithBitcoind: WalletWithBitcoind) =>
walletWithBitcoind
)
makeDependentFixture(builder, destroy = destroyWalletWithBitcoind)(test)
}
/** Funds the given wallet with money from the given bitcoind */
def fundWalletWithBitcoind(
pair: WalletWithBitcoind): Future[WalletWithBitcoind] = {
val WalletWithBitcoind(wallet, bitcoind) = pair
for {
addr <- wallet.getNewAddress()
tx <- bitcoind
.sendToAddress(addr, 25.bitcoins)
.flatMap(bitcoind.getRawTransaction(_))
_ <- bitcoind.getNewAddress.flatMap(bitcoind.generateToAddress(6, _))
_ <- wallet.processTransaction(tx.hex, 6)
balance <- wallet.getBalance()
} yield {
assert(balance >= 25.bitcoins)
pair
}
}
def withFundedWalletAndBitcoind(test: OneArgAsyncTest): FutureOutcome = {
val builder: () => Future[WalletWithBitcoind] =
composeBuildersAndWrapFuture(
builder = createDefaultWallet _,
dependentBuilder = createWalletWithBitcoind,
processResult = (_: UnlockedWalletApi, pair: WalletWithBitcoind) =>
fundWalletWithBitcoind(pair)
)
makeDependentFixture(builder, destroy = destroyWalletWithBitcoind)(test)
}
}