btcd/rpcserver.go
Dave Collins d44d253dff Make quit channel with RPC server.
The latest websockets code added a quit channel to the RPC server, but did
not initialize it.  This commit corrects that so shutdown works properly
again.
2013-10-20 11:06:36 -05:00

1356 lines
33 KiB
Go

// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"bytes"
"code.google.com/p/go.crypto/ripemd160"
"code.google.com/p/go.net/websocket"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"github.com/conformal/btcchain"
"github.com/conformal/btcdb"
"github.com/conformal/btcjson"
"github.com/conformal/btcscript"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
"math/big"
"net"
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
)
// Errors
var (
// ErrBadParamsField describes an error where the parameters JSON
// field cannot be properly parsed.
ErrBadParamsField = errors.New("bad params field")
// ErrMethodNotImplemented describes an error where the RPC or
// websocket JSON method is not implemented.
ErrMethodNotImplemented = errors.New("method not implemented")
)
// rpcServer holds the items the rpc server may need to access (config,
// shutdown, main server, etc.)
type rpcServer struct {
started int32
shutdown int32
server *server
ws wsContext
wg sync.WaitGroup
rpcport string
username string
password string
listeners []net.Listener
quit chan int
}
// wsContext holds the items the RPC server needs to handle websocket
// connections for wallets.
type wsContext struct {
// requests holds all wallet notification requests.
requests wsRequests
// Channel to add a wallet listener.
addWalletListener chan (chan []byte)
// Channel to removes a wallet listener.
removeWalletListener chan (chan []byte)
// Any chain notifications meant to be received by every connected
// wallet are sent across this channel.
walletNotificationMaster chan []byte
}
// wsRequests maps request contexts for wallet notifications to a
// wallet notification channel. A Mutex is used to protect incorrect
// concurrent access to the map.
type wsRequests struct {
sync.Mutex
m map[chan []byte]*requestContexts
}
// getOrCreateContexts gets the request contexts, or creates and adds a
// new context if one for this wallet is not already present.
func (r *wsRequests) getOrCreateContexts(walletNotification chan []byte) *requestContexts {
rc, ok := r.m[walletNotification]
if !ok {
rc = &requestContexts{
txRequests: make(map[addressHash]interface{}),
spentRequests: make(map[btcwire.OutPoint]interface{}),
}
r.m[walletNotification] = rc
}
return rc
}
// AddTxRequest adds the request context for new transaction notifications.
func (r *wsRequests) AddTxRequest(walletNotification chan []byte, addr addressHash, id interface{}) {
r.Lock()
defer r.Unlock()
rc := r.getOrCreateContexts(walletNotification)
rc.txRequests[addr] = id
}
// AddSpentRequest adds a request context for notifications of a spent
// Outpoint.
func (r *wsRequests) AddSpentRequest(walletNotification chan []byte, op *btcwire.OutPoint, id interface{}) {
r.Lock()
defer r.Unlock()
rc := r.getOrCreateContexts(walletNotification)
rc.spentRequests[*op] = id
}
// RemoveSpentRequest removes a request context for notifications of a
// spent Outpoint.
func (r *wsRequests) RemoveSpentRequest(walletNotification chan []byte, op *btcwire.OutPoint) {
r.Lock()
defer r.Unlock()
rc := r.getOrCreateContexts(walletNotification)
delete(rc.spentRequests, *op)
}
// CloseListeners removes all request contexts for notifications sent
// to a wallet notification channel and closes the channel to stop all
// goroutines currently serving that wallet.
func (r *wsRequests) CloseListeners(walletNotification chan []byte) {
r.Lock()
defer r.Unlock()
delete(r.m, walletNotification)
close(walletNotification)
}
type addressHash [ripemd160.Size]byte
// requestContexts holds all requests for a single wallet connection.
type requestContexts struct {
// txRequests maps between a 160-byte pubkey hash and the JSON
// id of the requester so replies can be correctly routed back
// to the correct btcwallet callback.
txRequests map[addressHash]interface{}
// spentRequests maps between an Outpoint of an unspent
// transaction output and the JSON id of the requester so
// replies can be correctly routed back to the correct
// btcwallet callback.
spentRequests map[btcwire.OutPoint]interface{}
}
// Start is used by server.go to start the rpc listener.
func (s *rpcServer) Start() {
if atomic.AddInt32(&s.started, 1) != 1 {
return
}
log.Trace("RPCS: Starting RPC server")
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
login := s.username + ":" + s.password
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
authhdr := r.Header["Authorization"]
if len(authhdr) > 0 && authhdr[0] == auth {
jsonRPCRead(w, r, s)
} else {
log.Warnf("RPCS: Auth failure.")
jsonAuthFail(w, r, s)
}
})
go s.walletListenerDuplicator()
http.Handle("/wallet", websocket.Handler(func(ws *websocket.Conn) {
s.walletReqsNotifications(ws)
}))
httpServer := &http.Server{}
for _, listener := range s.listeners {
s.wg.Add(1)
go func(listener net.Listener) {
log.Infof("RPCS: RPC server listening on %s", listener.Addr())
httpServer.Serve(listener)
log.Tracef("RPCS: RPC listener done for %s", listener.Addr())
s.wg.Done()
}(listener)
}
}
// Stop is used by server.go to stop the rpc listener.
func (s *rpcServer) Stop() error {
if atomic.AddInt32(&s.shutdown, 1) != 1 {
log.Infof("RPCS: RPC server is already in the process of shutting down")
return nil
}
log.Warnf("RPCS: RPC server shutting down")
for _, listener := range s.listeners {
err := listener.Close()
if err != nil {
log.Errorf("RPCS: Problem shutting down rpc: %v", err)
return err
}
}
log.Infof("RPCS: RPC server shutdown complete")
s.wg.Wait()
close(s.quit)
return nil
}
// newRPCServer returns a new instance of the rpcServer struct.
func newRPCServer(s *server) (*rpcServer, error) {
rpc := rpcServer{
server: s,
quit: make(chan int),
}
// Get values from config
rpc.rpcport = cfg.RPCPort
rpc.username = cfg.RPCUser
rpc.password = cfg.RPCPass
// initialize memory for websocket connections
rpc.ws.requests.m = make(map[chan []byte]*requestContexts)
rpc.ws.addWalletListener = make(chan (chan []byte))
rpc.ws.removeWalletListener = make(chan (chan []byte))
rpc.ws.walletNotificationMaster = make(chan []byte)
// IPv4 listener.
var listeners []net.Listener
listenAddr4 := net.JoinHostPort("127.0.0.1", rpc.rpcport)
listener4, err := net.Listen("tcp4", listenAddr4)
if err != nil {
log.Errorf("RPCS: Couldn't create listener: %v", err)
return nil, err
}
listeners = append(listeners, listener4)
// IPv6 listener.
listenAddr6 := net.JoinHostPort("::1", rpc.rpcport)
listener6, err := net.Listen("tcp6", listenAddr6)
if err != nil {
log.Errorf("RPCS: Couldn't create listener: %v", err)
return nil, err
}
listeners = append(listeners, listener6)
rpc.listeners = listeners
return &rpc, err
}
// jsonAuthFail sends a message back to the client if the http auth is rejected.
func jsonAuthFail(w http.ResponseWriter, r *http.Request, s *rpcServer) {
fmt.Fprint(w, "401 Unauthorized.\n")
}
// jsonRPCRead is the RPC wrapper around the jsonRead function to handles
// reading and responding to RPC messages.
func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) {
r.Close = true
if atomic.LoadInt32(&s.shutdown) != 0 {
return
}
body, err := btcjson.GetRaw(r.Body)
if err != nil {
log.Errorf("RPCS: Error getting json message: %v", err)
return
}
// Error is intentionally ignored here. It's used in in the
// websocket handler to tell when a method is not supported by
// the standard RPC API, and is not needed here. Error logging
// is done inside jsonRead, so no need to log the error here.
reply, _ := jsonRead(body, s)
log.Tracef("[RPCS] reply: %v", reply)
msg, err := btcjson.MarshallAndSend(reply, w)
if err != nil {
log.Errorf(msg)
return
}
log.Debugf(msg)
}
// jsonRead abstracts the JSON unmarshalling and reply handling used
// by both RPC and websockets.
func jsonRead(body []byte, s *rpcServer) (reply btcjson.Reply, err error) {
var message btcjson.Message
if err := json.Unmarshal(body, &message); err != nil {
jsonError := btcjson.Error{
Code: -32700,
Message: "Parse error",
}
reply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: nil,
}
log.Tracef("RPCS: reply: %v", reply)
return reply, jsonError
}
log.Tracef("RPCS: received: %v", message)
// Set final reply based on error if non-nil.
defer func() {
if err != nil {
if jsonErr, ok := err.(btcjson.Error); ok {
reply = btcjson.Reply{
Error: &jsonErr,
Id: &message.Id,
}
err = errors.New(jsonErr.Message)
} else {
rawJSONError := btcjson.Error{
Code: -32603,
Message: err.Error(),
}
reply = btcjson.Reply{
Error: &rawJSONError,
Id: &message.Id,
}
}
}
}()
// Deal with commands
switch message.Method {
case "stop":
reply = btcjson.Reply{
Result: "btcd stopping.",
Id: &message.Id,
}
s.server.Stop()
case "getblockcount":
var maxidx int64
_, maxidx, err = s.server.db.NewestSha()
if err != nil {
log.Errorf("RPCS: Error getting newest sha: %v", err)
err = btcjson.Error{
Code: -5,
Message: "Error getting block count",
}
return
}
reply = btcjson.Reply{
Result: maxidx,
Id: &message.Id,
}
case "getbestblockhash":
var sha *btcwire.ShaHash
sha, _, err = s.server.db.NewestSha()
if err != nil {
log.Errorf("RPCS: Error getting newest sha: %v", err)
err = btcjson.Error{
Code: -5,
Message: "Error getting best block hash",
}
return
}
reply = btcjson.Reply{
Result: sha.String(),
Id: &message.Id,
}
case "getdifficulty":
var sha *btcwire.ShaHash
sha, _, err = s.server.db.NewestSha()
if err != nil {
log.Errorf("RPCS: Error getting sha: %v", err)
err = btcjson.Error{
Code: -5,
Message: "Error Getting difficulty",
}
return
}
var blk *btcutil.Block
blk, err = s.server.db.FetchBlockBySha(sha)
if err != nil {
log.Errorf("RPCS: Error getting block: %v", err)
err = btcjson.Error{
Code: -5,
Message: "Error Getting difficulty",
}
return
}
blockHeader := &blk.MsgBlock().Header
reply = btcjson.Reply{
Result: getDifficultyRatio(blockHeader.Bits),
Id: &message.Id,
}
// btcd does not do mining so we can hardcode replies here.
case "getgenerate":
reply = btcjson.Reply{
Result: false,
Id: &message.Id,
}
case "setgenerate":
reply = btcjson.Reply{
Result: nil,
Id: &message.Id,
}
case "gethashespersec":
reply = btcjson.Reply{
Result: 0,
Id: &message.Id,
}
case "getblockhash":
var f interface{}
err = json.Unmarshal(body, &f)
m := f.(map[string]interface{})
var idx float64
for _, v := range m {
switch vv := v.(type) {
case []interface{}:
for _, u := range vv {
idx, _ = u.(float64)
}
default:
}
}
var sha *btcwire.ShaHash
sha, err = s.server.db.FetchBlockShaByHeight(int64(idx))
if err != nil {
log.Errorf("[RCPS] Error getting block: %v", err)
err = btcjson.Error{
Code: -1,
Message: "Block number out of range.",
}
return
}
reply = btcjson.Reply{
Result: sha.String(),
Id: &message.Id,
}
case "getblock":
var f interface{}
err = json.Unmarshal(body, &f)
m := f.(map[string]interface{})
var hash string
for _, v := range m {
switch vv := v.(type) {
case []interface{}:
for _, u := range vv {
hash, _ = u.(string)
}
default:
}
}
var sha *btcwire.ShaHash
sha, err = btcwire.NewShaHashFromStr(hash)
if err != nil {
log.Errorf("RPCS: Error generating sha: %v", err)
err = btcjson.Error{
Code: -5,
Message: "Block not found",
}
return
}
var blk *btcutil.Block
blk, err = s.server.db.FetchBlockBySha(sha)
if err != nil {
log.Errorf("RPCS: Error fetching sha: %v", err)
err = btcjson.Error{
Code: -5,
Message: "Block not found",
}
return
}
idx := blk.Height()
var buf []byte
buf, err = blk.Bytes()
if err != nil {
log.Errorf("RPCS: Error fetching block: %v", err)
err = btcjson.Error{
Code: -5,
Message: "Block not found",
}
return
}
txList, _ := blk.TxShas()
txNames := make([]string, len(txList))
for i, v := range txList {
txNames[i] = v.String()
}
var maxidx int64
_, maxidx, err = s.server.db.NewestSha()
if err != nil {
log.Errorf("RPCS: Cannot get newest sha: %v", err)
err = btcjson.Error{
Code: -5,
Message: "Block not found",
}
return
}
blockHeader := &blk.MsgBlock().Header
blockReply := btcjson.BlockResult{
Hash: hash,
Version: blockHeader.Version,
MerkleRoot: blockHeader.MerkleRoot.String(),
PreviousHash: blockHeader.PrevBlock.String(),
Nonce: blockHeader.Nonce,
Time: blockHeader.Timestamp.Unix(),
Confirmations: uint64(1 + maxidx - idx),
Height: idx,
Tx: txNames,
Size: len(buf),
Bits: strconv.FormatInt(int64(blockHeader.Bits), 16),
Difficulty: getDifficultyRatio(blockHeader.Bits),
}
// Get next block unless we are already at the top.
if idx < maxidx {
var shaNext *btcwire.ShaHash
shaNext, err = s.server.db.FetchBlockShaByHeight(int64(idx + 1))
if err != nil {
log.Errorf("RPCS: No next block: %v", err)
err = btcjson.Error{
Code: -5,
Message: "Block not found",
}
return
}
blockReply.NextHash = shaNext.String()
}
reply = btcjson.Reply{
Result: blockReply,
Error: nil,
Id: &message.Id,
}
case "getrawmempool":
hashes := s.server.txMemPool.TxShas()
hashStrings := make([]string, len(hashes))
for i := 0; i < len(hashes); i++ {
hashStrings[i] = hashes[i].String()
}
reply = btcjson.Reply{
Result: hashStrings,
Id: &message.Id,
}
case "getrawtransaction":
// TODO: Perform smarter paramter parsing.
var f interface{}
err = json.Unmarshal(body, &f)
m := f.(map[string]interface{})
var tx string
var verbose float64
for _, v := range m {
switch vv := v.(type) {
case []interface{}:
for _, u := range vv {
switch uu := u.(type) {
case string:
tx = uu
case float64:
verbose = uu
default:
}
}
default:
}
}
if int(verbose) != 0 {
// TODO: check error code. tx is not checked before
// this point.
txSha, _ := btcwire.NewShaHashFromStr(tx)
var txS *btcwire.MsgTx
var txList []*btcdb.TxListReply
txList, err = s.server.db.FetchTxBySha(txSha)
if err != nil {
log.Errorf("RPCS: Error fetching tx: %v", err)
err = btcjson.Error{
Code: -5,
Message: "No information available about transaction",
}
return
}
lastTx := len(txList) - 1
txS = txList[lastTx].Tx
blksha := txList[lastTx].BlkSha
var blk *btcutil.Block
blk, err = s.server.db.FetchBlockBySha(blksha)
if err != nil {
log.Errorf("RPCS: Error fetching sha: %v", err)
err = btcjson.Error{
Code: -5,
Message: "Block not found",
}
return
}
idx := blk.Height()
txOutList := txS.TxOut
voutList := make([]btcjson.Vout, len(txOutList))
txInList := txS.TxIn
vinList := make([]btcjson.Vin, len(txInList))
for i, v := range txInList {
vinList[i].Sequence = float64(v.Sequence)
disbuf, _ := btcscript.DisasmString(v.SignatureScript)
vinList[i].ScriptSig.Asm = strings.Replace(disbuf, " ", "", -1)
vinList[i].Vout = i + 1
log.Debugf(disbuf)
}
for i, v := range txOutList {
voutList[i].N = i
voutList[i].Value = float64(v.Value) / 100000000
isbuf, _ := btcscript.DisasmString(v.PkScript)
voutList[i].ScriptPubKey.Asm = isbuf
voutList[i].ScriptPubKey.ReqSig = strings.Count(isbuf, "OP_CHECKSIG")
_, addrhash, err := btcscript.ScriptToAddrHash(v.PkScript)
if err != nil {
// TODO: set and return error?
log.Errorf("RPCS: Error getting address hash for %v: %v", txSha, err)
}
if addr, err := btcutil.EncodeAddress(addrhash, s.server.btcnet); err != nil {
// TODO: set and return error?
addrList := make([]string, 1)
addrList[0] = addr
voutList[i].ScriptPubKey.Addresses = addrList
}
}
var maxidx int64
_, maxidx, err = s.server.db.NewestSha()
if err != nil {
log.Errorf("RPCS: Cannot get newest sha: %v", err)
err = btcjson.Error{
Code: -5,
Message: "No information about newest block",
}
return
}
confirmations := uint64(1 + maxidx - idx)
blockHeader := &blk.MsgBlock().Header
txReply := btcjson.TxRawResult{
Txid: tx,
Vout: voutList,
Vin: vinList,
Version: txS.Version,
LockTime: txS.LockTime,
// This is not a typo, they are identical in
// bitcoind as well.
Time: blockHeader.Timestamp.Unix(),
Blocktime: blockHeader.Timestamp.Unix(),
BlockHash: blksha.String(),
Confirmations: confirmations,
}
reply = btcjson.Reply{
Result: txReply,
Error: nil,
Id: &message.Id,
}
} else {
// Don't return details
// not used yet
}
case "decoderawtransaction":
// TODO: Perform smarter paramter parsing.
var f interface{}
err = json.Unmarshal(body, &f)
m := f.(map[string]interface{})
var hash string
for _, v := range m {
switch vv := v.(type) {
case []interface{}:
for _, u := range vv {
hash, _ = u.(string)
}
default:
}
}
// TODO: use hash and fill result with info.
_ = hash
txReply := btcjson.TxRawDecodeResult{}
reply = btcjson.Reply{
Result: txReply,
Error: nil,
Id: &message.Id,
}
case "sendrawtransaction":
params, ok := message.Params.([]interface{})
if !ok || len(params) != 1 {
err = btcjson.Error{
Code: -32602,
Message: "Invalid parameters",
}
return
}
serializedtxhex, ok := params[0].(string)
if !ok {
err = btcjson.Error{
Code: -32602,
Message: "Raw tx is not a string",
}
return
}
// Deserialize and send off to tx relay
var serializedTx []byte
serializedTx, err = hex.DecodeString(serializedtxhex)
if err != nil {
err = btcjson.Error{
Code: -22,
Message: "Unable to decode hex string",
}
return
}
msgtx := btcwire.NewMsgTx()
err = msgtx.Deserialize(bytes.NewBuffer(serializedTx))
if err != nil {
err = btcjson.Error{
Code: -22,
Message: "Unable to deserialize raw tx",
}
return
}
err = s.server.txMemPool.ProcessTransaction(msgtx)
if err != nil {
log.Errorf("RPCS: Failed to process transaction: %v", err)
err = btcjson.Error{
Code: -22,
Message: "Failed to process transaction",
}
return
}
var result interface{}
txsha, err := msgtx.TxSha()
if err == nil {
result = txsha.String()
}
reply = btcjson.Reply{
Result: result,
Error: nil,
Id: &message.Id,
}
default:
jsonError := btcjson.Error{
Code: -32601,
Message: "Method not found",
}
reply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
err = ErrMethodNotImplemented
}
return
}
func jsonWSRead(walletNotification chan []byte, replychan chan *btcjson.Reply, body []byte, s *rpcServer) error {
var message btcjson.Message
err := json.Unmarshal(body, &message)
if err != nil {
jsonError := btcjson.Error{
Code: -32700,
Message: "Parse error",
}
reply := btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: nil,
}
log.Tracef("RPCS: reply: %v", reply)
replychan <- &reply
return fmt.Errorf("RPCS: Error unmarshalling json message: %v", err)
}
log.Tracef("RPCS: received: %v", message)
var rawReply btcjson.Reply
defer func() {
replychan <- &rawReply
close(replychan)
}()
// Deal with commands
switch message.Method {
case "getcurrentnet":
var net btcwire.BitcoinNet
if cfg.TestNet3 {
net = btcwire.TestNet3
} else {
net = btcwire.MainNet
}
rawReply = btcjson.Reply{
Result: float64(net),
Id: &message.Id,
}
case "rescan":
var addr string
minblock, maxblock := int64(0), btcdb.AllShas
params, ok := message.Params.([]interface{})
if !ok {
return ErrBadParamsField
}
for i, v := range params {
switch v.(type) {
case string:
if i == 0 {
addr = v.(string)
}
case float64:
if i == 1 {
minblock = int64(v.(float64))
} else if i == 2 {
maxblock = int64(v.(float64))
}
}
}
addrhash, _, err := btcutil.DecodeAddress(addr)
if err != nil {
return err
}
// FetchHeightRange may not return a complete list of block shas for
// the given range, so fetch range as many times as necessary.
for {
blkshalist, err := s.server.db.FetchHeightRange(minblock, maxblock)
if err != nil {
return err
}
if len(blkshalist) == 0 {
break
}
for i := range blkshalist {
blk, err := s.server.db.FetchBlockBySha(&blkshalist[i])
if err != nil {
return err
}
txShaList, err := blk.TxShas()
if err != nil {
return err
}
txList := s.server.db.FetchTxByShaList(txShaList)
for j := range txList {
for _, txout := range txList[j].Tx.TxOut {
_, txaddrhash, err := btcscript.ScriptToAddrHash(txout.PkScript)
if err != nil {
return err
}
if !bytes.Equal(addrhash, txaddrhash) {
reply := btcjson.Reply{
Result: txList[j].Sha,
Error: nil,
Id: &message.Id,
}
replychan <- &reply
}
}
}
}
if maxblock-minblock > int64(len(blkshalist)) {
minblock += int64(len(blkshalist))
} else {
break
}
}
rawReply = btcjson.Reply{
Result: nil,
Error: nil,
Id: &message.Id,
}
case "notifynewtxs":
params, ok := message.Params.([]interface{})
if !ok || len(params) != 1 {
jsonError := btcjson.Error{
Code: -32602,
Message: "Invalid parameters",
}
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
return ErrBadParamsField
}
addr, ok := params[0].(string)
if !ok {
jsonError := btcjson.Error{
Code: -32602,
Message: "Invalid parameters",
}
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
return ErrBadParamsField
}
addrhash, _, err := btcutil.DecodeAddress(addr)
if err != nil {
jsonError := btcjson.Error{
Code: -32602,
Message: "Cannot decode address",
}
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
return ErrBadParamsField
}
var hash addressHash
copy(hash[:], addrhash)
s.ws.requests.AddTxRequest(walletNotification, hash, message.Id)
rawReply = btcjson.Reply{
Result: nil,
Error: nil,
Id: &message.Id,
}
case "notifyspent":
params, ok := message.Params.([]interface{})
if !ok || len(params) != 2 {
jsonError := btcjson.Error{
Code: -32602,
Message: "Invalid parameters",
}
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
return ErrBadParamsField
}
hashBE, ok1 := params[0].(string)
index, ok2 := params[1].(float64)
if !ok1 || !ok2 {
jsonError := btcjson.Error{
Code: -32602,
Message: "Invalid parameters",
}
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
return ErrBadParamsField
}
hash, err := btcwire.NewShaHashFromStr(hashBE)
if err != nil {
jsonError := btcjson.Error{
Code: -32602,
Message: "Hash string cannot be parsed.",
}
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
return ErrBadParamsField
}
op := btcwire.NewOutPoint(hash, uint32(index))
s.ws.requests.AddSpentRequest(walletNotification, op, message.Id)
rawReply = btcjson.Reply{
Result: nil,
Error: nil,
Id: &message.Id,
}
default:
jsonError := btcjson.Error{
Code: -32601,
Message: "Method not found",
}
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
}
return ErrMethodNotImplemented
}
// getDifficultyRatio returns the proof-of-work difficulty as a multiple of the
// minimum difficulty using the passed bits field from the header of a block.
func getDifficultyRatio(bits uint32) float64 {
// The minimum difficulty is the max possible proof-of-work limit bits
// converted back to a number. Note this is not the same as the the
// proof of work limit directly because the block difficulty is encoded
// in a block with the compact form which loses precision.
max := btcchain.CompactToBig(activeNetParams.powLimitBits)
target := btcchain.CompactToBig(bits)
difficulty := new(big.Rat).SetFrac(max, target)
outString := difficulty.FloatString(2)
diff, err := strconv.ParseFloat(outString, 64)
if err != nil {
log.Errorf("RPCS: Cannot get difficulty: %v", err)
return 0
}
return diff
}
// AddWalletListener adds a channel to listen for new messages from a
// wallet.
func (s *rpcServer) AddWalletListener(c chan []byte) {
s.ws.addWalletListener <- c
}
// RemoveWalletListener removes a wallet listener channel.
func (s *rpcServer) RemoveWalletListener(c chan []byte) {
s.ws.removeWalletListener <- c
}
// walletListenerDuplicator listens for new wallet listener channels
// and duplicates messages sent to walletNotificationMaster to all
// connected listeners.
func (s *rpcServer) walletListenerDuplicator() {
// walletListeners is a map holding each currently connected wallet
// listener as the key. The value is ignored, as this is only used as
// a set.
walletListeners := make(map[chan []byte]bool)
// Don't want to add or delete a wallet listener while iterating
// through each to propigate to every attached wallet. Use a mutex to
// prevent this.
var mtx sync.Mutex
// Check for listener channels to add or remove from set.
go func() {
for {
select {
case c := <-s.ws.addWalletListener:
mtx.Lock()
walletListeners[c] = true
mtx.Unlock()
case c := <-s.ws.removeWalletListener:
mtx.Lock()
delete(walletListeners, c)
mtx.Unlock()
case <-s.quit:
return
}
}
}()
// Duplicate all messages sent across walletNotificationMaster to each
// listening wallet.
for {
select {
case ntfn := <-s.ws.walletNotificationMaster:
mtx.Lock()
for c := range walletListeners {
c <- ntfn
}
mtx.Unlock()
case <-s.quit:
return
}
}
}
// walletReqsNotifications is the handler function for websocket
// connections from a btcwallet instance. It reads messages from wallet and
// sends back replies, as well as notififying wallets of chain updates.
func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) {
// Add wallet notification channel so this handler receives btcd chain
// notifications.
c := make(chan []byte)
s.AddWalletListener(c)
defer s.RemoveWalletListener(c)
// msgs is a channel for all messages received over the websocket.
msgs := make(chan []byte)
// Receive messages from websocket and send across reqs until the
// connection is lost.
go func() {
for {
select {
case <-s.quit:
close(msgs)
return
default:
var m []byte
if err := websocket.Message.Receive(ws, &m); err != nil {
close(msgs)
return
}
msgs <- m
}
}
}()
for {
select {
case m, ok := <-msgs:
if !ok {
// Wallet disconnected.
return
}
// Handle request here.
go s.websocketJSONHandler(c, m)
case ntfn, _ := <-c:
// Send btcd notification to btcwallet instance over
// websocket.
if err := websocket.Message.Send(ws, ntfn); err != nil {
// Wallet disconnected.
return
}
case <-s.quit:
// Server closed.
return
}
}
}
// websocketJSONHandler parses and handles a marshalled json message,
// sending the marshalled reply to a wallet notification channel.
func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []byte) {
s.wg.Add(1)
reply, err := jsonRead(msg, s)
s.wg.Done()
if err != ErrMethodNotImplemented {
replyBytes, err := json.Marshal(reply)
if err != nil {
log.Errorf("RPCS: Error marshalling reply: %v", err)
}
walletNotification <- replyBytes
return
}
// Try websocket extensions
replychan := make(chan *btcjson.Reply)
go func() {
for {
select {
case reply, ok := <-replychan:
if !ok {
// no more replies expected.
return
}
if reply == nil {
continue
}
log.Tracef("[RPCS] reply: %v", *reply)
replyBytes, err := json.Marshal(reply)
if err != nil {
log.Errorf("RPCS: Error Marshalling reply: %v", err)
return
}
walletNotification <- replyBytes
case <-s.quit:
return
}
}
}()
if err == ErrMethodNotImplemented {
// Try websocket extensions
s.wg.Add(1)
err = jsonWSRead(walletNotification, replychan, msg, s)
s.wg.Done()
}
if err != nil && err != ErrMethodNotImplemented {
log.Error(err)
}
}
// NotifyBlockConnected creates and marshalls a JSON message to notify
// of a new block connected to the main chain. The notification is sent
// to each connected wallet.
func (s *rpcServer) NotifyBlockConnected(block *btcutil.Block) {
var id interface{} = "btcd:blockconnected"
hash, err := block.Sha()
if err != nil {
log.Error("Bad block; connected block notification dropped.")
return
}
ntfn := btcjson.Reply{
Result: struct {
Hash string `json:"hash"`
Height int64 `json:"height"`
}{
Hash: hash.String(),
Height: block.Height(),
},
Id: &id,
}
m, _ := json.Marshal(ntfn)
s.ws.walletNotificationMaster <- m
}
// NotifyBlockDisconnected creates and marshalls a JSON message to notify
// of a new block disconnected from the main chain. The notification is sent
// to each connected wallet.
func (s *rpcServer) NotifyBlockDisconnected(block *btcutil.Block) {
var id interface{} = "btcd:blockdisconnected"
hash, err := block.Sha()
if err != nil {
log.Error("Bad block; connected block notification dropped.")
return
}
ntfn := btcjson.Reply{
Result: struct {
Hash string `json:"hash"`
Height int64 `json:"height"`
}{
Hash: hash.String(),
Height: block.Height(),
},
Id: &id,
}
m, _ := json.Marshal(ntfn)
s.ws.walletNotificationMaster <- m
}
// NotifyNewTxListeners creates and marshals a JSON message to notify wallets
// of new transactions (with both spent and unspent outputs) for a watched
// address.
func (s *rpcServer) NotifyNewTxListeners(db btcdb.Db, block *btcutil.Block) {
txShaList, err := block.TxShas()
if err != nil {
log.Error("Bad block; All notifications for block dropped.")
return
}
txList := db.FetchTxByShaList(txShaList)
for _, tx := range txList {
go s.newBlockNotifyCheckTxIn(tx.Tx.TxIn)
go s.newBlockNotifyCheckTxOut(db, block, tx)
}
}
// newBlockNotifyCheckTxIn is a helper function to iterate through
// each transaction input of a new block and perform any checks and
// notify listening frontends when necessary.
func (s *rpcServer) newBlockNotifyCheckTxIn(txins []*btcwire.TxIn) {
for wltNtfn, cxt := range s.ws.requests.m {
for _, txin := range txins {
for op, id := range cxt.spentRequests {
if txin.PreviousOutpoint != op {
continue
}
reply := &btcjson.Reply{
Result: struct {
TxHash string `json:"txhash"`
Index uint32 `json:"index"`
}{
TxHash: op.Hash.String(),
Index: uint32(op.Index),
},
Error: nil,
Id: &id,
}
replyBytes, err := json.Marshal(reply)
if err != nil {
log.Errorf("RPCS: Unable to marshal spent notification: %v", err)
continue
}
wltNtfn <- replyBytes
s.ws.requests.RemoveSpentRequest(wltNtfn, &op)
}
}
}
}
// newBlockNotifyCheckTxOut is a helper function to iterate through
// each transaction output of a new block and perform any checks and
// notify listening frontends when necessary.
func (s *rpcServer) newBlockNotifyCheckTxOut(db btcdb.Db, block *btcutil.Block, tx *btcdb.TxListReply) {
for wltNtfn, cxt := range s.ws.requests.m {
for i, txout := range tx.Tx.TxOut {
_, txaddrhash, err := btcscript.ScriptToAddrHash(txout.PkScript)
if err != nil {
log.Debug("Error getting payment address from tx; dropping any Tx notifications.")
break
}
for addr, id := range cxt.txRequests {
if !bytes.Equal(addr[:], txaddrhash) {
continue
}
blkhash, err := block.Sha()
if err != nil {
log.Error("Error getting block sha; dropping Tx notification.")
break
}
txaddr, err := btcutil.EncodeAddress(txaddrhash, s.server.btcnet)
if err != nil {
log.Error("Error encoding address; dropping Tx notification.")
break
}
reply := &btcjson.Reply{
Result: struct {
Sender string `json:"sender"`
Receiver string `json:"receiver"`
BlockHash string `json:"blockhash"`
Height int64 `json:"height"`
TxHash string `json:"txhash"`
Index uint32 `json:"index"`
Amount int64 `json:"amount"`
PkScript string `json:"pkscript"`
Spent bool `json:"spent"`
}{
Sender: "Unknown", // TODO(jrick)
Receiver: txaddr,
BlockHash: blkhash.String(),
Height: block.Height(),
TxHash: tx.Sha.String(),
Index: uint32(i),
Amount: txout.Value,
PkScript: btcutil.Base58Encode(txout.PkScript),
Spent: tx.TxSpent[i],
},
Error: nil,
Id: &id,
}
replyBytes, err := json.Marshal(reply)
if err != nil {
log.Errorf("RPCS: Unable to marshal tx notification: %v", err)
continue
}
wltNtfn <- replyBytes
}
}
}
}