From ab002c90cc9990a8313b9ff8de736945e7e98ae1 Mon Sep 17 00:00:00 2001 From: mydesktop Date: Tue, 18 Mar 2014 15:40:49 -0400 Subject: [PATCH] Implement a rebroadcast handler. This commit implements a rebroadcast handler which deals with rebroadcasting inventory at a random time interval between 0 and 30 minutes. It then uses the new rebroadcast logic to ensure transactions which were submitted via the sendrawtransaction RPC are rebroadcast until they make it into a block. Closes #99. --- blockmanager.go | 12 +++- mempool.go | 1 + rpcserver.go | 7 ++ server.go | 184 +++++++++++++++++++++++++++++++++++++----------- 4 files changed, 161 insertions(+), 43 deletions(-) diff --git a/blockmanager.go b/blockmanager.go index be2797ed..aabfad66 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -1062,8 +1062,16 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { b.server.txMemPool.RemoveDoubleSpends(tx) } - // Notify registered websocket clients if r := b.server.rpcServer; r != nil { + // Now that this block is in the blockchain we can mark all the + // transactions (except the coinbase) as no longer needing + // rebroadcasting. + for _, tx := range block.Transactions()[1:] { + iv := btcwire.NewInvVect(btcwire.InvTypeTx, tx.Sha()) + b.server.RemoveRebroadcastInventory(iv) + } + + // Notify registered websocket clients of incoming block. r.ntfnMgr.NotifyBlockConnected(block) } @@ -1087,7 +1095,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { } } - // Notify registered websocket clients + // Notify registered websocket clients. if r := b.server.rpcServer; r != nil { r.ntfnMgr.NotifyBlockDisconnected(block) } diff --git a/mempool.go b/mempool.go index ee8c9502..1d49f38c 100644 --- a/mempool.go +++ b/mempool.go @@ -94,6 +94,7 @@ type txMemPool struct { outpoints map[btcwire.OutPoint]*btcutil.Tx pennyTotal float64 // exponentially decaying total for penny spends. lastPennyUnix int64 // unix time of last ``penny spend'' + } // isDust returns whether or not the passed transaction output amount is diff --git a/rpcserver.go b/rpcserver.go index dfa6fdec..04f48266 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -1403,6 +1403,13 @@ func handleSendRawTransaction(s *rpcServer, cmd btcjson.Cmd) (interface{}, error return nil, err } + // We keep track of all the sendrawtransaction request txs because we need to + // rebroadcast them if they fail to get broadcast or entered into a block; for + // instance if the client was offline when they were generated. Refer to + // server.go in /btcd. + iv := btcwire.NewInvVect(btcwire.InvTypeTx, tx.Sha()) + s.server.AddRebroadcastInventory(iv) + return tx.Sha().String(), nil } diff --git a/server.go b/server.go index 74f75851..5358c892 100644 --- a/server.go +++ b/server.go @@ -6,11 +6,14 @@ package main import ( "container/list" + "crypto/rand" + "encoding/binary" "errors" "fmt" "github.com/conformal/btcdb" "github.com/conformal/btcjson" "github.com/conformal/btcwire" + "math" "net" "runtime" "strconv" @@ -19,9 +22,9 @@ import ( "time" ) -// These constants are used by the DNS seed code to pick a random last seen -// time. const ( + // These constants are used by the DNS seed code to pick a random last seen + // time. secondsIn3Days int32 = 24 * 60 * 60 * 3 secondsIn4Days int32 = 24 * 60 * 60 * 4 ) @@ -46,33 +49,42 @@ type broadcastMsg struct { excludePeers []*peer } +// BroadcastInventoryAdd is a type used to declare that the InvVect it contains +// needs to be added to the rebroadcast map +type broadcastInventoryAdd *btcwire.InvVect + +// BroadcastInventoryDel is a type used to declare that the InvVect it contains +// needs to be removed from the rebroadcast map +type broadcastInventoryDel *btcwire.InvVect + // server provides a bitcoin server for handling communications to and from // bitcoin peers. type server struct { - nonce uint64 - listeners []net.Listener - btcnet btcwire.BitcoinNet - started int32 // atomic - shutdown int32 // atomic - shutdownSched int32 // atomic - bytesMutex sync.Mutex // For the following two fields. - bytesReceived uint64 // Total bytes received from all peers since start. - bytesSent uint64 // Total bytes sent by all peers since start. - addrManager *AddrManager - rpcServer *rpcServer - blockManager *blockManager - txMemPool *txMemPool - newPeers chan *peer - donePeers chan *peer - banPeers chan *peer - wakeup chan bool - query chan interface{} - relayInv chan *btcwire.InvVect - broadcast chan broadcastMsg - wg sync.WaitGroup - quit chan bool - nat NAT - db btcdb.Db + nonce uint64 + listeners []net.Listener + btcnet btcwire.BitcoinNet + started int32 // atomic + shutdown int32 // atomic + shutdownSched int32 // atomic + bytesMutex sync.Mutex // For the following two fields. + bytesReceived uint64 // Total bytes received from all peers since start. + bytesSent uint64 // Total bytes sent by all peers since start. + addrManager *AddrManager + rpcServer *rpcServer + blockManager *blockManager + txMemPool *txMemPool + modifyRebroadcastInv chan interface{} + newPeers chan *peer + donePeers chan *peer + banPeers chan *peer + wakeup chan bool + query chan interface{} + relayInv chan *btcwire.InvVect + broadcast chan broadcastMsg + wg sync.WaitGroup + quit chan bool + nat NAT + db btcdb.Db } type peerState struct { @@ -84,6 +96,34 @@ type peerState struct { maxOutboundPeers int } +// randomUint16Number returns a random uint16 in a specified input range. Note +// that the range is in zeroth ordering; if you pass it 1800, you will get values +// from 0 to 1800. In order to avoid modulo bias and ensure every possible +// outcome in [0, max) has equal probability, the random number must be sampled +// from a random source that has a range limited to a multiple of the modulus. +func randomUint16Number(max uint16) uint16 { + var randomNumber uint16 + var limitRange = (math.MaxUint16 / max) * max + for { + binary.Read(rand.Reader, binary.LittleEndian, &randomNumber) + if randomNumber < limitRange { + return (randomNumber % max) + } + } +} + +// AddRebroadcastInventory dispatches a message to the rebroadcastHandler +// specifying to add an item to the rebroadcast map of InvVects +func (s *server) AddRebroadcastInventory(iv *btcwire.InvVect) { + s.modifyRebroadcastInv <- broadcastInventoryAdd(iv) +} + +// RemoveRebroadcastInventory dispatches a message to the rebroadcastHandler +// specifying to remove an item from the rebroadcast map of InvVects +func (s *server) RemoveRebroadcastInventory(iv *btcwire.InvVect) { + s.modifyRebroadcastInv <- broadcastInventoryDel(iv) +} + func (p *peerState) Count() int { return p.peers.Len() + p.outboundPeers.Len() + p.persistentPeers.Len() } @@ -706,6 +746,61 @@ func (s *server) NetTotals() (uint64, uint64) { return s.bytesReceived, s.bytesSent } +// rebroadcastHandler is a listener that uses a couple of channels to maintain +// a list of transactions that need to be rebroadcast. The list of tx is stored +// in their abstracted P2P form (InvVect) in a map (pendingInvs). +// Why we need this: +// We handle user submitted tx, e.g. from a wallet, via the RPC submission +// function sendrawtransactions. Because we need to ensure that user- +// submitted tx eventually enter a block, we need to retransmit them +// periodically until we see them actually enter a block. +func (s *server) rebroadcastHandler() { + timer := time.NewTimer(5 * time.Minute) // Wait 5 min before first tx rebroadcast. + pendingInvs := make(map[btcwire.InvVect]struct{}) + +out: + for { + select { + case riv := <-s.modifyRebroadcastInv: + switch msg := riv.(type) { + // Incoming InvVects are added to our map of RPC txs. + case broadcastInventoryAdd: + pendingInvs[*msg] = struct{}{} + + // When an InvVect has been added to a block, we can now remove it; + // note that we need to check if the iv is actually found in the + // map before we try to delete it, as when handleNotifyMsg finds a + // new block it cycles through the txs and sends them all + // indescriminately to this function. The if loop is cheap, so + // this should not be an issue. + case broadcastInventoryDel: + if _, ok := pendingInvs[*msg]; ok { + delete(pendingInvs, *msg) + } + } + + // When the timer triggers, scan through all the InvVects of RPC-submitted + // tx and cause the server to resubmit them to peers, as they have not + // been added to incoming blocks. + case <-timer.C: + for iv := range pendingInvs { + ivCopy := iv + s.RelayInventory(&ivCopy) + } + + // Set the timer to go off at a random time between 0 and 1799 seconds + timer.Reset(time.Second * time.Duration(randomUint16Number(1800))) + + case <-s.quit: + break out + } + } + + timer.Stop() + + s.wg.Done() +} + // Start begins accepting connections from peers. func (s *server) Start() { // Already started? @@ -726,13 +821,19 @@ func (s *server) Start() { // managers. s.wg.Add(1) go s.peerHandler() + if s.nat != nil { s.wg.Add(1) go s.upnpUpdateThread() } - // Start the RPC server if it's not disabled. if !cfg.DisableRPC { + s.wg.Add(1) + + // Start the rebroadcastHandler, which ensures user tx received by + // the RPC server are rebroadcast until being included in a block. + go s.rebroadcastHandler() + s.rpcServer.Start() } } @@ -1030,20 +1131,21 @@ func newServer(listenAddrs []string, db btcdb.Db, btcnet btcwire.BitcoinNet) (*s } s := server{ - nonce: nonce, - listeners: listeners, - btcnet: btcnet, - addrManager: amgr, - newPeers: make(chan *peer, cfg.MaxPeers), - donePeers: make(chan *peer, cfg.MaxPeers), - banPeers: make(chan *peer, cfg.MaxPeers), - wakeup: make(chan bool), - query: make(chan interface{}), - relayInv: make(chan *btcwire.InvVect, cfg.MaxPeers), - broadcast: make(chan broadcastMsg, cfg.MaxPeers), - quit: make(chan bool), - nat: nat, - db: db, + nonce: nonce, + listeners: listeners, + btcnet: btcnet, + addrManager: amgr, + newPeers: make(chan *peer, cfg.MaxPeers), + donePeers: make(chan *peer, cfg.MaxPeers), + banPeers: make(chan *peer, cfg.MaxPeers), + wakeup: make(chan bool), + query: make(chan interface{}), + relayInv: make(chan *btcwire.InvVect, cfg.MaxPeers), + broadcast: make(chan broadcastMsg, cfg.MaxPeers), + quit: make(chan bool), + modifyRebroadcastInv: make(chan interface{}), + nat: nat, + db: db, } bm, err := newBlockManager(&s) if err != nil {