From 73d353247c2e3ccff1fbe725bdc3580450ea12ac Mon Sep 17 00:00:00 2001 From: Jonathan Gillham Date: Wed, 3 Feb 2016 14:24:28 +0000 Subject: [PATCH] peer: Consolidate Connect, Disconnect, Start, Shutdown public methods. This commit does not change functionality. It makes the creation of inbound and outbound peers more homogeneous. As a result the Start method of peer was removed as it was found not to be necessary. This is the first of several pull requests/commits designed to make the peer public API and internals less complex. --- peer/doc.go | 17 +++++++---------- peer/example_test.go | 13 +++++++------ peer/peer.go | 42 ++++++++++++++---------------------------- peer/peer_test.go | 41 +++++++++++++++++------------------------ server.go | 44 ++++++++++++++++++++++++-------------------- 5 files changed, 69 insertions(+), 88 deletions(-) diff --git a/peer/doc.go b/peer/doc.go index 70bd3d98..bff4dda6 100644 --- a/peer/doc.go +++ b/peer/doc.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015 The btcsuite developers +// Copyright (c) 2015-2016 The btcsuite developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -66,15 +66,12 @@ This provides high flexibility for things such as connecting via proxies, acting as a proxy, creating bridge peers, choosing whether to listen for inbound peers, etc. -For outgoing peers, the NewOutboundPeer function must be used to specify the -configuration followed by invoking Connect with the net.Conn instance. This - will start all async I/O goroutines and initiate the initial negotiation -process. Once that has been completed, the peer is fully functional. - -For inbound peers, the NewInboundPeer function must be used to specify the -configuration and net.Conn instance followed by invoking Start. This will start -all async I/O goroutines and listen for the initial negotiation process. Once -that has been completed, the peer is fully functional. +NewOutboundPeer and NewInboundPeer functions must be followed by calling Connect +with a net.Conn instance to the peer. This will start all async I/O goroutines +and initiate the protocol negotiation process. Once finished with the peer call +Disconnect to disconnect from the peer and clean up all resources. +WaitForDisconnect can be used to block until peer disconnection and resource +cleanup has completed. Callbacks diff --git a/peer/example_test.go b/peer/example_test.go index 2abfaf94..7ca8f74c 100644 --- a/peer/example_test.go +++ b/peer/example_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015 The btcsuite developers +// Copyright (c) 2015-2016 The btcsuite developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -38,9 +38,9 @@ func mockRemotePeer() error { } // Create and start the inbound peer. - p := peer.NewInboundPeer(peerCfg, conn) - if err := p.Start(); err != nil { - fmt.Printf("Start: error %v\n", err) + p := peer.NewInboundPeer(peerCfg) + if err := p.Connect(conn); err != nil { + fmt.Printf("Connect: error %v\n", err) return } }() @@ -105,8 +105,9 @@ func Example_newOutboundPeer() { fmt.Printf("Example_peerConnection: verack timeout") } - // Shutdown the peer. - p.Shutdown() + // Disconnect the peer. + p.Disconnect() + p.WaitForDisconnect() // Output: // outbound: received version diff --git a/peer/peer.go b/peer/peer.go index 64ab4457..f0ed314c 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -387,8 +387,7 @@ type HostToNetAddrFunc func(host string, port uint16, // of specific types that typically require common special handling are // provided as a convenience. type Peer struct { - // The following variables must only be used atomically - started int32 + // The following variables must only be used atomically. connected int32 disconnect int32 bytesReceived uint64 @@ -1943,11 +1942,14 @@ func (p *Peer) Connect(conn net.Conn) error { return nil } + if p.inbound { + p.addr = conn.RemoteAddr().String() + } p.conn = conn p.timeConnected = time.Now() atomic.AddInt32(&p.connected, 1) - return p.Start() + return p.start() } // Connected returns whether or not the peer is currently connected. @@ -1975,18 +1977,12 @@ func (p *Peer) Disconnect() { // Start begins processing input and output messages. It also sends the initial // version message for outbound connections to start the negotiation process. -func (p *Peer) Start() error { - // Already started? - if atomic.AddInt32(&p.started, 1) != 1 { - return nil - } - +func (p *Peer) start() error { log.Tracef("Starting peer %s", p) // Send an initial version message if this is an outbound connection. if !p.inbound { - err := p.pushVersionMsg() - if err != nil { + if err := p.pushVersionMsg(); err != nil { log.Errorf("Can't send outbound version message %v", err) p.Disconnect() return err @@ -2002,16 +1998,11 @@ func (p *Peer) Start() error { return nil } -// Shutdown gracefully shuts down the peer by disconnecting it. -func (p *Peer) Shutdown() { - log.Tracef("Shutdown peer %s", p) - p.Disconnect() -} - -// WaitForShutdown waits until the peer has completely shutdown. This will -// happen if either the local or remote side has been disconnected or the peer -// is forcibly shutdown via Shutdown. -func (p *Peer) WaitForShutdown() { +// WaitForDisconnect waits until the peer has completely disconnected and all +// resources are cleaned up. This will happen if either the local or remote +// side has been disconnected or the peer is forcibly disconnected via +// Disconnect. +func (p *Peer) WaitForDisconnect() { <-p.quit } @@ -2052,13 +2043,8 @@ func newPeerBase(cfg *Config, inbound bool) *Peer { // NewInboundPeer returns a new inbound bitcoin peer. Use Start to begin // processing incoming and outgoing messages. -func NewInboundPeer(cfg *Config, conn net.Conn) *Peer { - p := newPeerBase(cfg, true) - p.conn = conn - p.addr = conn.RemoteAddr().String() - p.timeConnected = time.Now() - atomic.AddInt32(&p.connected, 1) - return p +func NewInboundPeer(cfg *Config) *Peer { + return newPeerBase(cfg, true) } // NewOutboundPeer returns a new outbound bitcoin peer. diff --git a/peer/peer_test.go b/peer/peer_test.go index 11bb4090..162dfebf 100644 --- a/peer/peer_test.go +++ b/peer/peer_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015 The btcsuite developers +// Copyright (c) 2015-2016 The btcsuite developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -244,9 +244,8 @@ func TestPeerConnection(t *testing.T) { &conn{raddr: "10.0.0.1:8333"}, &conn{raddr: "10.0.0.2:8333"}, ) - inPeer := peer.NewInboundPeer(peerCfg, inConn) - err := inPeer.Start() - if err != nil { + inPeer := peer.NewInboundPeer(peerCfg) + if err := inPeer.Connect(inConn); err != nil { return nil, nil, err } outPeer, err := peer.NewOutboundPeer(peerCfg, "10.0.0.2:8333") @@ -256,6 +255,7 @@ func TestPeerConnection(t *testing.T) { if err := outPeer.Connect(outConn); err != nil { return nil, nil, err } + for i := 0; i < 2; i++ { select { case <-verack: @@ -273,9 +273,8 @@ func TestPeerConnection(t *testing.T) { &conn{raddr: "10.0.0.1:8333", proxy: true}, &conn{raddr: "10.0.0.2:8333"}, ) - inPeer := peer.NewInboundPeer(peerCfg, inConn) - err := inPeer.Start() - if err != nil { + inPeer := peer.NewInboundPeer(peerCfg) + if err := inPeer.Connect(inConn); err != nil { return nil, nil, err } outPeer, err := peer.NewOutboundPeer(peerCfg, "10.0.0.2:8333") @@ -306,8 +305,8 @@ func TestPeerConnection(t *testing.T) { testPeer(t, inPeer, wantStats) testPeer(t, outPeer, wantStats) - inPeer.Shutdown() - outPeer.Shutdown() + inPeer.Disconnect() + outPeer.Disconnect() } } @@ -390,9 +389,8 @@ func TestPeerListeners(t *testing.T) { &conn{raddr: "10.0.0.1:8333"}, &conn{raddr: "10.0.0.2:8333"}, ) - inPeer := peer.NewInboundPeer(peerCfg, inConn) - err := inPeer.Start() - if err != nil { + inPeer := peer.NewInboundPeer(peerCfg) + if err := inPeer.Connect(inConn); err != nil { t.Errorf("TestPeerListeners: unexpected err %v\n", err) return } @@ -513,8 +511,8 @@ func TestPeerListeners(t *testing.T) { return } } - inPeer.Shutdown() - outPeer.Shutdown() + inPeer.Disconnect() + outPeer.Disconnect() } // TestOutboundPeer tests that the outbound peer works as expected. @@ -542,22 +540,17 @@ func TestOutboundPeer(t *testing.T) { return } - // Test Connect err wantErr := errBlockNotFound if err := p.Connect(c); err != wantErr { t.Errorf("Connect: expected err %v, got %v\n", wantErr, err) return } - // Test already connected + + // Test already connected. if err := p.Connect(c); err != nil { t.Errorf("Connect: unexpected err %v\n", err) return } - // Test already started - if err := p.Start(); err != nil { - t.Errorf("Start: unexpected err %v\n", err) - return - } // Test Queue Inv fakeBlockHash := &wire.ShaHash{0x00, 0x01} @@ -572,7 +565,7 @@ func TestOutboundPeer(t *testing.T) { done := make(chan struct{}) p.QueueMessage(fakeMsg, done) <-done - p.Shutdown() + p.Disconnect() // Test NewestBlock var newestBlock = func() (*wire.ShaHash, int32, error) { @@ -612,7 +605,7 @@ func TestOutboundPeer(t *testing.T) { // Test Queue Inv after connection p1.QueueInventory(fakeInv) - p1.Shutdown() + p1.Disconnect() // Test regression peerCfg.ChainParams = &chaincfg.RegressionNetParams @@ -657,7 +650,7 @@ func TestOutboundPeer(t *testing.T) { p2.QueueMessage(wire.NewMsgGetData(), nil) p2.QueueMessage(wire.NewMsgGetHeaders(), nil) - p2.Shutdown() + p2.Disconnect() } func init() { diff --git a/server.go b/server.go index 0bb6cfa5..dcbe15f2 100644 --- a/server.go +++ b/server.go @@ -1038,9 +1038,8 @@ func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool { // Ignore new peers if we're shutting down. if atomic.LoadInt32(&s.shutdown) != 0 { - srvrLog.Infof("New peer %s ignored - server is shutting "+ - "down", sp) - sp.Shutdown() + srvrLog.Infof("New peer %s ignored - server is shutting down", sp) + sp.Disconnect() return false } @@ -1048,14 +1047,14 @@ func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool { host, _, err := net.SplitHostPort(sp.Addr()) if err != nil { srvrLog.Debugf("can't split hostport %v", err) - sp.Shutdown() + sp.Disconnect() return false } if banEnd, ok := state.banned[host]; ok { if time.Now().Before(banEnd) { - srvrLog.Debugf("Peer %s is banned for another %v - "+ - "disconnecting", host, banEnd.Sub(time.Now())) - sp.Shutdown() + srvrLog.Debugf("Peer %s is banned for another %v - disconnecting", + host, banEnd.Sub(time.Now())) + sp.Disconnect() return false } @@ -1070,16 +1069,16 @@ func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool { if state.OutboundCount() >= state.maxOutboundPeers { srvrLog.Infof("Max outbound peers reached [%d] - disconnecting "+ "peer %s", state.maxOutboundPeers, sp) - sp.Shutdown() + sp.Disconnect() return false } } // Limit max number of total peers. if state.Count() >= cfg.MaxPeers { - srvrLog.Infof("Max peers reached [%d] - disconnecting "+ - "peer %s", cfg.MaxPeers, sp) - sp.Shutdown() + srvrLog.Infof("Max peers reached [%d] - disconnecting peer %s", + cfg.MaxPeers, sp) + sp.Disconnect() // TODO(oga) how to handle permanent peers here? // they should be rescheduled. return false @@ -1415,15 +1414,19 @@ func (s *server) listenHandler(listener net.Listener) { if err != nil { // Only log the error if we're not forcibly shutting down. if atomic.LoadInt32(&s.shutdown) == 0 { - srvrLog.Errorf("can't accept connection: %v", - err) + srvrLog.Errorf("Can't accept connection: %v", err) } continue } sp := newServerPeer(s, false) - sp.Peer = peer.NewInboundPeer(newPeerConfig(sp), conn) - sp.Start() + sp.Peer = peer.NewInboundPeer(newPeerConfig(sp)) go s.peerDoneHandler(sp) + if err := sp.Connect(conn); err != nil { + if atomic.LoadInt32(&s.shutdown) == 0 { + srvrLog.Errorf("Can't accept connection: %v", err) + } + continue + } } s.wg.Done() srvrLog.Tracef("Listener handler done for %s", listener.Addr()) @@ -1502,7 +1505,7 @@ func (s *server) peerConnHandler(sp *serverPeer) { // peerDoneHandler handles peer disconnects by notifiying the server that it's // done. func (s *server) peerDoneHandler(sp *serverPeer) { - sp.WaitForShutdown() + sp.WaitForDisconnect() s.donePeers <- sp // Only tell block manager we are gone if we ever told it we existed. @@ -1639,11 +1642,11 @@ out: case qmsg := <-s.query: s.handleQuery(state, qmsg) - // Shutdown the peer handler. case <-s.quit: - // Shutdown peers. + // Disconnect all peers on server shutdown. state.forAllPeers(func(sp *serverPeer) { - sp.Shutdown() + srvrLog.Tracef("Shutdown peer %s", sp.Peer) + sp.Disconnect() }) break out } @@ -1660,7 +1663,8 @@ out: if !state.NeedMoreOutbound() || len(cfg.ConnectPeers) > 0 || atomic.LoadInt32(&s.shutdown) != 0 { state.forPendingPeers(func(sp *serverPeer) { - sp.Shutdown() + srvrLog.Tracef("Shutdown peer %s", sp.Peer) + sp.Disconnect() }) continue }