lnd/server.go
Olaoluwa Osuntokun 913ae259de
lnwire: document commitment+HTLC update msgs
This commit adds some cursory documentation along wit minor field
modifications to all messages which deal with adding HTLC’s, or
updating remote commitment transactions.

The messages for dual funding of channel is left purposefully
undocumented as all initial negotiations will be single funder by
default.

A revamp of the testing infrastructure of lnwire will be committed in
the near future.
2016-05-23 13:54:41 -07:00

283 lines
5.6 KiB
Go

package main
import (
"encoding/hex"
"fmt"
"net"
"sync"
"sync/atomic"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lndc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcwallet/waddrmgr"
)
// server...
type server struct {
started int32 // atomic
shutdown int32 // atomic
longTermPriv *btcec.PrivateKey
listeners []net.Listener
peers map[int32]*peer
rpcServer *rpcServer
lnwallet *lnwallet.LightningWallet
chanDB *channeldb.DB
newPeers chan *peer
donePeers chan *peer
queries chan interface{}
wg sync.WaitGroup
quit chan struct{}
}
// newServer...
func newServer(listenAddrs []string, wallet *lnwallet.LightningWallet,
chanDB *channeldb.DB) (*server, error) {
privKey, err := getIdentityPrivKey(chanDB, wallet)
if err != nil {
return nil, err
}
listeners := make([]net.Listener, len(listenAddrs))
for i, addr := range listenAddrs {
listeners[i], err = lndc.NewListener(privKey, addr)
if err != nil {
return nil, err
}
}
s := &server{
chanDB: chanDB,
longTermPriv: privKey,
listeners: listeners,
peers: make(map[int32]*peer),
newPeers: make(chan *peer, 100),
donePeers: make(chan *peer, 100),
lnwallet: wallet,
queries: make(chan interface{}),
quit: make(chan struct{}),
}
s.rpcServer = newRpcServer(s)
return s, nil
}
// addPeer...
func (s *server) addPeer(p *peer) {
if p == nil {
return
}
// Ignore new peers if we're shutting down.
if atomic.LoadInt32(&s.shutdown) != 0 {
p.Stop()
return
}
s.peers[p.peerId] = p
}
// removePeer...
func (s *server) removePeer(p *peer) {
}
// peerManager...
func (s *server) peerManager() {
out:
for {
select {
// New peers.
case p := <-s.newPeers:
s.addPeer(p)
// Finished peers.
case p := <-s.donePeers:
s.removePeer(p)
case <-s.quit:
break out
}
}
s.wg.Done()
}
// connectPeerMsg...
type connectPeerMsg struct {
addr *lndc.LNAdr
reply chan error
}
// queryHandler...
func (s *server) queryHandler() {
out:
for {
select {
case query := <-s.queries:
switch msg := query.(type) {
case *connectPeerMsg:
addr := msg.addr
// Ensure we're not already connected to this
// peer.
for _, peer := range s.peers {
if peer.lightningAddr.String() ==
addr.String() {
msg.reply <- fmt.Errorf(
"already connected to peer: %v",
peer.lightningAddr,
)
}
}
// Launch a goroutine to connect to the requested
// peer so we can continue to handle queries.
go func() {
// For the lndc crypto handshake, we
// either need a compressed pubkey, or a
// 20-byte pkh.
var remoteId []byte
if addr.PubKey == nil {
remoteId = addr.Base58Adr.ScriptAddress()
} else {
remoteId = addr.PubKey.SerializeCompressed()
}
// Attempt to connect to the remote
// node. If the we can't make the
// connection, or the crypto negotation
// breaks down, then return an error to the
// caller.
ipAddr := addr.NetAddr.String()
conn := lndc.NewConn(nil)
if err := conn.Dial(
s.longTermPriv, ipAddr, remoteId); err != nil {
msg.reply <- err
}
// Now that we've established a connection,
// create a peer, and it to the set of
// currently active peers.
peer := newPeer(conn, s)
s.newPeers <- peer
msg.reply <- nil
}()
}
case <-s.quit:
break out
}
}
s.wg.Done()
}
// ConnectToPeer...
func (s *server) ConnectToPeer(addr *lndc.LNAdr) error {
reply := make(chan error, 1)
s.queries <- &connectPeerMsg{addr, reply}
return <-reply
}
// AddPeer...
func (s *server) AddPeer(p *peer) {
s.newPeers <- p
}
// listener...
func (s *server) listener(l net.Listener) {
srvrLog.Infof("Server listening on %s", l.Addr())
for atomic.LoadInt32(&s.shutdown) == 0 {
conn, err := l.Accept()
if err != nil {
// Only log the error message if we aren't currently
// shutting down.
if atomic.LoadInt32(&s.shutdown) == 0 {
srvrLog.Errorf("Can't accept connection: %v", err)
}
continue
}
srvrLog.Tracef("New inbound connection from %v", conn.RemoteAddr())
peer := newPeer(conn, s)
peer.Start()
s.newPeers <- peer
}
s.wg.Done()
}
// Start...
func (s *server) Start() {
// Already running?
if atomic.AddInt32(&s.started, 1) != 1 {
return
}
// Start all the listeners.
for _, l := range s.listeners {
s.wg.Add(1)
go s.listener(l)
}
s.wg.Add(2)
go s.peerManager()
go s.queryHandler()
}
// Stop...
func (s *server) Stop() error {
// Bail if we're already shutting down.
if atomic.AddInt32(&s.shutdown, 1) != 1 {
return nil
}
// Stop all the listeners.
for _, listener := range s.listeners {
if err := listener.Close(); err != nil {
return err
}
}
s.rpcServer.Stop()
s.lnwallet.Shutdown()
// Signal all the lingering goroutines to quit.
close(s.quit)
return nil
}
// getIdentityPrivKey gets the identity private key out of the wallet DB.
func getIdentityPrivKey(c *channeldb.DB, w *lnwallet.LightningWallet) (*btcec.PrivateKey, error) {
adr, err := c.GetIdAdr()
if err != nil {
return nil, err
}
ltndLog.Infof("got ID address: %s", adr.String())
adr2, err := w.Manager.Address(adr)
if err != nil {
return nil, err
}
ltndLog.Infof("pubkey: %v", hex.EncodeToString(adr2.(waddrmgr.ManagedPubKeyAddress).PubKey().SerializeCompressed()))
priv, err := adr2.(waddrmgr.ManagedPubKeyAddress).PrivKey()
if err != nil {
return nil, err
}
return priv, nil
}
// WaitForShutdown blocks all goroutines have been stopped.
func (s *server) WaitForShutdown() {
s.wg.Wait()
}