btcd/addrmgr/addrmanager.go

1219 lines
31 KiB
Go

// Copyright (c) 2013-2016 The btcsuite developers
// Copyright (c) 2015-2018 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package addrmgr
import (
"container/list"
crand "crypto/rand" // for seeding
"encoding/base32"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"math/rand"
"net"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
)
// AddrManager provides a concurrency safe address manager for caching potential
// peers on the bitcoin network.
type AddrManager struct {
mtx sync.RWMutex
peersFile string
lookupFunc func(string) ([]net.IP, error)
rand *rand.Rand
key [32]byte
addrIndex map[string]*KnownAddress // address key to ka for all addrs.
addrNew [newBucketCount]map[string]*KnownAddress
addrTried [triedBucketCount]*list.List
started int32
shutdown int32
wg sync.WaitGroup
quit chan struct{}
nTried int
nNew int
lamtx sync.Mutex
localAddresses map[string]*localAddress
version int
}
type serializedKnownAddress struct {
Addr string
Src string
Attempts int
TimeStamp int64
LastAttempt int64
LastSuccess int64
Services wire.ServiceFlag
SrcServices wire.ServiceFlag
// no refcount or tried, that is available from context.
}
type serializedAddrManager struct {
Version int
Key [32]byte
Addresses []*serializedKnownAddress
NewBuckets [newBucketCount][]string // string is NetAddressKey
TriedBuckets [triedBucketCount][]string
}
type localAddress struct {
na *wire.NetAddressV2
score AddressPriority
}
// AddressPriority type is used to describe the hierarchy of local address
// discovery methods.
type AddressPriority int
const (
// InterfacePrio signifies the address is on a local interface
InterfacePrio AddressPriority = iota
// BoundPrio signifies the address has been explicitly bounded to.
BoundPrio
// UpnpPrio signifies the address was obtained from UPnP.
UpnpPrio
// HTTPPrio signifies the address was obtained from an external HTTP service.
HTTPPrio
// ManualPrio signifies the address was provided by --externalip.
ManualPrio
)
const (
// needAddressThreshold is the number of addresses under which the
// address manager will claim to need more addresses.
needAddressThreshold = 1000
// dumpAddressInterval is the interval used to dump the address
// cache to disk for future use.
dumpAddressInterval = time.Minute * 10
// triedBucketSize is the maximum number of addresses in each
// tried address bucket.
triedBucketSize = 256
// triedBucketCount is the number of buckets we split tried
// addresses over.
triedBucketCount = 64
// newBucketSize is the maximum number of addresses in each new address
// bucket.
newBucketSize = 64
// newBucketCount is the number of buckets that we spread new addresses
// over.
newBucketCount = 1024
// triedBucketsPerGroup is the number of tried buckets over which an
// address group will be spread.
triedBucketsPerGroup = 8
// newBucketsPerGroup is the number of new buckets over which an
// source address group will be spread.
newBucketsPerGroup = 64
// newBucketsPerAddress is the number of buckets a frequently seen new
// address may end up in.
newBucketsPerAddress = 8
// numMissingDays is the number of days before which we assume an
// address has vanished if we have not seen it announced in that long.
numMissingDays = 30
// numRetries is the number of tried without a single success before
// we assume an address is bad.
numRetries = 3
// maxFailures is the maximum number of failures we will accept without
// a success before considering an address bad.
maxFailures = 10
// minBadDays is the number of days since the last success before we
// will consider evicting an address.
minBadDays = 7
// getAddrMax is the most addresses that we will send in response
// to a getAddr (in practise the most addresses we will return from a
// call to AddressCache()).
getAddrMax = 2500
// getAddrPercent is the percentage of total addresses known that we
// will share with a call to AddressCache.
getAddrPercent = 23
// serialisationVersion is the current version of the on-disk format.
serialisationVersion = 2
)
// updateAddress is a helper function to either update an address already known
// to the address manager, or to add the address if not already known.
func (a *AddrManager) updateAddress(netAddr, srcAddr *wire.NetAddressV2) {
// Filter out non-routable addresses. Note that non-routable
// also includes invalid and local addresses.
if !IsRoutable(netAddr) {
return
}
addr := NetAddressKey(netAddr)
ka := a.find(netAddr)
if ka != nil {
// TODO: only update addresses periodically.
// Update the last seen time and services.
// note that to prevent causing excess garbage on getaddr
// messages the netaddresses in addrmanager are *immutable*,
// if we need to change them then we replace the pointer with a
// new copy so that we don't have to copy every na for getaddr.
if netAddr.Timestamp.After(ka.na.Timestamp) ||
(ka.na.Services&netAddr.Services) !=
netAddr.Services {
naCopy := *ka.na
naCopy.Timestamp = netAddr.Timestamp
naCopy.AddService(netAddr.Services)
ka.mtx.Lock()
ka.na = &naCopy
ka.mtx.Unlock()
}
// If already in tried, we have nothing to do here.
if ka.tried {
return
}
// Already at our max?
if ka.refs == newBucketsPerAddress {
return
}
// The more entries we have, the less likely we are to add more.
// likelihood is 2N.
factor := int32(2 * ka.refs)
if a.rand.Int31n(factor) != 0 {
return
}
} else {
// Make a copy of the net address to avoid races since it is
// updated elsewhere in the addrmanager code and would otherwise
// change the actual netaddress on the peer.
netAddrCopy := *netAddr
ka = &KnownAddress{na: &netAddrCopy, srcAddr: srcAddr}
a.addrIndex[addr] = ka
a.nNew++
// XXX time penalty?
}
bucket := a.getNewBucket(netAddr, srcAddr)
// Already exists?
if _, ok := a.addrNew[bucket][addr]; ok {
return
}
// Enforce max addresses.
if len(a.addrNew[bucket]) > newBucketSize {
log.Tracef("new bucket is full, expiring old")
a.expireNew(bucket)
}
// Add to new bucket.
ka.refs++
a.addrNew[bucket][addr] = ka
log.Tracef("Added new address %s for a total of %d addresses", addr,
a.nTried+a.nNew)
}
// expireNew makes space in the new buckets by expiring the really bad entries.
// If no bad entries are available we look at a few and remove the oldest.
func (a *AddrManager) expireNew(bucket int) {
// First see if there are any entries that are so bad we can just throw
// them away. otherwise we throw away the oldest entry in the cache.
// Bitcoind here chooses four random and just throws the oldest of
// those away, but we keep track of oldest in the initial traversal and
// use that information instead.
var oldest *KnownAddress
for k, v := range a.addrNew[bucket] {
if v.isBad() {
log.Tracef("expiring bad address %v", k)
delete(a.addrNew[bucket], k)
v.refs--
if v.refs == 0 {
a.nNew--
delete(a.addrIndex, k)
}
continue
}
if oldest == nil {
oldest = v
} else if !v.na.Timestamp.After(oldest.na.Timestamp) {
oldest = v
}
}
if oldest != nil {
key := NetAddressKey(oldest.na)
log.Tracef("expiring oldest address %v", key)
delete(a.addrNew[bucket], key)
oldest.refs--
if oldest.refs == 0 {
a.nNew--
delete(a.addrIndex, key)
}
}
}
// pickTried selects an address from the tried bucket to be evicted.
// We just choose the eldest. Bitcoind selects 4 random entries and throws away
// the older of them.
func (a *AddrManager) pickTried(bucket int) *list.Element {
var oldest *KnownAddress
var oldestElem *list.Element
for e := a.addrTried[bucket].Front(); e != nil; e = e.Next() {
ka := e.Value.(*KnownAddress)
if oldest == nil || oldest.na.Timestamp.After(ka.na.Timestamp) {
oldestElem = e
oldest = ka
}
}
return oldestElem
}
func (a *AddrManager) getNewBucket(netAddr, srcAddr *wire.NetAddressV2) int {
// bitcoind:
// doublesha256(key + sourcegroup + int64(doublesha256(key + group + sourcegroup))%bucket_per_source_group) % num_new_buckets
data1 := []byte{}
data1 = append(data1, a.key[:]...)
data1 = append(data1, []byte(GroupKey(netAddr))...)
data1 = append(data1, []byte(GroupKey(srcAddr))...)
hash1 := chainhash.DoubleHashB(data1)
hash64 := binary.LittleEndian.Uint64(hash1)
hash64 %= newBucketsPerGroup
var hashbuf [8]byte
binary.LittleEndian.PutUint64(hashbuf[:], hash64)
data2 := []byte{}
data2 = append(data2, a.key[:]...)
data2 = append(data2, GroupKey(srcAddr)...)
data2 = append(data2, hashbuf[:]...)
hash2 := chainhash.DoubleHashB(data2)
return int(binary.LittleEndian.Uint64(hash2) % newBucketCount)
}
func (a *AddrManager) getTriedBucket(netAddr *wire.NetAddressV2) int {
// bitcoind hashes this as:
// doublesha256(key + group + truncate_to_64bits(doublesha256(key)) % buckets_per_group) % num_buckets
data1 := []byte{}
data1 = append(data1, a.key[:]...)
data1 = append(data1, []byte(NetAddressKey(netAddr))...)
hash1 := chainhash.DoubleHashB(data1)
hash64 := binary.LittleEndian.Uint64(hash1)
hash64 %= triedBucketsPerGroup
var hashbuf [8]byte
binary.LittleEndian.PutUint64(hashbuf[:], hash64)
data2 := []byte{}
data2 = append(data2, a.key[:]...)
data2 = append(data2, GroupKey(netAddr)...)
data2 = append(data2, hashbuf[:]...)
hash2 := chainhash.DoubleHashB(data2)
return int(binary.LittleEndian.Uint64(hash2) % triedBucketCount)
}
// addressHandler is the main handler for the address manager. It must be run
// as a goroutine.
func (a *AddrManager) addressHandler() {
dumpAddressTicker := time.NewTicker(dumpAddressInterval)
defer dumpAddressTicker.Stop()
out:
for {
select {
case <-dumpAddressTicker.C:
a.savePeers()
case <-a.quit:
break out
}
}
a.savePeers()
a.wg.Done()
log.Trace("Address handler done")
}
// savePeers saves all the known addresses to a file so they can be read back
// in at next run.
func (a *AddrManager) savePeers() {
a.mtx.Lock()
defer a.mtx.Unlock()
// First we make a serialisable datastructure so we can encode it to
// json.
sam := new(serializedAddrManager)
sam.Version = a.version
copy(sam.Key[:], a.key[:])
sam.Addresses = make([]*serializedKnownAddress, len(a.addrIndex))
i := 0
for k, v := range a.addrIndex {
ska := new(serializedKnownAddress)
ska.Addr = k
ska.TimeStamp = v.na.Timestamp.Unix()
ska.Src = NetAddressKey(v.srcAddr)
ska.Attempts = v.attempts
ska.LastAttempt = v.lastattempt.Unix()
ska.LastSuccess = v.lastsuccess.Unix()
if a.version > 1 {
ska.Services = v.na.Services
ska.SrcServices = v.srcAddr.Services
}
// Tried and refs are implicit in the rest of the structure
// and will be worked out from context on unserialisation.
sam.Addresses[i] = ska
i++
}
for i := range a.addrNew {
sam.NewBuckets[i] = make([]string, len(a.addrNew[i]))
j := 0
for k := range a.addrNew[i] {
sam.NewBuckets[i][j] = k
j++
}
}
for i := range a.addrTried {
sam.TriedBuckets[i] = make([]string, a.addrTried[i].Len())
j := 0
for e := a.addrTried[i].Front(); e != nil; e = e.Next() {
ka := e.Value.(*KnownAddress)
sam.TriedBuckets[i][j] = NetAddressKey(ka.na)
j++
}
}
w, err := os.Create(a.peersFile)
if err != nil {
log.Errorf("Error opening file %s: %v", a.peersFile, err)
return
}
enc := json.NewEncoder(w)
defer w.Close()
if err := enc.Encode(&sam); err != nil {
log.Errorf("Failed to encode file %s: %v", a.peersFile, err)
return
}
}
// loadPeers loads the known address from the saved file. If empty, missing, or
// malformed file, just don't load anything and start fresh
func (a *AddrManager) loadPeers() {
a.mtx.Lock()
defer a.mtx.Unlock()
err := a.deserializePeers(a.peersFile)
if err != nil {
log.Errorf("Failed to parse file %s: %v", a.peersFile, err)
// if it is invalid we nuke the old one unconditionally.
err = os.Remove(a.peersFile)
if err != nil {
log.Warnf("Failed to remove corrupt peers file %s: %v",
a.peersFile, err)
}
a.reset()
return
}
log.Infof("Loaded %d addresses from file '%s'", a.numAddresses(), a.peersFile)
}
func (a *AddrManager) deserializePeers(filePath string) error {
_, err := os.Stat(filePath)
if os.IsNotExist(err) {
return nil
}
r, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("%s error opening file: %v", filePath, err)
}
defer r.Close()
var sam serializedAddrManager
dec := json.NewDecoder(r)
err = dec.Decode(&sam)
if err != nil {
return fmt.Errorf("error reading %s: %v", filePath, err)
}
// Since decoding JSON is backwards compatible (i.e., only decodes
// fields it understands), we'll only return an error upon seeing a
// version past our latest supported version.
if sam.Version > serialisationVersion {
return fmt.Errorf("unknown version %v in serialized "+
"addrmanager", sam.Version)
}
copy(a.key[:], sam.Key[:])
for _, v := range sam.Addresses {
ka := new(KnownAddress)
// The first version of the serialized address manager was not
// aware of the service bits associated with this address, so
// we'll assign a default of SFNodeNetwork to it.
if sam.Version == 1 {
v.Services = wire.SFNodeNetwork
}
ka.na, err = a.DeserializeNetAddress(v.Addr, v.Services)
if err != nil {
return fmt.Errorf("failed to deserialize netaddress "+
"%s: %v", v.Addr, err)
}
// The first version of the serialized address manager was not
// aware of the service bits associated with the source address,
// so we'll assign a default of SFNodeNetwork to it.
if sam.Version == 1 {
v.SrcServices = wire.SFNodeNetwork
}
ka.srcAddr, err = a.DeserializeNetAddress(v.Src, v.SrcServices)
if err != nil {
return fmt.Errorf("failed to deserialize netaddress "+
"%s: %v", v.Src, err)
}
ka.attempts = v.Attempts
ka.lastattempt = time.Unix(v.LastAttempt, 0)
ka.lastsuccess = time.Unix(v.LastSuccess, 0)
a.addrIndex[NetAddressKey(ka.na)] = ka
}
for i := range sam.NewBuckets {
for _, val := range sam.NewBuckets[i] {
ka, ok := a.addrIndex[val]
if !ok {
return fmt.Errorf("newbucket contains %s but "+
"none in address list", val)
}
if ka.refs == 0 {
a.nNew++
}
ka.refs++
a.addrNew[i][val] = ka
}
}
for i := range sam.TriedBuckets {
for _, val := range sam.TriedBuckets[i] {
ka, ok := a.addrIndex[val]
if !ok {
return fmt.Errorf("Newbucket contains %s but "+
"none in address list", val)
}
ka.tried = true
a.nTried++
a.addrTried[i].PushBack(ka)
}
}
// Sanity checking.
for k, v := range a.addrIndex {
if v.refs == 0 && !v.tried {
return fmt.Errorf("address %s after serialisation "+
"with no references", k)
}
if v.refs > 0 && v.tried {
return fmt.Errorf("address %s after serialisation "+
"which is both new and tried!", k)
}
}
return nil
}
// DeserializeNetAddress converts a given address string to a *wire.NetAddress.
func (a *AddrManager) DeserializeNetAddress(addr string,
services wire.ServiceFlag) (*wire.NetAddressV2, error) {
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
return nil, err
}
return a.HostToNetAddress(host, uint16(port), services)
}
// Start begins the core address handler which manages a pool of known
// addresses, timeouts, and interval based writes.
func (a *AddrManager) Start() {
// Already started?
if atomic.AddInt32(&a.started, 1) != 1 {
return
}
log.Trace("Starting address manager")
// Load peers we already know about from file.
a.loadPeers()
// Start the address ticker to save addresses periodically.
a.wg.Add(1)
go a.addressHandler()
}
// Stop gracefully shuts down the address manager by stopping the main handler.
func (a *AddrManager) Stop() error {
if atomic.AddInt32(&a.shutdown, 1) != 1 {
log.Warnf("Address manager is already in the process of " +
"shutting down")
return nil
}
log.Infof("Address manager shutting down")
close(a.quit)
a.wg.Wait()
return nil
}
// AddAddresses adds new addresses to the address manager. It enforces a max
// number of addresses and silently ignores duplicate addresses. It is
// safe for concurrent access.
func (a *AddrManager) AddAddresses(addrs []*wire.NetAddressV2, srcAddr *wire.NetAddressV2) {
a.mtx.Lock()
defer a.mtx.Unlock()
for _, na := range addrs {
a.updateAddress(na, srcAddr)
}
}
// AddAddress adds a new address to the address manager. It enforces a max
// number of addresses and silently ignores duplicate addresses. It is
// safe for concurrent access.
func (a *AddrManager) AddAddress(addr, srcAddr *wire.NetAddressV2) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.updateAddress(addr, srcAddr)
}
// AddAddressByIP adds an address where we are given an ip:port and not a
// wire.NetAddress.
func (a *AddrManager) AddAddressByIP(addrIP string) error {
// Split IP and port
addr, portStr, err := net.SplitHostPort(addrIP)
if err != nil {
return err
}
// Put it in wire.Netaddress
ip := net.ParseIP(addr)
if ip == nil {
return fmt.Errorf("invalid ip address %s", addr)
}
port, err := strconv.ParseUint(portStr, 10, 0)
if err != nil {
return fmt.Errorf("invalid port %s: %v", portStr, err)
}
na := wire.NetAddressV2FromBytes(time.Now(), 0, ip, uint16(port))
a.AddAddress(na, na) // XXX use correct src address
return nil
}
// NumAddresses returns the number of addresses known to the address manager.
func (a *AddrManager) numAddresses() int {
return a.nTried + a.nNew
}
// NumAddresses returns the number of addresses known to the address manager.
func (a *AddrManager) NumAddresses() int {
a.mtx.RLock()
defer a.mtx.RUnlock()
return a.numAddresses()
}
// NeedMoreAddresses returns whether or not the address manager needs more
// addresses.
func (a *AddrManager) NeedMoreAddresses() bool {
a.mtx.RLock()
defer a.mtx.RUnlock()
return a.numAddresses() < needAddressThreshold
}
// AddressCache returns the current address cache. It must be treated as
// read-only (but since it is a copy now, this is not as dangerous).
func (a *AddrManager) AddressCache() []*wire.NetAddressV2 {
allAddr := a.getAddresses()
numAddresses := len(allAddr) * getAddrPercent / 100
if numAddresses > getAddrMax {
numAddresses = getAddrMax
}
// Fisher-Yates shuffle the array. We only need to do the first
// `numAddresses' since we are throwing the rest.
for i := 0; i < numAddresses; i++ {
// pick a number between current index and the end
j := rand.Intn(len(allAddr)-i) + i
allAddr[i], allAddr[j] = allAddr[j], allAddr[i]
}
// slice off the limit we are willing to share.
return allAddr[0:numAddresses]
}
// getAddresses returns all of the addresses currently found within the
// manager's address cache.
func (a *AddrManager) getAddresses() []*wire.NetAddressV2 {
a.mtx.RLock()
defer a.mtx.RUnlock()
addrIndexLen := len(a.addrIndex)
if addrIndexLen == 0 {
return nil
}
addrs := make([]*wire.NetAddressV2, 0, addrIndexLen)
for _, v := range a.addrIndex {
addrs = append(addrs, v.na)
}
return addrs
}
// reset resets the address manager by reinitialising the random source
// and allocating fresh empty bucket storage.
func (a *AddrManager) reset() {
a.addrIndex = make(map[string]*KnownAddress)
// fill key with bytes from a good random source.
io.ReadFull(crand.Reader, a.key[:])
for i := range a.addrNew {
a.addrNew[i] = make(map[string]*KnownAddress)
}
for i := range a.addrTried {
a.addrTried[i] = list.New()
}
}
// HostToNetAddress returns a netaddress given a host address. If the address
// is a Tor .onion address this will be taken care of. Else if the host is
// not an IP address it will be resolved (via Tor if required).
func (a *AddrManager) HostToNetAddress(host string, port uint16,
services wire.ServiceFlag) (*wire.NetAddressV2, error) {
var (
na *wire.NetAddressV2
ip net.IP
)
// Tor v2 address is 16 char base32 + ".onion"
if len(host) == wire.TorV2EncodedSize && host[wire.TorV2EncodedSize-6:] == ".onion" {
// go base32 encoding uses capitals (as does the rfc
// but Tor and bitcoind tend to user lowercase, so we switch
// case here.
data, err := base32.StdEncoding.DecodeString(
strings.ToUpper(host[:wire.TorV2EncodedSize-6]))
if err != nil {
return nil, err
}
na = wire.NetAddressV2FromBytes(
time.Now(), services, data, port,
)
} else if len(host) == wire.TorV3EncodedSize && host[wire.TorV3EncodedSize-6:] == ".onion" {
// Tor v3 addresses are 56 base32 characters with the 6 byte
// onion suffix.
data, err := base32.StdEncoding.DecodeString(
strings.ToUpper(host[:wire.TorV3EncodedSize-6]),
)
if err != nil {
return nil, err
}
// The first 32 bytes is the ed25519 public key and is enough
// to reconstruct the .onion address.
na = wire.NetAddressV2FromBytes(
time.Now(), services, data[:wire.TorV3Size], port,
)
} else if ip = net.ParseIP(host); ip == nil {
ips, err := a.lookupFunc(host)
if err != nil {
return nil, err
}
if len(ips) == 0 {
return nil, fmt.Errorf("no addresses found for %s", host)
}
ip = ips[0]
na = wire.NetAddressV2FromBytes(time.Now(), services, ip, port)
} else {
// This is an non-nil IP address that was parsed in the else if
// above.
na = wire.NetAddressV2FromBytes(time.Now(), services, ip, port)
}
return na, nil
}
// NetAddressKey returns a string key in the form of ip:port for IPv4 addresses
// or [ip]:port for IPv6 addresses. It also handles onion v2 and v3 addresses.
func NetAddressKey(na *wire.NetAddressV2) string {
port := strconv.FormatUint(uint64(na.Port), 10)
return net.JoinHostPort(na.Addr.String(), port)
}
// GetAddress returns a single address that should be routable. It picks a
// random one from the possible addresses with preference given to ones that
// have not been used recently and should not pick 'close' addresses
// consecutively.
func (a *AddrManager) GetAddress() *KnownAddress {
// Protect concurrent access.
a.mtx.Lock()
defer a.mtx.Unlock()
if a.numAddresses() == 0 {
return nil
}
// Use a 50% chance for choosing between tried and new table entries.
if a.nTried > 0 && (a.nNew == 0 || a.rand.Intn(2) == 0) {
// Tried entry.
large := 1 << 30
factor := 1.0
for {
// pick a random bucket.
bucket := a.rand.Intn(len(a.addrTried))
if a.addrTried[bucket].Len() == 0 {
continue
}
// Pick a random entry in the list
e := a.addrTried[bucket].Front()
for i :=
a.rand.Int63n(int64(a.addrTried[bucket].Len())); i > 0; i-- {
e = e.Next()
}
ka := e.Value.(*KnownAddress)
randval := a.rand.Intn(large)
if float64(randval) < (factor * ka.chance() * float64(large)) {
log.Tracef("Selected %v from tried bucket",
NetAddressKey(ka.na))
return ka
}
factor *= 1.2
}
} else {
// new node.
// XXX use a closure/function to avoid repeating this.
large := 1 << 30
factor := 1.0
for {
// Pick a random bucket.
bucket := a.rand.Intn(len(a.addrNew))
if len(a.addrNew[bucket]) == 0 {
continue
}
// Then, a random entry in it.
var ka *KnownAddress
nth := a.rand.Intn(len(a.addrNew[bucket]))
for _, value := range a.addrNew[bucket] {
if nth == 0 {
ka = value
}
nth--
}
randval := a.rand.Intn(large)
if float64(randval) < (factor * ka.chance() * float64(large)) {
log.Tracef("Selected %v from new bucket",
NetAddressKey(ka.na))
return ka
}
factor *= 1.2
}
}
}
func (a *AddrManager) find(addr *wire.NetAddressV2) *KnownAddress {
return a.addrIndex[NetAddressKey(addr)]
}
// Attempt increases the given address' attempt counter and updates
// the last attempt time.
func (a *AddrManager) Attempt(addr *wire.NetAddressV2) {
a.mtx.Lock()
defer a.mtx.Unlock()
// find address.
// Surely address will be in tried by now?
ka := a.find(addr)
if ka == nil {
return
}
// set last tried time to now
now := time.Now()
ka.mtx.Lock()
ka.attempts++
ka.lastattempt = now
ka.mtx.Unlock()
}
// Connected Marks the given address as currently connected and working at the
// current time. The address must already be known to AddrManager else it will
// be ignored.
func (a *AddrManager) Connected(addr *wire.NetAddressV2) {
a.mtx.Lock()
defer a.mtx.Unlock()
ka := a.find(addr)
if ka == nil {
return
}
// Update the time as long as it has been 20 minutes since last we did
// so.
now := time.Now()
if now.After(ka.na.Timestamp.Add(time.Minute * 20)) {
// ka.na is immutable, so replace it.
naCopy := *ka.na
naCopy.Timestamp = time.Now()
ka.mtx.Lock()
ka.na = &naCopy
ka.mtx.Unlock()
}
}
// Good marks the given address as good. To be called after a successful
// connection and version exchange. If the address is unknown to the address
// manager it will be ignored.
func (a *AddrManager) Good(addr *wire.NetAddressV2) {
a.mtx.Lock()
defer a.mtx.Unlock()
ka := a.find(addr)
if ka == nil {
return
}
// ka.Timestamp is not updated here to avoid leaking information
// about currently connected peers.
now := time.Now()
ka.mtx.Lock()
ka.lastsuccess = now
ka.lastattempt = now
ka.attempts = 0
ka.mtx.Unlock() // tried and refs synchronized via a.mtx
// move to tried set, optionally evicting other addresses if need.
if ka.tried {
return
}
// ok, need to move it to tried.
// remove from all new buckets.
// record one of the buckets in question and call it the `first'
addrKey := NetAddressKey(addr)
oldBucket := -1
for i := range a.addrNew {
// we check for existence so we can record the first one
if _, ok := a.addrNew[i][addrKey]; ok {
delete(a.addrNew[i], addrKey)
ka.refs--
if oldBucket == -1 {
oldBucket = i
}
}
}
a.nNew--
if oldBucket == -1 {
// What? wasn't in a bucket after all.... Panic?
return
}
bucket := a.getTriedBucket(ka.na)
// Room in this tried bucket?
if a.addrTried[bucket].Len() < triedBucketSize {
ka.tried = true
a.addrTried[bucket].PushBack(ka)
a.nTried++
return
}
// No room, we have to evict something else.
entry := a.pickTried(bucket)
rmka := entry.Value.(*KnownAddress)
// First bucket it would have been put in.
newBucket := a.getNewBucket(rmka.na, rmka.srcAddr)
// If no room in the original bucket, we put it in a bucket we just
// freed up a space in.
if len(a.addrNew[newBucket]) >= newBucketSize {
newBucket = oldBucket
}
// replace with ka in list.
ka.tried = true
entry.Value = ka
rmka.tried = false
rmka.refs++
// We don't touch a.nTried here since the number of tried stays the same
// but we decemented new above, raise it again since we're putting
// something back.
a.nNew++
rmkey := NetAddressKey(rmka.na)
log.Tracef("Replacing %s with %s in tried", rmkey, addrKey)
// We made sure there is space here just above.
a.addrNew[newBucket][rmkey] = rmka
}
// SetServices sets the services for the giiven address to the provided value.
func (a *AddrManager) SetServices(addr *wire.NetAddressV2, services wire.ServiceFlag) {
a.mtx.Lock()
defer a.mtx.Unlock()
ka := a.find(addr)
if ka == nil {
return
}
// Update the services if needed.
if ka.na.Services != services {
// ka.na is immutable, so replace it.
naCopy := *ka.na
naCopy.Services = services
ka.mtx.Lock()
ka.na = &naCopy
ka.mtx.Unlock()
}
}
// AddLocalAddress adds na to the list of known local addresses to advertise
// with the given priority.
func (a *AddrManager) AddLocalAddress(na *wire.NetAddressV2, priority AddressPriority) error {
if !IsRoutable(na) {
return fmt.Errorf(
"address %s is not routable", na.Addr.String(),
)
}
a.lamtx.Lock()
defer a.lamtx.Unlock()
key := NetAddressKey(na)
la, ok := a.localAddresses[key]
if !ok || la.score < priority {
if ok {
la.score = priority + 1
} else {
a.localAddresses[key] = &localAddress{
na: na,
score: priority,
}
}
}
return nil
}
// getReachabilityFrom returns the relative reachability of the provided local
// address to the provided remote address.
func getReachabilityFrom(localAddr, remoteAddr *wire.NetAddressV2) int {
const (
Unreachable = 0
Default = iota
Teredo
Ipv6Weak
Ipv4
Ipv6Strong
Private
)
if !IsRoutable(remoteAddr) {
return Unreachable
}
if remoteAddr.IsTorV3() {
if localAddr.IsTorV3() {
return Private
}
lna := localAddr.ToLegacy()
if IsOnionCatTor(lna) {
// Modern v3 clients should not be able to connect to
// deprecated v2 hidden services.
return Unreachable
}
if IsRoutable(localAddr) && IsIPv4(lna) {
return Ipv4
}
return Default
}
// We can't be sure if the remote party can actually connect to this
// address or not.
if localAddr.IsTorV3() {
return Default
}
// Convert the V2 addresses into legacy to access the network
// functions.
remoteLna := remoteAddr.ToLegacy()
localLna := localAddr.ToLegacy()
if IsOnionCatTor(remoteLna) {
if IsOnionCatTor(localLna) {
return Private
}
if IsRoutable(localAddr) && IsIPv4(localLna) {
return Ipv4
}
return Default
}
if IsRFC4380(remoteLna) {
if !IsRoutable(localAddr) {
return Default
}
if IsRFC4380(localLna) {
return Teredo
}
if IsIPv4(localLna) {
return Ipv4
}
return Ipv6Weak
}
if IsIPv4(remoteLna) {
if IsRoutable(localAddr) && IsIPv4(localLna) {
return Ipv4
}
return Unreachable
}
/* ipv6 */
var tunnelled bool
// Is our v6 is tunnelled?
if IsRFC3964(localLna) || IsRFC6052(localLna) || IsRFC6145(localLna) {
tunnelled = true
}
if !IsRoutable(localAddr) {
return Default
}
if IsRFC4380(localLna) {
return Teredo
}
if IsIPv4(localLna) {
return Ipv4
}
if tunnelled {
// only prioritise ipv6 if we aren't tunnelling it.
return Ipv6Weak
}
return Ipv6Strong
}
// GetBestLocalAddress returns the most appropriate local address to use
// for the given remote address.
func (a *AddrManager) GetBestLocalAddress(remoteAddr *wire.NetAddressV2) *wire.NetAddressV2 {
a.lamtx.Lock()
defer a.lamtx.Unlock()
bestreach := 0
var bestscore AddressPriority
var bestAddress *wire.NetAddressV2
for _, la := range a.localAddresses {
reach := getReachabilityFrom(la.na, remoteAddr)
if reach > bestreach ||
(reach == bestreach && la.score > bestscore) {
bestreach = reach
bestscore = la.score
bestAddress = la.na
}
}
if bestAddress != nil {
log.Debugf("Suggesting address %s:%d for %s:%d",
bestAddress.Addr.String(), bestAddress.Port,
remoteAddr.Addr.String(), remoteAddr.Port)
} else {
log.Debugf("No worthy address for %s:%d",
remoteAddr.Addr.String(), remoteAddr.Port)
// Send something unroutable if nothing suitable.
var ip net.IP
if remoteAddr.IsTorV3() {
ip = net.IPv4zero
} else {
remoteLna := remoteAddr.ToLegacy()
if !IsIPv4(remoteLna) && !IsOnionCatTor(remoteLna) {
ip = net.IPv6zero
} else {
ip = net.IPv4zero
}
}
services := wire.SFNodeNetwork | wire.SFNodeWitness | wire.SFNodeBloom
bestAddress = wire.NetAddressV2FromBytes(
time.Now(), services, ip, 0,
)
}
return bestAddress
}
// New returns a new bitcoin address manager.
// Use Start to begin processing asynchronous address updates.
func New(dataDir string, lookupFunc func(string) ([]net.IP, error)) *AddrManager {
am := AddrManager{
peersFile: filepath.Join(dataDir, "peers.json"),
lookupFunc: lookupFunc,
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
quit: make(chan struct{}),
localAddresses: make(map[string]*localAddress),
version: serialisationVersion,
}
am.reset()
return &am
}