Add functionality for broadcasting TXs to node (#577)
* Add functionality for broadcasting TXs to node In this commit we add functionality and tests for broadcasting a TX from our node. To accomplish this we introduce a table over broadcastable TXs that's added to when the externally facing method broadcastTransaction(tx) withing SpvNode is called. We send out a inv message for the TX we just added, and upon receiving a getdata message we search in the previously mentioned table for entries where the hashes match up. * Broadcast TX from server to SPV node * Perform assertions on the balance of bitcoind after sending a TX * Remove typeclass from broadcast TX * Refactor withFundedWalletAndBitcoind * Match on BitcoindExecption instead of throwable * Clean up broadcast functionality after code review
@ -120,7 +120,7 @@ object Main
_ <- node.sync()
start <- {
val walletRoutes = WalletRoutes(wallet)
val walletRoutes = WalletRoutes(wallet, node)
val nodeRoutes = NodeRoutes(node)
val chainRoutes = ChainRoutes(node.chainApi)
val server = Server(Seq(walletRoutes, nodeRoutes, chainRoutes))
@ -11,12 +11,14 @@ import org.bitcoins.core.util.BitcoinSLogger
import org.bitcoins.core.currency._
import org.bitcoins.wallet.api.UnlockedWalletApi
import org.bitcoins.core.wallet.fee.SatoshisPerByte
import org.bitcoins.node.SpvNode
import de.heikoseeberger.akkahttpupickle.UpickleSupport._
import scala.util.Failure
import scala.util.Success
case class WalletRoutes(wallet: UnlockedWalletApi)(implicit system: ActorSystem)
case class WalletRoutes(wallet: UnlockedWalletApi, node: SpvNode)(
implicit system: ActorSystem)
extends BitcoinSLogger
with ServerRoute {
import system.dispatcher
@ -48,10 +50,8 @@ case class WalletRoutes(wallet: UnlockedWalletApi)(implicit system: ActorSystem)
// TODO dynamic fees
val feeRate = SatoshisPerByte(100.sats)
wallet.sendToAddress(address, bitcoins, feeRate).map { tx =>
// TODO this TX isn't being broadcast anywhere
// would be better to dump the entire TX hex until that's implemented?
@ -58,13 +58,6 @@ abstract class DbCommonsColumnMappers {
/** Responsible for mapping a [[DoubleSha256Digest]] to a String, and vice versa */
implicit val doubleSha256DigestMapper: BaseColumnType[DoubleSha256Digest] =
MappedColumnType.base[DoubleSha256Digest, String](
implicit val doubleSha256DigestBEMapper: BaseColumnType[
DoubleSha256DigestBE] =
MappedColumnType.base[DoubleSha256DigestBE, String](
@ -0,0 +1,90 @@
package org.bitcoins.node
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.scalatest.FutureOutcome
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.testkit.node.NodeTestUtil
import org.bitcoins.core.currency._
import org.bitcoins.core.wallet.fee.SatoshisPerByte
import org.bitcoins.rpc.util.AsyncUtil
import org.bitcoins.rpc.BitcoindException
import org.bitcoins.core.protocol.transaction.Transaction
import org.scalactic.Bool
import scala.concurrent.Future
import scala.concurrent.duration._
import org.bitcoins.testkit.async.TestAsyncUtil
class BroadcastTransactionTest extends BitcoinSWalletTest {
override type FixtureParam = WalletWithBitcoind
def withFixture(test: OneArgAsyncTest): FutureOutcome =
it must "broadcast a transaction" in { param =>
val WalletWithBitcoind(wallet, rpc) = param
* This is not ideal, how do we get one implicit value (`config`)
* to resolve to multiple implicit parameters?
implicit val nodeConfig: NodeAppConfig = config
implicit val chainConfig: ChainAppConfig = config
def hasSeenTx(transaction: Transaction): Future[Boolean] = {
.map { _ =>
.recover {
case BitcoindException.InvalidAddressOrKey(_) =>
case other =>
s"Received unexpected error on getrawtransaction: $other")
throw other
for {
_ <- config.initialize()
address <- rpc.getNewAddress
bloom <- wallet.getBloomFilter()
spv <- {
val peer = Peer.fromBitcoind(rpc.instance)
val chainHandler = {
val bhDao = BlockHeaderDAO()
ChainHandler(bhDao, config)
val spv =
SpvNode(peer, chainHandler, bloomFilter = bloom)
_ <- spv.sync()
_ <- NodeTestUtil.awaitSync(spv, rpc)
tx <- wallet
.sendToAddress(address, 1.bitcoin, SatoshisPerByte(10.sats))
bitcoindBalancePreBroadcast <- rpc.getBalance
_ = spv.broadcastTransaction(tx)
_ <- TestAsyncUtil.awaitConditionF(() => hasSeenTx(tx),
duration = 1.second)
fromBitcoind <- rpc.getRawTransaction(tx.txIdBE)
_ = assert(fromBitcoind.vout.exists(_.value == 1.bitcoin))
_ <- rpc.getNewAddress.flatMap(rpc.generateToAddress(1, _))
bitcoindBalancePostBroadcast <- rpc.getBalance
} yield assert(bitcoindBalancePreBroadcast < bitcoindBalancePostBroadcast)
@ -0,0 +1,21 @@
package org.bitcoins.node.models
import org.bitcoins.testkit.node.NodeUnitTest
import org.bitcoins.testkit.fixtures.NodeDAOFixture
import org.bitcoins.testkit.Implicits._
import org.bitcoins.testkit.core.gen.TransactionGenerators
class BroadcastAbleTransactionDAOTest extends NodeDAOFixture {
behavior of "BroadcastAbleTransactionDAO"
it must "write a TX and read it back" in { daos =>
val txDAO = daos.txDAO
val tx = TransactionGenerators.transaction.sampleSome
for {
created <- txDAO.create(BroadcastAbleTransaction(tx))
read <- txDAO.read(created.id.get)
} yield assert(read.contains(created))
@ -17,6 +17,12 @@ import scala.concurrent.Future
import org.bitcoins.core.bloom.BloomFilter
import org.bitcoins.core.p2p.FilterLoadMessage
import org.bitcoins.core.p2p.NetworkPayload
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.node.models.BroadcastAbleTransaction
import org.bitcoins.node.models.BroadcastAbleTransactionDAO
import slick.jdbc.SQLiteProfile
import scala.util.Failure
import scala.util.Success
case class SpvNode(
peer: Peer,
@ -30,6 +36,8 @@ case class SpvNode(
extends BitcoinSLogger {
import system.dispatcher
private val txDAO = BroadcastAbleTransactionDAO(SQLiteProfile)
private val peerMsgRecv =
@ -85,6 +93,22 @@ case class SpvNode(
isStoppedF.map(_ => this)
/** Broadcasts the given transaction over the P2P network */
def broadcastTransaction(transaction: Transaction): Unit = {
val broadcastTx = BroadcastAbleTransaction(transaction)
txDAO.create(broadcastTx).onComplete {
case Failure(exception) =>
logger.error(s"Error when writing broadcastable TX to DB", exception)
case Success(written) =>
s"Wrote tx=${written.transaction.txIdBE} to broadcastable table")
logger.info(s"Sending out inv for tx=${transaction.txIdBE}")
/** Checks if we have a tcp connection with our peer */
def isConnected: Boolean = peerMsgRecv.isConnected
@ -1,8 +1,12 @@
package org.bitcoins.node.db
import org.bitcoins.db.DbManagement
import slick.lifted.TableQuery
import org.bitcoins.node.models.BroadcastAbleTransactionTable
object NodeDbManagement extends DbManagement {
override val allTables = List.empty
private val txTable = TableQuery[BroadcastAbleTransactionTable]
override val allTables = List(txTable)
@ -0,0 +1,29 @@
package org.bitcoins.node.models
import slick.jdbc.SQLiteProfile.api._
import slick.jdbc.JdbcProfile
import org.bitcoins.db.CRUDAutoInc
import org.bitcoins.node.config.NodeAppConfig
import scala.concurrent.ExecutionContext
import slick.lifted.TableQuery
import scala.concurrent.Future
import org.bitcoins.core.crypto.DoubleSha256Digest
final case class BroadcastAbleTransactionDAO(profile: JdbcProfile)(
implicit val appConfig: NodeAppConfig,
val ec: ExecutionContext)
extends CRUDAutoInc[BroadcastAbleTransaction] {
val table: TableQuery[BroadcastAbleTransactionTable] =
/** Searches for a TX by its TXID */
def findByHash(
hash: DoubleSha256Digest): Future[Option[BroadcastAbleTransaction]] = {
import org.bitcoins.db.DbCommonsColumnMappers._
val query = table.filter(_.txid === hash.flip)
@ -0,0 +1,43 @@
package org.bitcoins.node.models
import slick.jdbc.SQLiteProfile.api._
import slick.lifted.ProvenShape
import org.bitcoins.db.DbRowAutoInc
import org.bitcoins.db.TableAutoInc
import org.bitcoins.core.crypto.DoubleSha256DigestBE
import scodec.bits.ByteVector
import org.bitcoins.core.protocol.transaction.Transaction
/** TXs we can broadcast over the P2P network */
final case class BroadcastAbleTransaction(
transaction: Transaction,
id: Option[Long] = None)
extends DbRowAutoInc[BroadcastAbleTransaction] {
def copyWithId(id: Long): BroadcastAbleTransaction = copy(id = Some(id))
/** Table over TXs we can broadcast over the P2P network */
final case class BroadcastAbleTransactionTable(tag: Tag)
extends TableAutoInc[BroadcastAbleTransaction](tag, "broadcast_elements") {
private type Tuple = (DoubleSha256DigestBE, ByteVector, Option[Long])
private val fromTuple: (Tuple => BroadcastAbleTransaction) = {
case (txid, bytes, id) =>
val tx = Transaction.fromBytes(bytes)
require(tx.txId == txid.flip)
BroadcastAbleTransaction(tx, id)
private val toTuple: BroadcastAbleTransaction => Option[Tuple] = tx =>
Some(tx.transaction.txId.flip, tx.transaction.bytes, tx.id)
import org.bitcoins.db.DbCommonsColumnMappers._
def txid: Rep[DoubleSha256DigestBE] = column("txid", O.Unique)
def bytes: Rep[ByteVector] = column("tx_bytes")
def * : ProvenShape[BroadcastAbleTransaction] =
(txid, bytes, id.?) <>
(fromTuple, toTuple)
@ -16,6 +16,11 @@ import org.bitcoins.core.p2p.TransactionMessage
import org.bitcoins.core.p2p.MerkleBlockMessage
import org.bitcoins.node.SpvNodeCallbacks
import org.bitcoins.core.p2p.GetDataMessage
import org.bitcoins.node.models.BroadcastAbleTransactionDAO
import slick.jdbc.SQLiteProfile
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.core.p2p.TypeIdentifier
import org.bitcoins.core.p2p.MsgUnassigned
/** This actor is meant to handle a [[org.bitcoins.node.messages.DataPayload]]
* that a peer to sent to us on the p2p network, for instance, if we a receive a
@ -23,25 +28,56 @@ import org.bitcoins.core.p2p.GetDataMessage
class DataMessageHandler(callbacks: SpvNodeCallbacks)(
implicit ec: ExecutionContext,
appConfig: ChainAppConfig)
chainConf: ChainAppConfig,
nodeConf: NodeAppConfig)
extends BitcoinSLogger {
val callbackNum = callbacks.onBlockReceived.length + callbacks.onMerkleBlockReceived.length + callbacks.onTxReceived.length
private val callbackNum = callbacks.onBlockReceived.length + callbacks.onMerkleBlockReceived.length + callbacks.onTxReceived.length
logger.debug(s"Given $callbackNum of callback(s)")
private val blockHeaderDAO: BlockHeaderDAO = BlockHeaderDAO()
private val txDAO = BroadcastAbleTransactionDAO(SQLiteProfile)
def handleDataPayload(
payload: DataPayload,
peerMsgSender: PeerMessageSender): Future[Unit] = {
payload match {
case getData: GetDataMessage =>
s"Received a getdata message for inventories=${getData.inventories}")
getData.inventories.foreach { inv =>
logger.debug(s"Looking for inv=$inv")
inv.typeIdentifier match {
case TypeIdentifier.MsgTx =>
txDAO.findByHash(inv.hash).map {
case Some(tx) =>
case None =>
s"Got request to send data with hash=${inv.hash}, but found nothing")
case other @ (TypeIdentifier.MsgBlock |
TypeIdentifier.MsgFilteredBlock |
TypeIdentifier.MsgCompactBlock |
TypeIdentifier.MsgFilteredWitnessBlock |
TypeIdentifier.MsgWitnessBlock | TypeIdentifier.MsgWitnessTx) =>
s"Got request to send data type=$other, this is not implemented yet")
case unassigned: MsgUnassigned =>
s"Received unassigned message we do not understand, msg=${unassigned}")
case headersMsg: HeadersMessage =>
s"Received headers message with ${headersMsg.count.toInt} headers")
val headers = headersMsg.headers
val chainApi: ChainApi =
ChainHandler(blockHeaderDAO, chainConfig = appConfig)
ChainHandler(blockHeaderDAO, chainConfig = chainConf)
val chainApiF = chainApi.processHeaders(headers)
chainApiF.map { newApi =>
@ -8,6 +8,7 @@ import org.bitcoins.core.util.BitcoinSLogger
import org.bitcoins.core.p2p._
import org.bitcoins.node.networking.P2PClient
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.core.protocol.transaction.Transaction
case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
extends BitcoinSLogger {
@ -55,6 +56,23 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
* Sends a inventory message with the given transactions
def sendInventoryMessage(transactions: Transaction*): Unit = {
val inventories =
transactions.map(tx => Inventory(TypeIdentifier.MsgTx, tx.txId))
val message = InventoryMessage(inventories)
logger.trace(s"Sending inv=$message to peer=${client.peer}")
def sendTransactionMessage(transaction: Transaction): Unit = {
val message = TransactionMessage(transaction)
logger.trace(s"Sending txmessage=$message to peer=${client.peer}")
private[node] def sendMsg(msg: NetworkPayload): Unit = {
logger.debug(s"Sending msg=${msg.commandName} to peer=${socket}")
val newtworkMsg = NetworkMessage(conf.network, msg)
@ -111,6 +111,25 @@ trait BitcoinSFixture extends fixture.AsyncFlatSpec {
* Given two fixture building methods (one dependent on the other) and
* a function that processes the result of the builders returning a Future,
* returns a single fixture building method where the fixture is wrapper.
* This method is identical to `composeBuildersAndWrap`, except that
* the wrapping function returns a `Future[C]` instead of a `C`
def composeBuildersAndWrapFuture[T, U, C](
builder: () => Future[T],
dependentBuilder: T => Future[U],
processResult: (T, U) => Future[C]
): () => Future[C] = () => {
composeBuilders(builder, dependentBuilder)().flatMap {
case (first, second) => processResult(first, second)
def createBitcoindWithFunds()(
implicit system: ActorSystem): Future[BitcoindRpcClient] = {
for {
@ -0,0 +1,26 @@
package org.bitcoins.testkit.fixtures
import org.bitcoins.node.models.BroadcastAbleTransactionDAO
import org.scalatest._
import org.bitcoins.testkit.node.NodeUnitTest
import slick.jdbc.SQLiteProfile
import org.bitcoins.node.db.NodeDbManagement
import org.bitcoins.node.config.NodeAppConfig
case class NodeDAOs(txDAO: BroadcastAbleTransactionDAO)
/** Provides a fixture where all DAOs used by the node projects are provided */
trait NodeDAOFixture extends fixture.AsyncFlatSpec with NodeUnitTest {
private lazy val daos = {
val tx = BroadcastAbleTransactionDAO(SQLiteProfile)
final override type FixtureParam = NodeDAOs
implicit private val nodeConfig: NodeAppConfig = config
def withFixture(test: OneArgAsyncTest): FutureOutcome =
makeFixture(build = () => NodeDbManagement.createAll().map(_ => daos),
destroy = () => NodeDbManagement.dropAll())(test)
@ -18,6 +18,7 @@ import org.scalatest._
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import org.bitcoins.core.currency._
import org.bitcoins.db.AppConfig
import org.bitcoins.server.BitcoinSAppConfig
import com.typesafe.config.Config
@ -140,13 +141,46 @@ trait BitcoinSWalletTest
def withNewWalletAndBitcoind(test: OneArgAsyncTest): FutureOutcome = {
val builder: () => Future[WalletWithBitcoind] = composeBuildersAndWrap(
createDefaultWallet _,
(_: UnlockedWalletApi, walletWithBitcoind: WalletWithBitcoind) =>
builder = createDefaultWallet _,
dependentBuilder = createWalletWithBitcoind,
wrap = (_: UnlockedWalletApi, walletWithBitcoind: WalletWithBitcoind) =>
makeDependentFixture(builder, destroy = destroyWalletWithBitcoind)(test)
/** Funds the given wallet with money from the given bitcoind */
def fundWalletWithBitcoind(
pair: WalletWithBitcoind): Future[WalletWithBitcoind] = {
val WalletWithBitcoind(wallet, bitcoind) = pair
for {
addr <- wallet.getNewAddress()
tx <- bitcoind
.sendToAddress(addr, 25.bitcoins)
_ <- bitcoind.getNewAddress.flatMap(bitcoind.generateToAddress(6, _))
_ <- wallet.processTransaction(tx.hex, 6)
balance <- wallet.getBalance()
} yield {
assert(balance >= 25.bitcoins)
def withFundedWalletAndBitcoind(test: OneArgAsyncTest): FutureOutcome = {
val builder: () => Future[WalletWithBitcoind] =
builder = createDefaultWallet _,
dependentBuilder = createWalletWithBitcoind,
processResult = (_: UnlockedWalletApi, pair: WalletWithBitcoind) =>
makeDependentFixture(builder, destroy = destroyWalletWithBitcoind)(test)
