This commit is contained in:
Calvin Kim 2025-03-10 16:11:48 +02:00 committed by GitHub
commit 6d908f513a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 106 additions and 0 deletions

1
go.mod
View file

@ -25,6 +25,7 @@ require (
github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect
github.com/golang/snappy v0.0.4 // indirect github.com/golang/snappy v0.0.4 // indirect
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 // indirect github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 // indirect
github.com/lightninglabs/neutrino v0.16.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/net v0.24.0 // indirect golang.org/x/net v0.24.0 // indirect

2
go.sum
View file

@ -63,6 +63,8 @@ github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 h1:FOOIBWrEkLgmlgGfMuZT83xIwfPDxEI2OHu6xUmJMFE= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 h1:FOOIBWrEkLgmlgGfMuZT83xIwfPDxEI2OHu6xUmJMFE=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/lightninglabs/neutrino v0.16.0 h1:YNTQG32fPR/Zg0vvJVI65OBH8l3U18LSXXtX91hx0q0=
github.com/lightninglabs/neutrino v0.16.0/go.mod h1:x3OmY2wsA18+Kc3TSV2QpSUewOCiscw2mKpXgZv2kZk=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=

View file

@ -20,6 +20,7 @@ import (
"github.com/btcsuite/btcd/mempool" "github.com/btcsuite/btcd/mempool"
peerpkg "github.com/btcsuite/btcd/peer" peerpkg "github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/neutrino/query"
) )
const ( const (
@ -203,6 +204,7 @@ type SyncManager struct {
headerList *list.List headerList *list.List
startHeader *list.Element startHeader *list.Element
nextCheckpoint *chaincfg.Checkpoint nextCheckpoint *chaincfg.Checkpoint
peerSubscribers []*peerSubscription
// An optional fee estimator. // An optional fee estimator.
feeEstimator *mempool.FeeEstimator feeEstimator *mempool.FeeEstimator
@ -452,6 +454,31 @@ func (sm *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool {
return true return true
} }
// notifyPeerSubscribers notifies all the current peer subscribers of the peer
// that was passed in.
func (sm *SyncManager) notifyPeerSubscribers(peer *peerpkg.Peer) {
// Loop for alerting subscribers to the new peer that was connected to.
n := 0
for i, sub := range sm.peerSubscribers {
select {
// Quickly check whether this subscription has been canceled.
case <-sub.cancel:
// Avoid GC leak.
sm.peerSubscribers[i] = nil
continue
default:
}
// Keep non-canceled subscribers around.
sm.peerSubscribers[n] = sub
n++
sub.peers <- peer
}
// Re-align the slice to only active subscribers.
sm.peerSubscribers = sm.peerSubscribers[:n]
}
// handleNewPeerMsg deals with new peers that have signalled they may // handleNewPeerMsg deals with new peers that have signalled they may
// be considered as a sync peer (they have already successfully negotiated). It // be considered as a sync peer (they have already successfully negotiated). It
// also starts syncing if needed. It is invoked from the syncHandler goroutine. // also starts syncing if needed. It is invoked from the syncHandler goroutine.
@ -471,6 +498,13 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
requestedBlocks: make(map[chainhash.Hash]struct{}), requestedBlocks: make(map[chainhash.Hash]struct{}),
} }
// Only pass the peer off to the subscribers if we're able to sync off of
// the peer.
bestHeight := sm.chain.BestSnapshot().Height
if isSyncCandidate && peer.LastBlock() > bestHeight {
sm.notifyPeerSubscribers(peer)
}
// Start syncing by choosing the best candidate if needed. // Start syncing by choosing the best candidate if needed.
if isSyncCandidate && sm.syncPeer == nil { if isSyncCandidate && sm.syncPeer == nil {
sm.startSync() sm.startSync()
@ -1666,6 +1700,13 @@ func (sm *SyncManager) Pause() chan<- struct{} {
return c return c
} }
// peerSubscription holds a peer subscription which we'll notify about any
// connected peers.
type peerSubscription struct {
peers chan<- query.Peer
cancel <-chan struct{}
}
// New constructs a new SyncManager. Use Start to begin processing asynchronous // New constructs a new SyncManager. Use Start to begin processing asynchronous
// block, tx, and inv updates. // block, tx, and inv updates.
func New(config *Config) (*SyncManager, error) { func New(config *Config) (*SyncManager, error) {

View file

@ -494,6 +494,11 @@ type Peer struct {
queueQuit chan struct{} queueQuit chan struct{}
outQuit chan struct{} outQuit chan struct{}
quit chan struct{} quit chan struct{}
// subscribers is a channel for relaying all messages that were received
// to this peer.
subscribers map[recvMsgsubscription]struct{}
subscriberLock sync.Mutex
} }
// String returns the peer's address and directionality as a human-readable // String returns the peer's address and directionality as a human-readable
@ -1098,6 +1103,35 @@ func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte,
return msg, buf, nil return msg, buf, nil
} }
// recvMsgsubscription is two channels for a subscriber of recevied messages.
// msgChan for sending the messages and quit for cancelling the subscription.
type recvMsgsubscription struct {
msgChan chan wire.Message
quit chan struct{}
}
// SubscribeRecvMsg adds a OnRead subscription to the peer. All bitcoin
// messages received from this peer will be sent on the returned
// channel. A closure is also returned, that should be called to cancel
// the subscription.
func (p *Peer) SubscribeRecvMsg() (<-chan wire.Message, func()) {
p.subscriberLock.Lock()
defer p.subscriberLock.Unlock()
// No need to buffer this channel as we'll spin up a new goroutine for
// every send.
msgChan := make(chan wire.Message)
quit := make(chan struct{})
sub := recvMsgsubscription{
msgChan,
quit,
}
p.subscribers[sub] = struct{}{}
return msgChan, func() { close(quit) }
}
// writeMessage sends a bitcoin message to the peer with logging. // writeMessage sends a bitcoin message to the peer with logging.
func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error { func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error {
// Don't do anything if we're disconnecting. // Don't do anything if we're disconnecting.
@ -1446,6 +1480,27 @@ out:
} }
break out break out
} }
// Send the received message to all the subscribers.
for sub := range p.subscribers {
select {
case <-sub.quit:
delete(p.subscribers, sub)
continue
default:
}
// Spin up a goroutine so that we don't block here.
go func(subscription chan wire.Message,
quit chan struct{}) {
select {
case subscription <- rmsg:
case <-p.quit:
}
}(sub.msgChan, p.quit)
}
atomic.StoreInt64(&p.lastRecv, time.Now().Unix()) atomic.StoreInt64(&p.lastRecv, time.Now().Unix())
p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg} p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg}
@ -1961,6 +2016,12 @@ func (p *Peer) Disconnect() {
close(p.quit) close(p.quit)
} }
// OnDisconnect returns a channel that will be closed when this peer is
// disconnected.
func (p *Peer) OnDisconnect() <-chan struct{} {
return p.quit
}
// readRemoteVersionMsg waits for the next message to arrive from the remote // readRemoteVersionMsg waits for the next message to arrive from the remote
// peer. If the next message is not a version message or the version is not // peer. If the next message is not a version message or the version is not
// acceptable then return an error. // acceptable then return an error.
@ -2397,6 +2458,7 @@ func newPeerBase(origCfg *Config, inbound bool) *Peer {
queueQuit: make(chan struct{}), queueQuit: make(chan struct{}),
outQuit: make(chan struct{}), outQuit: make(chan struct{}),
quit: make(chan struct{}), quit: make(chan struct{}),
subscribers: make(map[recvMsgsubscription]struct{}),
cfg: cfg, // Copy so caller can't mutate. cfg: cfg, // Copy so caller can't mutate.
services: cfg.Services, services: cfg.Services,
protocolVersion: cfg.ProtocolVersion, protocolVersion: cfg.ProtocolVersion,