diff --git a/LICENSE b/LICENSE new file mode 100644 index 00000000..0d760cbb --- /dev/null +++ b/LICENSE @@ -0,0 +1,13 @@ +Copyright (c) 2013 Conformal Systems LLC. + +Permission to use, copy, modify, and distribute this software for any +purpose with or without fee is hereby granted, provided that the above +copyright notice and this permission notice appear in all copies. + +THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md index 0c901ca9..889faff6 100644 --- a/README.md +++ b/README.md @@ -2,3 +2,18 @@ btcd ==== btcd is an alternative full node bitcoin implementation written in Go (golang). + +This project is currently under active development and is not production ready +yet. + +## TODO + +The following is a list of major items remaining before production release: + +- Implement multi-peer support +- Implement relay +- Complete address manager +- Rework the block syncing code to work with headers +- Documentation +- A lot of code cleanup +- Optimize diff --git a/btcd/addrmanager.go b/btcd/addrmanager.go new file mode 100644 index 00000000..394fd914 --- /dev/null +++ b/btcd/addrmanager.go @@ -0,0 +1,183 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main + +import ( + "github.com/conformal/btcwire" + "net" + "strconv" + "sync" + "time" +) + +const ( + // maxAddresses identifies the maximum number of addresses that the + // address manager will track. + maxAddresses = 2500 + newAddressBufferSize = 50 + dumpAddressInterval = time.Minute * 2 +) + +// updateAddress is a helper function to either update an address already known +// to the address manager, or to add the address if not already known. +func updateAddress(a *AddrManager, netAddr *btcwire.NetAddress) { + // Protect concurrent access. + a.addrCacheLock.Lock() + defer a.addrCacheLock.Unlock() + + // Update address if it already exists. + addr := NetAddressKey(netAddr) + if na, ok := a.addrCache[addr]; ok { + // Update the last seen time. + if netAddr.Timestamp.After(na.Timestamp) { + na.Timestamp = netAddr.Timestamp + } + + // Update services. + na.AddService(na.Services) + + log.Tracef("[AMGR] Updated address manager address %s", addr) + return + } + + // Enforce max addresses. + if len(a.addrCache)+1 > maxAddresses { + log.Tracef("[AMGR] Max addresses of %d reached", maxAddresses) + return + } + + a.addrCache[addr] = netAddr + log.Tracef("[AMGR] Added new address %s for a total of %d addresses", + addr, len(a.addrCache)) +} + +// AddrManager provides a concurrency safe address manager for caching potential +// peers on the bitcoin network. +type AddrManager struct { + addrCache map[string]*btcwire.NetAddress + addrCacheLock sync.Mutex + started bool + shutdown bool + newAddresses chan []*btcwire.NetAddress + removeAddresses chan []*btcwire.NetAddress + wg sync.WaitGroup + quit chan bool +} + +// addressHandler is the main handler for the address manager. It must be run +// as a goroutine. +func (a *AddrManager) addressHandler() { + dumpAddressTicker := time.NewTicker(dumpAddressInterval) +out: + for !a.shutdown { + select { + case addrs := <-a.newAddresses: + for _, na := range addrs { + updateAddress(a, na) + } + + case <-dumpAddressTicker.C: + if !a.shutdown { + // TODO: Dump addresses to database. + } + + case <-a.quit: + // TODO: Dump addresses to database. + break out + } + } + dumpAddressTicker.Stop() + a.wg.Done() + log.Trace("[AMGR] Address handler done") +} + +// Start begins the core address handler which manages a pool of known +// addresses, timeouts, and interval based writes. +func (a *AddrManager) Start() { + // Already started? + if a.started { + return + } + + log.Trace("[AMGR] Starting address manager") + + go a.addressHandler() + a.wg.Add(1) + a.started = true +} + +// Stop gracefully shuts down the address manager by stopping the main handler. +func (a *AddrManager) Stop() error { + if a.shutdown { + log.Warnf("[AMGR] Address manager is already in the process of " + + "shutting down") + return nil + } + + log.Infof("[AMGR] Address manager shutting down") + a.shutdown = true + a.quit <- true + a.wg.Wait() + return nil +} + +// AddAddresses adds new addresses to the address manager. It enforces a max +// number of addresses and silently ignores duplicate addresses. It is +// safe for concurrent access. +func (a *AddrManager) AddAddresses(addrs []*btcwire.NetAddress) { + a.newAddresses <- addrs +} + +// AddAddress adds a new address to the address manager. It enforces a max +// number of addresses and silently ignores duplicate addresses. It is +// safe for concurrent access. +func (a *AddrManager) AddAddress(addr *btcwire.NetAddress) { + addrs := []*btcwire.NetAddress{addr} + a.newAddresses <- addrs +} + +// NeedMoreAddresses returns whether or not the address manager needs more +// addresses. +func (a *AddrManager) NeedMoreAddresses() bool { + // Protect concurrent access. + a.addrCacheLock.Lock() + defer a.addrCacheLock.Unlock() + + return len(a.addrCache)+1 <= maxAddresses +} + +// NumAddresses returns the number of addresses known to the address manager. +func (a *AddrManager) NumAddresses() int { + // Protect concurrent access. + a.addrCacheLock.Lock() + defer a.addrCacheLock.Unlock() + + return len(a.addrCache) +} + +// AddressCache returns the current address cache. It must be treated as +// read-only. +func (a *AddrManager) AddressCache() map[string]*btcwire.NetAddress { + return a.addrCache +} + +// New returns a new bitcoin address manager. +// Use Start to begin processing asynchronous address updates. +func NewAddrManager() *AddrManager { + am := AddrManager{ + addrCache: make(map[string]*btcwire.NetAddress), + newAddresses: make(chan []*btcwire.NetAddress, newAddressBufferSize), + quit: make(chan bool), + } + return &am +} + +// NetAddressKey returns a string key in the form of ip:port for IPv4 addresses +// or [ip]:port for IPv6 addresses. +func NetAddressKey(na *btcwire.NetAddress) string { + port := strconv.FormatUint(uint64(na.Port), 10) + addr := net.JoinHostPort(na.IP.String(), port) + return addr +} diff --git a/btcd/addrmanager_test.go b/btcd/addrmanager_test.go new file mode 100644 index 00000000..a7503a89 --- /dev/null +++ b/btcd/addrmanager_test.go @@ -0,0 +1,110 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main_test + +import ( + "github.com/conformal/btcwire" + "net" + "opensource.conformal.com/go/btcd-internal/addrmgr" + "testing" + "time" +) + +// naTest is used to describe a test to be perfomed against the NetAddressKey +// method. +type naTest struct { + in btcwire.NetAddress + want string +} + +// naTests houses all of the tests to be performed against the NetAddressKey +// method. +var naTests = make([]naTest, 0) + +func addNaTest(ip string, port uint16, want string) { + nip := net.ParseIP(ip) + na := btcwire.NetAddress{ + Timestamp: time.Now(), + Services: btcwire.SFNodeNetwork, + IP: nip, + Port: port, + } + test := naTest{na, want} + naTests = append(naTests, test) +} + +// addNaTests +func addNaTests() { + // IPv4 + // Localhost + addNaTest("127.0.0.1", 8333, "127.0.0.1:8333") + addNaTest("127.0.0.1", 8334, "127.0.0.1:8334") + + // Class A + addNaTest("1.0.0.1", 8333, "1.0.0.1:8333") + addNaTest("2.2.2.2", 8334, "2.2.2.2:8334") + addNaTest("27.253.252.251", 8335, "27.253.252.251:8335") + addNaTest("123.3.2.1", 8336, "123.3.2.1:8336") + + // Private Class A + addNaTest("10.0.0.1", 8333, "10.0.0.1:8333") + addNaTest("10.1.1.1", 8334, "10.1.1.1:8334") + addNaTest("10.2.2.2", 8335, "10.2.2.2:8335") + addNaTest("10.10.10.10", 8336, "10.10.10.10:8336") + + // Class B + addNaTest("128.0.0.1", 8333, "128.0.0.1:8333") + addNaTest("129.1.1.1", 8334, "129.1.1.1:8334") + addNaTest("180.2.2.2", 8335, "180.2.2.2:8335") + addNaTest("191.10.10.10", 8336, "191.10.10.10:8336") + + // Private Class B + addNaTest("172.16.0.1", 8333, "172.16.0.1:8333") + addNaTest("172.16.1.1", 8334, "172.16.1.1:8334") + addNaTest("172.16.2.2", 8335, "172.16.2.2:8335") + addNaTest("172.16.172.172", 8336, "172.16.172.172:8336") + + // Class C + addNaTest("193.0.0.1", 8333, "193.0.0.1:8333") + addNaTest("200.1.1.1", 8334, "200.1.1.1:8334") + addNaTest("205.2.2.2", 8335, "205.2.2.2:8335") + addNaTest("223.10.10.10", 8336, "223.10.10.10:8336") + + // Private Class C + addNaTest("192.168.0.1", 8333, "192.168.0.1:8333") + addNaTest("192.168.1.1", 8334, "192.168.1.1:8334") + addNaTest("192.168.2.2", 8335, "192.168.2.2:8335") + addNaTest("192.168.192.192", 8336, "192.168.192.192:8336") + + // IPv6 + // Localhost + addNaTest("::1", 8333, "[::1]:8333") + addNaTest("fe80::1", 8334, "[fe80::1]:8334") + + // Link-local + addNaTest("fe80::1:1", 8333, "[fe80::1:1]:8333") + addNaTest("fe91::2:2", 8334, "[fe91::2:2]:8334") + addNaTest("fea2::3:3", 8335, "[fea2::3:3]:8335") + addNaTest("feb3::4:4", 8336, "[feb3::4:4]:8336") + + // Site-local + addNaTest("fec0::1:1", 8333, "[fec0::1:1]:8333") + addNaTest("fed1::2:2", 8334, "[fed1::2:2]:8334") + addNaTest("fee2::3:3", 8335, "[fee2::3:3]:8335") + addNaTest("fef3::4:4", 8336, "[fef3::4:4]:8336") +} + +func TestNetAddressKey(t *testing.T) { + addNaTests() + + t.Logf("Running %d tests", len(naTests)) + for i, test := range naTests { + key := addrmgr.NetAddressKey(&test.in) + if key != test.want { + t.Errorf("NetAddressKey #%d\n got: %s want: %s", i, key, test.want) + continue + } + } +} diff --git a/btcd/blockmanager.go b/btcd/blockmanager.go new file mode 100644 index 00000000..30ccfef1 --- /dev/null +++ b/btcd/blockmanager.go @@ -0,0 +1,449 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main + +import ( + "container/list" + "github.com/conformal/btcchain" + "github.com/conformal/btcdb" + _ "github.com/conformal/btcdb/sqlite3" + "github.com/conformal/btcutil" + "github.com/conformal/btcwire" + "os" + "path/filepath" + "sync" + "time" +) + +const ( + chanBufferSize = 50 +) + +// inventoryItem is used to track known and requested inventory items. +type inventoryItem struct { + invVect *btcwire.InvVect + peers []*peer +} + +// blockMsg packages a bitcoin block message and the peer it came from together +// so the block handler has access to that information. +type blockMsg struct { + block *btcutil.Block +} + +// invMsg packages a bitcoin inv message and the peer it came from together +// so the block handler has access to that information. +type invMsg struct { + msg *btcwire.MsgInv + peer *peer +} + +// txMsg packages a bitcoin tx message and the peer it came from together +// so the block handler has access to that information. +type txMsg struct { + msg *btcwire.MsgTx + peer *peer +} + +// blockManager provides a concurrency safe block manager for handling all +// incoming block inventory advertisement as well as issuing requests to +// download needed blocks of the block chain from other peers. It works by +// forcing all incoming block inventory advertisements through a single +// goroutine which then determines whether the block is needed and how the +// requests should be made amongst multiple peers. +type blockManager struct { + server *server + started bool + shutdown bool + blockChain *btcchain.BlockChain + requestQueue *list.List + requestMap map[string]*inventoryItem + outstandingBlocks int + receivedLogBlocks int64 + receivedLogTx int64 + lastBlockLogTime time.Time + processingReqs bool + newBlocks chan bool + blockQueue chan *blockMsg + invQueue chan *invMsg + chainNotify chan *btcchain.Notification + wg sync.WaitGroup + quit chan bool +} + +// logBlockHeight logs a new block height as an information message to show +// progress to the user. In order to prevent spam, it limits logging to one +// message every 10 seconds with duration and totals included. +func (b *blockManager) logBlockHeight(numTx, height int64) { + b.receivedLogBlocks++ + b.receivedLogTx += numTx + + now := time.Now() + duration := now.Sub(b.lastBlockLogTime) + if b.outstandingBlocks != 0 && duration < time.Second*10 { + return + } + + // Log information about new block height. + blockStr := "blocks" + if b.receivedLogBlocks == 1 { + blockStr = "block" + } + txStr := "transactions" + if b.receivedLogTx == 1 { + txStr = "transaction" + } + log.Infof("[BMGR] Processed %d %s (%d %s) in the last %s - Block "+ + "height %d", b.receivedLogBlocks, blockStr, b.receivedLogTx, + txStr, duration, height) + + b.receivedLogBlocks = 0 + b.receivedLogTx = 0 + b.lastBlockLogTime = now +} + +// handleInvMsg handles inventory messages for all peers. It adds blocks that +// we need along with which peers know about each block to a request queue +// based upon the advertised inventory. It also attempts to strike a balance +// between the number of in-flight blocks and keeping the request queue full +// by issuing more getblocks (MsgGetBlocks) requests as needed. +func (b *blockManager) handleInvMsg(msg *btcwire.MsgInv, p *peer) { + // Find the last block in the inventory list. + invVects := msg.InvList + var lastHash *btcwire.ShaHash + for i := len(invVects) - 1; i >= 0; i-- { + if invVects[i].Type == btcwire.InvVect_Block { + lastHash = &invVects[i].Hash + break + } + } + + for _, iv := range invVects { + switch iv.Type { + case btcwire.InvVect_Block: + // Ignore this block if we already have it. + // TODO(davec): Need to check orphans too. + if b.server.db.ExistsSha(&iv.Hash) { + log.Tracef("[BMGR] Ignoring known block %v.", &iv.Hash) + continue + } + + // Add the peer to the list of peers which can serve the block if + // it's already queued to be fetched. + if item, ok := b.requestMap[iv.Hash.String()]; ok { + item.peers = append(item.peers, p) + continue + } + + // Add the item to the end of the request queue. + item := &inventoryItem{ + invVect: iv, + peers: []*peer{p}, + } + b.requestMap[item.invVect.Hash.String()] = item + b.requestQueue.PushBack(item) + b.outstandingBlocks++ + + case btcwire.InvVect_Tx: + // XXX: Handle transactions here. + } + } + + // Request more blocks if there aren't enough in-flight blocks. + if lastHash != nil && b.outstandingBlocks < btcwire.MaxBlocksPerMsg*5 { + stopHash := btcwire.ShaHash{} + gbmsg := btcwire.NewMsgGetBlocks(&stopHash) + gbmsg.AddBlockLocatorHash(lastHash) + p.QueueMessage(gbmsg) + } +} + +// handleBlockMsg handles block messages from all peers. It is currently +// very simple. It doesn't validate the block or handle orphans and side +// chains. It simply inserts the block into the database after ensuring the +// previous block is already inserted. +func (b *blockManager) handleBlockMsg(block *btcutil.Block) { + b.outstandingBlocks-- + msg := block.MsgBlock() + + // Process the block to include validation, best chain selection, orphan + // handling, etc. + err := b.blockChain.ProcessBlock(block) + if err != nil { + blockSha, err2 := block.Sha() + if err2 != nil { + log.Errorf("[BMGR] %v", err2) + } + log.Warnf("[BMGR] Failed to process block %v: %v", blockSha, err) + return + } + + // Log info about the new block height. + _, height, err := b.server.db.NewestSha() + if err != nil { + log.Warnf("[BMGR] Failed to obtain latest sha - %v", err) + return + } + b.logBlockHeight(int64(len(msg.Transactions)), height) + + // Sync the db to disk when there are no more outstanding blocks. + // NOTE: Periodic syncs happen as new data is requested as well. + if b.outstandingBlocks <= 0 { + b.server.db.Sync() + } +} + +// blockHandler is the main handler for the block manager. It must be run as a +// goroutine. It processes block and inv messages in a separate goroutine from +// the peer handlers so the block (MsgBlock) and tx (MsgTx) messages are handled +// by a single thread without needing to lock memory data structures. This is +// important because the block manager controls which blocks are needed and how +// the fetching should proceed. +// +// NOTE: Tx messages need to be handled here too. +// (either that or block and tx need to be handled in separate threads) +func (b *blockManager) blockHandler() { +out: + for !b.shutdown { + select { + // Handle new block messages. + case msg := <-b.blockQueue: + b.handleBlockMsg(msg.block) + + // Handle new inventory messages. + case msg := <-b.invQueue: + b.handleInvMsg(msg.msg, msg.peer) + // Request the blocks. + if b.requestQueue.Len() > 0 && !b.processingReqs { + b.processingReqs = true + b.newBlocks <- true + } + + case <-b.newBlocks: + numRequested := 0 + gdmsg := btcwire.NewMsgGetData() + var p *peer + for e := b.requestQueue.Front(); e != nil; e = b.requestQueue.Front() { + item := e.Value.(*inventoryItem) + p = item.peers[0] + gdmsg.AddInvVect(item.invVect) + delete(b.requestMap, item.invVect.Hash.String()) + b.requestQueue.Remove(e) + + numRequested++ + if numRequested >= btcwire.MaxInvPerMsg { + break + } + } + b.server.db.Sync() + if len(gdmsg.InvList) > 0 && p != nil { + p.QueueMessage(gdmsg) + } + b.processingReqs = false + + case <-b.quit: + break out + } + } + b.wg.Done() + log.Trace("[BMGR] Block handler done") +} + +// handleNotifyMsg handles notifications from btcchain. Currently it doesn't +// respond to any notifications, but the idea is that it requests missing blocks +// in response to orphan notifications and updates the wallet for blocks +// connected and disconnected to the main chain. +func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { + switch notification.Type { + case btcchain.NTOrphanBlock: + // TODO(davec): Ask the peer to fill in the missing blocks for the + // orphan root if it's not nil. + orphanRoot := notification.Data.(*btcwire.ShaHash) + _ = orphanRoot + + case btcchain.NTBlockAccepted: + // TODO(davec): Relay inventory, but don't relay old inventory + // during initial block download. + } +} + +// chainNotificationHandler is the handler for asynchronous notifications from +// btcchain. It must be run as a goroutine. +func (b *blockManager) chainNotificationHandler() { +out: + for !b.shutdown { + select { + case notification := <-b.chainNotify: + b.handleNotifyMsg(notification) + + case <-b.quit: + break out + } + } + b.wg.Done() + log.Trace("[BMGR] Chain notification handler done") +} + +// QueueBlock adds the passed block message and peer to the block handling queue. +func (b *blockManager) QueueBlock(block *btcutil.Block) { + // Don't accept more blocks if we're shutting down. + if b.shutdown { + return + } + + bmsg := blockMsg{block: block} + b.blockQueue <- &bmsg +} + +// QueueInv adds the passed inventory message and peer to the inventory handling +// queue. +func (b *blockManager) QueueInv(msg *btcwire.MsgInv, p *peer) { + // Don't accept more inventory if we're shutting down. + if b.shutdown { + return + } + + imsg := invMsg{msg: msg, peer: p} + b.invQueue <- &imsg +} + +// Start begins the core block handler which processes block and inv messages. +func (b *blockManager) Start() { + // Already started? + if b.started { + return + } + + log.Trace("[BMGR] Starting block manager") + go b.blockHandler() + go b.chainNotificationHandler() + b.wg.Add(2) + b.started = true +} + +// Stop gracefully shuts down the block manager by stopping all asynchronous +// handlers and waiting for them to finish. +func (b *blockManager) Stop() error { + if b.shutdown { + log.Warnf("[BMGR] Block manager is already in the process of " + + "shutting down") + return nil + } + + log.Infof("[BMGR] Block manager shutting down") + b.shutdown = true + close(b.quit) + b.wg.Wait() + return nil +} + +// AddBlockLocators adds block locators to a getblocks message starting with +// the passed hash back to the genesis block hash. In order to keep the list +// of locator hashes to a reasonable number of entries, first it adds the +// most recent 10 block hashes (starting with the passed hash), then doubles the +// step each loop iteration to exponentially decrease the number of hashes the +// further away from head and closer to the genesis block it gets. +func (b *blockManager) AddBlockLocators(hash *btcwire.ShaHash, msg *btcwire.MsgGetBlocks) error { + // XXX(davec): This is fetching the block data too. + block, err := b.server.db.FetchBlockBySha(hash) + if err != nil { + log.Warnf("[BMGR] Lookup of known valid index failed %v", hash) + return err + } + blockIndex := block.Height() + + // We want inventory after the passed hash. + msg.AddBlockLocatorHash(hash) + + // Generate the block locators according to the algorithm described in + // in the function comment and make sure to leave room for the already + // added hash and final genesis hash. + increment := int64(1) + for i := 1; i < btcwire.MaxBlockLocatorsPerMsg-2; i++ { + if i > 10 { + increment *= 2 + } + blockIndex -= increment + if blockIndex <= 1 { + break + } + + h, err := b.server.db.FetchBlockShaByHeight(blockIndex) + if err != nil { + // This shouldn't happen and it's ok to ignore, so just + // continue to the next. + log.Warnf("[BMGR] Lookup of known valid index failed %v", + blockIndex) + continue + } + msg.AddBlockLocatorHash(h) + } + msg.AddBlockLocatorHash(&btcwire.GenesisHash) + return nil +} + +// newBlockManager returns a new bitcoin block manager. +// Use Start to begin processing asynchronous block and inv updates. +func newBlockManager(s *server) *blockManager { + chainNotify := make(chan *btcchain.Notification, chanBufferSize) + bm := blockManager{ + server: s, + blockChain: btcchain.New(s.db, s.btcnet, chainNotify), + requestQueue: list.New(), + requestMap: make(map[string]*inventoryItem), + lastBlockLogTime: time.Now(), + newBlocks: make(chan bool, 1), + blockQueue: make(chan *blockMsg, chanBufferSize), + invQueue: make(chan *invMsg, chanBufferSize), + chainNotify: chainNotify, + quit: make(chan bool), + } + bm.blockChain.DisableVerify(cfg.VerifyDisabled) + return &bm +} + +// loadBlockDB opens the block database and returns a handle to it. +func loadBlockDB() (btcdb.Db, error) { + dbPath := filepath.Join(cfg.DbDir, activeNetParams.dbName) + log.Infof("[BMGR] Loading block database from '%s'", dbPath) + db, err := btcdb.OpenDB("sqlite", dbPath) + if err != nil { + // Return the error if it's not because the database doesn't + // exist. + if err != btcdb.DbDoesNotExist { + return nil, err + } + + // Create the db if it does not exist. + err = os.MkdirAll(cfg.DbDir, 0700) + if err != nil { + return nil, err + } + db, err = btcdb.CreateDB("sqlite", dbPath) + if err != nil { + return nil, err + } + + // Insert the appropriate genesis block for the bitcoin network + // being connected to. + genesis := btcutil.NewBlock(activeNetParams.genesisBlock) + _, err := db.InsertBlock(genesis) + if err != nil { + db.Close() + return nil, err + } + log.Infof("[BMGR] Inserted genesis block %v", + activeNetParams.genesisHash) + } + + // Get the latest block height from the database. + _, height, err := db.NewestSha() + if err != nil { + db.Close() + return nil, err + } + log.Infof("[BMGR] Block database loaded with block height %d", height) + return db, nil +} diff --git a/btcd/btcd.go b/btcd/btcd.go new file mode 100644 index 00000000..7cd814dd --- /dev/null +++ b/btcd/btcd.go @@ -0,0 +1,185 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main + +import ( + "fmt" + "github.com/conformal/btcchain" + "github.com/conformal/btcdb" + "github.com/conformal/btcscript" + "github.com/conformal/btcwire" + "github.com/conformal/seelog" + "math/rand" + "net" + "os" + "runtime" + "strconv" + "time" +) + +const userAgent = "/btcd:0.0.1/" + +// used by the dns seed code to pick a random last seen time +const ( + secondsIn3Days int32 = 24 * 60 * 60 * 3 + secondsIn4Days int32 = 24 * 60 * 60 * 4 +) + +var ( + log seelog.LoggerInterface = seelog.Disabled + cfg *config +) + +// newLogger creates a new seelog logger using the provided logging level and +// log message prefix. +func newLogger(level string, prefix string) seelog.LoggerInterface { + fmtstring := ` + + + + + + + + ` + config := fmt.Sprintf(fmtstring, level, prefix) + + logger, err := seelog.LoggerFromConfigAsString(config) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to create logger: %v", err) + os.Exit(1) + } + + return logger +} + +// useLogger sets the btcd logger to the passed logger. +func useLogger(logger seelog.LoggerInterface) { + log = logger +} + +// setLogLevel sets the log level for the logging system. It initialises a +// logger for each subsystem at the provided level. +func setLogLevel(logLevel string) []seelog.LoggerInterface { + var loggers []seelog.LoggerInterface + + // Define sub-systems. + subSystems := []struct { + level string + prefix string + useLogger func(seelog.LoggerInterface) + }{ + {logLevel, "BTCD", useLogger}, + {logLevel, "BCDB", btcdb.UseLogger}, + {logLevel, "CHAN", btcchain.UseLogger}, + {logLevel, "SCRP", btcscript.UseLogger}, + } + + // Configure all sub-systems with new loggers while keeping track of + // the created loggers to return so they can be flushed. + for _, s := range subSystems { + newLog := newLogger(s.level, s.prefix) + loggers = append(loggers, newLog) + s.useLogger(newLog) + } + + return loggers +} + +// btcdMain is the real main function for btcd. It is necessary to work around +// the fact that deferred functions do not run when os.Exit() is called. +func btcdMain() error { + // Initialize logging and setup deferred flushing to ensure all + // outstanding messages are written on shutdown. + loggers := setLogLevel(defaultLogLevel) + defer func() { + for _, logger := range loggers { + logger.Flush() + } + }() + + // Load configuration and parse command line. + tcfg, _, err := loadConfig() + if err != nil { + return err + } + cfg = tcfg + + // Change the logging level if needed. + if cfg.DebugLevel != defaultLogLevel { + loggers = setLogLevel(cfg.DebugLevel) + } + + // Load the block database. + db, err := loadBlockDB() + if err != nil { + log.Errorf("%v", err) + return err + } + defer db.Close() + + // Ensure the database is sync'd and closed on Ctrl+C. + addInterruptHandler(func() { + db.RollbackClose() + }) + + // Create server and start it. + listenAddr := net.JoinHostPort("", cfg.Port) + server, err := newServer(listenAddr, db, activeNetParams.btcnet) + if err != nil { + log.Errorf("Unable to start server on %v", listenAddr) + log.Errorf("%v", err) + return err + } + server.Start() + + // only ask dns for peers if we don't have a list of initial seeds. + if !cfg.DisableDNSSeed { + // XXX need a proxy config entry + seedpeers := dnsDiscover(activeNetParams.dnsSeeds, "") + addresses := make([]*btcwire.NetAddress, len(seedpeers)) + // if this errors then we have *real* problems + intPort, _ := strconv.Atoi(activeNetParams.peerPort) + for i, peer := range seedpeers { + addresses[i] = new(btcwire.NetAddress) + addresses[i].SetAddress(peer, uint16(intPort)) + // bitcoind seeds with addresses from + // a time randomly selected between 3 + // and 7 days ago. + addresses[i].Timestamp = time.Now().Add(-1 * + time.Second * time.Duration(secondsIn3Days+ + rand.Int31n(secondsIn4Days))) + } + + server.addrManager.AddAddresses(addresses) + // XXX if this is empty do we want to use hardcoded + // XXX peers like bitcoind does? + } + + peers := cfg.ConnectPeers + if len(peers) == 0 { + peers = cfg.AddPeers + } + // Connect to initial peers. + for _, addr := range peers { + // Connect to peer and add it to the server. + server.ConnectPeerAsync(addr, true) + } + + server.WaitForShutdown() + return nil +} + +func main() { + // Use all processor cores. + runtime.GOMAXPROCS(runtime.NumCPU()) + + // Work around defer not working after os.Exit() + err := btcdMain() + if err != nil { + os.Exit(1) + } +} diff --git a/btcd/config.go b/btcd/config.go new file mode 100644 index 00000000..e24cb528 --- /dev/null +++ b/btcd/config.go @@ -0,0 +1,271 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main + +import ( + "errors" + "fmt" + "github.com/conformal/btcwire" + "net" + "opensource.conformal.com/go/go-flags" + "os" + "path/filepath" + "time" +) + +const ( + defaultConfigFilename = "btcd.conf" + defaultLogLevel = "info" + defaultBtcnet = btcwire.MainNet + defaultMaxPeers = 8 + defaultBanDuration = time.Hour * 24 + defaultVerifyEnabled = false +) + +var ( + defaultConfigFile = filepath.Join(btcdHomeDir(), defaultConfigFilename) + defaultDbDir = filepath.Join(btcdHomeDir(), "db") +) + +// config defines the configuration options for btcd. +// +// See loadConfig for details on the configuration load process. +type config struct { + DebugLevel string `short:"d" long:"debuglevel" description:"Logging level {trace, debug, info, warn, error, critical}"` + AddPeers []string `short:"a" long:"addpeer" description:"Add a peer to connect with at startup"` + ConnectPeers []string `long:"connect" description:"Connect only to the specified peers at startup"` + SeedPeer string `short:"s" long:"seedpeer" description:"Retrieve peer addresses from this peer and then disconnect"` + Port string `short:"p" long:"port" description:"Listen for connections on this port (default: 8333, testnet: 18333)"` + RpcPort string `short:"r" long:"rpcport" description:"Listen for json/rpc messages on this port"` + MaxPeers int `long:"maxpeers" description:"Max number of inbound and outbound peers"` + BanDuration time.Duration `long:"banduration" description:"How long to ban misbehaving peers. Valid time units are {s, m, h}. Minimum 1 second"` + VerifyDisabled bool `long:"noverify" description:"Disable block/transaction verification -- WARNING: This option can be dangerous and is for development use only"` + ConfigFile string `short:"C" long:"configfile" description:"Path to configuration file"` + DbDir string `short:"b" long:"dbdir" description:"Directory to store database"` + RpcUser string `short:"u" long:"rpcuser" description:"Username for rpc connections"` + RpcPass string `short:"P" long:"rpcpass" description:"Password for rpc connections"` + DisableRpc bool `long:"norpc" description:"Disable built-in RPC server"` + DisableDNSSeed bool `long:"nodnsseed" description:"Disable DNS seeding for peers"` + TestNet3 bool `long:"testnet" description:"Use the test network"` + RegressionTest bool `long:"regtest" description:"Use the regression test network"` +} + +// btcdHomeDir returns an OS appropriate home directory for btcd. +func btcdHomeDir() string { + // Search for Windows APPDATA first. This won't exist on POSIX OSes. + appData := os.Getenv("APPDATA") + if appData != "" { + return filepath.Join(appData, "btcd") + } + + // Fall back to standard HOME directory that works for most POSIX OSes. + home := os.Getenv("HOME") + if home != "" { + return filepath.Join(home, ".btcd") + } + + // In the worst case, use the current directory. + return "." +} + +// validLogLevel returns whether or not logLevel is a valid debug log level. +func validLogLevel(logLevel string) bool { + switch logLevel { + case "trace": + fallthrough + case "debug": + fallthrough + case "info": + fallthrough + case "warn": + fallthrough + case "error": + fallthrough + case "critical": + return true + } + return false +} + +// normalizePeerAddress returns addr with the default peer port appended if +// there is not already a port specified. +func normalizePeerAddress(addr string) string { + _, _, err := net.SplitHostPort(addr) + if err != nil { + return net.JoinHostPort(addr, activeNetParams.peerPort) + } + return addr +} + +// removeDuplicateAddresses returns a new slice with all duplicate entries in +// addrs removed. +func removeDuplicateAddresses(addrs []string) []string { + result := make([]string, 0) + seen := map[string]bool{} + for _, val := range addrs { + if _, ok := seen[val]; !ok { + result = append(result, val) + seen[val] = true + } + } + return result +} + +func normalizeAndRemoveDuplicateAddresses(addrs []string) []string { + for i, addr := range addrs { + addrs[i] = normalizePeerAddress(addr) + } + addrs = removeDuplicateAddresses(addrs) + + return addrs +} + +// updateConfigWithActiveParams update the passed config with parameters +// from the active net params if the relevant options in the passed config +// object are the default so options specified by the user on the command line +// are not overridden. +func updateConfigWithActiveParams(cfg *config) { + if cfg.Port == netParams(defaultBtcnet).listenPort { + cfg.Port = activeNetParams.listenPort + } + + if cfg.RpcPort == netParams(defaultBtcnet).rpcPort { + cfg.RpcPort = activeNetParams.rpcPort + } +} + +// filesExists reports whether the named file or directory exists. +func fileExists(name string) bool { + if _, err := os.Stat(name); err != nil { + if os.IsNotExist(err) { + return false + } + } + return true +} + +// loadConfig initializes and parses the config using a config file and command +// line options. +// +// The configuration proceeds as follows: +// 1) Start with a default config with sane settings +// 2) Pre-parse the command line to check for an alternative config file +// 3) Load configuration file overwriting defaults with any specified options +// 4) Parse CLI options and overwrite/add any specified options +// +// The above results in btcd functioning properly without any config settings +// while still allowing the user to override settings with config files and +// command line options. Command line options always take precedence. +func loadConfig() (*config, []string, error) { + // Default config. + cfg := config{ + DebugLevel: defaultLogLevel, + Port: netParams(defaultBtcnet).listenPort, + RpcPort: netParams(defaultBtcnet).rpcPort, + MaxPeers: defaultMaxPeers, + BanDuration: defaultBanDuration, + ConfigFile: defaultConfigFile, + DbDir: defaultDbDir, + } + + // A config file in the current directory takes precedence. + if fileExists(defaultConfigFilename) { + cfg.ConfigFile = defaultConfigFilename + } + + // Pre-parse the command line options to see if an alternative config + // file was specified. + preCfg := cfg + preParser := flags.NewParser(&preCfg, flags.Default) + _, err := preParser.Parse() + if err != nil { + if e, ok := err.(*flags.Error); !ok || e.Type != flags.ErrHelp { + preParser.WriteHelp(os.Stderr) + } + return nil, nil, err + } + + // Load additional config from file. + parser := flags.NewParser(&cfg, flags.Default) + err = parser.ParseIniFile(preCfg.ConfigFile) + if err != nil { + if _, ok := err.(*os.PathError); !ok { + fmt.Fprintln(os.Stderr, err) + parser.WriteHelp(os.Stderr) + return nil, nil, err + } + log.Warnf("%v", err) + } + + // Parse command line options again to ensure they take precedence. + remainingArgs, err := parser.Parse() + if err != nil { + if e, ok := err.(*flags.Error); !ok || e.Type != flags.ErrHelp { + parser.WriteHelp(os.Stderr) + } + return nil, nil, err + } + + // The two test networks can't be selected simultaneously. + if cfg.TestNet3 && cfg.RegressionTest { + str := "%s: The testnet and regtest params can't be used " + + "together -- choose one of the two" + err := errors.New(fmt.Sprintf(str, "loadConfig")) + fmt.Fprintln(os.Stderr, err) + parser.WriteHelp(os.Stderr) + return nil, nil, err + } + + // Choose the active network params based on the testnet and regression + // test net flags. + if cfg.TestNet3 { + activeNetParams = netParams(btcwire.TestNet3) + } else if cfg.RegressionTest { + activeNetParams = netParams(btcwire.TestNet) + } + updateConfigWithActiveParams(&cfg) + + // Validate debug log level. + if !validLogLevel(cfg.DebugLevel) { + str := "%s: The specified debug level is invalid -- parsed [%v]" + err := errors.New(fmt.Sprintf(str, "loadConfig", cfg.DebugLevel)) + fmt.Fprintln(os.Stderr, err) + parser.WriteHelp(os.Stderr) + return nil, nil, err + } + + // Don't allow ban durations that are too short. + if cfg.BanDuration < time.Duration(time.Second) { + str := "%s: The banduration option may not be less than 1s -- parsed [%v]" + err := errors.New(fmt.Sprintf(str, "loadConfig", cfg.BanDuration)) + fmt.Fprintln(os.Stderr, err) + parser.WriteHelp(os.Stderr) + return nil, nil, err + } + + // --addPeer and --connect do not mix. + if len(cfg.AddPeers) > 0 && len(cfg.ConnectPeers) > 0 { + str := "%s: the --addpeer and --connect options can not be " + + "mixed" + err := errors.New(fmt.Sprintf(str, "loadConfig")) + fmt.Fprintln(os.Stderr, err) + parser.WriteHelp(os.Stderr) + return nil, nil, err + } + + // Connect means no seeding or listening. + if len(cfg.ConnectPeers) > 0 { + cfg.DisableDNSSeed = true + // XXX turn off server listening. + } + + // Add default port to all added peer addresses if needed and remove + // duplicate addresses. + cfg.AddPeers = normalizeAndRemoveDuplicateAddresses(cfg.AddPeers) + cfg.ConnectPeers = + normalizeAndRemoveDuplicateAddresses(cfg.ConnectPeers) + + return &cfg, remainingArgs, nil +} diff --git a/btcd/discovery.go b/btcd/discovery.go new file mode 100644 index 00000000..e7f1741e --- /dev/null +++ b/btcd/discovery.go @@ -0,0 +1,171 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main + +import ( + "encoding/binary" + "errors" + "fmt" + "net" +) + +const ( + torSucceeded = 0x00 + torGeneralError = 0x01 + torNotAllowed = 0x02 + torNetUnreachable = 0x03 + torHostUnreachable = 0x04 + torConnectionRefused = 0x05 + torTtlExpired = 0x06 + torCmdNotSupported = 0x07 + torAddrNotSupported = 0x08 +) + +var ( + ErrTorInvalidAddressResponse = errors.New("Invalid address response") + ErrTorInvalidProxyResponse = errors.New("Invalid proxy response") + ErrTorUnrecognizedAuthMethod = errors.New("Invalid proxy authentication method") + + torStatusErrors = map[byte]error{ + torSucceeded: errors.New("Tor succeeded"), + torGeneralError: errors.New("Tor general error"), + torNotAllowed: errors.New("Tor not allowed"), + torNetUnreachable: errors.New("Tor network is unreachable"), + torHostUnreachable: errors.New("Tor host is unreachable"), + torConnectionRefused: errors.New("Tor connection refused"), + torTtlExpired: errors.New("Tor ttl expired"), + torCmdNotSupported: errors.New("Tor command not supported"), + torAddrNotSupported: errors.New("Tor address type not supported"), + } +) + +// try individual DNS server return list of strings for responses. +func doDNSLookup(host, proxy string) ([]net.IP, error) { + var err error + var addrs []net.IP + + if proxy != "" { + addrs, err = torLookupIP(host, proxy) + } else { + addrs, err = net.LookupIP(host) + } + if err != nil { + return nil, err + } + + return addrs, nil +} + +// Use Tor to resolve DNS. +/* + TODO: + * this function must be documented internally + * this function does not handle IPv6 +*/ +func torLookupIP(host, proxy string) ([]net.IP, error) { + conn, err := net.Dial("tcp", proxy) + if err != nil { + return nil, err + } + defer conn.Close() + + buf := []byte{'\x05', '\x01', '\x00'} + _, err = conn.Write(buf) + if err != nil { + return nil, err + } + + buf = make([]byte, 2) + _, err = conn.Read(buf) + if err != nil { + return nil, err + } + if buf[0] != '\x05' { + return nil, ErrTorInvalidProxyResponse + } + if buf[1] != '\x00' { + return nil, ErrTorUnrecognizedAuthMethod + } + + buf = make([]byte, 7+len(host)) + buf[0] = 5 // protocol version + buf[1] = '\xF0' // Tor Resolve + buf[2] = 0 // reserved + buf[3] = 3 // Tor Resolve + buf[4] = byte(len(host)) + copy(buf[5:], host) + buf[5+len(host)] = 0 // Port 0 + + _, err = conn.Write(buf) + if err != nil { + return nil, err + } + + buf = make([]byte, 4) + _, err = conn.Read(buf) + if err != nil { + return nil, err + } + if buf[0] != 5 { + return nil, ErrTorInvalidProxyResponse + } + if buf[1] != 0 { + if int(buf[1]) > len(torStatusErrors) { + err = ErrTorInvalidProxyResponse + } else { + err := torStatusErrors[buf[1]] + if err == nil { + err = ErrTorInvalidProxyResponse + } + } + return nil, err + } + if buf[3] != 1 { + err := torStatusErrors[torGeneralError] + return nil, err + } + + buf = make([]byte, 4) + bytes, err := conn.Read(buf) + if err != nil { + return nil, err + } + if bytes != 4 { + return nil, ErrTorInvalidAddressResponse + } + + r := binary.BigEndian.Uint32(buf) + + addr := make([]net.IP, 1) + addr[0] = net.IPv4(byte(r>>24), byte(r>>16), byte(r>>8), byte(r)) + + return addr, nil +} + +// dnsDiscover looks up the list of peers resolved by dns for all hosts in +// seeders. If proxy is not "" then this is used as a tor proxy for the +// resolution. If any errors occur then that seeder that errored will not have +// any hosts in the list. Therefore if all hosts failed an empty slice of +// strings will be returned. +func dnsDiscover(seeders []string, proxy string) []net.IP { + peers := []net.IP{} + for _, seeder := range seeders { + log.Debugf("[DISC] Fetching list of seeds from %v", seeder) + newPeers, err := doDNSLookup(seeder, proxy) + if err != nil { + seederPlusProxy := seeder + if proxy != "" { + seederPlusProxy = fmt.Sprintf("%s (proxy %s)", + seeder, proxy) + } + log.Warnf("[DISC] Failed to fetch dns seeds "+ + "from %s: %v", seederPlusProxy, err) + continue + } + peers = append(peers, newPeers...) + } + + return peers +} diff --git a/btcd/params.go b/btcd/params.go new file mode 100644 index 00000000..4d5da2c6 --- /dev/null +++ b/btcd/params.go @@ -0,0 +1,100 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main + +import ( + "github.com/conformal/btcchain" + "github.com/conformal/btcwire" + "math/big" +) + +// activeNetParams is a pointer to the parameters specific to the +// currently active bitcoin network. +var activeNetParams = netParams(defaultBtcnet) + +// params is used to group parameters for various networks such as the main +// network and test networks. +type params struct { + dbName string + btcnet btcwire.BitcoinNet + genesisBlock *btcwire.MsgBlock + genesisHash *btcwire.ShaHash + powLimit *big.Int + powLimitBits uint32 + peerPort string + listenPort string + rpcPort string + dnsSeeds []string +} + +// mainNetParams contains parameters specific to the main network +// (btcwire.MainNet). +var mainNetParams = params{ + dbName: "btcd.db", + btcnet: btcwire.MainNet, + genesisBlock: btcchain.ChainParams(btcwire.MainNet).GenesisBlock, + genesisHash: btcchain.ChainParams(btcwire.MainNet).GenesisHash, + powLimit: btcchain.ChainParams(btcwire.MainNet).PowLimit, + powLimitBits: btcchain.ChainParams(btcwire.MainNet).PowLimitBits, + listenPort: btcwire.MainPort, + peerPort: btcwire.MainPort, + rpcPort: "8332", + dnsSeeds: []string{ + "seed.bitcoin.sipa.be", + "dnsseed.bluematt.me", + "dnsseed.bitcoin.dashjr.org", + "bitseed.xf2.org", + }, +} + +// regressionParams contains parameters specific to the regression test network +// (btcwire.TestNet). +var regressionParams = params{ + dbName: "btcd_regtest.db", + btcnet: btcwire.TestNet, + genesisBlock: btcchain.ChainParams(btcwire.TestNet).GenesisBlock, + genesisHash: btcchain.ChainParams(btcwire.TestNet).GenesisHash, + powLimit: btcchain.ChainParams(btcwire.TestNet).PowLimit, + powLimitBits: btcchain.ChainParams(btcwire.TestNet).PowLimitBits, + listenPort: btcwire.RegressionTestPort, + peerPort: btcwire.TestNetPort, + rpcPort: "18332", + dnsSeeds: []string{}, +} + +// testNet3Params contains parameters specific to the test network (version 3) +// (btcwire.TestNet3). +var testNet3Params = params{ + dbName: "btcd_testnet.db", + btcnet: btcwire.TestNet3, + genesisBlock: btcchain.ChainParams(btcwire.TestNet3).GenesisBlock, + genesisHash: btcchain.ChainParams(btcwire.TestNet3).GenesisHash, + powLimit: btcchain.ChainParams(btcwire.TestNet3).PowLimit, + powLimitBits: btcchain.ChainParams(btcwire.TestNet3).PowLimitBits, + listenPort: btcwire.TestNetPort, + peerPort: btcwire.TestNetPort, + rpcPort: "18332", + dnsSeeds: []string{ + "testnet-seed.bitcoin.petertodd.org", + "testnet-seed.bluematt.me", + }, +} + +// netParams returns parameters specific to the passed bitcoin network. +func netParams(btcnet btcwire.BitcoinNet) *params { + switch btcnet { + case btcwire.TestNet: + return ®ressionParams + + case btcwire.TestNet3: + return &testNet3Params + + // Return main net by default. + case btcwire.MainNet: + fallthrough + default: + return &mainNetParams + } +} diff --git a/btcd/peer.go b/btcd/peer.go new file mode 100644 index 00000000..068ecf89 --- /dev/null +++ b/btcd/peer.go @@ -0,0 +1,752 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main + +import ( + "bytes" + "errors" + "github.com/conformal/btcdb" + "github.com/conformal/btcutil" + "github.com/conformal/btcwire" + "github.com/davecgh/go-spew/spew" + "net" + "opensource.conformal.com/go/btcd-internal/addrmgr" + "sync" + "time" +) + +const outputBufferSize = 50 + +// zeroHash is the zero value hash (all zeros). It is defined as a convenience. +var zeroHash btcwire.ShaHash + +// minUint32 is a helper function to return the minimum of two uint32s. +// This avoids a math import and the need to cast to floats. +func minUint32(a, b uint32) uint32 { + if a < b { + return a + } + return b +} + +// peer provides a bitcoin peer for handling bitcoin communications. +type peer struct { + server *server + protocolVersion uint32 + btcnet btcwire.BitcoinNet + services btcwire.ServiceFlag + started bool + conn net.Conn + timeConnected time.Time + inbound bool + disconnect bool + persistent bool + versionKnown bool + knownAddresses map[string]bool + lastBlock int32 + wg sync.WaitGroup + outputQueue chan btcwire.Message + quit chan bool +} + +// pushVersionMsg sends a version message to the connected peer using the +// current state. +func (p *peer) pushVersionMsg() error { + _, blockNum, err := p.server.db.NewestSha() + if err != nil { + return err + } + + msg, err := btcwire.NewMsgVersionFromConn(p.conn, p.server.nonce, + userAgent, int32(blockNum)) + if err != nil { + return err + } + + // XXX: bitcoind appears to always enable the full node services flag + // of the remote peer netaddress field in the version message regardless + // of whether it knows it supports it or not. Also, bitcoind sets + // the services field of the local peer to 0 regardless of support. + // + // Realistically, this should be set as follows: + // - For outgoing connections: + // - Set the local netaddress services to what the local peer + // actually supports + // - Set the remote netaddress services to 0 to indicate no services + // as they are still unknown + // - For incoming connections: + // - Set the local netaddress services to what the local peer + // actually supports + // - Set the remote netaddress services to the what was advertised by + // by the remote peer in its version message + msg.AddrYou.Services = btcwire.SFNodeNetwork + + // Advertise that we're a full node. + msg.Services = btcwire.SFNodeNetwork + + p.outputQueue <- msg + return nil +} + +// handleVersionMsg is invoked when a peer receives a version bitcoin message +// and is used to negotiate the protocol version details as well as kick start +// the communications. +func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { + // Detect self connections. + if msg.Nonce == p.server.nonce { + log.Debugf("[PEER] Disconnecting peer connected to self %s", + p.conn.RemoteAddr()) + p.disconnect = true + p.conn.Close() + return + } + + // Limit to one version message per peer. + if p.versionKnown { + log.Errorf("[PEER] Only one version message per peer is allowed %s.", + p.conn.RemoteAddr()) + p.disconnect = true + p.conn.Close() + return + } + + // Negotiate the protocol version. + p.protocolVersion = minUint32(p.protocolVersion, uint32(msg.ProtocolVersion)) + p.versionKnown = true + log.Debugf("[PEER] Negotiated protocol version %d for peer %s", + p.protocolVersion, p.conn.RemoteAddr()) + p.lastBlock = msg.LastBlock + + // Inbound connections. + if p.inbound { + // Set the supported services for the peer to what the remote + // peer advertised. + p.services = msg.Services + + // Send version. + err := p.pushVersionMsg() + if err != nil { + log.Errorf("[PEER] %v", err) + p.disconnect = true + p.conn.Close() + return + } + + // Add inbound peer address to the server address manager. + na, err := btcwire.NewNetAddress(p.conn.RemoteAddr(), p.services) + if err != nil { + log.Errorf("[PEER] %v", err) + p.disconnect = true + p.conn.Close() + return + } + p.server.addrManager.AddAddress(na) + } + + // Send verack. + p.outputQueue <- btcwire.NewMsgVerAck() + + // Outbound connections. + if !p.inbound { + // TODO: Only do this if we're listening, not doing the initial + // block download, and are routable. + // Advertise the local address. + na, err := btcwire.NewNetAddress(p.conn.LocalAddr(), p.services) + if err != nil { + log.Errorf("[PEER] %v", err) + p.disconnect = true + p.conn.Close() + return + } + na.Services = p.services + addresses := map[string]*btcwire.NetAddress{ + addrmgr.NetAddressKey(na): na, + } + p.pushAddrMsg(addresses) + + // Request known addresses if the server address manager needs + // more and the peer has a protocol version new enough to + // include a timestamp with addresses. + hasTimestamp := p.protocolVersion >= btcwire.NetAddressTimeVersion + if p.server.addrManager.NeedMoreAddresses() && hasTimestamp { + p.outputQueue <- btcwire.NewMsgGetAddr() + } + } + + // Request latest blocks if the peer has blocks we're interested in. + // XXX: Ask block manager for latest so we get in-flight too... + sha, lastBlock, err := p.server.db.NewestSha() + if err != nil { + log.Errorf("[PEER] %v", err) + p.disconnect = true + p.conn.Close() + } + // If the peer has blocks we're interested in. + if p.lastBlock > int32(lastBlock) { + stopHash := btcwire.ShaHash{} + gbmsg := btcwire.NewMsgGetBlocks(&stopHash) + p.server.blockManager.AddBlockLocators(sha, gbmsg) + p.outputQueue <- gbmsg + } + + // TODO: Relay alerts. +} + +// pushTxMsg sends a tx message for the provided transaction hash to the +// connected peer. An error is returned if the transaction sha is not known. +func (p *peer) pushTxMsg(sha btcwire.ShaHash) error { + // We dont deal with these for now. + return errors.New("Tx fetching not implemented") +} + +// pushBlockMsg sends a block message for the provided block hash to the +// connected peer. An error is returned if the block hash is not known. +func (p *peer) pushBlockMsg(sha btcwire.ShaHash) error { + // What should this function do about the rate limiting the + // number of blocks queued for this peer? + // Current thought is have a counting mutex in the peer + // such that if > N Tx/Block requests are currently in + // the tx queue, wait until the mutex clears allowing more to be + // sent. This prevents 500 1+MB blocks from being loaded into + // memory and sit around until the output queue drains. + // Actually the outputQueue has a limit of 50 in its queue + // but still 50MB to 1.6GB(50 32MB blocks) just setting + // in memory waiting to be sent is pointless. + // I would recommend a getdata request limit of about 5 + // outstanding objects. + // Should the tx complete api be a mutex or channel? + + blk, err := p.server.db.FetchBlockBySha(&sha) + if err != nil { + log.Tracef("[PEER] Unable to fetch requested block sha %v: %v", + &sha, err) + return err + } + p.QueueMessage(blk.MsgBlock()) + return nil +} + +// handleGetData is invoked when a peer receives a getdata bitcoin message and +// is used to deliver block and transaction information. +func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) { + notFound := btcwire.NewMsgNotFound() + +out: + for _, iv := range msg.InvList { + var err error + switch iv.Type { + case btcwire.InvVect_Tx: + err = p.pushTxMsg(iv.Hash) + case btcwire.InvVect_Block: + err = p.pushBlockMsg(iv.Hash) + default: + log.Warnf("[PEER] Unknown type in inventory request %d", + iv.Type) + break out + } + if err != nil { + notFound.AddInvVect(iv) + } + } + if len(notFound.InvList) != 0 { + p.QueueMessage(notFound) + } +} + +// handleGetBlocksMsg is invoked when a peer receives a getdata bitcoin message. +func (p *peer) handleGetBlocksMsg(msg *btcwire.MsgGetBlocks) { + var err error + startIdx := int64(0) + endIdx := btcdb.AllShas + + // Return all block hashes to the latest one (up to max per message) if + // no stop hash was specified. + // Attempt to find the ending index of the stop hash if specified. + if !msg.HashStop.IsEqual(&zeroHash) { + block, err := p.server.db.FetchBlockBySha(&msg.HashStop) + if err != nil { + // Fetch all if we dont recognize the stop hash. + endIdx = btcdb.AllShas + } + endIdx = block.Height() + } + + // TODO(davec): This should have some logic to utilize the additional + // locator hashes to ensure the proper chain. + for _, hash := range msg.BlockLocatorHashes { + // TODO(drahn) does using the caching interface make sense + // on index lookups ? + block, err := p.server.db.FetchBlockBySha(hash) + if err == nil { + // Start with the next hash since we know this one. + startIdx = block.Height() + 1 + break + } + } + + // Don't attempt to fetch more than we can put into a single message. + if endIdx-startIdx > btcwire.MaxInvPerMsg { + endIdx = startIdx + btcwire.MaxInvPerMsg + } + + // Fetch the inventory from the block database. + hashList, err := p.server.db.FetchHeightRange(startIdx, endIdx) + if err != nil { + log.Warnf(" lookup returned %v ", err) + return + } + + // Nothing to send. + if len(hashList) == 0 { + return + } + + // Generate inventory vectors and push the inventory message. + inv := btcwire.NewMsgInv() + for _, hash := range hashList { + iv := btcwire.InvVect{Type: btcwire.InvVect_Block, Hash: hash} + inv.AddInvVect(&iv) + } + p.QueueMessage(inv) +} + +// handleGetBlocksMsg is invoked when a peer receives a getheaders bitcoin +// message. +func (p *peer) handleGetHeadersMsg(msg *btcwire.MsgGetHeaders) { + var err error + startIdx := int64(0) + endIdx := btcdb.AllShas + + // Return all block hashes to the latest one (up to max per message) if + // no stop hash was specified. + // Attempt to find the ending index of the stop hash if specified. + if !msg.HashStop.IsEqual(&zeroHash) { + block, err := p.server.db.FetchBlockBySha(&msg.HashStop) + if err != nil { + // Fetch all if we dont recognize the stop hash. + endIdx = btcdb.AllShas + } + endIdx = block.Height() + } + + // TODO(davec): This should have some logic to utilize the additional + // locator hashes to ensure the proper chain. + for _, hash := range msg.BlockLocatorHashes { + // TODO(drahn) does using the caching interface make sense + // on index lookups ? + block, err := p.server.db.FetchBlockBySha(hash) + if err == nil { + // Start with the next hash since we know this one. + startIdx = block.Height() + 1 + break + } + } + + // Don't attempt to fetch more than we can put into a single message. + if endIdx-startIdx > btcwire.MaxBlockHeadersPerMsg { + endIdx = startIdx + btcwire.MaxBlockHeadersPerMsg + } + + // Fetch the inventory from the block database. + hashList, err := p.server.db.FetchHeightRange(startIdx, endIdx) + if err != nil { + log.Warnf("lookup returned %v ", err) + return + } + + // Nothing to send. + if len(hashList) == 0 { + return + } + + // Generate inventory vectors and push the inventory message. + headersMsg := btcwire.NewMsgHeaders() + for _, hash := range hashList { + block, err := p.server.db.FetchBlockBySha(&hash) + if err != nil { + log.Warnf("[PEER] badness %v", err) + } + hdr := block.MsgBlock().Header // copy + hdr.TxnCount = 0 + headersMsg.AddBlockHeader(&hdr) + } + p.QueueMessage(headersMsg) +} + +// handleGetAddrMsg is invoked when a peer receives a getaddr bitcoin message +// and is used to provide the peer with known addresses from the address +// manager. +func (p *peer) handleGetAddrMsg(msg *btcwire.MsgGetAddr) { + // Get the current known addresses from the address manager. + addrCache := p.server.addrManager.AddressCache() + + // Push the addresses. + err := p.pushAddrMsg(addrCache) + if err != nil { + log.Errorf("[PEER] %v", err) + p.disconnect = true + p.conn.Close() + return + } +} + +// pushAddrMsg sends one, or more, addr message(s) to the connected peer using +// the provided addresses. +func (p *peer) pushAddrMsg(addresses map[string]*btcwire.NetAddress) error { + // Nothing to send. + if len(addresses) == 0 { + return nil + } + + numAdded := 0 + msg := btcwire.NewMsgAddr() + for _, na := range addresses { + // Filter addresses the peer already knows about. + if p.knownAddresses[addrmgr.NetAddressKey(na)] { + continue + } + + // Add the address to the message. + err := msg.AddAddress(na) + if err != nil { + return err + } + numAdded++ + + // Split into multiple messages as needed. + if numAdded > 0 && numAdded%btcwire.MaxAddrPerMsg == 0 { + p.outputQueue <- msg + msg.ClearAddresses() + } + } + + // Send message with remaining addresses if needed. + if numAdded%btcwire.MaxAddrPerMsg != 0 { + p.outputQueue <- msg + } + return nil +} + +// handleAddrMsg is invoked when a peer receives an addr bitcoin message and +// is used to notify the server about advertised addresses. +func (p *peer) handleAddrMsg(msg *btcwire.MsgAddr) { + // Ignore old style addresses which don't include a timestamp. + if p.protocolVersion < btcwire.NetAddressTimeVersion { + return + } + + // A message that has no addresses is invalid. + if len(msg.AddrList) == 0 { + log.Errorf("[PEER] Command [%s] from %s does not contain any addresses", + msg.Command(), p.conn.RemoteAddr()) + p.disconnect = true + p.conn.Close() + return + } + + for _, na := range msg.AddrList { + // Don't add more address if we're disconnecting. + if p.disconnect { + return + } + + // Set the timestamp to 5 days ago if it's more than 24 hours + // in the future so this address is one of the first to be + // removed when space is needed. + now := time.Now() + if na.Timestamp.After(now.Add(time.Minute * 10)) { + na.Timestamp = now.Add(-1 * time.Hour * 24 * 5) + } + + // Add address to known addresses for this peer. + p.knownAddresses[addrmgr.NetAddressKey(na)] = true + } + + // Add addresses to server address manager. The address manager handles + // the details of things such as preventing duplicate addresses, max + // addresses, and last seen updates. + p.server.addrManager.AddAddresses(msg.AddrList) +} + +// handlePingMsg is invoked when a peer receives a ping bitcoin message. For +// recent clients (protocol version > BIP0031Version), it replies with a pong +// message. For older clients, it does nothing and anything other than failure +// is considered a successful ping. +func (p *peer) handlePingMsg(msg *btcwire.MsgPing) { + // Only Reply with pong is message comes from a new enough client. + if p.protocolVersion > btcwire.BIP0031Version { + // Include nonce from ping so pong can be identified. + p.outputQueue <- btcwire.NewMsgPong(msg.Nonce) + } +} + +// readMessage reads the next bitcoin message from the peer with logging. +func (p *peer) readMessage() (msg btcwire.Message, buf []byte, err error) { + msg, buf, err = btcwire.ReadMessage(p.conn, p.protocolVersion, p.btcnet) + if err != nil { + return + } + log.Debugf("[PEER] Received command [%v] from %s", msg.Command(), + p.conn.RemoteAddr()) + + // Use closures to log expensive operations so they are only run when + // the logging level requires it. + log.Tracef("%v", newLogClosure(func() string { + return "[PEER] " + spew.Sdump(msg) + })) + log.Tracef("%v", newLogClosure(func() string { + return "[PEER] " + spew.Sdump(buf) + })) + + return +} + +// writeMessage sends a bitcoin Message to the peer with logging. +func (p *peer) writeMessage(msg btcwire.Message) error { + log.Debugf("[PEER] Sending command [%v] to %s", msg.Command(), + p.conn.RemoteAddr()) + + // Use closures to log expensive operations so they are only run when the + // logging level requires it. + log.Tracef("%v", newLogClosure(func() string { + return "[PEER] msg" + spew.Sdump(msg) + })) + log.Tracef("%v", newLogClosure(func() string { + var buf bytes.Buffer + err := btcwire.WriteMessage(&buf, msg, p.protocolVersion, p.btcnet) + if err != nil { + return err.Error() + } + return "[PEER] " + spew.Sdump(buf.Bytes()) + })) + + // Write the message to the peer. + err := btcwire.WriteMessage(p.conn, msg, p.protocolVersion, p.btcnet) + if err != nil { + return err + } + return nil +} + +// isAllowedByRegression returns whether or not the passed error is allowed by +// regression tests without disconnecting the peer. In particular, regression +// tests need to be allowed to send malformed messages without the peer being +// disconnected. +func (p *peer) isAllowedByRegression(err error) bool { + // Don't allow the error if it's not specifically a malformed message + // error. + if _, ok := err.(*btcwire.MessageError); !ok { + return false + } + + // Don't allow the error if it's not coming from localhost or the + // hostname can't be determined for some reason. + host, _, err := net.SplitHostPort(p.conn.RemoteAddr().String()) + if err != nil { + return false + } + + if host != "127.0.0.1" && host != "localhost" { + return false + } + + // Allowed if all checks passed. + return true +} + +// inHandler handles all incoming messages for the peer. It must be run as a +// goroutine. +func (p *peer) inHandler() { +out: + for !p.disconnect { + rmsg, buf, err := p.readMessage() + if err != nil { + // In order to allow regression tests with malformed + // messages, don't disconnect the peer when we're in + // regression test mode and the error is one of the + // allowed errors. + if cfg.RegressionTest && p.isAllowedByRegression(err) { + log.Errorf("[PEER] %v", err) + continue + } + + // Only log the error if we're not forcibly disconnecting. + if !p.disconnect { + log.Errorf("[PEER] %v", err) + } + break out + } + + // Ensure version message comes first. + if _, ok := rmsg.(*btcwire.MsgVersion); !ok && !p.versionKnown { + log.Errorf("[PEER] A version message must precede all others") + break out + } + + // Some messages are handled directly, while other messages + // are sent to a queue to be processed. Directly handling + // getdata and getblocks messages makes it impossible for a peer + // to spam with requests. However, it means that our getdata + // requests to it may not get prompt replies. + switch msg := rmsg.(type) { + case *btcwire.MsgVersion: + p.handleVersionMsg(msg) + + case *btcwire.MsgVerAck: + // Do nothing. + + case *btcwire.MsgGetAddr: + p.handleGetAddrMsg(msg) + + case *btcwire.MsgAddr: + p.handleAddrMsg(msg) + + case *btcwire.MsgPing: + p.handlePingMsg(msg) + + case *btcwire.MsgPong: + // Don't do anything, but could try to work out network + // timing or similar. + + case *btcwire.MsgAlert: + p.server.BroadcastMessage(msg, p) + + case *btcwire.MsgBlock: + block := btcutil.NewBlockFromBlockAndBytes(msg, buf) + p.server.blockManager.QueueBlock(block) + + case *btcwire.MsgInv: + p.server.blockManager.QueueInv(msg, p) + + case *btcwire.MsgGetData: + p.handleGetDataMsg(msg) + + case *btcwire.MsgGetBlocks: + p.handleGetBlocksMsg(msg) + + case *btcwire.MsgGetHeaders: + p.handleGetHeadersMsg(msg) + + default: + log.Debugf("[PEER] Received unhandled message of type %v: Fix Me", + rmsg.Command()) + } + } + + // Ensure connection is closed and notify server that the peer is done. + p.disconnect = true + p.conn.Close() + p.server.donePeers <- p + p.quit <- true + + p.wg.Done() + log.Tracef("[PEER] Peer input handler done for %s", p.conn.RemoteAddr()) +} + +// outHandler handles all outgoing messages for the peer. It must be run as a +// goroutine. It uses a buffered channel to serialize output messages while +// allowing the sender to continue running asynchronously. +func (p *peer) outHandler() { +out: + for { + select { + case msg := <-p.outputQueue: + // Don't send anything if we're disconnected. + if p.disconnect { + continue + } + err := p.writeMessage(msg) + if err != nil { + p.disconnect = true + log.Errorf("[PEER] %v", err) + } + + case <-p.quit: + break out + } + } + p.wg.Done() + log.Tracef("[PEER] Peer output handler done for %s", p.conn.RemoteAddr()) +} + +// QueueMessage adds the passed bitcoin message to the peer send queue. It +// uses a buffered channel to communicate with the output handler goroutine so +// it is automatically rate limited and safe for concurrent access. +func (p *peer) QueueMessage(msg btcwire.Message) { + p.outputQueue <- msg +} + +// Start begins processing input and output messages. It also sends the initial +// version message for outbound connections to start the negotiation process. +func (p *peer) Start() error { + // Already started? + if p.started { + return nil + } + + log.Tracef("[PEER] Starting peer %s", p.conn.RemoteAddr()) + + // Send an initial version message if this is an outbound connection. + if !p.inbound { + err := p.pushVersionMsg() + if err != nil { + log.Errorf("[PEER] %v", err) + p.conn.Close() + return err + } + } + + // Start processing input and output. + go p.inHandler() + go p.outHandler() + p.wg.Add(2) + p.started = true + + // If server is shutting down, don't even start watchdog + if p.server.shutdown { + log.Debug("[PEER] server is shutting down") + return nil + } + + return nil +} + +// Shutdown gracefully shuts down the peer by signalling the async input and +// output handler and waiting for them to finish. +func (p *peer) Shutdown() { + log.Tracef("[PEER] Shutdown peer %s", p.conn.RemoteAddr()) + p.disconnect = true + p.conn.Close() + p.wg.Wait() +} + +// newPeer returns a new bitcoin peer for the provided server and connection. +// Use start to begin processing incoming and outgoing messages. +func newPeer(s *server, conn net.Conn, inbound bool, persistent bool) *peer { + p := peer{ + server: s, + protocolVersion: btcwire.ProtocolVersion, + btcnet: s.btcnet, + services: btcwire.SFNodeNetwork, + conn: conn, + timeConnected: time.Now(), + inbound: inbound, + persistent: persistent, + knownAddresses: make(map[string]bool), + outputQueue: make(chan btcwire.Message, outputBufferSize), + quit: make(chan bool), + } + return &p +} + +type logClosure func() string + +func (c logClosure) String() string { + return c() +} + +func newLogClosure(c func() string) logClosure { + return logClosure(c) +} diff --git a/btcd/rpcserver.go b/btcd/rpcserver.go new file mode 100644 index 00000000..cae61050 --- /dev/null +++ b/btcd/rpcserver.go @@ -0,0 +1,486 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main + +import ( + "encoding/json" + "github.com/conformal/btcchain" + "github.com/conformal/btcjson" + "github.com/conformal/btcscript" + "github.com/conformal/btcwire" + "github.com/davecgh/go-spew/spew" + "math/big" + "net" + "net/http" + "strconv" + "strings" + "sync" +) + +// rpcServer holds the items the rpc server may need to access (config, +// shutdown, main server, etc.) +type rpcServer struct { + started bool + shutdown bool + server *server + wg sync.WaitGroup + rpcport string + username string + password string + listener net.Listener +} + +// Start is used by server.go to start the rpc listener. +func (s *rpcServer) Start() { + if s.started { + return + } + + log.Trace("[RPCS] Starting RPC server") + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + jsonRpcRead(w, r, s) + }) + listenAddr := net.JoinHostPort("", s.rpcport) + httpServer := &http.Server{Addr: listenAddr} + go func() { + log.Infof("[RPCS] RPC server listening on %s", s.listener.Addr()) + httpServer.Serve(s.listener) + s.wg.Done() + }() + s.wg.Add(1) + s.started = true +} + +// Stop is used by server.go to stop the rpc listener. +func (s *rpcServer) Stop() error { + if s.shutdown { + log.Infof("[RPCS] RPC server is already in the process of shutting down") + return nil + } + log.Warnf("[RPCS] RPC server shutting down") + err := s.listener.Close() + if err != nil { + log.Errorf("[RPCS] Problem shutting down rpc: %v", err) + return err + } + log.Infof("[RPCS] RPC server shutdown complete") + s.wg.Wait() + s.shutdown = true + return nil +} + +// newRpcServer returns a new instance of the rpcServer struct. +func newRpcServer(s *server) (*rpcServer, error) { + rpc := rpcServer{ + server: s, + } + // Get values from config + rpc.rpcport = cfg.RpcPort + rpc.username = cfg.RpcUser + rpc.password = cfg.RpcPass + + listenAddr := net.JoinHostPort("", rpc.rpcport) + listener, err := net.Listen("tcp", listenAddr) + if err != nil { + log.Errorf("[RPCS] Couldn't create listener: %v", err) + return nil, err + } + rpc.listener = listener + return &rpc, err +} + +// jsonRpcRead is the main function that handles reading messages, getting +// the data the message requests, and writing the reply. +func jsonRpcRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { + _ = spew.Dump + r.Close = true + if s.shutdown == true { + return + } + var rawReply btcjson.Reply + body, err := btcjson.GetRaw(r.Body) + if err != nil { + log.Errorf("[RPCS] Error getting json message: %v", err) + return + } + var message btcjson.Message + err = json.Unmarshal(body, &message) + if err != nil { + log.Errorf("[RPCS] Error unmarshalling json message: %v", err) + jsonError := btcjson.Error{ + Code: -32700, + Message: "Parse error", + } + + rawReply = btcjson.Reply{ + Result: nil, + Error: &jsonError, + Id: nil, + } + log.Tracef("[RPCS] reply: %v", rawReply) + msg, err := btcjson.MarshallAndSend(rawReply, w) + if err != nil { + log.Errorf(msg) + return + } + log.Debugf(msg) + return + + } + log.Tracef("[RPCS] received: %v", message) + + // Deal with commands + switch message.Method { + case "getblockcount": + _, maxidx, _ := s.server.db.NewestSha() + rawReply = btcjson.Reply{ + Result: maxidx, + Error: nil, + Id: &message.Id, + } + // btcd does not do mining so we can hardcode replies here. + case "getgenerate": + rawReply = btcjson.Reply{ + Result: false, + Error: nil, + Id: &message.Id, + } + case "setgenerate": + rawReply = btcjson.Reply{ + Result: nil, + Error: nil, + Id: &message.Id, + } + case "gethashespersec": + rawReply = btcjson.Reply{ + Result: 0, + Error: nil, + Id: &message.Id, + } + case "getblockhash": + var f interface{} + err = json.Unmarshal(body, &f) + m := f.(map[string]interface{}) + var idx float64 + for _, v := range m { + switch vv := v.(type) { + case []interface{}: + for _, u := range vv { + idx, _ = u.(float64) + } + default: + } + } + sha, err := s.server.db.FetchBlockShaByHeight(int64(idx)) + if err != nil { + log.Errorf("[RCPS] Error getting block: %v", err) + jsonError := btcjson.Error{ + Code: -1, + Message: "Block number out of range.", + } + + rawReply = btcjson.Reply{ + Result: nil, + Error: &jsonError, + Id: &message.Id, + } + log.Tracef("[RPCS] reply: %v", rawReply) + break + + } + + rawReply = btcjson.Reply{ + Result: sha.String(), + Error: nil, + Id: &message.Id, + } + case "getblock": + var f interface{} + err = json.Unmarshal(body, &f) + m := f.(map[string]interface{}) + var hash string + for _, v := range m { + switch vv := v.(type) { + case []interface{}: + for _, u := range vv { + hash, _ = u.(string) + } + default: + } + } + sha, err := btcwire.NewShaHashFromStr(hash) + if err != nil { + log.Errorf("[RPCS] Error generating sha: %v", err) + jsonError := btcjson.Error{ + Code: -5, + Message: "Block not found", + } + + rawReply = btcjson.Reply{ + Result: nil, + Error: &jsonError, + Id: &message.Id, + } + log.Tracef("[RPCS] reply: %v", rawReply) + break + } + blk, err := s.server.db.FetchBlockBySha(sha) + if err != nil { + log.Errorf("[RPCS] Error fetching sha: %v", err) + jsonError := btcjson.Error{ + Code: -5, + Message: "Block not found", + } + + rawReply = btcjson.Reply{ + Result: nil, + Error: &jsonError, + Id: &message.Id, + } + log.Tracef("[RPCS] reply: %v", rawReply) + break + } + idx := blk.Height() + buf, err := blk.Bytes() + if err != nil { + log.Errorf("[RPCS] Error fetching block: %v", err) + jsonError := btcjson.Error{ + Code: -5, + Message: "Block not found", + } + + rawReply = btcjson.Reply{ + Result: nil, + Error: &jsonError, + Id: &message.Id, + } + log.Tracef("[RPCS] reply: %v", rawReply) + break + } + + txList, _ := blk.TxShas() + + txNames := make([]string, len(txList)) + for i, v := range txList { + txNames[i] = v.String() + } + + _, maxidx, err := s.server.db.NewestSha() + if err != nil { + log.Errorf("[RPCS] Cannot get newest sha: %v", err) + return + } + + blockHeader := &blk.MsgBlock().Header + blockReply := btcjson.BlockResult{ + Hash: hash, + Version: blockHeader.Version, + MerkleRoot: blockHeader.MerkleRoot.String(), + PreviousHash: blockHeader.PrevBlock.String(), + Nonce: blockHeader.Nonce, + Time: blockHeader.Timestamp.Unix(), + Confirmations: uint64(1 + maxidx - idx), + Height: idx, + Tx: txNames, + Size: len(buf), + Bits: strconv.FormatInt(int64(blockHeader.Bits), 16), + Difficulty: getDifficultyRatio(blockHeader.Bits), + } + + // Get next block unless we are already at the top. + if idx < maxidx { + shaNext, err := s.server.db.FetchBlockShaByHeight(int64(idx + 1)) + if err != nil { + log.Errorf("[RPCS] No next block: %v", err) + } else { + blockReply.NextHash = shaNext.String() + } + } + + rawReply = btcjson.Reply{ + Result: blockReply, + Error: nil, + Id: &message.Id, + } + case "getrawtransaction": + var f interface{} + err = json.Unmarshal(body, &f) + m := f.(map[string]interface{}) + var tx string + var verbose float64 + for _, v := range m { + switch vv := v.(type) { + case []interface{}: + for _, u := range vv { + switch uu := u.(type) { + case string: + tx = uu + case float64: + verbose = uu + default: + } + } + default: + } + } + + if int(verbose) != 1 { + // Don't return details + // not used yet + } else { + txSha, _ := btcwire.NewShaHashFromStr(tx) + var txS *btcwire.MsgTx + txS, _, blksha, err := s.server.db.FetchTxBySha(txSha) + if err != nil { + log.Errorf("[RPCS] Error fetching tx: %v", err) + jsonError := btcjson.Error{ + Code: -5, + Message: "No information available about transaction", + } + + rawReply = btcjson.Reply{ + Result: nil, + Error: &jsonError, + Id: &message.Id, + } + log.Tracef("[RPCS] reply: %v", rawReply) + break + } + blk, err := s.server.db.FetchBlockBySha(blksha) + if err != nil { + log.Errorf("[RPCS] Error fetching sha: %v", err) + jsonError := btcjson.Error{ + Code: -5, + Message: "Block not found", + } + + rawReply = btcjson.Reply{ + Result: nil, + Error: &jsonError, + Id: &message.Id, + } + log.Tracef("[RPCS] reply: %v", rawReply) + break + } + idx := blk.Height() + + txOutList := txS.TxOut + voutList := make([]btcjson.Vout, len(txOutList)) + + txInList := txS.TxIn + vinList := make([]btcjson.Vin, len(txInList)) + + for i, v := range txInList { + vinList[i].Sequence = float64(v.Sequence) + disbuf, _ := btcscript.DisasmString(v.SignatureScript) + vinList[i].ScriptSig.Asm = strings.Replace(disbuf, " ", "", -1) + vinList[i].Vout = i + 1 + log.Debugf(disbuf) + } + + for i, v := range txOutList { + voutList[i].N = i + voutList[i].Value = float64(v.Value) / 100000000 + isbuf, _ := btcscript.DisasmString(v.PkScript) + voutList[i].ScriptPubKey.Asm = isbuf + voutList[i].ScriptPubKey.ReqSig = strings.Count(isbuf, "OP_CHECKSIG") + _, addr, err := btcscript.ScriptToAddress(v.PkScript) + if err != nil { + log.Errorf("[RPCS] Error getting address for %v: %v", txSha, err) + } else { + addrList := make([]string, 1) + addrList[0] = addr + voutList[i].ScriptPubKey.Addresses = addrList + } + } + + _, maxidx, err := s.server.db.NewestSha() + if err != nil { + log.Errorf("[RPCS] Cannot get newest sha: %v", err) + return + } + confirmations := uint64(1 + maxidx - idx) + + blockHeader := &blk.MsgBlock().Header + txReply := btcjson.TxRawResult{ + Txid: tx, + Vout: voutList, + Vin: vinList, + Version: txS.Version, + LockTime: txS.LockTime, + // This is not a typo, they are identical in + // bitcoind as well. + Time: blockHeader.Timestamp.Unix(), + Blocktime: blockHeader.Timestamp.Unix(), + BlockHash: blksha.String(), + Confirmations: confirmations, + } + rawReply = btcjson.Reply{ + Result: txReply, + Error: nil, + Id: &message.Id, + } + } + case "decoderawtransaction": + var f interface{} + err = json.Unmarshal(body, &f) + m := f.(map[string]interface{}) + var hash string + for _, v := range m { + switch vv := v.(type) { + case []interface{}: + for _, u := range vv { + hash, _ = u.(string) + } + default: + } + } + spew.Dump(hash) + txReply := btcjson.TxRawDecodeResult{} + rawReply = btcjson.Reply{ + Result: txReply, + Error: nil, + Id: &message.Id, + } + default: + jsonError := btcjson.Error{ + Code: -32601, + Message: "Method not found", + } + rawReply = btcjson.Reply{ + Result: nil, + Error: &jsonError, + Id: &message.Id, + } + } + + msg, err := btcjson.MarshallAndSend(rawReply, w) + if err != nil { + log.Errorf(msg) + return + } + log.Debugf(msg) + return +} + +// getDifficultyRatio returns the proof-of-work difficulty as a multiple of the +// minimum difficulty using the passed bits field from the header of a block. +func getDifficultyRatio(bits uint32) float64 { + // The minimum difficulty is the max possible proof-of-work limit bits + // converted back to a number. Note this is not the same as the the + // proof of work limit directly because the block difficulty is encoded + // in a block with the compact form which loses precision. + max := btcchain.CompactToBig(activeNetParams.powLimitBits) + target := btcchain.CompactToBig(bits) + + difficulty := new(big.Rat).SetFrac(max, target) + outString := difficulty.FloatString(2) + diff, err := strconv.ParseFloat(outString, 64) + if err != nil { + log.Errorf("[RPCS] Cannot get difficulty: %v", err) + return 0 + } + return diff +} diff --git a/btcd/sample-btcd.conf b/btcd/sample-btcd.conf new file mode 100644 index 00000000..9c6f00c9 --- /dev/null +++ b/btcd/sample-btcd.conf @@ -0,0 +1,22 @@ +[Application Options] +; Debug logging level. +; Valid options are {trace, debug, info, warn, error, critical} +; debuglevel=info + +; Use testnet. +; testnet=1 + +; Add as many specific space separated peers to connect to as desired. +; addpeer=192.168.1.1 +; addpeer=10.0.0.2:8333 +; addpeer=fe80::1 +; addpeer=[fe80::2]:8333 +; addpeer=192.168.1.1 10.0.0.2:8333 fe80::1 [fe80::2]:8333 + +; Maximum number of inbound and outbound peers. +; maxpeers=8 + +; How long to ban misbehaving peers. Valid time units are {s, m, h}. +; Minimum 1s. +; banduration=24h +; banduration=11h30m15s diff --git a/btcd/server.go b/btcd/server.go new file mode 100644 index 00000000..93e8d056 --- /dev/null +++ b/btcd/server.go @@ -0,0 +1,434 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main + +import ( + "container/list" + "github.com/conformal/btcdb" + "github.com/conformal/btcwire" + "net" + "opensource.conformal.com/go/btcd-internal/addrmgr" + "sync" + "time" +) + +// supportedServices describes which services are supported by the server. +const supportedServices = btcwire.SFNodeNetwork + +// connectionRetryInterval is the amount of time to wait in between retries +// when connecting to persistent peers. +const connectionRetryInterval = time.Second * 10 + +// directionString is a helper function that returns a string that represents +// the direction of a connection (inbound or outbound). +func directionString(inbound bool) string { + if inbound { + return "inbound" + } + return "outbound" +} + +// broadcastMsg provides the ability to house a bitcoin message to be broadcast +// to all connected peers except specified excluded peers. +type broadcastMsg struct { + message btcwire.Message + excludePeers []*peer +} + +// server provides a bitcoin server for handling communications to and from +// bitcoin peers. +type server struct { + nonce uint64 + listener net.Listener + btcnet btcwire.BitcoinNet + started bool + shutdown bool + shutdownSched bool + addrManager *addrmgr.AddrManager + rpcServer *rpcServer + blockManager *blockManager + newPeers chan *peer + donePeers chan *peer + banPeers chan *peer + broadcast chan broadcastMsg + wg sync.WaitGroup + quit chan bool + db btcdb.Db +} + +// handleAddPeerMsg deals with adding new peers. It is invoked from the +// peerHandler goroutine. +func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time, p *peer) { + + // Ignore new peers if we're shutting down. + direction := directionString(p.inbound) + if s.shutdown { + log.Infof("[SRVR] New peer %s (%s) ignored - server is "+ + "shutting down", p.conn.RemoteAddr(), direction) + p.Shutdown() + return + } + + // Disconnect banned peers. + host, _, err := net.SplitHostPort(p.conn.RemoteAddr().String()) + if err != nil { + log.Errorf("[SRVR] %v", err) + p.Shutdown() + return + } + if banEnd, ok := banned[host]; ok { + if time.Now().Before(banEnd) { + log.Debugf("[SRVR] Peer %s is banned for another %v - "+ + "disconnecting", host, banEnd.Sub(time.Now())) + p.Shutdown() + return + } + + log.Infof("[SRVR] Peer %s is no longer banned", host) + delete(banned, host) + } + + // TODO: Check for max peers from a single IP. + + // Limit max number of total peers. + if peers.Len() >= cfg.MaxPeers { + log.Infof("[SRVR] Max peers reached [%d] - disconnecting "+ + "peer %s (%s)", cfg.MaxPeers, p.conn.RemoteAddr(), + direction) + p.Shutdown() + return + } + + // Add the new peer and start it. + log.Infof("[SRVR] New peer %s (%s)", p.conn.RemoteAddr(), direction) + peers.PushBack(p) + p.Start() +} + +// handleDonePeerMsg deals with peers that have signalled they are done. It is +// invoked from the peerHandler goroutine. +func (s *server) handleDonePeerMsg(peers *list.List, p *peer) { + direction := directionString(p.inbound) + for e := peers.Front(); e != nil; e = e.Next() { + if e.Value == p { + peers.Remove(e) + log.Infof("[SRVR] Removed peer %s (%s)", + p.conn.RemoteAddr(), direction) + + // Issue an asynchronous reconnect if the peer was a + // persistent outbound connection. + if !p.inbound && p.persistent { + addr := p.conn.RemoteAddr().String() + s.ConnectPeerAsync(addr, true) + } + return + } + } +} + +// handleBanPeerMsg deals with banning peers. It is invoked from the +// peerHandler goroutine. +func (s *server) handleBanPeerMsg(banned map[string]time.Time, p *peer) { + host, _, err := net.SplitHostPort(p.conn.RemoteAddr().String()) + if err != nil { + log.Errorf("[SRVR] %v", err) + return + } + direction := directionString(p.inbound) + log.Infof("[SRVR] Banned peer %s (%s) for %v", host, direction, + cfg.BanDuration) + banned[host] = time.Now().Add(cfg.BanDuration) + +} + +// handleBroadcastMsg deals with broadcasting messages to peers. It is invoked +// from the peerHandler goroutine. +func (s *server) handleBroadcastMsg(peers *list.List, bmsg *broadcastMsg) { + for e := peers.Front(); e != nil; e = e.Next() { + excluded := false + for _, p := range bmsg.excludePeers { + if e.Value == p { + excluded = true + } + } + if !excluded { + p := e.Value.(*peer) + p.QueueMessage(bmsg.message) + } + } + +} + +// listenHandler is the main listener which accepts incoming connections for the +// server. It must be run as a goroutine. +func (s *server) listenHandler() { + log.Infof("[SRVR] Server listening on %s", s.listener.Addr()) + for !s.shutdown { + conn, err := s.listener.Accept() + if err != nil { + // Only log the error if we're not forcibly shutting down. + if !s.shutdown { + log.Errorf("[SRVR] %v", err) + } + continue + } + s.AddPeer(newPeer(s, conn, true, false)) + } + s.wg.Done() + log.Tracef("[SRVR] Listener handler done for %s", s.listener.Addr()) +} + +// peerHandler is used to handle peer operations such as adding and removing +// peers to and from the server, banning peers, and broadcasting messages to +// peers. It must be run a a goroutine. +func (s *server) peerHandler() { + log.Tracef("[SRVR] Starting peer handler for %s", s.listener.Addr()) + peers := list.New() + bannedPeers := make(map[string]time.Time) + + // Live while we're not shutting down or there are still connected peers. + for !s.shutdown || peers.Len() != 0 { + select { + + // New peers connected to the server. + case p := <-s.newPeers: + s.handleAddPeerMsg(peers, bannedPeers, p) + + // Disconnected peers. + case p := <-s.donePeers: + s.handleDonePeerMsg(peers, p) + + // Peer to ban. + case p := <-s.banPeers: + s.handleBanPeerMsg(bannedPeers, p) + + // Message to broadcast to all connected peers except those + // which are excluded by the message. + case bmsg := <-s.broadcast: + s.handleBroadcastMsg(peers, &bmsg) + + // Shutdown the peer handler. + case <-s.quit: + // Shutdown peers. + for e := peers.Front(); e != nil; e = e.Next() { + p := e.Value.(*peer) + p.Shutdown() + } + } + } + s.wg.Done() + log.Tracef("[SRVR] Peer handler done on %s", s.listener.Addr()) +} + +// AddPeer adds a new peer that has already been connected to the server. +func (s *server) AddPeer(p *peer) { + s.newPeers <- p +} + +// BanPeer bans a peer that has already been connected to the server by ip. +func (s *server) BanPeer(p *peer) { + s.banPeers <- p +} + +// BroadcastMessage sends msg to all peers currently connected to the server +// except those in the passed peers to exclude. +func (s *server) BroadcastMessage(msg btcwire.Message, exclPeers ...*peer) { + // XXX: Need to determine if this is an alert that has already been + // broadcast and refrain from broadcasting again. + bmsg := broadcastMsg{message: msg, excludePeers: exclPeers} + s.broadcast <- bmsg +} + +// ConnectPeerAsync attempts to asynchronously connect to addr. If successful, +// a new peer is created and added to the server's peer list. +func (s *server) ConnectPeerAsync(addr string, persistent bool) { + // Don't try to connect to a peer if the server is shutting down. + if s.shutdown { + return + } + + go func() { + // Attempt to connect to the peer. If the connection fails and + // this is a persistent connection, retry after the retry + // interval. + for !s.shutdown { + log.Debugf("[SRVR] Attempting to connect to %s", addr) + conn, err := net.Dial("tcp", addr) + if err != nil { + log.Errorf("[SRVR] %v", err) + if !persistent { + return + } + log.Infof("[SRVR] Retrying connection to %s "+ + "in %s", addr, connectionRetryInterval) + time.Sleep(connectionRetryInterval) + continue + } + + // Connection was successful so log it and create a new + // peer. + log.Infof("[SRVR] Connected to %s", conn.RemoteAddr()) + s.AddPeer(newPeer(s, conn, false, persistent)) + return + } + }() +} + +// Start begins accepting connections from peers. +func (s *server) Start() { + // Already started? + if s.started { + return + } + + log.Trace("[SRVR] Starting server") + go s.listenHandler() + go s.peerHandler() + s.wg.Add(2) + s.addrManager.Start() + s.blockManager.Start() + if !cfg.DisableRpc { + s.rpcServer.Start() + } + + s.started = true +} + +// Stop gracefully shuts down the server by stopping and disconnecting all +// peers and the main listener. +func (s *server) Stop() error { + if s.shutdown { + log.Infof("[SRVR] Server is already in the process of shutting down") + return nil + } + + log.Warnf("[SRVR] Server shutting down") + s.shutdown = true + s.quit <- true + if !cfg.DisableRpc { + s.rpcServer.Stop() + } + s.addrManager.Stop() + s.blockManager.Stop() + err := s.listener.Close() + if err != nil { + return err + } + return nil +} + +// WaitForShutdown blocks until the main listener and peer handlers are stopped. +func (s *server) WaitForShutdown() { + s.wg.Wait() + log.Infof("[SRVR] Server shutdown complete") +} + +// ScheduleShutdown schedules a server shutdown after the specified duration. +// It also dynamically adjusts how often to warn the server is going down based +// on remaining duration. +func (s *server) ScheduleShutdown(duration time.Duration) { + // Don't schedule shutdown more than once. + if s.shutdownSched { + return + } + log.Warnf("[SRVR] Server shutdown in %v", duration) + go func() { + remaining := duration + tickDuration := dynamicTickDuration(remaining) + done := time.After(remaining) + ticker := time.NewTicker(tickDuration) + out: + for { + select { + case <-done: + ticker.Stop() + s.Stop() + break out + case <-ticker.C: + remaining = remaining - tickDuration + if remaining < time.Second { + continue + } + + // Change tick duration dynamically based on remaining time. + newDuration := dynamicTickDuration(remaining) + if tickDuration != newDuration { + tickDuration = newDuration + ticker.Stop() + ticker = time.NewTicker(tickDuration) + } + log.Warnf("[SRVR] Server shutdown in %v", remaining) + } + } + }() + s.shutdownSched = true +} + +// newServer returns a new btcd server configured to listen on addr for the +// bitcoin network type specified in btcnet. Use start to begin accepting +// connections from peers. +func newServer(addr string, db btcdb.Db, btcnet btcwire.BitcoinNet) (*server, error) { + nonce, err := btcwire.RandomUint64() + if err != nil { + return nil, err + } + + listener, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + + s := server{ + nonce: nonce, + listener: listener, + btcnet: btcnet, + addrManager: addrmgr.New(), + newPeers: make(chan *peer, cfg.MaxPeers), + donePeers: make(chan *peer, cfg.MaxPeers), + banPeers: make(chan *peer, cfg.MaxPeers), + broadcast: make(chan broadcastMsg, cfg.MaxPeers), + quit: make(chan bool), + db: db, + } + s.blockManager = newBlockManager(&s) + + log.Infof("[BMGR] Generating initial block node index. This may " + + "take a while...") + err = s.blockManager.blockChain.GenerateInitialIndex() + if err != nil { + return nil, err + } + log.Infof("[BMGR] Block index generation complete") + + if !cfg.DisableRpc { + s.rpcServer, err = newRpcServer(&s) + if err != nil { + return nil, err + } + } + return &s, nil +} + +// dynamicTickDuration is a convenience function used to dynamically choose a +// tick duration based on remaining time. It is primarily used during +// server shutdown to make shutdown warnings more frequent as the shutdown time +// approaches. +func dynamicTickDuration(remaining time.Duration) time.Duration { + switch { + case remaining <= time.Second*5: + return time.Second + case remaining <= time.Second*15: + return time.Second * 5 + case remaining <= time.Minute: + return time.Second * 15 + case remaining <= time.Minute*5: + return time.Minute + case remaining <= time.Minute*15: + return time.Minute * 5 + case remaining <= time.Hour: + return time.Minute * 15 + } + return time.Hour +} diff --git a/btcd/signal.go b/btcd/signal.go new file mode 100644 index 00000000..e4a42011 --- /dev/null +++ b/btcd/signal.go @@ -0,0 +1,36 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main + +import ( + "os" + "os/signal" +) + +// interruptChannel is used to receive SIGINT (Ctrl+C) signals. +var interruptChannel chan os.Signal + +// interruptCallbacks is a list of callbacks to invoke when a SIGINT (Ctrl+C) is +// received. +var interruptCallbacks []func() + +// addInterruptHandler adds a handler to call when a SIGINT (Ctrl+C) is +// received. +func addInterruptHandler(handler func()) { + // Create the channel and start the main interrupt handler which invokes + // all other callbacks and exits if not already done. + if interruptChannel == nil { + interruptChannel = make(chan os.Signal, 1) + signal.Notify(interruptChannel, os.Interrupt) + go func() { + <-interruptChannel + for _, callback := range interruptCallbacks { + callback() + } + os.Exit(0) + }() + } + interruptCallbacks = append(interruptCallbacks, handler) +}