mirror of
https://github.com/ACINQ/eclair.git
synced 2025-02-23 14:40:34 +01:00
Electrum: allow watcher to watch for mempool transactions (#1298)
* Electrum: allow watcher to watch for mempool transactions Watcher now handles WatchConfirmed watches where min depth is set to 0: the watch event will be sent when the tx enters the mempool of the bitcoin node our Electrum server is connected to. For 0-conf channel, use scids with a height of 0 and a tx index generated from the first 16 bytes of the funding txid. This gives us unique ids that can still be identified as 0-conf channel.
This commit is contained in:
parent
66e04265b3
commit
1734861930
2 changed files with 167 additions and 22 deletions
|
@ -22,7 +22,7 @@ import akka.actor.{Actor, ActorLogging, ActorRef, Stash, Terminated}
|
|||
import fr.acinq.bitcoin.{BlockHeader, ByteVector32, Script, Transaction, TxIn, TxOut}
|
||||
import fr.acinq.eclair.blockchain._
|
||||
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.computeScriptHash
|
||||
import fr.acinq.eclair.channel.BITCOIN_PARENT_TX_CONFIRMED
|
||||
import fr.acinq.eclair.channel.{BITCOIN_FUNDING_DEPTHOK, BITCOIN_PARENT_TX_CONFIRMED}
|
||||
import fr.acinq.eclair.transactions.Scripts
|
||||
import fr.acinq.eclair.{LongToBtcAmount, ShortChannelId, TxCoordinates}
|
||||
|
||||
|
@ -97,7 +97,7 @@ class ElectrumWatcher(blockCount: AtomicLong, client: ActorRef) extends Actor wi
|
|||
case watch@WatchConfirmed(_, txid, publicKeyScript, _, _) =>
|
||||
val scriptHash = computeScriptHash(publicKeyScript)
|
||||
log.info(s"added watch-confirmed on txid=$txid scriptHash=$scriptHash")
|
||||
client ! ElectrumClient.GetScriptHashHistory(scriptHash)
|
||||
client ! ElectrumClient.ScriptHashSubscription(scriptHash, self)
|
||||
context.watch(watch.channel)
|
||||
context become running(height, tip, watches + watch, scriptHashStatus, block2tx, sent)
|
||||
|
||||
|
@ -117,7 +117,8 @@ class ElectrumWatcher(blockCount: AtomicLong, client: ActorRef) extends Actor wi
|
|||
|
||||
case ElectrumClient.GetScriptHashHistoryResponse(_, history) =>
|
||||
// we retrieve the transaction before checking watches
|
||||
history.filter(_.height >= 0).foreach { item => client ! ElectrumClient.GetTransaction(item.tx_hash, Some(item)) }
|
||||
// NB: height=-1 means that the tx is unconfirmed and at least one of its inputs is also unconfirmed. we need to take them into consideration if we want to handle unconfirmed txes (which is the case for turbo channels)
|
||||
history.filter(_.height >= -1).foreach { item => client ! ElectrumClient.GetTransaction(item.tx_hash, Some(item)) }
|
||||
|
||||
case ElectrumClient.GetTransactionResponse(tx, Some(item: ElectrumClient.TransactionHistoryItem)) =>
|
||||
// this is for WatchSpent/WatchSpendBasic
|
||||
|
@ -133,21 +134,27 @@ class ElectrumWatcher(blockCount: AtomicLong, client: ActorRef) extends Actor wi
|
|||
channel ! WatchEventSpentBasic(event)
|
||||
Some(w)
|
||||
}).flatten
|
||||
|
||||
// this is for WatchConfirmed
|
||||
// don't ask for merkle proof for unconfirmed transactions
|
||||
if (item.height > 0) {
|
||||
watches.collect {
|
||||
case WatchConfirmed(_, txid, _, minDepth, _) if txid == tx.txid =>
|
||||
val txheight = item.height
|
||||
val confirmations = height - txheight + 1
|
||||
log.info(s"txid=$txid was confirmed at height=$txheight and now has confirmations=$confirmations (currentHeight=$height)")
|
||||
if (confirmations >= minDepth) {
|
||||
// we need to get the tx position in the block
|
||||
client ! ElectrumClient.GetMerkle(txid, txheight, Some(tx))
|
||||
}
|
||||
}
|
||||
}
|
||||
context become running(height, tip, watches -- watchSpentTriggered, scriptHashStatus, block2tx, sent)
|
||||
val watchConfirmedTriggered = watches.collect {
|
||||
case w@WatchConfirmed(channel, txid, _, minDepth, BITCOIN_FUNDING_DEPTHOK) if txid == tx.txid && minDepth == 0 =>
|
||||
// special case for mempool watches (min depth = 0)
|
||||
val (dummyHeight, dummyTxIndex) = ElectrumWatcher.makeDummyShortChannelId(txid)
|
||||
channel ! WatchEventConfirmed(BITCOIN_FUNDING_DEPTHOK, dummyHeight, dummyTxIndex, tx)
|
||||
Some(w)
|
||||
case WatchConfirmed(_, txid, _, minDepth, _) if txid == tx.txid && minDepth > 0 =>
|
||||
// min depth > 0 here
|
||||
val txheight = item.height
|
||||
val confirmations = height - txheight + 1
|
||||
log.info(s"txid=$txid was confirmed at height=$txheight and now has confirmations=$confirmations (currentHeight=$height)")
|
||||
if (confirmations >= minDepth) {
|
||||
// we need to get the tx position in the block
|
||||
client ! ElectrumClient.GetMerkle(txid, txheight, Some(tx))
|
||||
}
|
||||
None
|
||||
}.flatten
|
||||
|
||||
context become running(height, tip, watches -- watchSpentTriggered -- watchConfirmedTriggered, scriptHashStatus, block2tx, sent)
|
||||
|
||||
case ElectrumClient.GetMerkleResponse(tx_hash, _, txheight, pos, Some(tx: Transaction)) =>
|
||||
val confirmations = height - txheight + 1
|
||||
|
@ -215,3 +222,24 @@ class ElectrumWatcher(blockCount: AtomicLong, client: ActorRef) extends Actor wi
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
object ElectrumWatcher {
|
||||
/**
|
||||
*
|
||||
* @param txid funding transaction id
|
||||
* @return a (blockHeight, txIndex) tuple that is extracted from the input source
|
||||
* This is used to create unique "dummy" short channel ids for zero-conf channels
|
||||
*/
|
||||
def makeDummyShortChannelId(txid: ByteVector32): (Int, Int) = {
|
||||
// we use a height of 0
|
||||
// - to make sure that the tx will be marked as "confirmed"
|
||||
// - to easily identify scids linked to 0-conf channels
|
||||
//
|
||||
// this gives us a probability of collisions of 0.1% for 5 0-conf channels and 1% for 20
|
||||
// collisions mean that users may temporarily see incorrect numbers for their 0-conf channels (until they've been confirmed)
|
||||
// if this ever becomes a problem we could just extract some bits for our dummy height instead of just returning 0
|
||||
val height = 0
|
||||
val txIndex = txid.bits.sliceToInt(0, 16, false)
|
||||
(height, txIndex)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,16 +22,15 @@ import java.util.concurrent.atomic.AtomicLong
|
|||
import akka.actor.{ActorSystem, Props}
|
||||
import akka.testkit.{TestKit, TestProbe}
|
||||
import fr.acinq.bitcoin.Crypto.PrivateKey
|
||||
import fr.acinq.bitcoin.{Base58, ByteVector32, OutPoint, SIGHASH_ALL, Script, ScriptFlags, ScriptWitness, SigVersion, Transaction, TxIn, TxOut}
|
||||
import fr.acinq.eclair.LongToBtcAmount
|
||||
import fr.acinq.bitcoin.{Base58, Bech32, ByteVector32, OutPoint, SIGHASH_ALL, Script, ScriptFlags, ScriptWitness, SigVersion, Transaction, TxIn, TxOut}
|
||||
import fr.acinq.eclair.blockchain._
|
||||
import fr.acinq.eclair.blockchain.bitcoind.BitcoindService
|
||||
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.SSL
|
||||
import fr.acinq.eclair.blockchain.electrum.ElectrumClientPool.ElectrumServerAddress
|
||||
import fr.acinq.eclair.blockchain._
|
||||
import fr.acinq.eclair.channel.{BITCOIN_FUNDING_DEPTHOK, BITCOIN_FUNDING_SPENT}
|
||||
import fr.acinq.eclair.{LongToBtcAmount, randomBytes32}
|
||||
import grizzled.slf4j.Logging
|
||||
import org.json4s
|
||||
import org.json4s.JsonAST.{JArray, JString, JValue}
|
||||
import org.json4s.JsonAST.{JString, JValue}
|
||||
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
|
||||
import scodec.bits._
|
||||
|
||||
|
@ -123,6 +122,124 @@ class ElectrumWatcherSpec extends TestKit(ActorSystem("test")) with FunSuiteLike
|
|||
system.stop(watcher)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a chain of unspent txs
|
||||
* @param tx tx that sends funds to a p2wpkh of priv
|
||||
* @param priv private key that tx sends funds to
|
||||
* @return a (tx1, tx2) tuple where tx2 spends tx1 which spends tx
|
||||
*/
|
||||
def createUnspentTxChain(tx: Transaction, priv: PrivateKey) : (Transaction, Transaction) = {
|
||||
// tx sends funds to our key
|
||||
val pub = priv.publicKey
|
||||
val outputIndex = tx.txOut.indexWhere(_.publicKeyScript == Script.write(Script.pay2wpkh(pub)))
|
||||
|
||||
val fee = 10000 sat
|
||||
val tx1 = {
|
||||
val tmp = Transaction(version = 2, txIn = TxIn(OutPoint(tx, outputIndex), Nil, TxIn.SEQUENCE_FINAL) :: Nil, txOut = TxOut(tx.txOut(outputIndex).amount - fee, Script.pay2wpkh(pub)) :: Nil, lockTime = 0)
|
||||
val sig = Transaction.signInput(tmp, 0, Script.pay2pkh(pub), SIGHASH_ALL, tx.txOut(outputIndex).amount, SigVersion.SIGVERSION_WITNESS_V0, priv)
|
||||
val tmp1 = tmp.updateWitness(0, ScriptWitness(sig :: pub.value :: Nil))
|
||||
Transaction.correctlySpends(tmp1, tx :: Nil, ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS)
|
||||
tmp1
|
||||
}
|
||||
// tx1 spends tx
|
||||
|
||||
val tx2 = {
|
||||
val tmp = Transaction(version = 2, txIn = TxIn(OutPoint(tx1, 0), Nil, TxIn.SEQUENCE_FINAL) :: Nil, txOut = TxOut(tx1.txOut(0).amount - fee, Script.pay2wpkh(pub)) :: Nil, lockTime = 0)
|
||||
val sig = Transaction.signInput(tmp, 0, Script.pay2pkh(pub), SIGHASH_ALL, tx1.txOut(0).amount, SigVersion.SIGVERSION_WITNESS_V0, priv)
|
||||
val tmp1 = tmp.updateWitness(0, ScriptWitness(sig :: pub.value :: Nil))
|
||||
Transaction.correctlySpends(tmp1, tx1 :: Nil, ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS)
|
||||
tmp1
|
||||
}
|
||||
// and tx2 spends tx1
|
||||
(tx1, tx2)
|
||||
}
|
||||
|
||||
test("watch for mempool transactions (txs in mempool before we set the watch)") {
|
||||
val probe = TestProbe()
|
||||
val blockCount = new AtomicLong()
|
||||
val electrumClient = system.actorOf(Props(new ElectrumClientPool(blockCount, Set(electrumAddress))))
|
||||
probe.send(electrumClient, ElectrumClient.AddStatusListener(probe.ref))
|
||||
probe.expectMsgType[ElectrumClient.ElectrumReady]
|
||||
|
||||
val watcher = system.actorOf(Props(new ElectrumWatcher(blockCount, electrumClient)))
|
||||
|
||||
val priv = PrivateKey(ByteVector32.fromValidHex("01" * 32))
|
||||
val pub = priv.publicKey
|
||||
val address = Bech32.encodeWitnessAddress("bcrt", 0, pub.hash160)
|
||||
probe.send(bitcoincli, BitcoinReq("sendtoaddress", address, 1.0))
|
||||
val JString(txid) = probe.expectMsgType[JValue](3000 seconds)
|
||||
probe.send(bitcoincli, BitcoinReq("getrawtransaction", txid))
|
||||
val JString(hex) = probe.expectMsgType[JValue]
|
||||
val tx = Transaction.read(hex)
|
||||
|
||||
val (tx1, tx2) = createUnspentTxChain(tx, priv)
|
||||
|
||||
probe.send(bitcoincli, BitcoinReq("sendrawtransaction", tx1.toString()))
|
||||
probe.expectMsgType[JValue]
|
||||
probe.send(bitcoincli, BitcoinReq("sendrawtransaction", tx2.toString()))
|
||||
probe.expectMsgType[JValue]
|
||||
|
||||
|
||||
// wait until tx1 and tx2 are in the mempool (as seen by our ElectrumX server)
|
||||
awaitCond({
|
||||
probe.send(electrumClient, ElectrumClient.GetScriptHashHistory(ElectrumClient.computeScriptHash(tx2.txOut(0).publicKeyScript)))
|
||||
val ElectrumClient.GetScriptHashHistoryResponse(_, history) = probe.expectMsgType[ElectrumClient.GetScriptHashHistoryResponse]
|
||||
history.map(_.tx_hash).toSet == Set(tx.txid, tx1.txid, tx2.txid)
|
||||
}, max = 30 seconds, interval = 5 seconds)
|
||||
|
||||
// then set a watch
|
||||
val listener = TestProbe()
|
||||
probe.send(watcher, WatchConfirmed(listener.ref, tx2.txid, tx2.txOut(0).publicKeyScript, 0, BITCOIN_FUNDING_DEPTHOK))
|
||||
val confirmed = listener.expectMsgType[WatchEventConfirmed](20 seconds)
|
||||
assert(confirmed.tx.txid === tx2.txid)
|
||||
system.stop(watcher)
|
||||
}
|
||||
|
||||
test("watch for mempool transactions (txs not yet in the mempool when we set the watch)") {
|
||||
val probe = TestProbe()
|
||||
val blockCount = new AtomicLong()
|
||||
val electrumClient = system.actorOf(Props(new ElectrumClientPool(blockCount, Set(electrumAddress))))
|
||||
probe.send(electrumClient, ElectrumClient.AddStatusListener(probe.ref))
|
||||
probe.expectMsgType[ElectrumClient.ElectrumReady]
|
||||
val watcher = system.actorOf(Props(new ElectrumWatcher(blockCount, electrumClient)))
|
||||
|
||||
val priv = PrivateKey(ByteVector32.fromValidHex("01" * 32))
|
||||
val pub = priv.publicKey
|
||||
val address = Bech32.encodeWitnessAddress("bcrt", 0, pub.hash160)
|
||||
probe.send(bitcoincli, BitcoinReq("sendtoaddress", address, 1.0))
|
||||
val JString(txid) = probe.expectMsgType[JValue](3000 seconds)
|
||||
probe.send(bitcoincli, BitcoinReq("getrawtransaction", txid))
|
||||
val JString(hex) = probe.expectMsgType[JValue]
|
||||
val tx = Transaction.read(hex)
|
||||
|
||||
val (tx1, tx2) = createUnspentTxChain(tx, priv)
|
||||
|
||||
// here we set the watch * before * we publish our transactions
|
||||
val listener = TestProbe()
|
||||
probe.send(watcher, WatchConfirmed(listener.ref, tx2.txid, tx2.txOut(0).publicKeyScript, 0, BITCOIN_FUNDING_DEPTHOK))
|
||||
|
||||
probe.send(bitcoincli, BitcoinReq("sendrawtransaction", tx1.toString()))
|
||||
probe.expectMsgType[JValue]
|
||||
probe.send(bitcoincli, BitcoinReq("sendrawtransaction", tx2.toString()))
|
||||
probe.expectMsgType[JValue]
|
||||
|
||||
val confirmed = listener.expectMsgType[WatchEventConfirmed](20 seconds)
|
||||
assert(confirmed.tx.txid === tx2.txid)
|
||||
system.stop(watcher)
|
||||
}
|
||||
|
||||
test("generate unique dummy scids") {
|
||||
// generate 1000 dummy ids
|
||||
val dummies = (0 until 20).map { _ =>
|
||||
ElectrumWatcher.makeDummyShortChannelId(randomBytes32)
|
||||
} toSet
|
||||
|
||||
// make sure that they are unique (we allow for 1 collision here, actual probability of a collision with the current impl. is 1%
|
||||
// but that could change and we don't want to make this test impl. dependent)
|
||||
// if this test fails it's very likely that the code that generates dummy scids is broken
|
||||
assert(dummies.size >= 19)
|
||||
}
|
||||
|
||||
test("get transaction") {
|
||||
val blockCount = new AtomicLong()
|
||||
val mainnetAddress = ElectrumServerAddress(new InetSocketAddress("electrum.acinq.co", 50002), SSL.STRICT)
|
||||
|
|
Loading…
Add table
Reference in a new issue