From b89e93e52f40d10bcc6f84c08e7ace22ba29a6c2 Mon Sep 17 00:00:00 2001 From: Francis Lam Date: Sat, 8 Feb 2014 17:15:17 -0500 Subject: [PATCH] Added notifyallnewtxs custom websocket command Changed mempool.MaybeAcceptTransaction to accept an additional parameter to differentiate betwee new transactions and those added from disconnected blocks. Added new fields to requestContexts to indicate which clients want to receive all new transaction notifications. Added NotifyForNewTx to rpcServer to deliver approriate transaction notification. --- blockmanager.go | 2 +- mempool.go | 14 +++++++---- rpcwebsocket.go | 62 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 6 deletions(-) diff --git a/blockmanager.go b/blockmanager.go index 677bc169..e26a39d0 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -1014,7 +1014,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { // Reinsert all of the transactions (except the coinbase) into // the transaction pool. for _, tx := range block.Transactions()[1:] { - err := b.server.txMemPool.MaybeAcceptTransaction(tx, nil) + err := b.server.txMemPool.MaybeAcceptTransaction(tx, nil, false) if err != nil { // Remove the transaction and all transactions // that depend on it if it wasn't accepted into diff --git a/mempool.go b/mempool.go index df62ab4a..ea5bbd81 100644 --- a/mempool.go +++ b/mempool.go @@ -717,7 +717,7 @@ func (mp *txMemPool) FetchTransaction(txHash *btcwire.ShaHash) (*btcutil.Tx, err // more details. // // This function MUST be called with the mempool lock held (for writes). -func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool) error { +func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool, isNew bool) error { if isOrphan != nil { *isOrphan = false } @@ -879,6 +879,10 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool) erro // Notify wallets of mempool transactions to wallet addresses. if mp.server.rpcServer != nil { mp.server.rpcServer.NotifyForTxOuts(tx, nil) + + if isNew { + mp.server.rpcServer.NotifyForNewTx(tx) + } } return nil @@ -892,12 +896,12 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool) erro // or not the transaction is an orphan. // // This function is safe for concurrent access. -func (mp *txMemPool) MaybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool) error { +func (mp *txMemPool) MaybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool, isNew bool) error { // Protect concurrent access. mp.Lock() defer mp.Unlock() - return mp.maybeAcceptTransaction(tx, isOrphan) + return mp.maybeAcceptTransaction(tx, isOrphan, isNew) } // processOrphans determines if there are any orphans which depend on the passed @@ -937,7 +941,7 @@ func (mp *txMemPool) processOrphans(hash *btcwire.ShaHash) error { // Potentially accept the transaction into the // transaction pool. var isOrphan bool - err := mp.maybeAcceptTransaction(tx, &isOrphan) + err := mp.maybeAcceptTransaction(tx, &isOrphan, true) if err != nil { return err } @@ -975,7 +979,7 @@ func (mp *txMemPool) ProcessTransaction(tx *btcutil.Tx) error { // Potentially accept the transaction to the memory pool. var isOrphan bool - err := mp.maybeAcceptTransaction(tx, &isOrphan) + err := mp.maybeAcceptTransaction(tx, &isOrphan, true) if err != nil { return err } diff --git a/rpcwebsocket.go b/rpcwebsocket.go index 4d4a8789..39d4753b 100644 --- a/rpcwebsocket.go +++ b/rpcwebsocket.go @@ -46,6 +46,7 @@ var wsHandlers = map[string]wsCommandHandler{ "getbestblock": handleGetBestBlock, "notifyblocks": handleNotifyBlocks, "notifynewtxs": handleNotifyNewTXs, + "notifyallnewtxs": handleNotifyAllNewTXs, "notifyspent": handleNotifySpent, "rescan": handleRescan, "sendrawtransaction": handleWalletSendRawTransaction, @@ -83,6 +84,15 @@ func (r *wsContext) AddBlockUpdateRequest(n ntfnChan) { rc.blockUpdates = true } +func (r *wsContext) AddAllNewTxRequest(n ntfnChan, verbose bool) { + r.Lock() + defer r.Unlock() + + rc := r.connections[n] + rc.allTxUpdates = true + rc.verboseTxUpdates = verbose +} + // AddTxRequest adds the request context for new transaction notifications. func (r *wsContext) AddTxRequest(n ntfnChan, addr string) { r.Lock() @@ -251,6 +261,14 @@ type requestContexts struct { // chain. blockUpdates bool + // allTxUpdates specifies whether a client has requested notifications + // for all new transactions. + allTxUpdates bool + + // verboseTxUpdates specifies whether a client has requested more verbose + // information about all new transactions + verboseTxUpdates bool + // txRequests is a set of addresses a wallet has requested transactions // updates for. It is maintained here so all requests can be removed // when a wallet disconnects. @@ -362,6 +380,18 @@ func handleNotifyNewTXs(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interfa return nil, nil } +// handleNotifyAllNewTXs implements the notifyallnewtxs command extension for +// websocket connections. +func handleNotifyAllNewTXs(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) { + cmd, ok := icmd.(*btcws.NotifyAllNewTXsCmd) + if !ok { + return nil, &btcjson.ErrInternal + } + + s.ws.AddAllNewTxRequest(c.n, cmd.Verbose) + return nil, nil +} + // handleNotifySpent implements the notifyspent command extension for // websocket connections. func handleNotifySpent(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) { @@ -951,3 +981,35 @@ func (s *rpcServer) NotifyForTxOuts(tx *btcutil.Tx, block *btcutil.Block) { } } } + +// NotifyForNewTx sends delivers the new tx to any client that has +// registered for all new TX. +func (s *rpcServer) NotifyForNewTx(tx *btcutil.Tx) { + txId := tx.Sha().String() + mtx := tx.MsgTx() + + var amount int64 + for _, txOut := range mtx.TxOut { + amount += txOut.Value + } + + ntfn := btcws.NewAllTxNtfn(txId, amount) + var verboseNtfn *btcws.AllVerboseTxNtfn + + for ntfnChan, rc := range s.ws.connections { + if rc.allTxUpdates { + if rc.verboseTxUpdates { + if verboseNtfn == nil { + rawTx, err := createTxRawResult(s.server.btcnet, txId, mtx, nil, 0, nil) + if err != nil { + return + } + verboseNtfn = btcws.NewAllVerboseTxNtfn(rawTx) + } + ntfnChan <- verboseNtfn + } else { + ntfnChan <- ntfn + } + } + } +}