From 32d7f0cf203befa3cd939e54d4bf16195c26a5e2 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Tue, 6 Aug 2024 17:53:22 +0900 Subject: [PATCH 1/2] peer: make peer meet query.Peer interface query.Peer is used for downloading blocks out of order during headers first download. Methods SubscribeRecvMsg() and OnDisconnect() are added to abide by the interface. --- peer/peer.go | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/peer/peer.go b/peer/peer.go index 195fc0b4..6af32387 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -494,6 +494,11 @@ type Peer struct { queueQuit chan struct{} outQuit chan struct{} quit chan struct{} + + // subscribers is a channel for relaying all messages that were received + // to this peer. + subscribers map[recvMsgsubscription]struct{} + subscriberLock sync.Mutex } // String returns the peer's address and directionality as a human-readable @@ -1098,6 +1103,35 @@ func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte, return msg, buf, nil } +// recvMsgsubscription is two channels for a subscriber of recevied messages. +// msgChan for sending the messages and quit for cancelling the subscription. +type recvMsgsubscription struct { + msgChan chan wire.Message + quit chan struct{} +} + +// SubscribeRecvMsg adds a OnRead subscription to the peer. All bitcoin +// messages received from this peer will be sent on the returned +// channel. A closure is also returned, that should be called to cancel +// the subscription. +func (p *Peer) SubscribeRecvMsg() (<-chan wire.Message, func()) { + p.subscriberLock.Lock() + defer p.subscriberLock.Unlock() + + // No need to buffer this channel as we'll spin up a new goroutine for + // every send. + msgChan := make(chan wire.Message) + quit := make(chan struct{}) + sub := recvMsgsubscription{ + msgChan, + quit, + } + + p.subscribers[sub] = struct{}{} + + return msgChan, func() { close(quit) } +} + // writeMessage sends a bitcoin message to the peer with logging. func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error { // Don't do anything if we're disconnecting. @@ -1446,6 +1480,27 @@ out: } break out } + + // Send the received message to all the subscribers. + for sub := range p.subscribers { + select { + case <-sub.quit: + delete(p.subscribers, sub) + continue + default: + } + + // Spin up a goroutine so that we don't block here. + go func(subscription chan wire.Message, + quit chan struct{}) { + + select { + case subscription <- rmsg: + case <-p.quit: + } + + }(sub.msgChan, p.quit) + } atomic.StoreInt64(&p.lastRecv, time.Now().Unix()) p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg} @@ -1961,6 +2016,12 @@ func (p *Peer) Disconnect() { close(p.quit) } +// OnDisconnect returns a channel that will be closed when this peer is +// disconnected. +func (p *Peer) OnDisconnect() <-chan struct{} { + return p.quit +} + // readRemoteVersionMsg waits for the next message to arrive from the remote // peer. If the next message is not a version message or the version is not // acceptable then return an error. @@ -2397,6 +2458,7 @@ func newPeerBase(origCfg *Config, inbound bool) *Peer { queueQuit: make(chan struct{}), outQuit: make(chan struct{}), quit: make(chan struct{}), + subscribers: make(map[recvMsgsubscription]struct{}), cfg: cfg, // Copy so caller can't mutate. services: cfg.Services, protocolVersion: cfg.ProtocolVersion, From aa39d15cde8af892acfc6119112e02043bfb0e64 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Wed, 7 Aug 2024 16:27:07 +0900 Subject: [PATCH 2/2] netsync: add peerSubscription peerSubscription is added to Manager which will allow it subscribers to receive peers through the channel whenever the Manager is aware of a new peer that it's been connected to. This is useful to alert query.Workmanager that a new peer that's been connected to is eligible to download blocks from. --- go.mod | 1 + go.sum | 2 ++ netsync/manager.go | 41 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 44 insertions(+) diff --git a/go.mod b/go.mod index 1f445d90..6e6a2cbf 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 // indirect + github.com/lightninglabs/neutrino v0.16.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.5.0 // indirect golang.org/x/net v0.24.0 // indirect diff --git a/go.sum b/go.sum index bb666c89..3f3b3464 100644 --- a/go.sum +++ b/go.sum @@ -63,6 +63,8 @@ github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 h1:FOOIBWrEkLgmlgGfMuZT83xIwfPDxEI2OHu6xUmJMFE= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= +github.com/lightninglabs/neutrino v0.16.0 h1:YNTQG32fPR/Zg0vvJVI65OBH8l3U18LSXXtX91hx0q0= +github.com/lightninglabs/neutrino v0.16.0/go.mod h1:x3OmY2wsA18+Kc3TSV2QpSUewOCiscw2mKpXgZv2kZk= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= diff --git a/netsync/manager.go b/netsync/manager.go index 3215a86a..130723b8 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -20,6 +20,7 @@ import ( "github.com/btcsuite/btcd/mempool" peerpkg "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/neutrino/query" ) const ( @@ -203,6 +204,7 @@ type SyncManager struct { headerList *list.List startHeader *list.Element nextCheckpoint *chaincfg.Checkpoint + peerSubscribers []*peerSubscription // An optional fee estimator. feeEstimator *mempool.FeeEstimator @@ -452,6 +454,31 @@ func (sm *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool { return true } +// notifyPeerSubscribers notifies all the current peer subscribers of the peer +// that was passed in. +func (sm *SyncManager) notifyPeerSubscribers(peer *peerpkg.Peer) { + // Loop for alerting subscribers to the new peer that was connected to. + n := 0 + for i, sub := range sm.peerSubscribers { + select { + // Quickly check whether this subscription has been canceled. + case <-sub.cancel: + // Avoid GC leak. + sm.peerSubscribers[i] = nil + continue + default: + } + + // Keep non-canceled subscribers around. + sm.peerSubscribers[n] = sub + n++ + + sub.peers <- peer + } + // Re-align the slice to only active subscribers. + sm.peerSubscribers = sm.peerSubscribers[:n] +} + // handleNewPeerMsg deals with new peers that have signalled they may // be considered as a sync peer (they have already successfully negotiated). It // also starts syncing if needed. It is invoked from the syncHandler goroutine. @@ -471,6 +498,13 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) { requestedBlocks: make(map[chainhash.Hash]struct{}), } + // Only pass the peer off to the subscribers if we're able to sync off of + // the peer. + bestHeight := sm.chain.BestSnapshot().Height + if isSyncCandidate && peer.LastBlock() > bestHeight { + sm.notifyPeerSubscribers(peer) + } + // Start syncing by choosing the best candidate if needed. if isSyncCandidate && sm.syncPeer == nil { sm.startSync() @@ -1666,6 +1700,13 @@ func (sm *SyncManager) Pause() chan<- struct{} { return c } +// peerSubscription holds a peer subscription which we'll notify about any +// connected peers. +type peerSubscription struct { + peers chan<- query.Peer + cancel <-chan struct{} +} + // New constructs a new SyncManager. Use Start to begin processing asynchronous // block, tx, and inv updates. func New(config *Config) (*SyncManager, error) {