diff --git a/blockmanager.go b/blockmanager.go index 2557dc37..f7ab2a85 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -59,6 +59,7 @@ type newPeerMsg struct { type blockMsg struct { block *btcutil.Block peer *serverPeer + reply chan struct{} } // invMsg packages a bitcoin inv message and the peer it came from together @@ -83,8 +84,9 @@ type donePeerMsg struct { // txMsg packages a bitcoin tx message and the peer it came from together // so the block handler has access to that information. type txMsg struct { - tx *btcutil.Tx - peer *serverPeer + tx *btcutil.Tx + peer *serverPeer + reply chan struct{} } // getSyncPeerMsg is a message type to be sent across the message channel for @@ -1126,11 +1128,11 @@ out: case *txMsg: b.handleTxMsg(msg) - msg.peer.txProcessed <- struct{}{} + msg.reply <- struct{}{} case *blockMsg: b.handleBlockMsg(msg) - msg.peer.blockProcessed <- struct{}{} + msg.reply <- struct{}{} case *invMsg: b.handleInvMsg(msg) @@ -1261,26 +1263,29 @@ func (b *blockManager) NewPeer(sp *serverPeer) { } // QueueTx adds the passed transaction message and peer to the block handling -// queue. -func (b *blockManager) QueueTx(tx *btcutil.Tx, sp *serverPeer) { +// queue. Responds to the done channel argument after the tx message is +// processed. +func (b *blockManager) QueueTx(tx *btcutil.Tx, sp *serverPeer, done chan struct{}) { // Don't accept more transactions if we're shutting down. if atomic.LoadInt32(&b.shutdown) != 0 { - sp.txProcessed <- struct{}{} + done <- struct{}{} return } - b.msgChan <- &txMsg{tx: tx, peer: sp} + b.msgChan <- &txMsg{tx: tx, peer: sp, reply: done} } -// QueueBlock adds the passed block message and peer to the block handling queue. -func (b *blockManager) QueueBlock(block *btcutil.Block, sp *serverPeer) { +// QueueBlock adds the passed block message and peer to the block handling +// queue. Responds to the done channel argument after the block message is +// processed. +func (b *blockManager) QueueBlock(block *btcutil.Block, sp *serverPeer, done chan struct{}) { // Don't accept more blocks if we're shutting down. if atomic.LoadInt32(&b.shutdown) != 0 { - sp.blockProcessed <- struct{}{} + done <- struct{}{} return } - b.msgChan <- &blockMsg{block: block, peer: sp} + b.msgChan <- &blockMsg{block: block, peer: sp, reply: done} } // QueueInv adds the passed inv message and peer to the block handling queue. diff --git a/server.go b/server.go index 945d9777..c17f79f0 100644 --- a/server.go +++ b/server.go @@ -485,7 +485,7 @@ func (sp *serverPeer) OnTx(_ *peer.Peer, msg *wire.MsgTx) { // processed and known good or bad. This helps prevent a malicious peer // from queuing up a bunch of bad transactions before disconnecting (or // being disconnected) and wasting memory. - sp.server.blockManager.QueueTx(tx, sp) + sp.server.blockManager.QueueTx(tx, sp, sp.txProcessed) <-sp.txProcessed } @@ -511,7 +511,7 @@ func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) { // reference implementation processes blocks in the same // thread and therefore blocks further messages until // the bitcoin block has been fully processed. - sp.server.blockManager.QueueBlock(block, sp) + sp.server.blockManager.QueueBlock(block, sp, sp.blockProcessed) <-sp.blockProcessed }