peer+wire: add addrv2 message, protocol negotiation

This commit is contained in:
eugene 2022-02-03 14:04:43 -05:00
parent 201c0836ec
commit cb6f21b598
No known key found for this signature in database
GPG key ID: 118759E83439A9B1
8 changed files with 714 additions and 81 deletions

View file

@ -29,7 +29,7 @@ import (
const (
// MaxProtocolVersion is the max protocol version the peer supports.
MaxProtocolVersion = wire.FeeFilterVersion
MaxProtocolVersion = wire.AddrV2Version
// DefaultTrickleInterval is the min time between attempts to send an
// inv message to a peer.
@ -102,6 +102,9 @@ type MessageListeners struct {
// OnAddr is invoked when a peer receives an addr bitcoin message.
OnAddr func(p *Peer, msg *wire.MsgAddr)
// OnAddrV2 is invoked when a peer receives an addrv2 bitcoin message.
OnAddrV2 func(p *Peer, msg *wire.MsgAddrV2)
// OnPing is invoked when a peer receives a ping bitcoin message.
OnPing func(p *Peer, msg *wire.MsgPing)
@ -197,6 +200,9 @@ type MessageListeners struct {
// message.
OnSendHeaders func(p *Peer, msg *wire.MsgSendHeaders)
// OnSendAddrV2 is invoked when a peer receives a sendaddrv2 message.
OnSendAddrV2 func(p *Peer, msg *wire.MsgSendAddrV2)
// OnRead is invoked when a peer receives a bitcoin message. It
// consists of the number of bytes read, the message, and whether or not
// an error in the read occurred. Typically, callers will opt to use
@ -399,7 +405,7 @@ type AddrFunc func(remoteAddr *wire.NetAddress) *wire.NetAddress
// HostToNetAddrFunc is a func which takes a host, port, services and returns
// the netaddress.
type HostToNetAddrFunc func(host string, port uint16,
services wire.ServiceFlag) (*wire.NetAddress, error)
services wire.ServiceFlag) (*wire.NetAddressV2, error)
// NOTE: The overall data flow of a peer is split into 3 goroutines. Inbound
// messages are read via the inHandler goroutine and generally dispatched to
@ -445,7 +451,7 @@ type Peer struct {
inbound bool
flagsMtx sync.Mutex // protects the peer flags below
na *wire.NetAddress
na *wire.NetAddressV2
id int32
userAgent string
services wire.ServiceFlag
@ -455,6 +461,7 @@ type Peer struct {
sendHeadersPreferred bool // peer sent a sendheaders message
verAckReceived bool
witnessEnabled bool
sendAddrV2 bool
wireEncoding wire.MessageEncoding
@ -585,7 +592,7 @@ func (p *Peer) ID() int32 {
// NA returns the peer network address.
//
// This function is safe for concurrent access.
func (p *Peer) NA() *wire.NetAddress {
func (p *Peer) NA() *wire.NetAddressV2 {
p.flagsMtx.Lock()
na := p.na
p.flagsMtx.Unlock()
@ -820,6 +827,16 @@ func (p *Peer) IsWitnessEnabled() bool {
return witnessEnabled
}
// WantsAddrV2 returns if the peer supports addrv2 messages instead of the
// legacy addr messages.
func (p *Peer) WantsAddrV2() bool {
p.flagsMtx.Lock()
wantsAddrV2 := p.sendAddrV2
p.flagsMtx.Unlock()
return wantsAddrV2
}
// PushAddrMsg sends an addr message to the connected peer using the provided
// addresses. This function is useful over manually sending the message via
// QueueMessage since it automatically limits the addresses to the maximum
@ -856,6 +873,38 @@ func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress) ([]*wire.NetAddress, er
return msg.AddrList, nil
}
// PushAddrV2Msg is used to push an addrv2 message to the remote peer.
//
// This function is safe for concurrent access.
func (p *Peer) PushAddrV2Msg(addrs []*wire.NetAddressV2) (
[]*wire.NetAddressV2, error) {
count := len(addrs)
// Nothing to send.
if count == 0 {
return nil, nil
}
m := wire.NewMsgAddrV2()
m.AddrList = make([]*wire.NetAddressV2, count)
copy(m.AddrList, addrs)
// Randomize the addresses sent if there are more than the maximum.
if count > wire.MaxV2AddrPerMsg {
rand.Shuffle(count, func(i, j int) {
m.AddrList[i] = m.AddrList[j]
m.AddrList[j] = m.AddrList[i]
})
// Truncate it to the maximum size.
m.AddrList = m.AddrList[:wire.MaxV2AddrPerMsg]
}
p.QueueMessage(m, nil)
return m.AddrList, nil
}
// PushGetBlocksMsg sends a getblocks message for the provided block locator
// and stop hash. It will ignore back-to-back duplicate requests.
//
@ -1363,6 +1412,19 @@ out:
continue
}
// Since the protocol version is 70016 but we don't
// implement compact blocks, we have to ignore unknown
// messages after the version-verack handshake. This
// matches bitcoind's behavior and is necessary since
// compact blocks negotiation occurs after the
// handshake.
if err == wire.ErrUnknownMessage {
log.Debugf("Received unknown message from %s:"+
" %v", p, err)
idleTimer.Reset(idleTimeout)
continue
}
// Only log the error and send reject message if the
// local peer is not forcibly disconnecting and the
// remote peer has not disconnected.
@ -1404,6 +1466,11 @@ out:
)
break out
case *wire.MsgSendAddrV2:
// Disconnect if peer sends this after the handshake is
// completed.
break out
case *wire.MsgGetAddr:
if p.cfg.Listeners.OnGetAddr != nil {
p.cfg.Listeners.OnGetAddr(p, msg)
@ -1414,6 +1481,11 @@ out:
p.cfg.Listeners.OnAddr(p, msg)
}
case *wire.MsgAddrV2:
if p.cfg.Listeners.OnAddrV2 != nil {
p.cfg.Listeners.OnAddrV2(p, msg)
}
case *wire.MsgPing:
p.handlePingMsg(msg)
if p.cfg.Listeners.OnPing != nil {
@ -1986,29 +2058,8 @@ func (p *Peer) readRemoteVersionMsg() error {
return nil
}
// readRemoteVerAckMsg waits for the next message to arrive from the remote
// peer. If this message is not a verack message, then an error is returned.
// This method is to be used as part of the version negotiation upon a new
// connection.
func (p *Peer) readRemoteVerAckMsg() error {
// Read the next message from the wire.
remoteMsg, _, err := p.readMessage(wire.LatestEncoding)
if err != nil {
return err
}
// It should be a verack message, otherwise send a reject message to the
// peer explaining why.
msg, ok := remoteMsg.(*wire.MsgVerAck)
if !ok {
reason := "a verack message must follow version"
rejectMsg := wire.NewMsgReject(
msg.Command(), wire.RejectMalformed, reason,
)
_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
return errors.New(reason)
}
// processRemoteVerAckMsg takes the verack from the remote peer and handles it.
func (p *Peer) processRemoteVerAckMsg(msg *wire.MsgVerAck) {
p.flagsMtx.Lock()
p.verAckReceived = true
p.flagsMtx.Unlock()
@ -2016,8 +2067,6 @@ func (p *Peer) readRemoteVerAckMsg() error {
if p.cfg.Listeners.OnVerAck != nil {
p.cfg.Listeners.OnVerAck(p, msg)
}
return nil
}
// localVersionMsg creates a version message that can be used to send to the
@ -2032,7 +2081,15 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
}
}
theirNA := p.na
theirNA := p.na.ToLegacy()
// If p.na is a torv3 hidden service address, we'll need to send over
// an empty NetAddress for their address.
if p.na.IsTorV3() {
theirNA = wire.NewNetAddressIPPort(
net.IP([]byte{0, 0, 0, 0}), p.na.Port, p.na.Services,
)
}
// If we are behind a proxy and the connection comes from the proxy then
// we return an unroutable address as their address. This is to prevent
@ -2040,7 +2097,7 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
if p.cfg.Proxy != "" {
proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy)
// invalid proxy means poorly configured, be on the safe side.
if err != nil || p.na.IP.String() == proxyaddress {
if err != nil || p.na.Addr.String() == proxyaddress {
theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0,
theirNA.Services)
}
@ -2092,14 +2149,71 @@ func (p *Peer) writeLocalVersionMsg() error {
return p.writeMessage(localVerMsg, wire.LatestEncoding)
}
// writeSendAddrV2Msg writes our sendaddrv2 message to the remote peer if the
// peer supports protocol version 70016 and above.
func (p *Peer) writeSendAddrV2Msg(pver uint32) error {
if pver < wire.AddrV2Version {
return nil
}
sendAddrMsg := wire.NewMsgSendAddrV2()
return p.writeMessage(sendAddrMsg, wire.LatestEncoding)
}
// waitToFinishNegotiation waits until desired negotiation messages are
// received, recording the remote peer's preference for sendaddrv2 as an
// example. The list of negotiated features can be expanded in the future. If a
// verack is received, negotiation stops and the connection is live.
func (p *Peer) waitToFinishNegotiation(pver uint32) error {
// There are several possible messages that can be received here. We
// could immediately receive verack and be done with the handshake. We
// could receive sendaddrv2 and still have to wait for verack. Or we
// can receive unknown messages before and after sendaddrv2 and still
// have to wait for verack.
for {
remoteMsg, _, err := p.readMessage(wire.LatestEncoding)
if err == wire.ErrUnknownMessage {
continue
} else if err != nil {
return err
}
switch m := remoteMsg.(type) {
case *wire.MsgSendAddrV2:
if pver >= wire.AddrV2Version {
p.flagsMtx.Lock()
p.sendAddrV2 = true
p.flagsMtx.Unlock()
if p.cfg.Listeners.OnSendAddrV2 != nil {
p.cfg.Listeners.OnSendAddrV2(p, m)
}
}
case *wire.MsgVerAck:
// Receiving a verack means we are done with the
// handshake.
p.processRemoteVerAckMsg(m)
return nil
default:
// This is triggered if the peer sends, for example, a
// GETDATA message during this negotiation.
return wire.ErrInvalidHandshake
}
}
}
// negotiateInboundProtocol performs the negotiation protocol for an inbound
// peer. The events should occur in the following order, otherwise an error is
// returned:
//
// 1. Remote peer sends their version.
// 2. We send our version.
// 3. We send our verack.
// 4. Remote peer sends their verack.
// 3. We send sendaddrv2 if their version is >= 70016.
// 4. We send our verack.
// 5. Wait until sendaddrv2 or verack is received. Unknown messages are
// skipped as it could be wtxidrelay or a different message in the future
// that btcd does not implement but bitcoind does.
// 6. If remote peer sent sendaddrv2 above, wait until receipt of verack.
func (p *Peer) negotiateInboundProtocol() error {
if err := p.readRemoteVersionMsg(); err != nil {
return err
@ -2109,12 +2223,22 @@ func (p *Peer) negotiateInboundProtocol() error {
return err
}
var protoVersion uint32
p.flagsMtx.Lock()
protoVersion = p.protocolVersion
p.flagsMtx.Unlock()
if err := p.writeSendAddrV2Msg(protoVersion); err != nil {
return err
}
err := p.writeMessage(wire.NewMsgVerAck(), wire.LatestEncoding)
if err != nil {
return err
}
return p.readRemoteVerAckMsg()
// Finish the negotiation by waiting for negotiable messages or verack.
return p.waitToFinishNegotiation(protoVersion)
}
// negotiateOutboundProtocol performs the negotiation protocol for an outbound
@ -2123,8 +2247,11 @@ func (p *Peer) negotiateInboundProtocol() error {
//
// 1. We send our version.
// 2. Remote peer sends their version.
// 3. Remote peer sends their verack.
// 3. We send sendaddrv2 if their version is >= 70016.
// 4. We send our verack.
// 5. We wait to receive sendaddrv2 or verack, skipping unknown messages as
// in the inbound case.
// 6. If sendaddrv2 was received, wait for receipt of verack.
func (p *Peer) negotiateOutboundProtocol() error {
if err := p.writeLocalVersionMsg(); err != nil {
return err
@ -2134,11 +2261,22 @@ func (p *Peer) negotiateOutboundProtocol() error {
return err
}
if err := p.readRemoteVerAckMsg(); err != nil {
var protoVersion uint32
p.flagsMtx.Lock()
protoVersion = p.protocolVersion
p.flagsMtx.Unlock()
if err := p.writeSendAddrV2Msg(protoVersion); err != nil {
return err
}
return p.writeMessage(wire.NewMsgVerAck(), wire.LatestEncoding)
err := p.writeMessage(wire.NewMsgVerAck(), wire.LatestEncoding)
if err != nil {
return err
}
// Finish the negotiation by waiting for negotiable messages or verack.
return p.waitToFinishNegotiation(protoVersion)
}
// start begins processing input and output messages.
@ -2201,7 +2339,12 @@ func (p *Peer) AssociateConnection(conn net.Conn) {
p.Disconnect()
return
}
p.na = na
// Convert the NetAddress created above into NetAddressV2.
currentNa := wire.NetAddressV2FromBytes(
na.Timestamp, na.Services, na.IP, na.Port,
)
p.na = currentNa
}
go func() {
@ -2267,7 +2410,10 @@ func NewInboundPeer(cfg *Config) *Peer {
return newPeerBase(cfg, true)
}
// NewOutboundPeer returns a new outbound bitcoin peer.
// NewOutboundPeer returns a new outbound bitcoin peer. If the Config argument
// does not set HostToNetAddress, connecting to anything other than an ipv4 or
// ipv6 address will fail and may cause a nil-pointer-dereference. This
// includes hostnames and onion services.
func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) {
p := newPeerBase(cfg, false)
p.addr = addr
@ -2289,7 +2435,12 @@ func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) {
}
p.na = na
} else {
p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), 0)
// If host is an onion hidden service or a hostname, it is
// likely that a nil-pointer-dereference will occur. The caller
// should set HostToNetAddress if connecting to these.
p.na = wire.NetAddressV2FromBytes(
time.Now(), 0, net.ParseIP(host), uint16(port),
)
}
return p, nil

View file

@ -289,18 +289,16 @@ func TestPeerConnection(t *testing.T) {
{
"basic handshake",
func() (*peer.Peer, *peer.Peer, error) {
inConn, outConn := pipe(
&conn{raddr: "10.0.0.1:8333"},
&conn{raddr: "10.0.0.2:8333"},
)
inPeer := peer.NewInboundPeer(peer1Cfg)
inPeer.AssociateConnection(inConn)
outPeer, err := peer.NewOutboundPeer(peer2Cfg, "10.0.0.2:8333")
if err != nil {
return nil, nil, err
}
outPeer.AssociateConnection(outConn)
err = setupPeerConnection(inPeer, outPeer)
if err != nil {
return nil, nil, err
}
for i := 0; i < 4; i++ {
select {
@ -315,18 +313,16 @@ func TestPeerConnection(t *testing.T) {
{
"socks proxy",
func() (*peer.Peer, *peer.Peer, error) {
inConn, outConn := pipe(
&conn{raddr: "10.0.0.1:8333", proxy: true},
&conn{raddr: "10.0.0.2:8333"},
)
inPeer := peer.NewInboundPeer(peer1Cfg)
inPeer.AssociateConnection(inConn)
outPeer, err := peer.NewOutboundPeer(peer2Cfg, "10.0.0.2:8333")
if err != nil {
return nil, nil, err
}
outPeer.AssociateConnection(outConn)
err = setupPeerConnection(inPeer, outPeer)
if err != nil {
return nil, nil, err
}
for i := 0; i < 4; i++ {
select {
@ -359,7 +355,7 @@ func TestPeerConnection(t *testing.T) {
// TestPeerListeners tests that the peer listeners are called as expected.
func TestPeerListeners(t *testing.T) {
verack := make(chan struct{}, 1)
ok := make(chan wire.Message, 20)
ok := make(chan wire.Message, 22)
peerCfg := &peer.Config{
Listeners: peer.MessageListeners{
OnGetAddr: func(p *peer.Peer, msg *wire.MsgGetAddr) {
@ -447,6 +443,12 @@ func TestPeerListeners(t *testing.T) {
OnSendHeaders: func(p *peer.Peer, msg *wire.MsgSendHeaders) {
ok <- msg
},
OnSendAddrV2: func(p *peer.Peer, msg *wire.MsgSendAddrV2) {
ok <- msg
},
OnAddrV2: func(p *peer.Peer, msg *wire.MsgAddrV2) {
ok <- msg
},
},
UserAgentName: "peer",
UserAgentVersion: "1.0",
@ -456,12 +458,7 @@ func TestPeerListeners(t *testing.T) {
TrickleInterval: time.Second * 10,
AllowSelfConns: true,
}
inConn, outConn := pipe(
&conn{raddr: "10.0.0.1:8333"},
&conn{raddr: "10.0.0.2:8333"},
)
inPeer := peer.NewInboundPeer(peerCfg)
inPeer.AssociateConnection(inConn)
peerCfg.Listeners = peer.MessageListeners{
OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
@ -473,7 +470,12 @@ func TestPeerListeners(t *testing.T) {
t.Errorf("NewOutboundPeer: unexpected err %v\n", err)
return
}
outPeer.AssociateConnection(outConn)
err = setupPeerConnection(inPeer, outPeer)
if err != nil {
t.Errorf("setupPeerConnection: failed: %v\n", err)
return
}
for i := 0; i < 2; i++ {
select {
@ -597,6 +599,14 @@ func TestPeerListeners(t *testing.T) {
"OnSendHeaders",
wire.NewMsgSendHeaders(),
},
{
"OnSendAddrV2",
wire.NewMsgSendAddrV2(),
},
{
"OnAddrV2",
wire.NewMsgAddrV2(),
},
}
t.Logf("Running %d tests", len(tests))
for _, test := range tests {
@ -881,17 +891,17 @@ func TestDuplicateVersionMsg(t *testing.T) {
Services: 0,
AllowSelfConns: true,
}
inConn, outConn := pipe(
&conn{laddr: "10.0.0.1:9108", raddr: "10.0.0.2:9108"},
&conn{laddr: "10.0.0.2:9108", raddr: "10.0.0.1:9108"},
)
outPeer, err := peer.NewOutboundPeer(peerCfg, inConn.laddr)
outPeer, err := peer.NewOutboundPeer(peerCfg, "10.0.0.2:8333")
if err != nil {
t.Fatalf("NewOutboundPeer: unexpected err: %v\n", err)
}
outPeer.AssociateConnection(outConn)
inPeer := peer.NewInboundPeer(peerCfg)
inPeer.AssociateConnection(inConn)
err = setupPeerConnection(inPeer, outPeer)
if err != nil {
t.Fatalf("setupPeerConnection failed to connect: %v\n", err)
}
// Wait for the veracks from the initial protocol version negotiation.
for i := 0; i < 2; i++ {
select {
@ -947,17 +957,16 @@ func TestUpdateLastBlockHeight(t *testing.T) {
remotePeerCfg.NewestBlock = func() (*chainhash.Hash, int32, error) {
return &chainhash.Hash{}, remotePeerHeight, nil
}
inConn, outConn := pipe(
&conn{laddr: "10.0.0.1:9108", raddr: "10.0.0.2:9108"},
&conn{laddr: "10.0.0.2:9108", raddr: "10.0.0.1:9108"},
)
localPeer, err := peer.NewOutboundPeer(&peerCfg, inConn.laddr)
localPeer, err := peer.NewOutboundPeer(&peerCfg, "10.0.0.2:8333")
if err != nil {
t.Fatalf("NewOutboundPeer: unexpected err: %v\n", err)
}
localPeer.AssociateConnection(outConn)
inPeer := peer.NewInboundPeer(&remotePeerCfg)
inPeer.AssociateConnection(inConn)
err = setupPeerConnection(inPeer, localPeer)
if err != nil {
t.Fatalf("setupPeerConnection failed to connect: %v\n", err)
}
// Wait for the veracks from the initial protocol version negotiation.
for i := 0; i < 2; i++ {
@ -989,3 +998,214 @@ func TestUpdateLastBlockHeight(t *testing.T) {
remotePeerHeight+1)
}
}
// setupPeerConnection initiates a tcp connection between two peers.
func setupPeerConnection(in, out *peer.Peer) error {
// listenFunc is a function closure that listens for a tcp connection.
// The tcp connection will be the one the inbound peer uses. This will
// be run as a goroutine.
listenFunc := func(l *net.TCPListener, errChan chan error,
listenChan chan struct{}) {
listenChan <- struct{}{}
conn, err := l.Accept()
if err != nil {
errChan <- err
return
}
in.AssociateConnection(conn)
errChan <- nil
}
// dialFunc is a function closure that initiates the tcp connection.
// The tcp connection will be the one the outbound peer uses.
dialFunc := func(addr *net.TCPAddr) error {
conn, err := net.Dial("tcp", addr.String())
if err != nil {
return err
}
out.AssociateConnection(conn)
return nil
}
listenAddr := "localhost:0"
addr, err := net.ResolveTCPAddr("tcp", listenAddr)
if err != nil {
return err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return err
}
errChan := make(chan error, 1)
listenChan := make(chan struct{}, 1)
go listenFunc(l, errChan, listenChan)
<-listenChan
if err := dialFunc(l.Addr().(*net.TCPAddr)); err != nil {
return err
}
select {
case err = <-errChan:
return err
case <-time.After(time.Second * 2):
return errors.New("failed to create connection")
}
}
// TestSendAddrV2Handshake tests that the version-verack handshake with the
// addition of the sendaddrv2 message works as expected.
func TestSendAddrV2Handshake(t *testing.T) {
verack := make(chan struct{}, 2)
sendaddr := make(chan struct{}, 2)
peer1Cfg := &peer.Config{
Listeners: peer.MessageListeners{
OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
verack <- struct{}{}
},
OnSendAddrV2: func(p *peer.Peer,
msg *wire.MsgSendAddrV2) {
sendaddr <- struct{}{}
},
},
AllowSelfConns: true,
ChainParams: &chaincfg.MainNetParams,
}
peer2Cfg := &peer.Config{
Listeners: peer1Cfg.Listeners,
AllowSelfConns: true,
ChainParams: &chaincfg.MainNetParams,
}
verackErr := errors.New("verack timeout")
tests := []struct {
name string
expectsV2 bool
setup func() (*peer.Peer, *peer.Peer, error)
}{
{
"successful sendaddrv2 handshake",
true,
func() (*peer.Peer, *peer.Peer, error) {
inPeer := peer.NewInboundPeer(peer1Cfg)
outPeer, err := peer.NewOutboundPeer(
peer2Cfg, "10.0.0.2:8333",
)
if err != nil {
return nil, nil, err
}
err = setupPeerConnection(inPeer, outPeer)
if err != nil {
return nil, nil, err
}
for i := 0; i < 4; i++ {
select {
case <-sendaddr:
case <-verack:
case <-time.After(time.Second * 2):
return nil, nil, verackErr
}
}
return inPeer, outPeer, nil
},
},
{
"handshake with legacy inbound peer",
false,
func() (*peer.Peer, *peer.Peer, error) {
legacyVersion := wire.AddrV2Version - 1
peer1Cfg.ProtocolVersion = legacyVersion
inPeer := peer.NewInboundPeer(peer1Cfg)
outPeer, err := peer.NewOutboundPeer(
peer2Cfg, "10.0.0.2:8333",
)
if err != nil {
return nil, nil, err
}
err = setupPeerConnection(inPeer, outPeer)
if err != nil {
return nil, nil, err
}
for i := 0; i < 2; i++ {
select {
case <-verack:
case <-time.After(time.Second * 2):
return nil, nil, verackErr
}
}
return inPeer, outPeer, nil
},
},
{
"handshake with legacy outbound peer",
false,
func() (*peer.Peer, *peer.Peer, error) {
inPeer := peer.NewInboundPeer(peer1Cfg)
legacyVersion := wire.AddrV2Version - 1
peer2Cfg.ProtocolVersion = legacyVersion
outPeer, err := peer.NewOutboundPeer(
peer2Cfg, "10.0.0.2:8333",
)
if err != nil {
return nil, nil, err
}
err = setupPeerConnection(inPeer, outPeer)
if err != nil {
return nil, nil, err
}
for i := 0; i < 2; i++ {
select {
case <-verack:
case <-time.After(time.Second * 2):
return nil, nil, verackErr
}
}
return inPeer, outPeer, nil
},
},
}
t.Logf("Running %d tests", len(tests))
for i, test := range tests {
inPeer, outPeer, err := test.setup()
if err != nil {
t.Fatalf("TestSendAddrV2Handshake setup #%d: "+
"unexpected err: %v", i, err)
}
if inPeer.WantsAddrV2() != test.expectsV2 {
t.Fatalf("TestSendAddrV2Handshake #%d expected "+
"wantsAddrV2 to be %v instead was %v", i,
test.expectsV2, inPeer.WantsAddrV2())
} else if outPeer.WantsAddrV2() != test.expectsV2 {
t.Fatalf("TestSendAddrV2Handshake #%d expected "+
"wantsAddrV2 to be %v instead was %v", i,
test.expectsV2, outPeer.WantsAddrV2())
}
inPeer.Disconnect()
outPeer.Disconnect()
inPeer.WaitForDisconnect()
outPeer.WaitForDisconnect()
}
}

View file

@ -344,6 +344,34 @@ func (sp *serverPeer) relayTxDisabled() bool {
// pushAddrMsg sends a legacy addr message to the connected peer using the
// provided addresses.
func (sp *serverPeer) pushAddrMsg(addresses []*wire.NetAddressV2) {
if sp.WantsAddrV2() {
// If the peer supports addrv2, we'll be pushing an addrv2
// message instead. The logic is otherwise identical to the
// addr case below.
addrs := make([]*wire.NetAddressV2, 0, len(addresses))
for _, addr := range addresses {
// Filter addresses already known to the peer.
if sp.addressKnown(addr) {
continue
}
addrs = append(addrs, addr)
}
known, err := sp.PushAddrV2Msg(addrs)
if err != nil {
peerLog.Errorf("Can't push addrv2 message to %s: %v",
sp.Peer, err)
sp.Disconnect()
return
}
// Add the final set of addresses sent to the set the peer
// knows of.
sp.addKnownAddresses(known)
return
}
addrs := make([]*wire.NetAddress, 0, len(addresses))
for _, addr := range addresses {
// Filter addresses already known to the peer.
@ -1328,6 +1356,45 @@ func (sp *serverPeer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) {
sp.server.addrManager.AddAddresses(addrs, sp.NA())
}
// OnAddrV2 is invoked when a peer receives an addrv2 bitcoin message and is
// used to notify the server about advertised addresses.
func (sp *serverPeer) OnAddrV2(_ *peer.Peer, msg *wire.MsgAddrV2) {
// Ignore if simnet for the same reasons as the regular addr message.
if cfg.SimNet {
return
}
// An empty AddrV2 message is invalid.
if len(msg.AddrList) == 0 {
peerLog.Errorf("Command [%s] from %s does not contain any "+
"addresses", msg.Command(), sp.Peer)
sp.Disconnect()
return
}
for _, na := range msg.AddrList {
// Don't add more to the set of known addresses if we're
// disconnecting.
if !sp.Connected() {
return
}
// Set the timestamp to 5 days ago if the timestamp received is
// more than 10 minutes in the future so this address is one of
// the first to be removed.
now := time.Now()
if na.Timestamp.After(now.Add(time.Minute * 10)) {
na.Timestamp = now.Add(-1 * time.Hour * 24 * 5)
}
// Add to the set of known addresses.
sp.addKnownAddresses([]*wire.NetAddressV2{na})
}
// Add the addresses to the addrmanager.
sp.server.addrManager.AddAddresses(msg.AddrList, sp.NA())
}
// OnRead is invoked when a peer receives a message and it is used to update
// the bytes received by the server.
func (sp *serverPeer) OnRead(_ *peer.Peer, bytesRead int, msg wire.Message, err error) {
@ -2074,6 +2141,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config {
OnFilterLoad: sp.OnFilterLoad,
OnGetAddr: sp.OnGetAddr,
OnAddr: sp.OnAddr,
OnAddrV2: sp.OnAddrV2,
OnRead: sp.OnRead,
OnWrite: sp.OnWrite,
OnNotFound: sp.OnNotFound,

View file

@ -32,6 +32,7 @@ const (
CmdVerAck = "verack"
CmdGetAddr = "getaddr"
CmdAddr = "addr"
CmdAddrV2 = "addrv2"
CmdGetBlocks = "getblocks"
CmdInv = "inv"
CmdGetData = "getdata"
@ -78,6 +79,13 @@ const (
// protocol.
var LatestEncoding = WitnessEncoding
// ErrUnknownMessage is the error returned when decoding an unknown message.
var ErrUnknownMessage = fmt.Errorf("received unknown message")
// ErrInvalidHandshake is the error returned when a peer sends us a known
// message that does not belong in the version-verack handshake.
var ErrInvalidHandshake = fmt.Errorf("invalid message during handshake")
// Message is an interface that describes a bitcoin message. A type that
// implements Message has complete control over the representation of its data
// and may therefore contain additional or fewer fields than those which
@ -109,6 +117,9 @@ func makeEmptyMessage(command string) (Message, error) {
case CmdAddr:
msg = &MsgAddr{}
case CmdAddrV2:
msg = &MsgAddrV2{}
case CmdGetBlocks:
msg = &MsgGetBlocks{}
@ -185,7 +196,7 @@ func makeEmptyMessage(command string) (Message, error) {
msg = &MsgCFCheckpt{}
default:
return nil, fmt.Errorf("unhandled command [%s]", command)
return nil, ErrUnknownMessage
}
return msg, nil
}
@ -378,9 +389,10 @@ func ReadMessageWithEncodingN(r io.Reader, pver uint32, btcnet BitcoinNet,
// Create struct of appropriate message type based on the command.
msg, err := makeEmptyMessage(command)
if err != nil {
// makeEmptyMessage can only return ErrUnknownMessage and it is
// important that we bubble it up to the caller.
discardInput(r, hdr.length)
return totalBytes, nil, nil, messageError("ReadMessage",
err.Error())
return totalBytes, nil, nil, err
}
// Check for maximum length based on the message type as a malicious client

View file

@ -295,7 +295,7 @@ func TestReadMessageWireErrors(t *testing.T) {
pver,
btcnet,
len(unsupportedCommandBytes),
&MessageError{},
ErrUnknownMessage,
24,
},
@ -345,7 +345,7 @@ func TestReadMessageWireErrors(t *testing.T) {
pver,
btcnet,
len(discardBytes),
&MessageError{},
ErrUnknownMessage,
24,
},
}

102
wire/msgaddrv2.go Normal file
View file

@ -0,0 +1,102 @@
package wire
import (
"fmt"
"io"
)
// MaxV2AddrPerMsg is the maximum number of version 2 addresses that will exist
// in a single addrv2 message (MsgAddrV2).
const MaxV2AddrPerMsg = 1000
// MsgAddrV2 implements the Message interface and represents a bitcoin addrv2
// message that can support longer-length addresses like torv3, cjdns, and i2p.
// It is used to gossip addresses on the network. Each message is limited to
// MaxV2AddrPerMsg addresses. This is the same limit as MsgAddr.
type MsgAddrV2 struct {
AddrList []*NetAddressV2
}
// BtcDecode decodes r using the bitcoin protocol into a MsgAddrV2.
func (m *MsgAddrV2) BtcDecode(r io.Reader, pver uint32,
enc MessageEncoding) error {
count, err := ReadVarInt(r, pver)
if err != nil {
return err
}
// Limit to max addresses per message.
if count > MaxV2AddrPerMsg {
str := fmt.Sprintf("too many addresses for message [count %v,"+
" max %v]", count, MaxV2AddrPerMsg)
return messageError("MsgAddrV2.BtcDecode", str)
}
addrList := make([]NetAddressV2, count)
m.AddrList = make([]*NetAddressV2, 0, count)
for i := uint64(0); i < count; i++ {
na := &addrList[i]
err := readNetAddressV2(r, pver, na)
switch err {
case ErrSkippedNetworkID:
// This may be a network ID we don't know of, but is
// still valid. We can safely skip those.
continue
case ErrInvalidAddressSize:
// The encoding used by the peer does not follow
// BIP-155 and we should stop processing this message.
return err
}
m.AddrList = append(m.AddrList, na)
}
return nil
}
// BtcEncode encodes the MsgAddrV2 into a writer w.
func (m *MsgAddrV2) BtcEncode(w io.Writer, pver uint32,
enc MessageEncoding) error {
count := len(m.AddrList)
if count > MaxV2AddrPerMsg {
str := fmt.Sprintf("too many addresses for message [count %v,"+
" max %v]", count, MaxV2AddrPerMsg)
return messageError("MsgAddrV2.BtcEncode", str)
}
err := WriteVarInt(w, pver, uint64(count))
if err != nil {
return err
}
for _, na := range m.AddrList {
err = writeNetAddressV2(w, pver, na)
if err != nil {
return err
}
}
return nil
}
// Command returns the protocol command string for MsgAddrV2.
func (m *MsgAddrV2) Command() string {
return CmdAddrV2
}
// MaxPayloadLength returns the maximum length payload possible for MsgAddrV2.
func (m *MsgAddrV2) MaxPayloadLength(pver uint32) uint32 {
// The varint that can store the maximum number of addresses is 3 bytes
// long. The maximum payload is then 3 + 1000 * maxNetAddressV2Payload.
return 3 + (MaxV2AddrPerMsg * maxNetAddressV2Payload())
}
// NewMsgAddrV2 returns a new bitcoin addrv2 message that conforms to the
// Message interface.
func NewMsgAddrV2() *MsgAddrV2 {
return &MsgAddrV2{
AddrList: make([]*NetAddressV2, 0, MaxV2AddrPerMsg),
}
}

73
wire/msgaddrv2_test.go Normal file
View file

@ -0,0 +1,73 @@
package wire
import (
"bytes"
"io"
"testing"
)
// TestAddrV2Decode checks that decoding an addrv2 message off the wire behaves
// as expected. This means ignoring certain addresses, and failing in certain
// failure scenarios.
func TestAddrV2Decode(t *testing.T) {
tests := []struct {
buf []byte
expectedError bool
expectedAddrs int
}{
// Exceeding max addresses.
{
[]byte{0xfd, 0xff, 0xff},
true,
0,
},
// Invalid address size.
{
[]byte{0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x05},
true,
0,
},
// One valid address and one skipped address
{
[]byte{
0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x04,
0x7f, 0x00, 0x00, 0x01, 0x22, 0x22, 0x00, 0x00,
0x00, 0x00, 0x00, 0x02, 0x10, 0xfd, 0x87, 0xd8,
0x7e, 0xeb, 0x43, 0xff, 0xfe, 0xcc, 0x39, 0xa8,
0x73, 0x69, 0x15, 0xff, 0xff, 0x22, 0x22,
},
false,
1,
},
}
t.Logf("Running %d tests", len(tests))
for i, test := range tests {
r := bytes.NewReader(test.buf)
m := &MsgAddrV2{}
err := m.BtcDecode(r, 0, LatestEncoding)
if test.expectedError {
if err == nil {
t.Errorf("Test #%d expected error", i)
}
continue
} else if err != nil {
t.Errorf("Test #%d unexpected error %v", i, err)
}
// Trying to read more should give EOF.
var b [1]byte
if _, err := r.Read(b[:]); err != io.EOF {
t.Errorf("Test #%d did not cleanly finish reading", i)
}
if len(m.AddrList) != test.expectedAddrs {
t.Errorf("Test #%d expected %d addrs, instead of %d",
i, test.expectedAddrs, len(m.AddrList))
}
}
}

View file

@ -13,7 +13,7 @@ import (
// XXX pedro: we will probably need to bump this.
const (
// ProtocolVersion is the latest protocol version this package supports.
ProtocolVersion uint32 = 70013
ProtocolVersion uint32 = 70016
// MultipleAddressVersion is the protocol version which added multiple
// addresses per message (pver >= MultipleAddressVersion).
@ -51,6 +51,13 @@ const (
// FeeFilterVersion is the protocol version which added a new
// feefilter message.
FeeFilterVersion uint32 = 70013
// AddrV2Version is the protocol version which added two new messages.
// sendaddrv2 is sent during the version-verack handshake and signals
// support for sending and receiving the addrv2 message. In the future,
// new messages that occur during the version-verack handshake will not
// come with a protocol version bump.
AddrV2Version uint32 = 70016
)
// ServiceFlag identifies services supported by a bitcoin peer.