1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-03-13 11:35:47 +01:00

Merge branch 'master' into android

This commit is contained in:
sstone 2019-01-11 14:49:05 +01:00
commit fda8baa6ab
26 changed files with 5572 additions and 270 deletions

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -1,215 +1,412 @@
{
"207.154.223.80": {
"3smoooajg7qqac2y.onion": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"4cii7ryno5j3axe4.onion": {
"81-7-10-251.blue.kundencontroller.de": {
"pruning": "-",
"t": "50001",
"version": "1.2"
"s": "50002",
"version": "1.4"
},
"E-X.not.fyi": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"MEADS.hopto.org": {
"pruning": "-",
"s": "50002",
"version": "1.4"
},
"VPS.hsmiths.com": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"arihancckjge66iv.onion": {
"b.ooze.cc": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"bauerjda5hnedjam.onion": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"bauerjhejlv6di7s.onion": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"bitcoin.corgi.party": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"bitcoin3nqy3db7c.onion": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"bitcoins.sk": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"btc.cihar.com": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"btc.smsys.me": {
"pruning": "-",
"s": "995",
"version": "1.2"
"version": "1.4"
},
"btc.xskyx.net": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"cashyes.zapto.org": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"currentlane.lovebitco.in": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"daedalus.bauerj.eu": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"dedi.jochen-hoenicke.de": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"dragon085.startdedicated.de": {
"pruning": "-",
"s": "50002",
"version": "1.4"
},
"e-1.claudioboxx.com": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"e.keff.org": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"elec.luggs.co": {
"pruning": "-",
"s": "443",
"version": "1.2"
"version": "1.4"
},
"electrum-server.ninja": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"electrum-unlimited.criptolayer.net": {
"pruning": "-",
"s": "50002",
"version": "1.4"
},
"electrum.eff.ro": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"electrum.festivaldelhumor.org": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"electrum.hsmiths.com": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"electrum.leblancnet.us": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"electrum.meltingice.net": {
"electrum.mindspot.org": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
},
"electrum.nute.net": {
"pruning": "-",
"s": "50002",
"version": "1.2"
"version": "1.4"
},
"electrum.qtornado.com": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"electrum.taborsky.cz": {
"pruning": "-",
"s": "50002",
"version": "1.4"
},
"electrum.villocq.com": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"electrum2.eff.ro": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"electrum2.villocq.com": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"electrum3.hachre.de": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"electrumx.bot.nu": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"electrumx.ddns.net": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"electrumx.ftp.sh": {
"pruning": "-",
"s": "50002",
"version": "1.4"
},
"electrumx.ml": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"electrumx.nmdps.net": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"electrumx.soon.it": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"electrumxhqdsmlu.onion": {
"pruning": "-",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"elx2018.mooo.com": {
"elx01.knas.systems": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"enode.duckdns.org": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"erbium1.sytes.net": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"fedaykin.goip.de": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"fn.48.org": {
"pruning": "-",
"s": "50002",
"t": "50003",
"version": "1.4"
},
"helicarrier.bauerj.eu": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"hsmiths4fyqlw5xw.onion": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"hsmiths5mjk6uijs.onion": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"j5jfrdthqt5g25xz.onion": {
"icarus.tetradrachm.net": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"kirsche.emzy.de": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"luggscoqbymhvnkp.onion": {
"pruning": "-",
"t": "80",
"version": "1.2"
"version": "1.4"
},
"ndnd.selfhost.eu": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"ndndword5lpb7eex.onion": {
"pruning": "-",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"node.arihanc.com": {
"oneweek.duckdns.org": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"orannis.com": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"ozahtqwp25chjdjd.onion": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"qtornadoklbgdyww.onion": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"rbx.curalle.ovh": {
"pruning": "-",
"s": "50002",
"version": "1.2"
},
"ruuxwv74pjxms3ws.onion": {
"pruning": "-",
"s": "10042",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"s7clinmo4cazmhul.onion": {
"pruning": "-",
"t": "50001",
"version": "1.2"
},
"spv.48.org": {
"pruning": "-",
"s": "50002",
"t": "50003",
"version": "1.2"
"version": "1.4"
},
"tardis.bauerj.eu": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.2"
"version": "1.4"
},
"technetium.network": {
"pruning": "-",
"s": "50002",
"version": "1.4"
},
"tomscryptos.com": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"ulrichard.ch": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"us.electrum.be": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"vmd27610.contaboserver.net": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"vmd30612.contaboserver.net": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"wsw6tua3xl24gsmi264zaep6seppjyrkyucpsmuxnjzyt3f3j6swshad.onion": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"xray587.startdedicated.de": {
"pruning": "-",
"s": "50002",
"version": "1.4"
},
"yuio.top": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
}
}

View file

@ -135,6 +135,7 @@ object NodeParams {
chaindir.mkdir()
val sqlite = DriverManager.getConnection(s"jdbc:sqlite:${new File(chaindir, "eclair.sqlite")}")
SqliteUtils.obtainExclusiveLock(sqlite) // there should only be one process writing to this file
val channelsDb = new SqliteChannelsDb(sqlite)
val peersDb = new SqlitePeersDb(sqlite)
val pendingRelayDb = new SqlitePendingRelayDb(sqlite)

View file

@ -18,6 +18,7 @@ package fr.acinq.eclair
import java.io.File
import java.net.InetSocketAddress
import java.sql.DriverManager
import java.util.concurrent.TimeUnit
import akka.actor.{ActorRef, ActorSystem, Props, SupervisorStrategy}
@ -30,6 +31,7 @@ import fr.acinq.eclair.NodeParams.ELECTRUM
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.SSL
import fr.acinq.eclair.blockchain.electrum.ElectrumClientPool.ElectrumServerAddress
import fr.acinq.eclair.blockchain.electrum._
import fr.acinq.eclair.blockchain.electrum.db.sqlite.SqliteWalletDb
import fr.acinq.eclair.blockchain.fee.{ConstantFeeProvider, _}
import fr.acinq.eclair.blockchain.{EclairWallet, _}
import fr.acinq.eclair.channel.Register
@ -140,7 +142,11 @@ class Setup(datadir: File,
wallet = bitcoin match {
case Electrum(electrumClient) =>
val electrumWallet = system.actorOf(ElectrumWallet.props(seed, electrumClient, ElectrumWallet.WalletParameters(nodeParams.chainHash)), "electrum-wallet")
// TODO: DRY
val chaindir = new File(datadir, chain)
val sqlite = DriverManager.getConnection(s"jdbc:sqlite:${new File(chaindir, "wallet.sqlite")}")
val walletDb = new SqliteWalletDb(sqlite)
val electrumWallet = system.actorOf(ElectrumWallet.props(seed, electrumClient, ElectrumWallet.WalletParameters(nodeParams.chainHash, walletDb)), "electrum-wallet")
implicit val timeout = Timeout(30 seconds)
new ElectrumEclairWallet(electrumWallet, nodeParams.chainHash)
case _ => ???

View file

@ -0,0 +1,352 @@
/*
* Copyright 2018 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fr.acinq.eclair.blockchain.electrum
import java.math.BigInteger
import fr.acinq.bitcoin.{BinaryData, Block, BlockHeader, decodeCompact}
import fr.acinq.eclair.blockchain.electrum.db.HeaderDb
import grizzled.slf4j.Logging
import scala.annotation.tailrec
case class Blockchain(chainHash: BinaryData,
checkpoints: Vector[CheckPoint],
headersMap: Map[BinaryData, Blockchain.BlockIndex],
bestchain: Vector[Blockchain.BlockIndex],
orphans: Map[BinaryData, BlockHeader] = Map()) {
import Blockchain._
require(chainHash == Block.LivenetGenesisBlock.hash || chainHash == Block.TestnetGenesisBlock.hash || chainHash == Block.RegtestGenesisBlock.hash, s"invalid chain hash $chainHash")
def tip = bestchain.last
def height = if (bestchain.isEmpty) 0 else bestchain.last.height
/**
* Build a chain of block indexes
*
* This is used in case of reorg to rebuilt the new best chain
*
* @param index last index of the chain
* @param acc accumulator
* @return the chain that starts at the genesis block and ends at index
*/
@tailrec
private def buildChain(index: BlockIndex, acc: Vector[BlockIndex] = Vector.empty[BlockIndex]): Vector[BlockIndex] = {
index.parent match {
case None => index +: acc
case Some(parent) => buildChain(parent, index +: acc)
}
}
/**
*
* @param height block height
* @return the encoded difficulty that a block at this height should have
*/
def getDifficulty(height: Int): Option[Long] = height match {
case value if value < RETARGETING_PERIOD * (checkpoints.length + 1) =>
// we're within our checkpoints
val checkpoint = checkpoints(height / RETARGETING_PERIOD - 1)
Some(checkpoint.nextBits)
case value if value % RETARGETING_PERIOD != 0 =>
// we're not at a retargeting height, difficulty is the same as for the previous block
getHeader(height - 1).map(_.bits)
case _ =>
// difficulty retargeting
for {
previous <- getHeader(height - 1)
firstBlock <- getHeader(height - RETARGETING_PERIOD)
} yield BlockHeader.calculateNextWorkRequired(previous, firstBlock.time)
}
def getHeader(height: Int): Option[BlockHeader] = if (!bestchain.isEmpty && height >= bestchain.head.height && height - bestchain.head.height < bestchain.size)
Some(bestchain(height - bestchain.head.height).header)
else None
}
object Blockchain extends Logging {
val RETARGETING_PERIOD = 2016 // on bitcoin, the difficulty re-targeting period is 2016 blocks
val MAX_REORG = 500 // we assume that there won't be a reorg of more than 500 blocks
/**
*
* @param header block header
* @param height block height
* @param parent parent block
* @param chainwork cumulative chain work up to and including this block
*/
case class BlockIndex(header: BlockHeader, height: Int, parent: Option[BlockIndex], chainwork: BigInt) {
lazy val hash = header.hash
lazy val blockId = header.blockId
lazy val logwork = if (chainwork == 0) 0.0 else Math.log(chainwork.doubleValue()) / Math.log(2.0)
override def toString = s"BlockIndex($blockId, $height, ${parent.map(_.blockId)}, $logwork)"
}
/**
* Build an empty blockchain from a series of checkpoints
*
* @param chainhash chain we're on
* @param checkpoints list of checkpoints
* @return a blockchain instance
*/
def fromCheckpoints(chainhash: BinaryData, checkpoints: Vector[CheckPoint]): Blockchain = {
Blockchain(chainhash, checkpoints, Map(), Vector())
}
/**
* Used in tests
*/
def fromGenesisBlock(chainhash: BinaryData, genesis: BlockHeader): Blockchain = {
require(chainhash == Block.RegtestGenesisBlock.hash)
// the height of the genesis block is 0
val blockIndex = BlockIndex(genesis, 0, None, decodeCompact(genesis.bits)._1)
Blockchain(chainhash, Vector(), Map(blockIndex.hash -> blockIndex), Vector(blockIndex))
}
/**
* load an em
*
* @param chainHash
* @param headerDb
* @return
*/
def load(chainHash: BinaryData, headerDb: HeaderDb): Blockchain = {
val checkpoints = CheckPoint.load(chainHash)
val checkpoints1 = headerDb.getTip match {
case Some((height, header)) =>
val newcheckpoints = for {h <- checkpoints.size * RETARGETING_PERIOD - 1 + RETARGETING_PERIOD to height - RETARGETING_PERIOD by RETARGETING_PERIOD} yield {
val cpheader = headerDb.getHeader(h).get
val nextDiff = headerDb.getHeader(h + 1).get.bits
CheckPoint(cpheader.hash, nextDiff)
}
checkpoints ++ newcheckpoints
case None => checkpoints
}
Blockchain.fromCheckpoints(chainHash, checkpoints1)
}
/**
* Validate a chunk of 2016 headers
*
* Used during initial sync to batch validate
*
* @param height height of the first header; must be a multiple of 2016
* @param headers headers.
* @throws Exception if this chunk is not valid and consistent with our checkpoints
*/
def validateHeadersChunk(blockchain: Blockchain, height: Int, headers: Seq[BlockHeader]): Unit = {
if (headers.isEmpty) return
require(height % RETARGETING_PERIOD == 0, s"header chunk height $height not a multiple of 2016")
require(BlockHeader.checkProofOfWork(headers.head))
headers.tail.foldLeft(headers.head) {
case (previous, current) =>
require(BlockHeader.checkProofOfWork(current))
require(current.hashPreviousBlock == previous.hash)
// on mainnet all blocks with a re-targeting window have the same difficulty target
// on testnet it doesn't hold, there can be a drop in difficulty if there are no blocks for 20 minutes
blockchain.chainHash match {
case Block.LivenetGenesisBlock | Block.RegtestGenesisBlock.hash => require(current.bits == previous.bits)
case _ => ()
}
current
}
val cpindex = (height / RETARGETING_PERIOD) - 1
if (cpindex < blockchain.checkpoints.length) {
// check that the first header in the chunk matches our checkpoint
val checkpoint = blockchain.checkpoints(cpindex)
require(headers(0).hashPreviousBlock == checkpoint.hash)
blockchain.chainHash match {
case Block.LivenetGenesisBlock.hash => require(headers(0).bits == checkpoint.nextBits)
case _ => ()
}
}
// if we have a checkpoint after this chunk, check that it is also satisfied
if (cpindex < blockchain.checkpoints.length - 1) {
require(headers.length == RETARGETING_PERIOD)
val nextCheckpoint = blockchain.checkpoints(cpindex + 1)
require(headers.last.hash == nextCheckpoint.hash)
blockchain.chainHash match {
case Block.LivenetGenesisBlock.hash =>
val diff = BlockHeader.calculateNextWorkRequired(headers.last, headers.head.time)
require(diff == nextCheckpoint.nextBits)
case _ => ()
}
}
}
def addHeadersChunk(blockchain: Blockchain, height: Int, headers: Seq[BlockHeader]): Blockchain = {
if (headers.length > RETARGETING_PERIOD) {
val blockchain1 = Blockchain.addHeadersChunk(blockchain, height, headers.take(RETARGETING_PERIOD))
return Blockchain.addHeadersChunk(blockchain1, height + RETARGETING_PERIOD, headers.drop(RETARGETING_PERIOD))
}
if (headers.isEmpty) return blockchain
validateHeadersChunk(blockchain, height, headers)
height match {
case _ if height == blockchain.checkpoints.length * RETARGETING_PERIOD =>
// append after our last checkpoint
// checkpoints are (block hash, * next * difficulty target), this is why:
// - we duplicate the first checkpoints because all headers in the first chunks on mainnet had the same difficulty target
// - we drop the last checkpoint
val chainwork = (blockchain.checkpoints(0) +: blockchain.checkpoints.dropRight(1)).map(t => BigInt(RETARGETING_PERIOD) * Blockchain.chainWork(t.nextBits)).sum
val blockIndex = BlockIndex(headers.head, height, None, chainwork + Blockchain.chainWork(headers.head))
val bestchain1 = headers.tail.foldLeft(Vector(blockIndex)) {
case (indexes, header) => indexes :+ BlockIndex(header, indexes.last.height + 1, Some(indexes.last), indexes.last.chainwork + Blockchain.chainWork(header))
}
val headersMap1 = blockchain.headersMap ++ bestchain1.map(bi => bi.hash -> bi)
blockchain.copy(bestchain = bestchain1, headersMap = headersMap1)
case _ if height < blockchain.checkpoints.length * RETARGETING_PERIOD =>
blockchain
case _ if height == blockchain.height + 1 =>
// attach at our best chain
require(headers.head.hashPreviousBlock == blockchain.bestchain.last.hash)
val blockIndex = BlockIndex(headers.head, height, None, blockchain.bestchain.last.chainwork + Blockchain.chainWork(headers.head))
val indexes = headers.tail.foldLeft(Vector(blockIndex)) {
case (indexes, header) => indexes :+ BlockIndex(header, indexes.last.height + 1, Some(indexes.last), indexes.last.chainwork + Blockchain.chainWork(header))
}
val bestchain1 = blockchain.bestchain ++ indexes
val headersMap1 = blockchain.headersMap ++ indexes.map(bi => bi.hash -> bi)
blockchain.copy(bestchain = bestchain1, headersMap = headersMap1)
// do nothing; headers have been validated
case _ => throw new IllegalArgumentException(s"cannot add headers chunk to an empty blockchain: not within our checkpoint")
}
}
def addHeader(blockchain: Blockchain, height: Int, header: BlockHeader): Blockchain = {
require(BlockHeader.checkProofOfWork(header), s"invalid proof of work for $header")
blockchain.headersMap.get(header.hashPreviousBlock) match {
case Some(parent) if parent.height == height - 1 =>
if (height % RETARGETING_PERIOD != 0 && (blockchain.chainHash == Block.LivenetGenesisBlock.hash || blockchain.chainHash == Block.RegtestGenesisBlock.hash)) {
// check difficulty target, which should be the same as for the parent block
// we only check this on mainnet, on testnet rules are much more lax
require(header.bits == parent.header.bits, s"header invalid difficulty target for ${header}, it should be ${parent.header.bits}")
}
val blockIndex = BlockIndex(header, height, Some(parent), parent.chainwork + Blockchain.chainWork(header))
val headersMap1 = blockchain.headersMap + (blockIndex.hash -> blockIndex)
val bestChain1 = if (parent == blockchain.bestchain.last) {
// simplest case: we add to our current best chain
logger.info(s"new tip at $blockIndex")
blockchain.bestchain :+ blockIndex
} else if (blockIndex.chainwork > blockchain.bestchain.last.chainwork) {
logger.info(s"new best chain at $blockIndex")
// we have a new best chain
buildChain(blockIndex)
} else {
logger.info(s"received header $blockIndex which is not on the best chain")
blockchain.bestchain
}
blockchain.copy(headersMap = headersMap1, bestchain = bestChain1)
case Some(parent) => throw new IllegalArgumentException(s"parent for $header at $height is not valid: $parent ")
case None if height < blockchain.height - 1000 => blockchain
case None => throw new IllegalArgumentException(s"cannot find parent for $header at $height")
}
}
def addHeaders(blockchain: Blockchain, height: Int, headers: Seq[BlockHeader]): Blockchain = {
if (headers.isEmpty) blockchain
else if (height % RETARGETING_PERIOD == 0) addHeadersChunk(blockchain, height, headers)
else {
@tailrec
def loop(bc: Blockchain, h: Int, hs: Seq[BlockHeader]): Blockchain = if (hs.isEmpty) bc else {
loop(Blockchain.addHeader(bc, h, hs.head), h + 1, hs.tail)
}
loop(blockchain, height, headers)
}
}
/**
* build a chain of block indexes
*
* @param index last index of the chain
* @param acc accumulator
* @return the chain that starts at the genesis block and ends at index
*/
@tailrec
def buildChain(index: BlockIndex, acc: Vector[BlockIndex] = Vector.empty[BlockIndex]): Vector[BlockIndex] = {
index.parent match {
case None => index +: acc
case Some(parent) => buildChain(parent, index +: acc)
}
}
def chainWork(target: BigInt): BigInt = BigInt(2).pow(256) / (target + BigInt(1))
def chainWork(bits: Long): BigInt = {
val (target, negative, overflow) = decodeCompact(bits)
if (target == BigInteger.ZERO || negative || overflow) BigInt(0) else chainWork(target)
}
def chainWork(header: BlockHeader): BigInt = chainWork(header.bits)
/**
* Optimize blockchain
*
* @param blockchain
* @param acc internal accumulator
* @return a (blockchain, indexes) tuple where headers that are old enough have been removed and new checkpoints added,
* and indexes is the list of header indexes that have been optimized out and must be persisted
*/
@tailrec
def optimize(blockchain: Blockchain, acc: Vector[BlockIndex] = Vector.empty[BlockIndex]) : (Blockchain, Vector[BlockIndex]) = {
if (blockchain.bestchain.size >= RETARGETING_PERIOD + MAX_REORG) {
val saveme = blockchain.bestchain.take(RETARGETING_PERIOD)
val headersMap1 = blockchain.headersMap -- saveme.map(_.hash)
val bestchain1 = blockchain.bestchain.drop(RETARGETING_PERIOD)
val checkpoints1 = blockchain.checkpoints :+ CheckPoint(saveme.last.hash, bestchain1.head.header.bits)
optimize(blockchain.copy(headersMap = headersMap1, bestchain = bestchain1, checkpoints = checkpoints1), acc ++ saveme)
} else {
(blockchain, acc)
}
}
/**
* Computes the difficulty target at a given height.
*
* @param blockchain blockchain
* @param height height for which we want the difficulty target
* @param headerDb header database
* @return the difficulty target for this height
*/
def getDifficulty(blockchain: Blockchain, height: Int, headerDb: HeaderDb): Option[Long] = {
blockchain.chainHash match {
case Block.LivenetGenesisBlock.hash | Block.RegtestGenesisBlock.hash =>
(height % RETARGETING_PERIOD) match {
case 0 =>
for {
parent <- blockchain.getHeader(height - 1) orElse headerDb.getHeader(height - 1)
previous <- blockchain.getHeader(height - 2016) orElse headerDb.getHeader(height - 2016)
target = BlockHeader.calculateNextWorkRequired(parent, previous.time)
} yield target
case _ => blockchain.getHeader(height - 1) orElse headerDb.getHeader(height - 1) map (_.bits)
}
case _ => None // no difficulty check on testnet
}
}
}

View file

@ -0,0 +1,81 @@
/*
* Copyright 2018 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fr.acinq.eclair.blockchain.electrum
import java.io.InputStream
import fr.acinq.bitcoin.{BinaryData, Block, encodeCompact}
import fr.acinq.eclair.blockchain.electrum.db.HeaderDb
import org.json4s.JsonAST.{JArray, JInt, JString}
import org.json4s.jackson.JsonMethods
/**
*
* @param hash block hash
* @param target difficulty target for the next block
*/
case class CheckPoint(hash: BinaryData, nextBits: Long) {
require(hash.length == 32)
}
object CheckPoint {
import Blockchain.RETARGETING_PERIOD
/**
* Load checkpoints.
* There is one checkpoint every 2016 blocks (which is the difficulty adjustment period). They are used to check that
* we're on the right chain and to validate proof-of-work by checking the difficulty target
* @return an ordered list of checkpoints, with one checkpoint every 2016 blocks
*/
def load(chainHash: BinaryData): Vector[CheckPoint] = chainHash match {
case Block.LivenetGenesisBlock.hash => load(classOf[CheckPoint].getResourceAsStream("/electrum/checkpoints_mainnet.json"))
case Block.TestnetGenesisBlock.hash => load(classOf[CheckPoint].getResourceAsStream("/electrum/checkpoints_testnet.json"))
case Block.RegtestGenesisBlock.hash => Vector.empty[CheckPoint] // no checkpoints on regtest
}
def load(stream: InputStream): Vector[CheckPoint] = {
val JArray(values) = JsonMethods.parse(stream)
val checkpoints = values.collect {
case JArray(JString(a) :: JInt(b) :: Nil) => CheckPoint(BinaryData(a).reverse, encodeCompact(b.bigInteger))
}
checkpoints.toVector
}
/**
* load checkpoints from our resources and header database
*
* @param chainHash chaim hash
* @param headerDb header db
* @return a series of checkpoints
*/
def load(chainHash: BinaryData, headerDb: HeaderDb): Vector[CheckPoint] = {
val checkpoints = CheckPoint.load(chainHash)
val checkpoints1 = headerDb.getTip match {
case Some((height, header)) =>
val newcheckpoints = for {h <- checkpoints.size * RETARGETING_PERIOD - 1 + RETARGETING_PERIOD to height - RETARGETING_PERIOD by RETARGETING_PERIOD} yield {
// we * should * have these headers in our db
val cpheader = headerDb.getHeader(h).get
val nextDiff = headerDb.getHeader(h + 1).get.bits
CheckPoint(cpheader.hash, nextDiff)
}
checkpoints ++ newcheckpoints
case None => checkpoints
}
checkpoints1
}
}

View file

@ -45,7 +45,7 @@ import scala.concurrent.duration._
/**
* For later optimizations, see http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html
*
*
*/
class ElectrumClient(serverAddress: InetSocketAddress, ssl: SSL)(implicit val ec: ExecutionContext) extends Actor with Stash with ActorLogging {
@ -81,20 +81,39 @@ class ElectrumClient(serverAddress: InetSocketAddress, ssl: SSL)(implicit val ec
ch.pipeline.addLast(new LineEncoder)
ch.pipeline.addLast(new JsonRPCRequestEncoder)
// error handler
ch.pipeline().addLast(new ExceptionHandler)
ch.pipeline.addLast(new ExceptionHandler)
}
})
// Start the client.
log.info(s"connecting to $serverAddress")
val channelFuture = b.connect(serverAddress.getHostName, serverAddress.getPort)
val channelOpenFuture = b.connect(serverAddress.getHostName, serverAddress.getPort)
case class ConnectionError(t: Throwable)
def close() = {
statusListeners.map(_ ! ElectrumDisconnected)
context stop self
}
channelFuture.addListeners(new ChannelFutureListener {
def errorHandler(t: Throwable) = {
log.info(s"connection error (reason=${t.getMessage})")
close()
}
channelOpenFuture.addListeners(new ChannelFutureListener {
override def operationComplete(future: ChannelFuture): Unit = {
if (!future.isSuccess) {
self ! ConnectionError(future.cause())
errorHandler(future.cause())
} else {
future.channel().closeFuture().addListener(new ChannelFutureListener {
override def operationComplete(future: ChannelFuture): Unit = {
if (!future.isSuccess) {
errorHandler(future.cause())
} else {
log.info(s"channel closed: " + future.channel())
close()
}
}
})
}
}
})
@ -108,7 +127,7 @@ class ElectrumClient(serverAddress: InetSocketAddress, ssl: SSL)(implicit val ec
ctx.connect(remoteAddress, localAddress, promise.addListener(new ChannelFutureListener() {
override def operationComplete(future: ChannelFuture): Unit = {
if (!future.isSuccess) {
self ! ConnectionError(future.cause())
errorHandler(future.cause())
}
}
}))
@ -118,14 +137,14 @@ class ElectrumClient(serverAddress: InetSocketAddress, ssl: SSL)(implicit val ec
ctx.write(msg, promise.addListener(new ChannelFutureListener() {
override def operationComplete(future: ChannelFuture): Unit = {
if (!future.isSuccess) {
self ! ConnectionError(future.cause())
errorHandler(future.cause())
}
}
}))
}
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
self ! ConnectionError(cause)
errorHandler(cause)
}
}
@ -163,6 +182,7 @@ class ElectrumClient(serverAddress: InetSocketAddress, ssl: SSL)(implicit val ec
}
/**
* Forwards incoming messages to the underlying actor
*
@ -182,22 +202,17 @@ class ElectrumClient(serverAddress: InetSocketAddress, ssl: SSL)(implicit val ec
var addressSubscriptions = Map.empty[String, Set[ActorRef]]
var scriptHashSubscriptions = Map.empty[BinaryData, Set[ActorRef]]
val headerSubscriptions = collection.mutable.HashSet.empty[ActorRef]
val version = ServerVersion("2.1.7", "1.2")
val version = ServerVersion("3.3.2", "1.4")
val statusListeners = collection.mutable.HashSet.empty[ActorRef]
val keepHeaders = 100
var reqId = 0
// we need to regularly send a ping in order not to get disconnected
val versionTrigger = context.system.scheduler.schedule(30 seconds, 30 seconds, self, version)
val pingTrigger = context.system.scheduler.schedule(30 seconds, 30 seconds, self, Ping)
override def unhandled(message: Any): Unit = {
message match {
case ConnectionError(t) =>
log.info(s"connection error (reason=${t.getMessage})")
statusListeners.map(_ ! ElectrumDisconnected)
context stop self
case Terminated(deadActor) =>
addressSubscriptions = addressSubscriptions.mapValues(subscribers => subscribers - deadActor)
scriptHashSubscriptions = scriptHashSubscriptions.mapValues(subscribers => subscribers - deadActor)
@ -206,16 +221,14 @@ class ElectrumClient(serverAddress: InetSocketAddress, ssl: SSL)(implicit val ec
case RemoveStatusListener(actor) => statusListeners -= actor
case _: ServerVersion => () // we only handle this when connected
case _: ServerVersionResponse => () // we just ignore these messages, they are used as pings
case PingResponse => ()
case _ => log.warning(s"unhandled $message")
}
}
override def postStop(): Unit = {
versionTrigger.cancel()
pingTrigger.cancel()
super.postStop()
}
@ -228,7 +241,11 @@ class ElectrumClient(serverAddress: InetSocketAddress, ssl: SSL)(implicit val ec
*/
def send(ctx: ChannelHandlerContext, request: Request): String = {
val electrumRequestId = "" + reqId
ctx.channel().writeAndFlush(makeRequest(request, electrumRequestId))
if (ctx.channel().isWritable) {
ctx.channel().writeAndFlush(makeRequest(request, electrumRequestId))
} else {
errorHandler(new RuntimeException(s"channel not writable"))
}
reqId = reqId + 1
electrumRequestId
}
@ -258,22 +275,22 @@ class ElectrumClient(serverAddress: InetSocketAddress, ssl: SSL)(implicit val ec
def waitingForTip(ctx: ChannelHandlerContext): Receive = {
case Right(json: JsonRPCResponse) =>
val header = parseHeader(json.result)
log.debug(s"connected, tip = ${header.block_hash} $header")
statusListeners.map(_ ! ElectrumReady(header, serverAddress))
context become connected(ctx, header, "", Map())
val (height, header) = parseBlockHeader(json.result)
log.debug(s"connected, tip = ${header.hash} height = $height")
statusListeners.map(_ ! ElectrumReady(height, header, serverAddress))
context become connected(ctx, height, header, "", Map())
case AddStatusListener(actor) => statusListeners += actor
}
def connected(ctx: ChannelHandlerContext, tip: Header, buffer: String, requests: Map[String, (Request, ActorRef)]): Receive = {
def connected(ctx: ChannelHandlerContext, height: Int, tip: BlockHeader, buffer: String, requests: Map[String, (Request, ActorRef)]): Receive = {
case AddStatusListener(actor) =>
statusListeners += actor
actor ! ElectrumReady(tip, serverAddress)
actor ! ElectrumReady(height, tip, serverAddress)
case HeaderSubscription(actor) =>
headerSubscriptions += actor
actor ! HeaderSubscriptionResponse(tip)
actor ! HeaderSubscriptionResponse(height, tip)
context watch actor
case request: Request =>
@ -287,7 +304,7 @@ class ElectrumClient(serverAddress: InetSocketAddress, ssl: SSL)(implicit val ec
context watch actor
case _ => ()
}
context become connected(ctx, tip, buffer, requests + (curReqId -> (request, sender())))
context become connected(ctx, height, tip, buffer, requests + (curReqId -> (request, sender())))
case Right(json: JsonRPCResponse) =>
requests.get(json.id) match {
@ -298,7 +315,7 @@ class ElectrumClient(serverAddress: InetSocketAddress, ssl: SSL)(implicit val ec
case None =>
log.warning(s"could not find requestor for reqId=${json.id} response=$json")
}
context become connected(ctx, tip, buffer, requests - json.id)
context become connected(ctx, height, tip, buffer, requests - json.id)
case Left(response: HeaderSubscriptionResponse) => headerSubscriptions.map(_ ! response)
@ -306,9 +323,9 @@ class ElectrumClient(serverAddress: InetSocketAddress, ssl: SSL)(implicit val ec
case Left(response: ScriptHashSubscriptionResponse) => scriptHashSubscriptions.get(response.scriptHash).map(listeners => listeners.map(_ ! response))
case HeaderSubscriptionResponse(newtip) =>
case HeaderSubscriptionResponse(height, newtip) =>
log.info(s"new tip $newtip")
context become connected(ctx, newtip, buffer, requests)
context become connected(ctx, height, newtip, buffer, requests)
}
}
@ -335,8 +352,11 @@ object ElectrumClient {
case class ServerVersion(clientName: String, protocolVersion: String) extends Request
case class ServerVersionResponse(clientName: String, protocolVersion: String) extends Response
case object Ping extends Request
case object PingResponse extends Response
case class GetAddressHistory(address: String) extends Request
case class TransactionHistoryItem(height: Long, tx_hash: BinaryData)
case class TransactionHistoryItem(height: Int, tx_hash: BinaryData)
case class GetAddressHistoryResponse(address: String, history: Seq[TransactionHistoryItem]) extends Response
case class GetScriptHashHistory(scriptHash: BinaryData) extends Request
@ -358,14 +378,20 @@ object ElectrumClient {
case class GetTransactionResponse(tx: Transaction) extends Response
case class GetHeader(height: Int) extends Request
case class GetHeaderResponse(header: Header) extends Response
case class GetHeaderResponse(height: Int, header: BlockHeader) extends Response
object GetHeaderResponse {
def apply(t: (Int, BlockHeader)) = new GetHeaderResponse(t._1, t._2)
}
case class GetMerkle(txid: BinaryData, height: Long) extends Request
case class GetMerkleResponse(txid: BinaryData, merkle: Seq[BinaryData], block_height: Long, pos: Int) extends Response {
case class GetHeaders(start_height: Int, count: Int, cp_height: Int = 0) extends Request
case class GetHeadersResponse(start_height: Int, headers: Seq[BlockHeader], max: Int) extends Response
case class GetMerkle(txid: BinaryData, height: Int) extends Request
case class GetMerkleResponse(txid: BinaryData, merkle: List[BinaryData], block_height: Int, pos: Int) extends Response {
lazy val root: BinaryData = {
@tailrec
def loop(pos: Int, hashes: Seq[BinaryData]): BinaryData = {
if (hashes.length == 1) hashes(0).reverse
if (hashes.length == 1) hashes(0)
else {
val h = if (pos % 2 == 1) Crypto.hash256(hashes(1) ++ hashes(0)) else Crypto.hash256(hashes(0) ++ hashes(1))
loop(pos / 2, h +: hashes.drop(2))
@ -382,16 +408,20 @@ object ElectrumClient {
case class ScriptHashSubscriptionResponse(scriptHash: BinaryData, status: String) extends Response
case class HeaderSubscription(actor: ActorRef) extends Request
case class HeaderSubscriptionResponse(header: Header) extends Response
case class HeaderSubscriptionResponse(height: Int, header: BlockHeader) extends Response
object HeaderSubscriptionResponse {
def apply(t: (Int, BlockHeader)) = new HeaderSubscriptionResponse(t._1, t._2)
}
case class Header(block_height: Long, version: Long, prev_block_hash: BinaryData, merkle_root: BinaryData, timestamp: Long, bits: Long, nonce: Long) {
def blockHeader = BlockHeader(version, prev_block_hash.reverse, merkle_root.reverse, timestamp, bits, nonce)
lazy val block_id: BinaryData = blockHeader.hash
lazy val block_hash: BinaryData = block_id.reverse
lazy val block_hash: BinaryData = blockHeader.hash
lazy val block_id: BinaryData = block_hash.reverse
}
object Header {
def makeHeader(height: Long, header: BlockHeader) = ElectrumClient.Header(0, header.version, header.hashPreviousBlock, header.hashMerkleRoot, header.time, header.bits, header.nonce)
def makeHeader(height: Long, header: BlockHeader) = ElectrumClient.Header(height, header.version, header.hashPreviousBlock.reverse, header.hashMerkleRoot.reverse, header.time, header.bits, header.nonce)
val RegtestGenesisHeader = makeHeader(0, Block.RegtestGenesisBlock.header)
val TestnetGenesisHeader = makeHeader(0, Block.TestnetGenesisBlock.header)
@ -405,7 +435,11 @@ object ElectrumClient {
case class ServerError(request: Request, error: Error) extends Response
sealed trait ElectrumEvent
case class ElectrumReady(tip: Header, serverAddress: InetSocketAddress) extends ElectrumEvent
case class ElectrumReady(height: Int, tip: BlockHeader, serverAddress: InetSocketAddress) extends ElectrumEvent
object ElectrumReady {
def apply(t: (Int, BlockHeader), serverAddress: InetSocketAddress) = new ElectrumReady(t._1 , t._2, serverAddress)
}
case object ElectrumDisconnected extends ElectrumEvent
sealed trait SSL
@ -425,7 +459,7 @@ object ElectrumClient {
// this is a jsonrpc request, i.e. a subscription response
val JArray(params) = json \ "params"
Left(((method, params): @unchecked) match {
case ("blockchain.headers.subscribe", header :: Nil) => HeaderSubscriptionResponse(parseHeader(header))
case ("blockchain.headers.subscribe", header :: Nil) => HeaderSubscriptionResponse(parseBlockHeader(header))
case ("blockchain.address.subscribe", JString(address) :: JNull :: Nil) => AddressSubscriptionResponse(address, "")
case ("blockchain.address.subscribe", JString(address) :: JString(status) :: Nil) => AddressSubscriptionResponse(address, status)
case ("blockchain.scripthash.subscribe", JString(scriptHashHex) :: JNull :: Nil) => ScriptHashSubscriptionResponse(BinaryData(scriptHashHex), "")
@ -472,19 +506,15 @@ object ElectrumClient {
case JInt(value) => value.intValue()
}
def parseHeader(json: JValue): Header = {
val block_height = longField(json, "block_height")
val version = longField(json, "version")
val timestamp = longField(json, "timestamp")
val bits = longField(json, "bits")
val nonce = longField(json, "nonce")
val JString(prev_block_hash) = json \ "prev_block_hash"
val JString(merkle_root) = json \ "merkle_root"
Header(block_height, version, prev_block_hash, merkle_root, timestamp, bits, nonce)
def parseBlockHeader(json: JValue): (Int, BlockHeader) = {
val height = intField(json, "height")
val JString(hex) = json \ "hex"
(height, BlockHeader.read(hex))
}
def makeRequest(request: Request, reqId: String): JsonRPCRequest = request match {
case ServerVersion(clientName, protocolVersion) => JsonRPCRequest(id = reqId, method = "server.version", params = clientName :: protocolVersion :: Nil)
case Ping => JsonRPCRequest(id = reqId, method = "server.ping", params = Nil)
case GetAddressHistory(address) => JsonRPCRequest(id = reqId, method = "blockchain.address.get_history", params = address :: Nil)
case GetScriptHashHistory(scripthash) => JsonRPCRequest(id = reqId, method = "blockchain.scripthash.get_history", params = scripthash.toString() :: Nil)
case AddressListUnspent(address) => JsonRPCRequest(id = reqId, method = "blockchain.address.listunspent", params = address :: Nil)
@ -494,7 +524,8 @@ object ElectrumClient {
case BroadcastTransaction(tx) => JsonRPCRequest(id = reqId, method = "blockchain.transaction.broadcast", params = Hex.toHexString(Transaction.write(tx)) :: Nil)
case GetTransaction(txid: BinaryData) => JsonRPCRequest(id = reqId, method = "blockchain.transaction.get", params = txid :: Nil)
case HeaderSubscription(_) => JsonRPCRequest(id = reqId, method = "blockchain.headers.subscribe", params = Nil)
case GetHeader(height) => JsonRPCRequest(id = reqId, method = "blockchain.block.get_header", params = height :: Nil)
case GetHeader(height) => JsonRPCRequest(id = reqId, method = "blockchain.block.header", params = height :: Nil)
case GetHeaders(start_height, count, cp_height) => JsonRPCRequest(id = reqId, method = "blockchain.block.headers", params = start_height :: count :: Nil)
case GetMerkle(txid, height) => JsonRPCRequest(id = reqId, method = "blockchain.transaction.get_merkle", params = txid :: height :: Nil)
}
@ -511,11 +542,12 @@ object ElectrumClient {
val JString(clientName) = jitems(0)
val JString(protocolVersion) = jitems(1)
ServerVersionResponse(clientName, protocolVersion)
case Ping => PingResponse
case GetAddressHistory(address) =>
val JArray(jitems) = json.result
val items = jitems.map(jvalue => {
val JString(tx_hash) = jvalue \ "tx_hash"
val height = longField(jvalue, "height")
val height = intField(jvalue, "height")
TransactionHistoryItem(height, tx_hash)
})
GetAddressHistoryResponse(address, items)
@ -523,7 +555,7 @@ object ElectrumClient {
val JArray(jitems) = json.result
val items = jitems.map(jvalue => {
val JString(tx_hash) = jvalue \ "tx_hash"
val height = longField(jvalue, "height")
val height = intField(jvalue, "height")
TransactionHistoryItem(height, tx_hash)
})
GetScriptHashHistoryResponse(scripthash, items)
@ -532,7 +564,7 @@ object ElectrumClient {
val items = jitems.map(jvalue => {
val JString(tx_hash) = jvalue \ "tx_hash"
val tx_pos = intField(jvalue, "tx_pos")
val height = longField(jvalue, "height")
val height = intField(jvalue, "height")
val value = longField(jvalue, "value")
UnspentItem(tx_hash, tx_pos, value, height)
})
@ -563,11 +595,19 @@ object ElectrumClient {
require(BinaryData(txid) == tx.txid)
BroadcastTransactionResponse(tx, None)
case GetHeader(height) =>
GetHeaderResponse(parseHeader(json.result))
val JString(hex) = json.result
GetHeaderResponse(height, BlockHeader.read(hex))
case GetHeaders(start_height, count, cp_height) =>
val count = intField(json.result, "count")
val max = intField(json.result, "max")
val JString(hex) = json.result \ "hex"
val bin = fromHexString(hex)
val blockHeaders = bin.grouped(80).map(BlockHeader.read).toList
GetHeadersResponse(start_height, blockHeaders, max)
case GetMerkle(txid, height) =>
val JArray(hashes) = json.result \ "merkle"
val leaves = hashes collect { case JString(value) => BinaryData(value) }
val blockHeight = longField(json.result, "block_height")
val blockHeight = intField(json.result, "block_height")
val JInt(pos) = json.result \ "pos"
GetMerkleResponse(txid, leaves, blockHeight, pos.toInt)
}

View file

@ -20,6 +20,7 @@ import java.io.InputStream
import java.net.InetSocketAddress
import akka.actor.{Actor, ActorRef, FSM, Props, Terminated}
import fr.acinq.bitcoin.BlockHeader
import fr.acinq.eclair.Globals
import fr.acinq.eclair.blockchain.CurrentBlockCount
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.SSL
@ -40,16 +41,16 @@ class ElectrumClientPool(serverAddresses: Set[ElectrumServerAddress])(implicit v
// on startup, we attempt to connect to a number of electrum clients
// they will send us an `ElectrumReady` message when they're connected, or
// terminate if they cannot connect
(0 until MAX_CONNECTION_COUNT) foreach (_ => self ! Connect)
(0 until Math.min(MAX_CONNECTION_COUNT, serverAddresses.size)) foreach (_ => self ! Connect)
log.debug(s"starting electrum pool with serverAddresses={}", serverAddresses)
startWith(Disconnected, DisconnectedData)
when(Disconnected) {
case Event(ElectrumClient.ElectrumReady(tip, _), _) if addresses.contains(sender) =>
case Event(ElectrumClient.ElectrumReady(height, tip, _), _) if addresses.contains(sender) =>
sender ! ElectrumClient.HeaderSubscription(self)
handleHeader(sender, tip, None)
handleHeader(sender, height, tip, None)
case Event(ElectrumClient.AddStatusListener(listener), _) =>
statusListeners += listener
@ -63,12 +64,12 @@ class ElectrumClientPool(serverAddresses: Set[ElectrumServerAddress])(implicit v
}
when(Connected) {
case Event(ElectrumClient.ElectrumReady(tip, _), d: ConnectedData) if addresses.contains(sender) =>
case Event(ElectrumClient.ElectrumReady(height, tip, _), d: ConnectedData) if addresses.contains(sender) =>
sender ! ElectrumClient.HeaderSubscription(self)
handleHeader(sender, tip, Some(d))
handleHeader(sender, height, tip, Some(d))
case Event(ElectrumClient.HeaderSubscriptionResponse(tip), d: ConnectedData) if addresses.contains(sender) =>
handleHeader(sender, tip, Some(d))
case Event(ElectrumClient.HeaderSubscriptionResponse(height, tip), d: ConnectedData) if addresses.contains(sender) =>
handleHeader(sender, height, tip, Some(d))
case Event(request: ElectrumClient.Request, ConnectedData(master, _)) =>
master forward request
@ -91,8 +92,8 @@ class ElectrumClientPool(serverAddresses: Set[ElectrumServerAddress])(implicit v
} else {
// we choose next best candidate as master
val tips1 = d.tips - actor
val (bestClient, bestTip) = tips1.toSeq.maxBy(_._2.block_height)
handleHeader(bestClient, bestTip, Some(d.copy(tips = tips1)))
val (bestClient, bestTip) = tips1.toSeq.maxBy(_._2._1)
handleHeader(bestClient, bestTip._1, bestTip._2, Some(d.copy(tips = tips1)))
}
}
@ -122,18 +123,18 @@ class ElectrumClientPool(serverAddresses: Set[ElectrumServerAddress])(implicit v
initialize()
private def handleHeader(connection: ActorRef, tip: ElectrumClient.Header, data: Option[ConnectedData]) = {
private def handleHeader(connection: ActorRef, height: Int, tip: BlockHeader, data: Option[ConnectedData]) = {
val remoteAddress = addresses(connection)
// we update our block count even if it doesn't come from our current master
updateBlockCount(tip.block_height)
updateBlockCount(height)
data match {
case None =>
// as soon as we have a connection to an electrum server, we select it as master
log.info(s"selecting master $remoteAddress} at $tip")
statusListeners.foreach(_ ! ElectrumClient.ElectrumReady(tip, remoteAddress))
context.system.eventStream.publish(ElectrumClient.ElectrumReady(tip, remoteAddress))
goto(Connected) using ConnectedData(connection, Map(connection -> tip))
case Some(d) if tip.block_height >= d.blockHeight + 2L =>
statusListeners.foreach(_ ! ElectrumClient.ElectrumReady(height, tip, remoteAddress))
context.system.eventStream.publish(ElectrumClient.ElectrumReady(height, tip, remoteAddress))
goto(Connected) using ConnectedData(connection, Map(connection -> (height, tip)))
case Some(d) if height >= d.blockHeight + 2L =>
// we only switch to a new master if there is a significant difference with our current master, because
// we don't want to switch to a new master every time a new block arrives (some servers will be notified before others)
log.info(s"switching to master $remoteAddress at $tip")
@ -141,12 +142,12 @@ class ElectrumClientPool(serverAddresses: Set[ElectrumServerAddress])(implicit v
// so users (wallet, watcher, ...) will reset their subscriptions
statusListeners.foreach(_ ! ElectrumClient.ElectrumDisconnected)
context.system.eventStream.publish(ElectrumClient.ElectrumDisconnected)
statusListeners.foreach(_ ! ElectrumClient.ElectrumReady(tip, remoteAddress))
context.system.eventStream.publish(ElectrumClient.ElectrumReady(tip, remoteAddress))
goto(Connected) using d.copy(master = connection, tips = d.tips + (connection -> tip))
statusListeners.foreach(_ ! ElectrumClient.ElectrumReady(height, tip, remoteAddress))
context.system.eventStream.publish(ElectrumClient.ElectrumReady(height, tip, remoteAddress))
goto(Connected) using d.copy(master = connection, tips = d.tips + (connection -> (height, tip)))
case Some(d) =>
log.debug(s"received tip from $remoteAddress} $tip")
stay using d.copy(tips = d.tips + (connection -> tip))
log.debug(s"received tip from $remoteAddress} $tip at $height")
stay using d.copy(tips = d.tips + (connection -> (height, tip)))
}
}
@ -207,8 +208,8 @@ object ElectrumClientPool {
sealed trait Data
case object DisconnectedData extends Data
case class ConnectedData(master: ActorRef, tips: Map[ActorRef, ElectrumClient.Header]) extends Data {
def blockHeight = tips.get(master).map(_.block_height).getOrElse(0L)
case class ConnectedData(master: ActorRef, tips: Map[ActorRef, (Int, BlockHeader)]) extends Data {
def blockHeight = tips.get(master).map(_._1).getOrElse(0)
}
case object Connect

View file

@ -16,12 +16,13 @@
package fr.acinq.eclair.blockchain.electrum
import akka.actor.{ActorRef, FSM, Props}
import akka.actor.{ActorRef, FSM, PoisonPill, Props}
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.DeterministicWallet.{ExtendedPrivateKey, derivePrivateKey, hardened}
import fr.acinq.bitcoin.{Base58, Base58Check, BinaryData, Block, Crypto, DeterministicWallet, OP_PUSHDATA, OutPoint, SIGHASH_ALL, Satoshi, Script, ScriptElt, ScriptWitness, SigVersion, Transaction, TxIn, TxOut}
import fr.acinq.bitcoin.{Base58, Base58Check, BinaryData, Block, BlockHeader, Crypto, DeterministicWallet, OP_PUSHDATA, OutPoint, SIGHASH_ALL, Satoshi, Script, ScriptElt, ScriptWitness, SigVersion, Transaction, TxIn, TxOut}
import fr.acinq.eclair.blockchain.bitcoind.rpc.Error
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.{GetTransaction, GetTransactionResponse, TransactionHistoryItem, computeScriptHash}
import fr.acinq.eclair.blockchain.electrum.ElectrumClient._
import fr.acinq.eclair.blockchain.electrum.db.WalletDb
import fr.acinq.eclair.transactions.Transactions
import grizzled.slf4j.Logging
@ -46,6 +47,7 @@ import scala.util.{Failure, Success, Try}
*/
class ElectrumWallet(seed: BinaryData, client: ActorRef, params: ElectrumWallet.WalletParameters) extends FSM[ElectrumWallet.State, ElectrumWallet.Data] {
import Blockchain.RETARGETING_PERIOD
import ElectrumWallet._
import params._
@ -64,6 +66,7 @@ class ElectrumWallet(seed: BinaryData, client: ActorRef, params: ElectrumWallet.
/**
* Send a notification if the wallet is ready and its ready message has not
* already been sent
*
* @param data wallet data
* @return the input data with an updated 'last ready message' if needed
*/
@ -84,47 +87,115 @@ class ElectrumWallet(seed: BinaryData, client: ActorRef, params: ElectrumWallet.
}
startWith(DISCONNECTED, {
val header = chainHash match {
case Block.RegtestGenesisBlock.hash => ElectrumClient.Header.RegtestGenesisHeader
case Block.TestnetGenesisBlock.hash => ElectrumClient.Header.TestnetGenesisHeader
case Block.LivenetGenesisBlock.hash => ElectrumClient.Header.LivenetGenesisHeader
val blockchain = params.chainHash match {
// regtest is a special case, there are no checkpoints and we start with a single header
case Block.RegtestGenesisBlock.hash => Blockchain.fromGenesisBlock(Block.RegtestGenesisBlock.hash, Block.RegtestGenesisBlock.header)
case _ =>
val checkpoints = CheckPoint.load(params.chainHash, params.walletDb)
Blockchain.fromCheckpoints(params.chainHash, checkpoints)
}
val headers = params.walletDb.getHeaders(blockchain.checkpoints.size * RETARGETING_PERIOD, None)
log.info(s"loading ${headers.size} headers from db")
val blockchain1 = Blockchain.addHeadersChunk(blockchain, blockchain.checkpoints.size * RETARGETING_PERIOD, headers)
val firstAccountKeys = (0 until params.swipeRange).map(i => derivePrivateKey(accountMaster, i)).toVector
val firstChangeKeys = (0 until params.swipeRange).map(i => derivePrivateKey(changeMaster, i)).toVector
val data = Data(params, header, firstAccountKeys, firstChangeKeys)
val transactions = walletDb.getTransactions().map(_._1)
log.info(s"loading ${transactions.size} transactions from db")
val txs = transactions.map(tx => tx.txid -> tx).toMap
val data = Data(params, blockchain1, firstAccountKeys, firstChangeKeys).copy(transactions = txs)
context.system.eventStream.publish(NewWalletReceiveAddress(data.currentReceiveAddress))
data
})
when(DISCONNECTED) {
case Event(ElectrumClient.ElectrumReady(_, _), data) =>
case Event(ElectrumClient.ElectrumReady(_, _, _), data) =>
// subscribe to headers stream, server will reply with its current tip
client ! ElectrumClient.HeaderSubscription(self)
goto(WAITING_FOR_TIP) using data
}
when(WAITING_FOR_TIP) {
case Event(ElectrumClient.HeaderSubscriptionResponse(header), data) =>
data.accountKeys.foreach(key => client ! ElectrumClient.ScriptHashSubscription(computeScriptHashFromPublicKey(key.publicKey), self))
data.changeKeys.foreach(key => client ! ElectrumClient.ScriptHashSubscription(computeScriptHashFromPublicKey(key.publicKey), self))
// make sure there is not last ready message
goto(RUNNING) using data.copy(tip = header, lastReadyMessage = None)
case Event(ElectrumClient.HeaderSubscriptionResponse(height, header), data) =>
if (height < data.blockchain.height) {
log.info(s"electrum server is behind at ${height} we're at ${data.blockchain.height}, disconnecting")
sender ! PoisonPill
goto(DISCONNECTED) using data
} else if (data.blockchain.bestchain.isEmpty) {
log.info("performing full sync")
// now ask for the first header after our latest checkpoint
client ! ElectrumClient.GetHeaders(data.blockchain.checkpoints.size * RETARGETING_PERIOD, RETARGETING_PERIOD)
// make sure there is not last ready message
goto(SYNCING) using data.copy(lastReadyMessage = None)
} else if (header == data.blockchain.tip.header) {
// nothing to sync
data.accountKeys.foreach(key => client ! ElectrumClient.ScriptHashSubscription(computeScriptHashFromPublicKey(key.publicKey), self))
data.changeKeys.foreach(key => client ! ElectrumClient.ScriptHashSubscription(computeScriptHashFromPublicKey(key.publicKey), self))
goto(RUNNING) using notifyReady(data.copy(lastReadyMessage = None))
} else {
client ! ElectrumClient.GetHeaders(data.blockchain.tip.height + 1, RETARGETING_PERIOD)
log.info(s"syncing headers from ${data.blockchain.height} to ${height}")
goto(SYNCING) using data.copy(lastReadyMessage = None)
}
}
case Event(ElectrumClient.ElectrumDisconnected, data) =>
log.info(s"wallet got disconnected")
goto(DISCONNECTED) using data
when(SYNCING) {
case Event(ElectrumClient.GetHeadersResponse(start, headers, _), data) =>
if (headers.isEmpty) {
// ok, we're all synced now
log.info(s"headers sync complete, tip=${data.blockchain.tip}")
data.accountKeys.foreach(key => client ! ElectrumClient.ScriptHashSubscription(computeScriptHashFromPublicKey(key.publicKey), self))
data.changeKeys.foreach(key => client ! ElectrumClient.ScriptHashSubscription(computeScriptHashFromPublicKey(key.publicKey), self))
goto(RUNNING) using notifyReady(data)
} else {
Try(Blockchain.addHeaders(data.blockchain, start, headers)) match {
case Success(blockchain1) =>
val (blockchain2, saveme) = Blockchain.optimize(blockchain1)
saveme.grouped(RETARGETING_PERIOD).foreach(chunk => params.walletDb.addHeaders(chunk.head.height, chunk.map(_.header)))
log.info(s"requesting new headers chunk at ${blockchain2.tip.height}")
client ! ElectrumClient.GetHeaders(blockchain2.tip.height + 1, RETARGETING_PERIOD)
goto(SYNCING) using data.copy(blockchain = blockchain2)
case Failure(error) =>
log.error("electrum server sent bad headers, disconnecting", error)
sender ! PoisonPill
goto(DISCONNECTED) using data
}
}
case Event(ElectrumClient.HeaderSubscriptionResponse(height, header), data) =>
// we can ignore this, we will request header chunks until the server has nothing left to send us
log.debug(s"ignoring header $header at $height while syncing")
stay()
}
when(RUNNING) {
case Event(ElectrumClient.HeaderSubscriptionResponse(header), data) if data.tip == header => stay
case Event(ElectrumClient.HeaderSubscriptionResponse(_, header), data) if data.blockchain.tip == header => stay
case Event(ElectrumClient.HeaderSubscriptionResponse(header), data) =>
log.info(s"got new tip ${header.block_hash} at ${header.block_height}")
data.heights.collect {
case (txid, height) if height > 0 =>
val confirmations = computeDepth(header.block_height, height)
context.system.eventStream.publish(TransactionConfidenceChanged(txid, confirmations))
case Event(ElectrumClient.HeaderSubscriptionResponse(height, header), data) =>
log.info(s"got new tip ${header.blockId} at ${height}")
val difficulty = Blockchain.getDifficulty(data.blockchain, height, params.walletDb)
if (!difficulty.forall(target => header.bits == target)) {
log.error(s"electrum server send bad header (difficulty is not valid), disconnecting")
sender ! PoisonPill
stay()
} else {
Try(Blockchain.addHeader(data.blockchain, height, header)) match {
case Success(blockchain1) =>
data.heights.collect {
case (txid, txheight) if txheight > 0 =>
val confirmations = computeDepth(height, txheight)
context.system.eventStream.publish(TransactionConfidenceChanged(txid, confirmations))
}
val (blockchain2, saveme) = Blockchain.optimize(blockchain1)
saveme.grouped(RETARGETING_PERIOD).foreach(chunk => params.walletDb.addHeaders(chunk.head.height, chunk.map(_.header)))
stay using notifyReady(data.copy(blockchain = blockchain2))
case Failure(error) =>
log.error(error, s"electrum server sent bad header, disconnecting")
sender ! PoisonPill
stay() using data
}
}
stay using notifyReady(data.copy(tip = header))
case Event(ElectrumClient.ScriptHashSubscriptionResponse(scriptHash, status), data) if data.status.get(scriptHash) == Some(status) =>
stay using notifyReady(data)// we already have it
@ -178,6 +249,13 @@ class ElectrumWallet(seed: BinaryData, client: ActorRef, params: ElectrumWallet.
case ((heights, hashes), item) if !data.transactions.contains(item.tx_hash) && !data.pendingTransactionRequests.contains(item.tx_hash) =>
// we retrieve the tx if we don't have it and haven't yet requested it
client ! GetTransaction(item.tx_hash)
if (item.height > 0) { // don't ask for merkle proof for unconfirmed transactions
if (data.blockchain.getHeader(item.height).orElse(params.walletDb.getHeader(item.height)).isEmpty) {
val start = (item.height / RETARGETING_PERIOD) * RETARGETING_PERIOD
client ! GetHeaders(start, RETARGETING_PERIOD)
}
client ! GetMerkle(item.tx_hash, item.height)
}
(heights + (item.tx_hash -> item.height), hashes + item.tx_hash)
case ((heights, hashes), item) =>
// otherwise we just update the height
@ -187,7 +265,7 @@ class ElectrumWallet(seed: BinaryData, client: ActorRef, params: ElectrumWallet.
// we now have updated height for all our transactions,
heights1.collect {
case (txid, height) =>
val confirmations = if (height <= 0) 0 else computeDepth(data.tip.block_height, height)
val confirmations = if (height <= 0) 0 else computeDepth(data.blockchain.tip.height, height)
(data.heights.get(txid), height) match {
case (None, height) if height <= 0 =>
// height=0 => unconfirmed, height=-1 => unconfirmed and one input is unconfirmed
@ -221,6 +299,28 @@ class ElectrumWallet(seed: BinaryData, client: ActorRef, params: ElectrumWallet.
stay using notifyReady(data1)
}
case Event(response@GetMerkleResponse(txid, _, height, _), data) =>
data.blockchain.getHeader(height).orElse(params.walletDb.getHeader(height)) match {
case Some(header) if header.hashMerkleRoot == response.root =>
log.info(s"transaction $txid has been verified")
data.transactions.get(txid).orElse(data.pendingTransactions.find(_.txid == txid)) match {
case Some(tx) =>
log.info(s"saving ${tx.txid} to our db")
walletDb.addTransaction(tx, response)
case None => log.warning(s"we received a Merkle proof for transaction $txid that we don't have")
}
stay()
case Some(header) =>
log.error(s"server sent an invalid proof for $txid, disconnecting")
sender ! PoisonPill
stay() using data.copy(transactions = data.transactions - txid)
case None =>
// this is probably because the tx is old and within our checkpoints => request the whole header chunk
val start = (height / RETARGETING_PERIOD) * RETARGETING_PERIOD
client ! GetHeaders(start, RETARGETING_PERIOD)
stay()
}
case Event(CompleteTransaction(tx, feeRatePerKw), data) =>
Try(data.completeTransaction(tx, feeRatePerKw, minimumFee, dustLimit, allowSpendUnconfirmed)) match {
case Success((data1, tx1, fee1)) => stay using data1 replying CompleteTransactionResponse(tx1, fee1, None)
@ -250,14 +350,21 @@ class ElectrumWallet(seed: BinaryData, client: ActorRef, params: ElectrumWallet.
log.info(s"broadcasting txid=${tx.txid}")
client forward bc
stay
case Event(ElectrumClient.ElectrumDisconnected, data) =>
log.info(s"wallet got disconnected")
goto(DISCONNECTED) using data
}
whenUnhandled {
case Event(ElectrumClient.ElectrumDisconnected, data) =>
log.info(s"wallet got disconnected")
goto(DISCONNECTED) using data.copy(
pendingHistoryRequests = Set(),
pendingTransactionRequests = Set(),
pendingTransactions = Seq(),
status = Map(),
heights = Map(),
history = Map()
)
case Event(GetCurrentReceiveAddress, data) => stay replying GetCurrentReceiveAddressResponse(data.currentReceiveAddress)
case Event(GetBalance, data) =>
@ -279,18 +386,15 @@ class ElectrumWallet(seed: BinaryData, client: ActorRef, params: ElectrumWallet.
}
object ElectrumWallet {
// use 32 bytes seed, which will generate a 24 words mnemonic code
val SEED_BYTES_LENGTH = 32
def props(seed: BinaryData, client: ActorRef, params: WalletParameters): Props = Props(new ElectrumWallet(seed, client, params))
case class WalletParameters(chainHash: BinaryData, minimumFee: Satoshi = Satoshi(2000), dustLimit: Satoshi = Satoshi(546), swipeRange: Int = 10, allowSpendUnconfirmed: Boolean = true)
case class WalletParameters(chainHash: BinaryData, walletDb: WalletDb, minimumFee: Satoshi = Satoshi(2000), dustLimit: Satoshi = Satoshi(546), swipeRange: Int = 10, allowSpendUnconfirmed: Boolean = true)
// @formatter:off
sealed trait State
case object DISCONNECTED extends State
case object WAITING_FOR_TIP extends State
case object SYNCING extends State
case object RUNNING extends State
sealed trait Request
@ -373,7 +477,7 @@ object ElectrumWallet {
/**
*
* @param key public key
* @return the hash of the public key script for this key, as used by ElectrumX's hash-based methods
* @return the hash of the public key script for this key, as used by Electrum's hash-based methods
*/
def computeScriptHashFromPublicKey(key: PublicKey): BinaryData = Crypto.sha256(Script.write(computePublicKeyScript(key))).reverse
@ -453,14 +557,14 @@ object ElectrumWallet {
}
/**
* Wallet state, which stores data returned by ElectrumX servers.
* Wallet state, which stores data returned by Electrum servers.
* Most items are indexed by script hash (i.e. by pubkey script sha256 hash).
* Height follow ElectrumX's conventions:
* Height follows Electrum's conventions:
* - h > 0 means that the tx was confirmed at block #h
* - 0 means unconfirmed, but all input are confirmed
* < 0 means unconfirmed, and some inputs are unconfirmed as well
*
* @param tip current blockchain tip
* @param blockchain blockchain
* @param accountKeys account keys
* @param changeKeys change keys
* @param status script hash -> status; "" means that the script hash has not been used
@ -473,8 +577,7 @@ object ElectrumWallet {
* @param pendingTransactionRequests requests pending a response from the electrum server
* @param pendingTransactions transactions received but not yet connected to their parents
*/
case class Data(chainHash: BinaryData,
tip: ElectrumClient.Header,
case class Data(blockchain: Blockchain,
accountKeys: Vector[ExtendedPrivateKey],
changeKeys: Vector[ExtendedPrivateKey],
status: Map[BinaryData, String],
@ -486,6 +589,8 @@ object ElectrumWallet {
pendingTransactionRequests: Set[BinaryData],
pendingTransactions: Seq[Transaction],
lastReadyMessage: Option[WalletReady]) extends Logging {
val chainHash = blockchain.chainHash
lazy val accountKeyMap = accountKeys.map(key => computeScriptHashFromPublicKey(key.publicKey) -> key).toMap
lazy val changeKeyMap = changeKeys.map(key => computeScriptHashFromPublicKey(key.publicKey) -> key).toMap
@ -507,7 +612,7 @@ object ElectrumWallet {
def readyMessage: WalletReady = {
val (confirmed, unconfirmed) = balance
WalletReady(confirmed, unconfirmed, tip.block_height, tip.timestamp)
WalletReady(confirmed, unconfirmed, blockchain.tip.height, blockchain.tip.header.time)
}
/**
@ -556,7 +661,7 @@ object ElectrumWallet {
def isMine(txOut: TxOut): Boolean = publicScriptMap.contains(txOut.publicKeyScript)
def computeTransactionDepth(txid: BinaryData): Long = heights.get(txid).map(height => if (height > 0) computeDepth(tip.block_height, height) else 0).getOrElse(0)
def computeTransactionDepth(txid: BinaryData): Long = heights.get(txid).map(height => if (height > 0) computeDepth(blockchain.tip.height, height) else 0).getOrElse(0)
/**
*
@ -824,8 +929,8 @@ object ElectrumWallet {
}
object Data {
def apply(params: ElectrumWallet.WalletParameters, tip: ElectrumClient.Header, accountKeys: Vector[ExtendedPrivateKey], changeKeys: Vector[ExtendedPrivateKey]): Data
= Data(params.chainHash, tip, accountKeys, changeKeys, Map(), Map(), Map(), Map(), Set(), Set(), Set(), Seq(), None)
def apply(params: ElectrumWallet.WalletParameters, blockchain: Blockchain, accountKeys: Vector[ExtendedPrivateKey], changeKeys: Vector[ExtendedPrivateKey]): Data
= Data(blockchain, accountKeys, changeKeys, Map(), Map(), Map(), Map(), Set(), Set(), Set(), Seq(), None)
}
case class InfiniteLoopException(data: Data, tx: Transaction) extends Exception

View file

@ -20,7 +20,7 @@ import java.net.InetSocketAddress
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props, Stash, Terminated}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{BinaryData, Satoshi, Script, Transaction, TxIn, TxOut}
import fr.acinq.bitcoin.{BinaryData, BlockHeader, Satoshi, Script, Transaction, TxIn, TxOut}
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.electrum.ElectrumClient._
import fr.acinq.eclair.channel.{BITCOIN_FUNDING_DEPTHOK, BITCOIN_FUNDING_SPENT, BITCOIN_PARENT_TX_CONFIRMED}
@ -52,29 +52,29 @@ class ElectrumWatcher(client: ActorRef) extends Actor with Stash with ActorLoggi
def receive = disconnected(Set.empty, Nil, SortedMap.empty)
def disconnected(watches: Set[Watch], publishQueue: Seq[PublishAsap], block2tx: SortedMap[Long, Seq[Transaction]]): Receive = {
case ElectrumClient.ElectrumReady(_, _) =>
case ElectrumClient.ElectrumReady(_, _, _) =>
client ! ElectrumClient.HeaderSubscription(self)
case ElectrumClient.HeaderSubscriptionResponse(header) =>
case ElectrumClient.HeaderSubscriptionResponse(height, header) =>
watches.map(self ! _)
publishQueue.map(self ! _)
context become running(header, Set(), Map(), block2tx, Nil)
context become running(height, header, Set(), Map(), block2tx, Nil)
case watch: Watch => context become disconnected(watches + watch, publishQueue, block2tx)
case publish: PublishAsap => context become disconnected(watches, publishQueue :+ publish, block2tx)
}
def running(tip: ElectrumClient.Header, watches: Set[Watch], scriptHashStatus: Map[BinaryData, String], block2tx: SortedMap[Long, Seq[Transaction]], sent: Seq[Transaction]): Receive = {
case ElectrumClient.HeaderSubscriptionResponse(newtip) if tip == newtip => ()
def running(height: Int, tip: BlockHeader, watches: Set[Watch], scriptHashStatus: Map[BinaryData, String], block2tx: SortedMap[Long, Seq[Transaction]], sent: Seq[Transaction]): Receive = {
case ElectrumClient.HeaderSubscriptionResponse(newheight, newtip) if tip == newtip => ()
case ElectrumClient.HeaderSubscriptionResponse(newtip) =>
log.info(s"new tip: ${newtip.block_hash} $newtip")
case ElectrumClient.HeaderSubscriptionResponse(newheight, newtip) =>
log.info(s"new tip: ${newtip.blockId} $height")
watches collect {
case watch: WatchConfirmed =>
val scriptHash = computeScriptHash(watch.publicKeyScript)
client ! ElectrumClient.GetScriptHashHistory(scriptHash)
}
val toPublish = block2tx.filterKeys(_ <= newtip.block_height)
val toPublish = block2tx.filterKeys(_ <= newheight)
toPublish.values.flatten.foreach(tx => self ! PublishAsap(tx))
context become running(newtip, watches, scriptHashStatus, block2tx -- toPublish.keys, sent)
context become running(newheight, newtip, watches, scriptHashStatus, block2tx -- toPublish.keys, sent)
case watch: Watch if watches.contains(watch) => ()
@ -83,25 +83,25 @@ class ElectrumWatcher(client: ActorRef) extends Actor with Stash with ActorLoggi
log.info(s"added watch-spent on output=$txid:$outputIndex scriptHash=$scriptHash")
client ! ElectrumClient.ScriptHashSubscription(scriptHash, self)
context.watch(watch.channel)
context become running(tip, watches + watch, scriptHashStatus, block2tx, sent)
context become running(height, tip, watches + watch, scriptHashStatus, block2tx, sent)
case watch@WatchSpentBasic(_, txid, outputIndex, publicKeyScript, _) =>
val scriptHash = computeScriptHash(publicKeyScript)
log.info(s"added watch-spent-basic on output=$txid:$outputIndex scriptHash=$scriptHash")
client ! ElectrumClient.ScriptHashSubscription(scriptHash, self)
context.watch(watch.channel)
context become running(tip, watches + watch, scriptHashStatus, block2tx, sent)
context become running(height, tip, watches + watch, scriptHashStatus, block2tx, sent)
case watch@WatchConfirmed(_, txid, publicKeyScript, _, _) =>
val scriptHash = computeScriptHash(publicKeyScript)
log.info(s"added watch-confirmed on txid=$txid scriptHash=$scriptHash")
client ! ElectrumClient.GetScriptHashHistory(scriptHash)
context.watch(watch.channel)
context become running(tip, watches + watch, scriptHashStatus, block2tx, sent)
context become running(height, tip, watches + watch, scriptHashStatus, block2tx, sent)
case Terminated(actor) =>
val watches1 = watches.filterNot(_.channel == actor)
context become running(tip, watches1, scriptHashStatus, block2tx, sent)
context become running(height, tip, watches1, scriptHashStatus, block2tx, sent)
case ElectrumClient.ScriptHashSubscriptionResponse(scriptHash, status) =>
scriptHashStatus.get(scriptHash) match {
@ -111,33 +111,33 @@ class ElectrumWatcher(client: ActorRef) extends Actor with Stash with ActorLoggi
log.info(s"new status=$status for scriptHash=$scriptHash")
client ! ElectrumClient.GetScriptHashHistory(scriptHash)
}
context become running(tip, watches, scriptHashStatus + (scriptHash -> status), block2tx, sent)
context become running(height, tip, watches, scriptHashStatus + (scriptHash -> status), block2tx, sent)
case ElectrumClient.GetScriptHashHistoryResponse(_, history) =>
// this is for WatchSpent/WatchSpentBasic
history.filter(_.height >= 0).map(item => client ! ElectrumClient.GetTransaction(item.tx_hash))
// this is for WatchConfirmed
history.collect {
case ElectrumClient.TransactionHistoryItem(height, tx_hash) if height > 0 => watches.collect {
case ElectrumClient.TransactionHistoryItem(txheight, tx_hash) if txheight > 0 => watches.collect {
case WatchConfirmed(_, txid, _, minDepth, _) if txid == tx_hash =>
val confirmations = tip.block_height - height + 1
log.info(s"txid=$txid was confirmed at height=$height and now has confirmations=$confirmations (currentHeight=${tip.block_height})")
val confirmations = height - txheight + 1
log.info(s"txid=$txid was confirmed at height=$txheight and now has confirmations=$confirmations (currentHeight=${height})")
if (confirmations >= minDepth) {
// we need to get the tx position in the block
client ! GetMerkle(tx_hash, height)
client ! GetMerkle(tx_hash, txheight)
}
}
}
case ElectrumClient.GetMerkleResponse(tx_hash, _, height, pos) =>
val confirmations = tip.block_height - height + 1
case ElectrumClient.GetMerkleResponse(tx_hash, _, txheight, pos) =>
val confirmations = height - txheight + 1
val triggered = watches.collect {
case w@WatchConfirmed(channel, txid, _, minDepth, event) if txid == tx_hash && confirmations >= minDepth =>
log.info(s"txid=$txid had confirmations=$confirmations in block=$height pos=$pos")
channel ! WatchEventConfirmed(event, height.toInt, pos)
log.info(s"txid=$txid had confirmations=$confirmations in block=$txheight pos=$pos")
channel ! WatchEventConfirmed(event, txheight.toInt, pos)
w
}
context become running(tip, watches -- triggered, scriptHashStatus, block2tx, sent)
context become running(height, tip, watches -- triggered, scriptHashStatus, block2tx, sent)
case ElectrumClient.GetTransactionResponse(spendingTx) =>
val triggered = spendingTx.txIn.map(_.outPoint).flatMap(outPoint => watches.collect {
@ -152,7 +152,7 @@ class ElectrumWatcher(client: ActorRef) extends Actor with Stash with ActorLoggi
channel ! WatchEventSpentBasic(event)
Some(w)
}).flatten
context become running(tip, watches -- triggered, scriptHashStatus, block2tx, sent)
context become running(height, tip, watches -- triggered, scriptHashStatus, block2tx, sent)
case PublishAsap(tx) =>
val blockCount = Globals.blockCount.get()
@ -167,11 +167,11 @@ class ElectrumWatcher(client: ActorRef) extends Actor with Stash with ActorLoggi
} else if (cltvTimeout > blockCount) {
log.info(s"delaying publication of txid=${tx.txid} until block=$cltvTimeout (curblock=$blockCount)")
val block2tx1 = block2tx.updated(cltvTimeout, block2tx.getOrElse(cltvTimeout, Seq.empty[Transaction]) :+ tx)
context become running(tip, watches, scriptHashStatus, block2tx1, sent)
context become running(height, tip, watches, scriptHashStatus, block2tx1, sent)
} else {
log.info(s"publishing tx=$tx")
client ! BroadcastTransaction(tx)
context become running(tip, watches, scriptHashStatus, block2tx, sent :+ tx)
context become running(height, tip, watches, scriptHashStatus, block2tx, sent :+ tx)
}
case WatchEventConfirmed(BITCOIN_PARENT_TX_CONFIRMED(tx), blockHeight, _) =>
@ -182,11 +182,11 @@ class ElectrumWatcher(client: ActorRef) extends Actor with Stash with ActorLoggi
if (absTimeout > blockCount) {
log.info(s"delaying publication of txid=${tx.txid} until block=$absTimeout (curblock=$blockCount)")
val block2tx1 = block2tx.updated(absTimeout, block2tx.getOrElse(absTimeout, Seq.empty[Transaction]) :+ tx)
context become running(tip, watches, scriptHashStatus, block2tx1, sent)
context become running(height, tip, watches, scriptHashStatus, block2tx1, sent)
} else {
log.info(s"publishing tx=$tx")
client ! BroadcastTransaction(tx)
context become running(tip, watches, scriptHashStatus, block2tx, sent :+ tx)
context become running(height, tip, watches, scriptHashStatus, block2tx, sent :+ tx)
}
case ElectrumClient.BroadcastTransactionResponse(tx, error_opt) =>
@ -195,7 +195,7 @@ class ElectrumWatcher(client: ActorRef) extends Actor with Stash with ActorLoggi
case Some(error) if error.message.contains("transaction already in block chain") => log.info(s"broadcast ignored for txid=${tx.txid} tx=$tx (tx was already in blockchain)")
case Some(error) => log.error(s"broadcast failed for txid=${tx.txid} tx=$tx with error=$error")
}
context become running(tip, watches, scriptHashStatus, block2tx, sent diff Seq(tx))
context become running(height, tip, watches, scriptHashStatus, block2tx, sent diff Seq(tx))
case ElectrumClient.ElectrumDisconnected =>
// we remember watches and keep track of tx that have not yet been published
@ -220,7 +220,7 @@ object ElectrumWatcher extends App {
}
def receive = {
case ElectrumClient.ElectrumReady(_, _) =>
case ElectrumClient.ElectrumReady(_, _, _) =>
log.info(s"starting watcher")
context become running(context.actorOf(Props(new ElectrumWatcher(client)), "watcher"))
}

View file

@ -0,0 +1,29 @@
package fr.acinq.eclair.blockchain.electrum.db
import fr.acinq.bitcoin.{BinaryData, BlockHeader, Transaction}
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.GetMerkleResponse
trait HeaderDb {
def addHeader(height: Int, header: BlockHeader): Unit
def addHeaders(startHeight: Int, headers: Seq[BlockHeader]): Unit
def getHeader(height: Int): Option[BlockHeader]
// used only in unit tests
def getHeader(blockHash: BinaryData): Option[(Int, BlockHeader)]
def getHeaders(startHeight: Int, maxCount: Option[Int]): Seq[BlockHeader]
def getTip: Option[(Int, BlockHeader)]
}
trait TransactionDb {
def addTransaction(tx: Transaction, proof: GetMerkleResponse): Unit
def getTransaction(txid: BinaryData): Option[(Transaction, GetMerkleResponse)]
def getTransactions(): Seq[(Transaction, GetMerkleResponse)]
}
trait WalletDb extends HeaderDb with TransactionDb

View file

@ -0,0 +1,142 @@
package fr.acinq.eclair.blockchain.electrum.db.sqlite
import java.sql.Connection
import fr.acinq.bitcoin.{BinaryData, BlockHeader, Transaction}
import fr.acinq.eclair.blockchain.electrum.ElectrumClient
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.GetMerkleResponse
import fr.acinq.eclair.blockchain.electrum.db.WalletDb
import fr.acinq.eclair.db.sqlite.SqliteUtils
import scala.collection.immutable.Queue
class SqliteWalletDb(sqlite: Connection) extends WalletDb {
import SqliteUtils._
using(sqlite.createStatement()) { statement =>
statement.executeUpdate("CREATE TABLE IF NOT EXISTS headers (height INTEGER NOT NULL PRIMARY KEY, block_hash BLOB NOT NULL, header BLOB NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS transactions (tx_hash BLOB PRIMARY KEY, tx BLOB NOT NULL, proof BLOB NOT NULL)")
}
override def addHeader(height: Int, header: BlockHeader): Unit = {
using(sqlite.prepareStatement("INSERT OR IGNORE INTO headers VALUES (?, ?, ?)")) { statement =>
statement.setInt(1, height)
statement.setBytes(2, header.hash)
statement.setBytes(3, BlockHeader.write(header))
statement.executeUpdate()
}
}
override def addHeaders(startHeight: Int, headers: Seq[BlockHeader]): Unit = {
using(sqlite.prepareStatement("INSERT OR IGNORE INTO headers VALUES (?, ?, ?)"), disableAutoCommit = true) { statement =>
var height = startHeight
headers.foreach(header => {
statement.setInt(1, height)
statement.setBytes(2, header.hash)
statement.setBytes(3, BlockHeader.write(header))
statement.addBatch()
height = height + 1
})
val result = statement.executeBatch()
}
}
override def getHeader(height: Int): Option[BlockHeader] = {
using(sqlite.prepareStatement("SELECT header FROM headers WHERE height = ?")) { statement =>
statement.setInt(1, height)
val rs = statement.executeQuery()
if (rs.next()) {
Some(BlockHeader.read(rs.getBytes("header")))
} else {
None
}
}
}
override def getHeader(blockHash: BinaryData): Option[(Int, BlockHeader)] = {
using(sqlite.prepareStatement("SELECT height, header FROM headers WHERE block_hash = ?")) { statement =>
statement.setBytes(1, blockHash)
val rs = statement.executeQuery()
if (rs.next()) {
Some((rs.getInt("height"), BlockHeader.read(rs.getBytes("header"))))
} else {
None
}
}
}
override def getHeaders(startHeight: Int, maxCount: Option[Int]): Seq[BlockHeader] = {
val query = "SELECT height, header FROM headers WHERE height >= ? ORDER BY height " + maxCount.map(m => s" LIMIT $m").getOrElse("")
using(sqlite.prepareStatement(query)) { statement =>
statement.setInt(1, startHeight)
val rs = statement.executeQuery()
var q: Queue[BlockHeader] = Queue()
while (rs.next()) {
q = q :+ BlockHeader.read(rs.getBytes("header"))
}
q
}
}
override def getTip: Option[(Int, BlockHeader)] = {
using(sqlite.prepareStatement("SELECT t.height, t.header FROM headers t INNER JOIN (SELECT MAX(height) AS maxHeight FROM headers) q ON t.height = q.maxHeight")) { statement =>
val rs = statement.executeQuery()
if (rs.next()) {
Some((rs.getInt("height"), BlockHeader.read(rs.getBytes("header"))))
} else {
None
}
}
}
override def addTransaction(tx: Transaction, proof: ElectrumClient.GetMerkleResponse): Unit = {
using(sqlite.prepareStatement("INSERT OR IGNORE INTO transactions VALUES (?, ?, ?)")) { statement =>
statement.setBytes(1, tx.hash)
statement.setBytes(2, Transaction.write(tx))
statement.setBytes(3, SqliteWalletDb.serialize(proof))
statement.executeUpdate()
}
}
override def getTransaction(tx_hash: BinaryData): Option[(Transaction, ElectrumClient.GetMerkleResponse)] = {
using(sqlite.prepareStatement("SELECT tx, proof FROM transactions WHERE tx_hash = ?")) { statement =>
statement.setBytes(1, tx_hash)
val rs = statement.executeQuery()
if (rs.next()) {
Some((Transaction.read(rs.getBytes("tx")), SqliteWalletDb.deserialize((rs.getBytes("proof")))))
} else {
None
}
}
}
override def getTransactions(): Seq[(Transaction, ElectrumClient.GetMerkleResponse)] = {
using(sqlite.prepareStatement("SELECT tx, proof FROM transactions")) { statement =>
val rs = statement.executeQuery()
var q: Queue[(Transaction, ElectrumClient.GetMerkleResponse)] = Queue()
while (rs.next()) {
q = q :+ (Transaction.read(rs.getBytes("tx")), SqliteWalletDb.deserialize(rs.getBytes("proof")))
}
q
}
}
}
object SqliteWalletDb {
import fr.acinq.eclair.wire.LightningMessageCodecs.binarydata
import scodec.Codec
import scodec.bits.BitVector
import scodec.codecs._
val proofCodec: Codec[GetMerkleResponse] = (
("txid" | binarydata(32)) ::
("merkle" | listOfN(uint16, binarydata(32))) ::
("block_height" | uint24) ::
("pos" | uint24)).as[GetMerkleResponse]
def serialize(proof: GetMerkleResponse) : BinaryData = proofCodec.encode(proof).require.toByteArray
def deserialize(bin: BinaryData) : GetMerkleResponse = proofCodec.decode(BitVector(bin.toArray)).require.value
}

View file

@ -626,6 +626,11 @@ class Channel(val nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: Pu
log.info(s"updating channel_update aboveReserve=${Helpers.aboveReserve(commitments1)}")
self ! TickRefreshChannelUpdate
}
if (!Helpers.aboveReserve(d.commitments) && Helpers.aboveReserve(commitments1)) {
// we just went above reserve (can't go below), let's refresh our channel_update to enable/disable it accordingly
log.info(s"updating channel_update aboveReserve=${Helpers.aboveReserve(commitments1)}")
self ! TickRefreshChannelUpdate
}
context.system.eventStream.publish(ChannelSignatureSent(self, commitments1))
context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortChannelId, nextRemoteCommit.spec.toRemoteMsat)) // note that remoteCommit.toRemote == toLocal
// we expect a quick response from our peer

View file

@ -16,7 +16,7 @@
package fr.acinq.eclair.db.sqlite
import java.sql.{PreparedStatement, ResultSet, Statement}
import java.sql.{Connection, PreparedStatement, ResultSet, Statement}
import scodec.Codec
import scodec.bits.BitVector
@ -31,10 +31,12 @@ object SqliteUtils {
* @param statement
* @param block
*/
def using[T <: Statement, U](statement: T)(block: T => U): U = {
def using[T <: Statement, U](statement: T, disableAutoCommit: Boolean = false)(block: T => U): U = {
try {
if (disableAutoCommit) statement.getConnection.setAutoCommit(false)
block(statement)
} finally {
if (disableAutoCommit) statement.getConnection.setAutoCommit(true)
if (statement != null) statement.close()
}
}
@ -102,4 +104,20 @@ object SqliteUtils {
val result = rs.getLong(label)
if (rs.wasNull()) None else Some(result)
}
/**
* Obtain an exclusive lock on a sqlite database. This is useful when we want to make sure that only one process
* accesses the database file (see https://www.sqlite.org/pragma.html).
*
* The lock will be kept until the database is closed, or if the locking mode is explicitely reset.
*
* @param sqlite
*/
def obtainExclusiveLock(sqlite: Connection){
val statement = sqlite.createStatement()
statement.execute("PRAGMA locking_mode = EXCLUSIVE")
// we have to make a write to actually obtain the lock
statement.executeUpdate("CREATE TABLE IF NOT EXISTS dummy_table_for_locking (a INTEGER NOT NULL)")
statement.executeUpdate("INSERT INTO dummy_table_for_locking VALUES (42)")
}
}

View file

@ -19,9 +19,18 @@ package fr.acinq.eclair.io
import java.net.InetSocketAddress
import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Status, SupervisorStrategy, Terminated}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.blockchain.EclairWallet
import fr.acinq.eclair.channel._
import fr.acinq.eclair.payment.Relayer.RelayPayload
import fr.acinq.eclair.payment.{Relayed, Relayer}
import fr.acinq.eclair.router.Rebroadcast
import fr.acinq.eclair.transactions.{IN, OUT}
import fr.acinq.eclair.wire.{TemporaryNodeFailure, UpdateAddHtlc}
import grizzled.slf4j.Logging
import scala.util.Success
import fr.acinq.eclair.channel.HasCommitments
/**
@ -30,13 +39,24 @@ import fr.acinq.eclair.channel.HasCommitments
*/
class Switchboard(nodeParams: NodeParams, authenticator: ActorRef, watcher: ActorRef, router: ActorRef, relayer: ActorRef, wallet: EclairWallet) extends Actor with ActorLogging {
import Switchboard._
authenticator ! self
// we load peers and channels from database
private val initialPeers = {
val channels = nodeParams.channelsDb.listChannels().groupBy(_.commitments.remoteParams.nodeId)
val channels = nodeParams.channelsDb.listChannels()
val peers = nodeParams.peersDb.listPeers()
checkBrokenHtlcsLink(channels, nodeParams.privateKey) match {
case Nil => ()
case brokenHtlcs =>
val brokenHtlcKiller = context.actorOf(Props[HtlcReaper], name = "htlc-reaper")
brokenHtlcKiller ! brokenHtlcs
}
channels
.groupBy(_.commitments.remoteParams.nodeId)
.map {
case (remoteNodeId, states) => (remoteNodeId, states, peers.get(remoteNodeId))
}
@ -111,8 +131,73 @@ class Switchboard(nodeParams: NodeParams, authenticator: ActorRef, watcher: Acto
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = true) { case _ => SupervisorStrategy.Resume }
}
object Switchboard {
object Switchboard extends Logging {
def props(nodeParams: NodeParams, authenticator: ActorRef, watcher: ActorRef, router: ActorRef, relayer: ActorRef, wallet: EclairWallet) = Props(new Switchboard(nodeParams, authenticator, watcher, router, relayer, wallet))
/**
* If we have stopped eclair while it was forwarding HTLCs, it is possible that we are in a state were an incoming HTLC
* was committed by both sides, but we didn't have time to send and/or sign the corresponding HTLC to the downstream node.
*
* In that case, if we do nothing, the incoming HTLC will eventually expire and we won't lose money, but the channel will
* get closed, which is a major inconvenience.
*
* This check will detect this and will allow us to fast-fail HTLCs and thus preserve channels.
*
* @param channels
* @return
*/
def checkBrokenHtlcsLink(channels: Seq[HasCommitments], privateKey: PrivateKey): Seq[UpdateAddHtlc] = {
// We are interested in incoming HTLCs, that have been *cross-signed*. They signed it first, so the HTLC will first
// appear in our commitment tx, and later on in their commitment when we subsequently sign it.
// That's why we need to look in *their* commitment with direction=OUT.
val htlcs_in = channels
.flatMap(_.commitments.remoteCommit.spec.htlcs)
.filter(_.direction == OUT)
.map(_.add)
.map(Relayer.tryParsePacket(_, privateKey))
.collect { case Success(RelayPayload(add, _, _)) => add } // we only consider htlcs that are relayed, not the ones for which we are the final node
// Here we do it differently because we need the origin information.
val relayed_out = channels
.flatMap(_.commitments.originChannels.values)
.collect { case r: Relayed => r }
.toSet
val htlcs_broken = htlcs_in.filterNot(htlc_in => relayed_out.exists(r => r.originChannelId == htlc_in.channelId && r.originHtlcId == htlc_in.id))
logger.info(s"htlcs_in=${htlcs_in.size} htlcs_out=${relayed_out.size} htlcs_broken=${htlcs_broken.size}")
htlcs_broken
}
}
class HtlcReaper extends Actor with ActorLogging {
context.system.eventStream.subscribe(self, classOf[ChannelStateChanged])
override def receive: Receive = {
case initialHtlcs: Seq[UpdateAddHtlc]@unchecked => context become main(initialHtlcs)
}
def main(htlcs: Seq[UpdateAddHtlc]): Receive = {
case ChannelStateChanged(channel, _, _, WAIT_FOR_INIT_INTERNAL | OFFLINE | SYNCING, NORMAL | SHUTDOWN | CLOSING, data: HasCommitments) =>
val acked = htlcs
.filter(_.channelId == data.channelId) // only consider htlcs related to this channel
.filter {
case htlc if Commitments.getHtlcCrossSigned(data.commitments, IN, htlc.id).isDefined =>
// this htlc is cross signed in the current commitment, we can fail it
log.info(s"failing broken htlc=$htlc")
channel ! CMD_FAIL_HTLC(htlc.id, Right(TemporaryNodeFailure), commit = true)
false // the channel may very well be disconnected before we sign (=ack) the fail, so we keep it for now
case _ =>
true // the htlc has already been failed, we can forget about it now
}
acked.foreach(htlc => log.info(s"forgetting htlc id=${htlc.id} channelId=${htlc.channelId}"))
context become main(htlcs diff acked)
}
}

View file

@ -0,0 +1,27 @@
/*
* Copyright 2018 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fr.acinq.eclair.blockchain.electrum
import fr.acinq.bitcoin.Block
import org.scalatest.FunSuite
class CheckPointSpec extends FunSuite {
test("load checkpoint") {
val checkpoints = CheckPoint.load(Block.LivenetGenesisBlock.hash)
assert(!checkpoints.isEmpty)
}
}

View file

@ -62,19 +62,19 @@ class ElectrumClientPoolSpec extends TestKit(ActorSystem("test")) with FunSuiteL
}
test("get merkle tree") {
probe.send(pool, GetMerkle("c5efb5cbd35a44ba956b18100be0a91c9c33af4c7f31be20e33741d95f04e202", 1210223L))
probe.send(pool, GetMerkle("c5efb5cbd35a44ba956b18100be0a91c9c33af4c7f31be20e33741d95f04e202", 1210223))
val response = probe.expectMsgType[GetMerkleResponse]
assert(response.txid == BinaryData("c5efb5cbd35a44ba956b18100be0a91c9c33af4c7f31be20e33741d95f04e202"))
assert(response.block_height == 1210223L)
assert(response.block_height == 1210223)
assert(response.pos == 28)
assert(response.root == BinaryData("fb0234a21e96913682bc4108bcf72b67fb5d2dd680875b7e4671c03ccf523a20"))
assert(response.root == BinaryData("203a52cf3cc071467e5b8780d62d5dfb672bf7bc0841bc823691961ea23402fb"))
}
test("header subscription") {
val probe1 = TestProbe()
probe1.send(pool, HeaderSubscription(probe1.ref))
val HeaderSubscriptionResponse(header) = probe1.expectMsgType[HeaderSubscriptionResponse]
logger.info(s"received header for block ${header.block_hash}")
val HeaderSubscriptionResponse(_, header) = probe1.expectMsgType[HeaderSubscriptionResponse]
logger.info(s"received header for block ${header.blockId}")
}
test("scripthash subscription") {

View file

@ -58,24 +58,32 @@ class ElectrumClientSpec extends TestKit(ActorSystem("test")) with FunSuiteLike
test("get header") {
probe.send(client, GetHeader(10000))
val GetHeaderResponse(header) = probe.expectMsgType[GetHeaderResponse]
assert(header.block_hash == BinaryData("000000000058b74204bb9d59128e7975b683ac73910660b6531e59523fb4a102"))
val GetHeaderResponse(height, header) = probe.expectMsgType[GetHeaderResponse]
assert(header.blockId == BinaryData("000000000058b74204bb9d59128e7975b683ac73910660b6531e59523fb4a102"))
}
test("get headers") {
val start = (500000 / 2016) * 2016
probe.send(client, GetHeaders(start, 2016))
val GetHeadersResponse(start1, headers, _) = probe.expectMsgType[GetHeadersResponse]
assert(start1 == start)
assert(headers.size == 2016)
}
test("get merkle tree") {
probe.send(client, GetMerkle("c5efb5cbd35a44ba956b18100be0a91c9c33af4c7f31be20e33741d95f04e202", 1210223L))
probe.send(client, GetMerkle("c5efb5cbd35a44ba956b18100be0a91c9c33af4c7f31be20e33741d95f04e202", 1210223))
val response = probe.expectMsgType[GetMerkleResponse]
assert(response.txid == BinaryData("c5efb5cbd35a44ba956b18100be0a91c9c33af4c7f31be20e33741d95f04e202"))
assert(response.block_height == 1210223L)
assert(response.block_height == 1210223)
assert(response.pos == 28)
assert(response.root == BinaryData("fb0234a21e96913682bc4108bcf72b67fb5d2dd680875b7e4671c03ccf523a20"))
assert(response.root == BinaryData("203a52cf3cc071467e5b8780d62d5dfb672bf7bc0841bc823691961ea23402fb"))
}
test("header subscription") {
val probe1 = TestProbe()
probe1.send(client, HeaderSubscription(probe1.ref))
val HeaderSubscriptionResponse(header) = probe1.expectMsgType[HeaderSubscriptionResponse]
logger.info(s"received header for block ${header.block_hash}")
val HeaderSubscriptionResponse(_, header) = probe1.expectMsgType[HeaderSubscriptionResponse]
logger.info(s"received header for block ${header.blockId}")
}
test("scripthash subscription") {

View file

@ -16,9 +16,12 @@
package fr.acinq.eclair.blockchain.electrum
import java.sql.DriverManager
import fr.acinq.bitcoin.Crypto.PrivateKey
import fr.acinq.bitcoin.DeterministicWallet.{ExtendedPrivateKey, derivePrivateKey}
import fr.acinq.bitcoin._
import fr.acinq.eclair.blockchain.electrum.db.sqlite.SqliteWalletDb
import fr.acinq.eclair.transactions.Transactions
import grizzled.slf4j.Logging
import org.scalatest.FunSuite
@ -46,9 +49,9 @@ class ElectrumWalletBasicSpec extends FunSuite with Logging {
val firstAccountKeys = (0 until 10).map(i => derivePrivateKey(accountMaster, i)).toVector
val firstChangeKeys = (0 until 10).map(i => derivePrivateKey(changeMaster, i)).toVector
val params = ElectrumWallet.WalletParameters(Block.RegtestGenesisBlock.hash)
val params = ElectrumWallet.WalletParameters(Block.RegtestGenesisBlock.hash, new SqliteWalletDb(DriverManager.getConnection("jdbc:sqlite::memory:")))
val state = Data(params, ElectrumClient.Header.RegtestGenesisHeader, firstAccountKeys, firstChangeKeys)
val state = Data(params, Blockchain.fromCheckpoints(Block.RegtestGenesisBlock.hash, CheckPoint.load(Block.RegtestGenesisBlock.hash)), firstAccountKeys, firstChangeKeys)
.copy(status = (firstAccountKeys ++ firstChangeKeys).map(key => computeScriptHashFromPublicKey(key.publicKey) -> "").toMap)
def addFunds(data: Data, key: ExtendedPrivateKey, amount: Satoshi): Data = {

View file

@ -17,51 +17,81 @@
package fr.acinq.eclair.blockchain.electrum
import java.net.InetSocketAddress
import java.sql.DriverManager
import akka.actor.{Actor, ActorSystem, Props}
import akka.testkit.{TestFSMRef, TestKit, TestProbe}
import fr.acinq.bitcoin.{BinaryData, Block, MnemonicCode, Satoshi}
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.{ScriptHashSubscription, ScriptHashSubscriptionResponse}
import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props, Terminated}
import akka.testkit
import akka.testkit.{TestActor, TestFSMRef, TestKit, TestProbe}
import fr.acinq.bitcoin.{BinaryData, Block, BlockHeader, MnemonicCode, Satoshi, Script, Transaction, TxOut}
import fr.acinq.eclair.blockchain.electrum.ElectrumClient._
import fr.acinq.eclair.blockchain.electrum.ElectrumWallet._
import fr.acinq.eclair.blockchain.electrum.db.sqlite.SqliteWalletDb
import org.scalatest.FunSuiteLike
import scala.annotation.tailrec
import scala.concurrent.duration._
class ElectrumWalletSimulatedClientSpec extends TestKit(ActorSystem("test")) with FunSuiteLike {
val sender = TestProbe()
class SimulatedClient extends Actor {
def receive = {
case ScriptHashSubscription(scriptHash, replyTo) => replyTo ! ScriptHashSubscriptionResponse(scriptHash, "")
}
}
val entropy = BinaryData("01" * 32)
val mnemonics = MnemonicCode.toMnemonics(entropy)
val seed = MnemonicCode.toSeed(mnemonics, "")
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[WalletEvent])
val wallet = TestFSMRef(new ElectrumWallet(seed, system.actorOf(Props(new SimulatedClient())), WalletParameters(Block.RegtestGenesisBlock.hash, minimumFee = Satoshi(5000))))
val genesis = Block.RegtestGenesisBlock.header
// initial headers that we will sync when we connect to our mock server
val headers = makeHeaders(genesis, 2016 + 2000)
val client = TestProbe()
client.ignoreMsg {
case ElectrumClient.Ping => true
case _: AddStatusListener => true
case _: HeaderSubscription => true
}
client.setAutoPilot(new testkit.TestActor.AutoPilot {
override def run(sender: ActorRef, msg: Any): TestActor.AutoPilot = msg match {
case ScriptHashSubscription(scriptHash, replyTo) =>
replyTo ! ScriptHashSubscriptionResponse(scriptHash, "")
TestActor.KeepRunning
case GetHeaders(start, count, _) =>
sender ! GetHeadersResponse(start, headers.drop(start - 1).take(count), 2016)
TestActor.KeepRunning
case _ => TestActor.KeepRunning
}
})
val wallet = TestFSMRef(new ElectrumWallet(seed, client.ref, WalletParameters(Block.RegtestGenesisBlock.hash, new SqliteWalletDb(DriverManager.getConnection("jdbc:sqlite::memory:")), minimumFee = Satoshi(5000))))
// wallet sends a receive address notification as soon as it is created
listener.expectMsgType[NewWalletReceiveAddress]
def makeHeader(previousHeader: BlockHeader, timestamp: Long): BlockHeader = {
var template = previousHeader.copy(hashPreviousBlock = previousHeader.hash, time = timestamp, nonce = 0)
while (!BlockHeader.checkProofOfWork(template)) {
template = template.copy(nonce = template.nonce + 1)
}
template
}
val genesis = ElectrumClient.Header(1, 1, Block.RegtestGenesisBlock.hash, BinaryData("01" * 32), timestamp = 12346L, bits = 0, nonce = 0)
val header1 = makeHeader(genesis, 12345L)
val header2 = makeHeader(header1, 12346L)
val header3 = makeHeader(header2, 12347L)
val header4 = makeHeader(header3, 12348L)
def makeHeader(previousHeader: BlockHeader): BlockHeader = makeHeader(previousHeader, previousHeader.time + 1)
def makeHeader(previousHeader: ElectrumClient.Header, timestamp: Long): ElectrumClient.Header = ElectrumClient.Header(previousHeader.block_height + 1, 1, previousHeader.block_hash, BinaryData("01" * 32), timestamp = timestamp, bits = 0, nonce = 0)
def makeHeaders(previousHeader: BlockHeader, count: Int): Vector[BlockHeader] = {
@tailrec
def loop(acc: Vector[BlockHeader]): Vector[BlockHeader] = if (acc.length == count) acc else loop(acc :+ makeHeader(acc.last))
loop(Vector(makeHeader(previousHeader)))
}
test("wait until wallet is ready") {
sender.send(wallet, ElectrumClient.ElectrumReady(header1, InetSocketAddress.createUnresolved("0.0.0.0", 9735)))
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(header1))
awaitCond(wallet.stateName == ElectrumWallet.RUNNING)
assert(listener.expectMsgType[WalletReady].timestamp == header1.timestamp)
sender.send(wallet, ElectrumClient.ElectrumReady(2016, headers(2015), InetSocketAddress.createUnresolved("0.0.0.0", 9735)))
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(2016, headers(2015)))
val ready = listener.expectMsgType[WalletReady]
assert(ready.timestamp == headers.last.time)
listener.expectMsgType[NewWalletReceiveAddress]
listener.send(wallet, GetXpub)
val GetXpubResponse(xpub, path) = listener.expectMsgType[GetXpubResponse]
@ -70,8 +100,10 @@ class ElectrumWalletSimulatedClientSpec extends TestKit(ActorSystem("test")) wit
}
test("tell wallet is ready when a new block comes in, even if nothing else has changed") {
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(header2))
assert(listener.expectMsgType[WalletReady].timestamp == header2.timestamp)
val last = wallet.stateData.blockchain.tip
val header = makeHeader(last.header)
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(last.height + 1, header))
assert(listener.expectMsgType[WalletReady].timestamp == header.time)
val NewWalletReceiveAddress(address) = listener.expectMsgType[NewWalletReceiveAddress]
assert(address == "2NDjBqJugL3gCtjWTToDgaWWogq9nYuYw31")
}
@ -82,22 +114,119 @@ class ElectrumWalletSimulatedClientSpec extends TestKit(ActorSystem("test")) wit
awaitCond(wallet.stateName == ElectrumWallet.DISCONNECTED)
// reconnect wallet
sender.send(wallet, ElectrumClient.ElectrumReady(header3, InetSocketAddress.createUnresolved("0.0.0.0", 9735)))
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(header3))
val last = wallet.stateData.blockchain.tip
sender.send(wallet, ElectrumClient.ElectrumReady(2016, headers(2015), InetSocketAddress.createUnresolved("0.0.0.0", 9735)))
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(last.height, last.header))
awaitCond(wallet.stateName == ElectrumWallet.RUNNING)
// listener should be notified
assert(listener.expectMsgType[WalletReady].timestamp == header3.timestamp)
assert(listener.expectMsgType[WalletReady].timestamp == last.header.time)
listener.expectMsgType[NewWalletReceiveAddress]
}
test("don't send the same ready message more then once") {
// listener should be notified
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(header4))
assert(listener.expectMsgType[WalletReady].timestamp == header4.timestamp)
val last = wallet.stateData.blockchain.tip
val header = makeHeader(last.header)
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(last.height + 1, header))
assert(listener.expectMsgType[WalletReady].timestamp == header.time)
listener.expectMsgType[NewWalletReceiveAddress]
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(header4))
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(last.height + 1, header))
listener.expectNoMsg(500 milliseconds)
}
test("disconnect if server sends a bad header") {
val last = wallet.stateData.blockchain.bestchain.last
val bad = makeHeader(last.header, 42L).copy(bits = Long.MaxValue)
// here we simulate a bad client
val probe = TestProbe()
val watcher = TestProbe()
watcher.watch(probe.ref)
watcher.setAutoPilot(new TestActor.AutoPilot {
override def run(sender: ActorRef, msg: Any): TestActor.AutoPilot = msg match {
case Terminated(actor) if actor == probe.ref =>
// if the client dies, we tell the wallet that it's been disconnected
wallet ! ElectrumClient.ElectrumDisconnected
TestActor.KeepRunning
}
})
probe.send(wallet, ElectrumClient.HeaderSubscriptionResponse(last.height + 1, bad))
watcher.expectTerminated(probe.ref)
awaitCond(wallet.stateName == ElectrumWallet.DISCONNECTED)
sender.send(wallet, ElectrumClient.ElectrumReady(last.height, last.header, InetSocketAddress.createUnresolved("0.0.0.0", 9735)))
awaitCond(wallet.stateName == ElectrumWallet.WAITING_FOR_TIP)
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(last.height, last.header))
awaitCond(wallet.stateName == ElectrumWallet.RUNNING)
}
test("disconnect if server sends an invalid transaction") {
while (client.msgAvailable) {
client.receiveOne(100 milliseconds)
}
val key = wallet.stateData.accountKeys(0)
val scriptHash = computeScriptHashFromPublicKey(key.publicKey)
wallet ! ScriptHashSubscriptionResponse(scriptHash, "01" * 32)
client.expectMsg(GetScriptHashHistory(scriptHash))
val tx = Transaction(version = 2, txIn = Nil, txOut = TxOut(Satoshi(100000), ElectrumWallet.computePublicKeyScript(key.publicKey)) :: Nil, lockTime = 0)
wallet ! GetScriptHashHistoryResponse(scriptHash, TransactionHistoryItem(2, tx.txid) :: Nil)
// wallet will generate a new address and the corresponding subscription
client.expectMsgType[ScriptHashSubscription]
while (listener.msgAvailable) {
listener.receiveOne(100 milliseconds)
}
client.expectMsg(GetTransaction(tx.txid))
wallet ! GetTransactionResponse(tx)
val TransactionReceived(_, _, Satoshi(100000), _, _) = listener.expectMsgType[TransactionReceived]
// we think we have some unconfirmed funds
val WalletReady(Satoshi(100000), _, _, _) = listener.expectMsgType[WalletReady]
client.expectMsg(GetMerkle(tx.txid, 2))
val probe = TestProbe()
val watcher = TestProbe()
watcher.watch(probe.ref)
watcher.setAutoPilot(new TestActor.AutoPilot {
override def run(sender: ActorRef, msg: Any): TestActor.AutoPilot = msg match {
case Terminated(actor) if actor == probe.ref =>
wallet ! ElectrumClient.ElectrumDisconnected
TestActor.KeepRunning
}
})
probe.send(wallet, GetMerkleResponse(tx.txid, BinaryData("01" * 32) :: Nil, 2, 0))
watcher.expectTerminated(probe.ref)
awaitCond(wallet.stateName == ElectrumWallet.DISCONNECTED)
sender.send(wallet, ElectrumClient.ElectrumReady(wallet.stateData.blockchain.bestchain.last.height, wallet.stateData.blockchain.bestchain.last.header, InetSocketAddress.createUnresolved("0.0.0.0", 9735)))
awaitCond(wallet.stateName == ElectrumWallet.WAITING_FOR_TIP)
while (listener.msgAvailable) {
listener.receiveOne(100 milliseconds)
}
sender.send(wallet, ElectrumClient.HeaderSubscriptionResponse(wallet.stateData.blockchain.bestchain.last.height, wallet.stateData.blockchain.bestchain.last.header))
awaitCond(wallet.stateName == ElectrumWallet.RUNNING)
val ready = listener.expectMsgType[WalletReady]
assert(ready.unconfirmedBalance == Satoshi(0))
}
test("disconnect if server sent a block with an invalid difficulty target") {
val last = wallet.stateData.blockchain.bestchain.last
val chunk = makeHeaders(last.header, 2015 - (last.height % 2016))
for (i <- 0 until chunk.length) {
wallet ! HeaderSubscriptionResponse(last.height + i + 1, chunk(i))
}
awaitCond(wallet.stateData.blockchain.tip.header == chunk.last)
val bad = {
var template = makeHeader(chunk.last)
template
}
wallet ! HeaderSubscriptionResponse(wallet.stateData.blockchain.tip.height + 1, bad)
}
}

View file

@ -18,6 +18,7 @@ package fr.acinq.eclair.blockchain.electrum
import java.net.InetSocketAddress
import java.sql.DriverManager
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.testkit.{TestKit, TestProbe}
@ -27,6 +28,7 @@ import fr.acinq.eclair.blockchain.bitcoind.BitcoinCoreWallet.{FundTransactionRes
import fr.acinq.eclair.blockchain.bitcoind.{BitcoinCoreWallet, BitcoindService}
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.{BroadcastTransaction, BroadcastTransactionResponse, SSL}
import fr.acinq.eclair.blockchain.electrum.ElectrumClientPool.ElectrumServerAddress
import fr.acinq.eclair.blockchain.electrum.db.sqlite.SqliteWalletDb
import grizzled.slf4j.Logging
import org.json4s.JsonAST.{JDecimal, JString, JValue}
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
@ -86,7 +88,7 @@ class ElectrumWalletSpec extends TestKit(ActorSystem("test")) with FunSuiteLike
test("wait until wallet is ready") {
electrumClient = system.actorOf(Props(new ElectrumClientPool(Set(ElectrumServerAddress(new InetSocketAddress("localhost", 50001), SSL.OFF)))))
wallet = system.actorOf(Props(new ElectrumWallet(seed, electrumClient, WalletParameters(Block.RegtestGenesisBlock.hash, minimumFee = Satoshi(5000)))), "wallet")
wallet = system.actorOf(Props(new ElectrumWallet(seed, electrumClient, WalletParameters(Block.RegtestGenesisBlock.hash, new SqliteWalletDb(DriverManager.getConnection("jdbc:sqlite::memory:")), minimumFee = Satoshi(5000)))), "wallet")
val probe = TestProbe()
awaitCond({
probe.send(wallet, GetData)

View file

@ -19,7 +19,7 @@ package fr.acinq.eclair.blockchain.electrum
import com.spotify.docker.client.{DefaultDockerClient, DockerClient}
import com.whisk.docker.impl.spotify.SpotifyDockerFactory
import com.whisk.docker.scalatest.DockerTestKit
import com.whisk.docker.{DockerContainer, DockerFactory}
import com.whisk.docker.{DockerContainer, DockerFactory, LogLineReceiver}
import org.scalatest.Suite
trait ElectrumxService extends DockerTestKit {
@ -27,14 +27,15 @@ trait ElectrumxService extends DockerTestKit {
val electrumxContainer = if (System.getProperty("os.name").startsWith("Linux")) {
// "host" mode will let the container access the host network on linux
DockerContainer("lukechilds/electrumx")
// we use our own docker image because other images on Docker lag behind and don't yet support 1.4
DockerContainer("acinq/electrumx")
.withNetworkMode("host")
.withEnv("DAEMON_URL=http://foo:bar@localhost:28332", "COIN=BitcoinSegwit", "NET=regtest")
//.withLogLineReceiver(LogLineReceiver(true, println))
} else {
// on windows or oxs, host mode is not available, but from docker 18.03 on host.docker.internal can be used instead
// host.docker.internal is not (yet ?) available on linux though
DockerContainer("lukechilds/electrumx")
DockerContainer("acinq/electrumx")
.withPorts(50001 -> Some(50001))
.withEnv("DAEMON_URL=http://foo:bar@host.docker.internal:28332", "COIN=BitcoinSegwit", "NET=regtest", "TCP_PORT=50001")
//.withLogLineReceiver(LogLineReceiver(true, println))

View file

@ -0,0 +1,54 @@
package fr.acinq.eclair.blockchain.electrum.db.sqlite
import java.sql.DriverManager
import java.util.Random
import fr.acinq.bitcoin.{BinaryData, Block, BlockHeader, Transaction}
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.GetMerkleResponse
import org.scalatest.FunSuite
class SqliteWalletDbSpec extends FunSuite {
val random = new Random()
def inmem = DriverManager.getConnection("jdbc:sqlite::memory:")
def makeChildHeader(header: BlockHeader): BlockHeader = header.copy(hashPreviousBlock = header.hash, nonce = random.nextLong() & 0xffffffffL)
def makeHeaders(n: Int, acc: Seq[BlockHeader] = Seq(Block.RegtestGenesisBlock.header)): Seq[BlockHeader] = {
if (acc.size == n) acc else makeHeaders(n, acc :+ makeChildHeader(acc.last))
}
test("add/get/list headers") {
val db = new SqliteWalletDb(inmem)
val headers = makeHeaders(100)
db.addHeaders(2016, headers)
val headers1 = db.getHeaders(2016, None)
assert(headers1 === headers)
val headers2 = db.getHeaders(2016, Some(50))
assert(headers2 === headers.take(50))
var height = 2016
headers.foreach(header => {
val Some((height1, header1)) = db.getHeader(header.hash)
assert(height1 == height)
assert(header1 == header)
val Some(header2) = db.getHeader(height1)
assert(header2 == header)
height = height + 1
})
}
test("add/get/list transactions") {
val db = new SqliteWalletDb(inmem)
val tx = Transaction.read("0100000001b021a77dcaad3a2da6f1611d2403e1298a902af8567c25d6e65073f6b52ef12d000000006a473044022056156e9f0ad7506621bc1eb963f5133d06d7259e27b13fcb2803f39c7787a81c022056325330585e4be39bcf63af8090a2deff265bc29a3fb9b4bf7a31426d9798150121022dfb538041f111bb16402aa83bd6a3771fa8aa0e5e9b0b549674857fafaf4fe0ffffffff0210270000000000001976a91415c23e7f4f919e9ff554ec585cb2a67df952397488ac3c9d1000000000001976a9148982824e057ccc8d4591982df71aa9220236a63888ac00000000")
val proof = GetMerkleResponse(tx.hash, List(BinaryData("01" * 32), BinaryData("02" * 32)), 100000, 15)
db.addTransaction(tx, proof)
val Some((tx1, proof1)) = db.getTransaction(tx.hash)
assert(tx1 == tx)
assert(proof1 == proof)
}
}

View file

@ -0,0 +1,76 @@
/*
* Copyright 2018 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fr.acinq.eclair.io
import akka.actor.{ActorSystem, Props}
import akka.testkit.{TestKit, TestProbe}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.ChannelStateSpec
import fr.acinq.eclair.randomBytes
import fr.acinq.eclair.wire.{TemporaryNodeFailure, UpdateAddHtlc}
import org.scalatest.FunSuiteLike
import scala.concurrent.duration._
/**
* Created by PM on 27/01/2017.
*/
class HtlcReaperSpec extends TestKit(ActorSystem("test")) with FunSuiteLike {
test("init and cleanup") {
val data = ChannelStateSpec.normal
// assuming that data has incoming htlcs 0 and 1, we don't care about the amount/payment_hash/onion fields
val add0 = UpdateAddHtlc(data.channelId, 0, 20000, randomBytes(32), 100, "")
val add1 = UpdateAddHtlc(data.channelId, 1, 30000, randomBytes(32), 100, "")
// unrelated htlc
val add99 = UpdateAddHtlc(randomBytes(32), 0, 12345678, randomBytes(32), 100, "")
val brokenHtlcs = Seq(add0, add1, add99)
val brokenHtlcKiller = system.actorOf(Props[HtlcReaper], name = "htlc-reaper")
brokenHtlcKiller ! brokenHtlcs
val sender = TestProbe()
val channel = TestProbe()
// channel goes to NORMAL state
sender.send(brokenHtlcKiller, ChannelStateChanged(channel.ref, system.deadLetters, data.commitments.remoteParams.nodeId, OFFLINE, NORMAL, data))
channel.expectMsg(CMD_FAIL_HTLC(add0.id, Right(TemporaryNodeFailure), commit = true))
channel.expectMsg(CMD_FAIL_HTLC(add1.id, Right(TemporaryNodeFailure), commit = true))
channel.expectNoMsg(100 millis)
// lets'assume that channel was disconnected before having signed the fails, and gets connected again:
sender.send(brokenHtlcKiller, ChannelStateChanged(channel.ref, system.deadLetters, data.commitments.remoteParams.nodeId, OFFLINE, NORMAL, data))
channel.expectMsg(CMD_FAIL_HTLC(add0.id, Right(TemporaryNodeFailure), commit = true))
channel.expectMsg(CMD_FAIL_HTLC(add1.id, Right(TemporaryNodeFailure), commit = true))
channel.expectNoMsg(100 millis)
// let's now assume that the channel get's reconnected, and it had the time to fail the htlcs
val data1 = data.copy(commitments = data.commitments.copy(localCommit = data.commitments.localCommit.copy(spec = data.commitments.localCommit.spec.copy(htlcs = Set.empty))))
sender.send(brokenHtlcKiller, ChannelStateChanged(channel.ref, system.deadLetters, data.commitments.remoteParams.nodeId, OFFLINE, NORMAL, data1))
channel.expectNoMsg(100 millis)
// reaper has cleaned up htlc, so next time it won't fail them anymore, even if we artificially submit the former state
sender.send(brokenHtlcKiller, ChannelStateChanged(channel.ref, system.deadLetters, data.commitments.remoteParams.nodeId, OFFLINE, NORMAL, data))
channel.expectNoMsg(100 millis)
}
}

View file

@ -66,7 +66,7 @@
<scala.version.short>2.11</scala.version.short>
<akka.version>2.3.14</akka.version>
<sttp.version>1.3.9</sttp.version>
<bitcoinlib.version>0.9.17</bitcoinlib.version>
<bitcoinlib.version>0.9.18</bitcoinlib.version>
<guava.version>24.0-android</guava.version>
</properties>