From d98430d8cac454205e100a313727751ee00b148b Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 3 Nov 2016 22:51:07 -0500 Subject: [PATCH] connmgr: Implement inbound connection handling. This modifies the connection manager to provide support for accepting inbound connections on a caller-provided set of listeners and notify the caller via a callback. This is only the minimum work necessary to get inbound support into the connection manager. The intention for future commits is to move more connection-related logic such as limiting the maximum number of overall connections and banned peer tracking into the connection manager. --- connmgr/connmanager.go | 61 ++++++++++++++++++++++++++++++++++++++++++ server.go | 49 ++++++++------------------------- 2 files changed, 72 insertions(+), 38 deletions(-) diff --git a/connmgr/connmanager.go b/connmgr/connmanager.go index 614185cc..33133734 100644 --- a/connmgr/connmanager.go +++ b/connmgr/connmanager.go @@ -96,6 +96,29 @@ func (c *ConnReq) String() string { // Config holds the configuration options related to the connection manager. type Config struct { + // Listeners defines a slice of listeners for which the connection + // manager will take ownership of and accept connections. When a + // connection is accepted, the OnAccept handler will be invoked with the + // connection. Since the connection manager takes ownership of these + // listeners, they will be closed when the connection manager is + // stopped. + // + // This field will not have any effect if the OnAccept field is not + // also specified. It may be nil if the caller does not wish to listen + // for incoming connections. + Listeners []net.Listener + + // OnAccept is a callback that is fired when an inbound connection is + // accepted. It is the caller's responsibility to close the connection. + // Failure to close the connection will result in the connection manager + // believing the connection is still active and thus have undesirable + // side effects such as still counting toward maximum connection limits. + // + // This field will not have any effect if the Listeners field is not + // also specified since there couldn't possibly be any accepted + // connections in that case. + OnAccept func(net.Conn) + // TargetOutbound is the number of outbound network connections to // maintain. Defaults to 8. TargetOutbound uint32 @@ -306,6 +329,26 @@ func (cm *ConnManager) Remove(id uint64) { cm.requests <- handleDisconnected{id, false} } +// listenHandler accepts incoming connections on a given listener. It must be +// run as a goroutine. +func (cm *ConnManager) listenHandler(listener net.Listener) { + log.Infof("Server listening on %s", listener.Addr()) + for atomic.LoadInt32(&cm.stop) == 0 { + conn, err := listener.Accept() + if err != nil { + // Only log the error if not forcibly shutting down. + if atomic.LoadInt32(&cm.stop) == 0 { + log.Errorf("Can't accept connection: %v", err) + } + continue + } + go cm.cfg.OnAccept(conn) + } + + cm.wg.Done() + log.Tracef("Listener handler done for %s", listener.Addr()) +} + // Start launches the connection manager and begins connecting to the network. func (cm *ConnManager) Start() { // Already started? @@ -317,6 +360,15 @@ func (cm *ConnManager) Start() { cm.wg.Add(1) go cm.connHandler() + // Start all the listeners so long as the caller requested them and + // provided a callback to be invoked when connections are accepted. + if cm.cfg.OnAccept != nil { + for _, listner := range cm.cfg.Listeners { + cm.wg.Add(1) + go cm.listenHandler(listner) + } + } + for i := atomic.LoadUint64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ { go cm.NewConnReq() } @@ -333,6 +385,15 @@ func (cm *ConnManager) Stop() { log.Warnf("Connection manager already stopped") return } + + // Stop all the listeners. There will not be any listeners if + // listening is disabled. + for _, listener := range cm.cfg.Listeners { + // Ignore the error since this is shutdown and there is no way + // to recover anyways. + _ = listener.Close() + } + close(cm.quit) log.Trace("Connection manager stopped") } diff --git a/server.go b/server.go index 69c32a2a..aaf7bff7 100644 --- a/server.go +++ b/server.go @@ -145,7 +145,6 @@ type server struct { shutdown int32 shutdownSched int32 - listeners []net.Listener chainParams *chaincfg.Params addrManager *addrmgr.AddrManager connManager *connmgr.ConnManager @@ -1582,26 +1581,15 @@ func newPeerConfig(sp *serverPeer) *peer.Config { } } -// listenHandler is the main listener which accepts incoming connections for the -// server. It must be run as a goroutine. -func (s *server) listenHandler(listener net.Listener) { - srvrLog.Infof("Server listening on %s", listener.Addr()) - for atomic.LoadInt32(&s.shutdown) == 0 { - conn, err := listener.Accept() - if err != nil { - // Only log the error if we're not forcibly shutting down. - if atomic.LoadInt32(&s.shutdown) == 0 { - srvrLog.Errorf("Can't accept connection: %v", err) - } - continue - } - sp := newServerPeer(s, false) - sp.Peer = peer.NewInboundPeer(newPeerConfig(sp)) - sp.AssociateConnection(conn) - go s.peerDoneHandler(sp) - } - s.wg.Done() - srvrLog.Tracef("Listener handler done for %s", listener.Addr()) +// inboundPeerConnected is invoked by the connection manager when a new inbound +// connection is established. It initializes a new inbound server peer +// instance, associates it with the connection, and starts a goroutine to wait +// for disconnection. +func (s *server) inboundPeerConnected(conn net.Conn) { + sp := newServerPeer(s, false) + sp.Peer = peer.NewInboundPeer(newPeerConfig(sp)) + sp.AssociateConnection(conn) + go s.peerDoneHandler(sp) } // outboundPeerConnected is invoked by the connection manager when a new @@ -1966,13 +1954,6 @@ func (s *server) Start() { srvrLog.Trace("Starting server") - // Start all the listeners. There will not be any if listening is - // disabled. - for _, listener := range s.listeners { - s.wg.Add(1) - go s.listenHandler(listener) - } - // Start the peer handler which in turn starts the address and block // managers. s.wg.Add(1) @@ -2010,15 +1991,6 @@ func (s *server) Stop() error { srvrLog.Warnf("Server shutting down") - // Stop all the listeners. There will not be any listeners if - // listening is disabled. - for _, listener := range s.listeners { - err := listener.Close() - if err != nil { - return err - } - } - // Stop the CPU miner if needed s.cpuMiner.Stop() @@ -2317,7 +2289,6 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param } s := server{ - listeners: listeners, chainParams: chainParams, addrManager: amgr, newPeers: make(chan *serverPeer, cfg.MaxPeers), @@ -2469,6 +2440,8 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param targetOutbound = cfg.MaxPeers } cmgr, err := connmgr.New(&connmgr.Config{ + Listeners: listeners, + OnAccept: s.inboundPeerConnected, RetryDuration: connectionRetryInterval, TargetOutbound: uint32(targetOutbound), Dial: btcdDial,