PeerGroup first draft

This commit is contained in:
Miron Cuperman (devrandom) 2011-07-06 20:38:38 +00:00
parent 54b44be316
commit cd2f4c655b
7 changed files with 162 additions and 115 deletions

View File

@ -53,20 +53,21 @@ public class NetworkConnection {
* Connect to the given IP address using the port specified as part of the network parameters. Once construction * Connect to the given IP address using the port specified as part of the network parameters. Once construction
* is complete a functioning network channel is set up and running. * is complete a functioning network channel is set up and running.
* *
* @param remoteIp IP address to connect to. IPv6 is not currently supported by BitCoin. * @param peerAddress address to connect to. IPv6 is not currently supported by BitCoin.
* @param params Defines which network to connect to and details of the protocol. * @param params Defines which network to connect to and details of the protocol.
* @param bestHeight How many blocks are in our best chain * @param bestHeight How many blocks are in our best chain
* @param connectTimeout Timeout in milliseconds when initially connecting to peer * @param connectTimeout Timeout in milliseconds when initially connecting to peer
* @throws IOException if there is a network related failure. * @throws IOException if there is a network related failure.
* @throws ProtocolException if the version negotiation failed. * @throws ProtocolException if the version negotiation failed.
*/ */
public NetworkConnection(InetAddress remoteIp, NetworkParameters params, int bestHeight, int connectTimeout) public NetworkConnection(PeerAddress peerAddress, NetworkParameters params, int bestHeight, int connectTimeout)
throws IOException, ProtocolException { throws IOException, ProtocolException {
this.params = params; this.params = params;
this.remoteIp = remoteIp; this.remoteIp = peerAddress.addr;
int port = (peerAddress.port > 0) ? peerAddress.port : params.port;
InetSocketAddress address = new InetSocketAddress(remoteIp, params.port); InetSocketAddress address = new InetSocketAddress(remoteIp, port);
socket = new Socket(); socket = new Socket();
socket.connect(address, connectTimeout); socket.connect(address, connectTimeout);

View File

@ -21,63 +21,90 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.*; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** /**
* A Peer handles the high level communication with a BitCoin node. It requires a NetworkConnection to be set up for * A Peer handles the high level communication with a BitCoin node.
* it. After that it takes ownership of the connection, creates and manages its own thread used for communication *
* with the network. All these threads synchronize on the block chain. * <p>After making the connection with connect(), call run() to start the message handling loop.
*/ */
public class Peer { public class Peer {
private static final Logger log = LoggerFactory.getLogger(Peer.class); private static final Logger log = LoggerFactory.getLogger(Peer.class);
private final NetworkConnection conn; private NetworkConnection conn;
private final NetworkParameters params; private final NetworkParameters params;
private Thread thread; // Whether the peer loop is supposed to be running or not. Set to false during shutdown so the peer loop
// Whether the peer thread is supposed to be running or not. Set to false during shutdown so the peer thread
// knows to quit when the socket goes away. // knows to quit when the socket goes away.
private boolean running; private boolean running;
private final BlockChain blockChain; private final BlockChain blockChain;
// Used to notify clients when the initial block chain download is finished.
private CountDownLatch chainCompletionLatch;
// When we want to download a block or transaction from a peer, the InventoryItem is put here whilst waiting for // When we want to download a block or transaction from a peer, the InventoryItem is put here whilst waiting for
// the response. Synchronized on itself. // the response. Synchronized on itself.
private final List<GetDataFuture<Block>> pendingGetBlockFutures; private final List<GetDataFuture<Block>> pendingGetBlockFutures;
private int bestHeight;
private PeerAddress address;
private List<PeerEventListener> eventListeners;
/** /**
* Construct a peer that handles the given network connection and reads/writes from the given block chain. Note that * Construct a peer that handles the given network connection and reads/writes from the given block chain. Note that
* communication won't occur until you call start(). * communication won't occur until you call connect().
*/ */
public Peer(NetworkParameters params, NetworkConnection conn, BlockChain blockChain) { public Peer(NetworkParameters params, PeerAddress address, int bestHeight, BlockChain blockChain) {
this.conn = conn;
this.params = params; this.params = params;
this.address = address;
this.bestHeight = bestHeight;
this.blockChain = blockChain; this.blockChain = blockChain;
this.pendingGetBlockFutures = new ArrayList<GetDataFuture<Block>>(); this.pendingGetBlockFutures = new ArrayList<GetDataFuture<Block>>();
this.eventListeners = new ArrayList<PeerEventListener>();
} }
/** Starts the background thread that processes messages. */ public synchronized void addEventListener(PeerEventListener listener) {
public void start() { eventListeners.add(listener);
this.thread = new Thread(new Runnable() { }
public void run() {
Peer.this.run(); public synchronized void removeEventListener(PeerEventListener listener) {
} eventListeners.remove(listener);
}); }
synchronized (this) {
running = true; @Override
} public String toString() {
this.thread.setName("Bitcoin peer thread: " + conn.toString()); return "Peer(" + address.addr + ":" + address.port + ")";
this.thread.start();
} }
/** /**
* Runs in the peers network thread and manages communication with the peer. * Connects to the peer.
*/ */
private void run() { public void connect() {
assert Thread.currentThread() == thread; try {
conn = new NetworkConnection(address, params, bestHeight, 60000);
} catch (IOException ex) {
log.error("while trying to open connection", ex);
throw new RuntimeException(ex);
} catch (ProtocolException ex) {
log.error("while trying to negotiate connection", ex);
throw new RuntimeException(ex);
}
}
/**
* Runs in the peers network loop and manages communication with the peer.
*
* <p>connect() must be called first
*/
public void run() {
if (conn == null)
throw new RuntimeException("please call connect() first");
running = true;
try { try {
while (true) { while (true) {
Message m = conn.readMessage(); Message m = conn.readMessage();
@ -97,19 +124,25 @@ public class Peer {
} catch (Exception e) { } catch (Exception e) {
if (e instanceof IOException && !running) { if (e instanceof IOException && !running) {
// This exception was expected because we are tearing down the socket as part of quitting. // This exception was expected because we are tearing down the socket as part of quitting.
log.info("Shutting down peer thread"); log.info("Shutting down peer loop");
} else { } else {
// We caught an unexpected exception. // We caught an unexpected exception.
e.printStackTrace(); e.printStackTrace();
} }
} }
try {
conn.shutdown();
} catch (IOException e) {
// Ignore exceptions on shutdown, socket might be dead
}
synchronized (this) { synchronized (this) {
running = false; running = false;
} }
} }
private void processBlock(Block m) throws IOException { private void processBlock(Block m) throws IOException {
assert Thread.currentThread() == thread;
try { try {
// Was this block requested by getBlock()? // Was this block requested by getBlock()?
synchronized (pendingGetBlockFutures) { synchronized (pendingGetBlockFutures) {
@ -128,12 +161,8 @@ public class Peer {
// This call will synchronize on blockChain. // This call will synchronize on blockChain.
if (blockChain.add(m)) { if (blockChain.add(m)) {
// The block was successfully linked into the chain. Notify the user of our progress. // The block was successfully linked into the chain. Notify the user of our progress.
if (chainCompletionLatch != null) { for (PeerEventListener listener : eventListeners) {
chainCompletionLatch.countDown(); listener.onBlocksDownloaded(this, getPeerBlocksToGet());
if (chainCompletionLatch.getCount() == 0) {
// All blocks fetched, so we don't need this anymore.
chainCompletionLatch = null;
}
} }
} else { } else {
// This block is unconnected - we don't know how to get from it back to the genesis block yet. That // This block is unconnected - we don't know how to get from it back to the genesis block yet. That
@ -147,15 +176,14 @@ public class Peer {
} }
} catch (VerificationException e) { } catch (VerificationException e) {
// We don't want verification failures to kill the thread. // We don't want verification failures to kill the thread.
log.warn("block verification failed", e); log.warn("block verification failed", e);
} catch (ScriptException e) { } catch (ScriptException e) {
// We don't want script failures to kill the thread. // We don't want script failures to kill the thread.
log.warn("script exception", e); log.warn("script exception", e);
} }
} }
private void processInv(InventoryMessage inv) throws IOException { private void processInv(InventoryMessage inv) throws IOException {
assert Thread.currentThread() == thread;
// The peer told us about some blocks or transactions they have. For now we only care about blocks. // The peer told us about some blocks or transactions they have. For now we only care about blocks.
// Note that as we don't actually want to store the entire block chain or even the headers of the block // Note that as we don't actually want to store the entire block chain or even the headers of the block
// chain, we may end up requesting blocks we already requested before. This shouldn't (in theory) happen // chain, we may end up requesting blocks we already requested before. This shouldn't (in theory) happen
@ -256,7 +284,6 @@ public class Peer {
/** Called by the Peer when the result has arrived. Completes the task. */ /** Called by the Peer when the result has arrived. Completes the task. */
void setResult(T result) { void setResult(T result) {
assert Thread.currentThread() == thread; // Called from peer thread.
this.result = result; this.result = result;
// Now release the thread that is waiting. We don't need to synchronize here as the latch establishes // Now release the thread that is waiting. We don't need to synchronize here as the latch establishes
// a memory barrier. // a memory barrier.
@ -318,35 +345,42 @@ public class Peer {
/** /**
* Starts an asynchronous download of the block chain. The chain download is deemed to be complete once we've * Starts an asynchronous download of the block chain. The chain download is deemed to be complete once we've
* downloaded the same number of blocks that the peer advertised having in its version handshake message. * downloaded the same number of blocks that the peer advertised having in its version handshake message.
*
* @return a {@link CountDownLatch} that can be used to track progress and wait for completion.
*/ */
public CountDownLatch startBlockChainDownload() throws IOException { public void startBlockChainDownload() throws IOException {
for (PeerEventListener listener : eventListeners) {
listener.onBlocksDownloaded(this, getPeerBlocksToGet());
}
if (getPeerBlocksToGet() > 0) {
// When we just want as many blocks as possible, we can set the target hash to zero.
blockChainDownload(Sha256Hash.ZERO_HASH);
}
}
/**
* @return the number of blocks to get, based on our chain height and the peer reported height
*/
private int getPeerBlocksToGet() {
// Chain will overflow signed int blocks in ~41,000 years. // Chain will overflow signed int blocks in ~41,000 years.
int chainHeight = (int) conn.getVersionMessage().bestHeight; int chainHeight = (int) conn.getVersionMessage().bestHeight;
if (chainHeight <= 0) { if (chainHeight <= 0) {
// This should not happen because we shouldn't have given the user a Peer that is to another client-mode // This should not happen because we shouldn't have given the user a Peer that is to another client-mode
// node. If that happens it means the user overrode us somewhere. // node. If that happens it means the user overrode us somewhere.
throw new RuntimeException("Peer does not have block chain"); throw new RuntimeException("Peer does not have block chain");
} }
int blocksToGet = chainHeight - blockChain.getChainHead().getHeight(); int blocksToGet = chainHeight - blockChain.getChainHead().getHeight();
chainCompletionLatch = new CountDownLatch(blocksToGet); return blocksToGet;
if (blocksToGet > 0) {
// When we just want as many blocks as possible, we can set the target hash to zero.
blockChainDownload(Sha256Hash.ZERO_HASH);
}
return chainCompletionLatch;
} }
/** /**
* Terminates the network connection and stops the background thread. * Terminates the network connection and stops the message handling loop.
*/ */
public void disconnect() { public void disconnect() {
synchronized (this) { synchronized (this) {
running = false; running = false;
} }
try { try {
// This will cause the background thread to die, but it's really ugly. We must do a better job of this. // This is the correct way to stop an IO bound loop
conn.shutdown(); conn.shutdown();
} catch (IOException e) { } catch (IOException e) {
// Don't care about this. // Don't care about this.

View File

@ -49,6 +49,15 @@ public class PeerAddress extends Message {
this.services = BigInteger.ZERO; this.services = BigInteger.ZERO;
} }
public PeerAddress(InetAddress addr, int port) {
this(addr, port, NetworkParameters.PROTOCOL_VERSION);
}
public PeerAddress(InetAddress addr) {
this(addr, 0);
}
@Override
public void bitcoinSerializeToStream(OutputStream stream) throws IOException { public void bitcoinSerializeToStream(OutputStream stream) throws IOException {
if (protocolVersion >= 31402) { if (protocolVersion >= 31402) {
int secs = (int)(new Date().getTime() / 1000); int secs = (int)(new Date().getTime() / 1000);
@ -71,7 +80,7 @@ public class PeerAddress extends Message {
} }
@Override @Override
protected void parse() throws ProtocolException { protected void parse() {
// Format of a serialized address: // Format of a serialized address:
// uint32 timestamp // uint32 timestamp
// uint64 services (flags determining what the node can do) // uint64 services (flags determining what the node can do)

View File

@ -422,18 +422,19 @@ public class Wallet implements Serializable {
} }
/** /**
* Sends coins to the given address, via the given {@link Peer}. Change is returned to the first key in the wallet. * Sends coins to the given address, via the given {@link PeerGroup}. Change is returned to the first key in the wallet.
* @param to Which address to send coins to. * @param to Which address to send coins to.
* @param nanocoins How many nanocoins to send. You can use Utils.toNanoCoins() to calculate this. * @param nanocoins How many nanocoins to send. You can use Utils.toNanoCoins() to calculate this.
* @return The {@link Transaction} that was created or null if there was insufficient balance to send the coins. * @return The {@link Transaction} that was created or null if there was insufficient balance to send the coins.
* @throws IOException if there was a problem broadcasting the transaction * @throws IOException if there was a problem broadcasting the transaction
*/ */
public synchronized Transaction sendCoins(Peer peer, Address to, BigInteger nanocoins) throws IOException { public synchronized Transaction sendCoins(PeerGroup peerGroup, Address to, BigInteger nanocoins) throws IOException {
Transaction tx = createSend(to, nanocoins); Transaction tx = createSend(to, nanocoins);
if (tx == null) // Not enough money! :-( if (tx == null) // Not enough money! :-(
return null; return null;
peer.broadcastTransaction(tx); if (peerGroup.broadcastTransaction(tx)) {
confirmSend(tx); confirmSend(tx);
}
return tx; return tx;
} }

View File

@ -16,7 +16,18 @@
package com.google.bitcoin.examples; package com.google.bitcoin.examples;
import com.google.bitcoin.core.*; import com.google.bitcoin.core.Address;
import com.google.bitcoin.core.BlockChain;
import com.google.bitcoin.core.ECKey;
import com.google.bitcoin.core.NetworkParameters;
import com.google.bitcoin.core.PeerAddress;
import com.google.bitcoin.core.PeerGroup;
import com.google.bitcoin.core.ScriptException;
import com.google.bitcoin.core.Transaction;
import com.google.bitcoin.core.TransactionInput;
import com.google.bitcoin.core.Utils;
import com.google.bitcoin.core.Wallet;
import com.google.bitcoin.core.WalletEventListener;
import com.google.bitcoin.store.BlockStore; import com.google.bitcoin.store.BlockStore;
import com.google.bitcoin.store.BoundedOverheadBlockStore; import com.google.bitcoin.store.BoundedOverheadBlockStore;
@ -24,8 +35,6 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.math.BigInteger; import java.math.BigInteger;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/** /**
* <p> * <p>
@ -75,14 +84,15 @@ public class PingService {
// Connect to the localhost node. One minute timeout since we won't try any other peers // Connect to the localhost node. One minute timeout since we won't try any other peers
System.out.println("Connecting ..."); System.out.println("Connecting ...");
NetworkConnection conn = new NetworkConnection(InetAddress.getLocalHost(), params,
blockStore.getChainHead().getHeight(), 60000);
BlockChain chain = new BlockChain(params, wallet, blockStore); BlockChain chain = new BlockChain(params, wallet, blockStore);
final Peer peer = new Peer(params, conn, chain);
peer.start(); final PeerGroup peerGroup = new PeerGroup(1, blockStore, params, chain);
peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost()));
peerGroup.start();
// We want to know when the balance changes. // We want to know when the balance changes.
wallet.addEventListener(new WalletEventListener() { wallet.addEventListener(new WalletEventListener() {
@Override
public void onCoinsReceived(Wallet w, Transaction tx, BigInteger prevBalance, BigInteger newBalance) { public void onCoinsReceived(Wallet w, Transaction tx, BigInteger prevBalance, BigInteger newBalance) {
// Running on a peer thread. // Running on a peer thread.
assert !newBalance.equals(BigInteger.ZERO); assert !newBalance.equals(BigInteger.ZERO);
@ -95,7 +105,7 @@ public class PingService {
BigInteger value = tx.getValueSentToMe(w); BigInteger value = tx.getValueSentToMe(w);
System.out.println("Received " + Utils.bitcoinValueToFriendlyString(value) + " from " + from.toString()); System.out.println("Received " + Utils.bitcoinValueToFriendlyString(value) + " from " + from.toString());
// Now send the coins back! // Now send the coins back!
Transaction sendTx = w.sendCoins(peer, from, value); Transaction sendTx = w.sendCoins(peerGroup, from, value);
assert sendTx != null; // We should never try to send more coins than we have! assert sendTx != null; // We should never try to send more coins than we have!
System.out.println("Sent coins back! Transaction hash is " + sendTx.getHashAsString()); System.out.println("Sent coins back! Transaction hash is " + sendTx.getHashAsString());
w.saveToFile(walletFile); w.saveToFile(walletFile);
@ -110,24 +120,11 @@ public class PingService {
} }
}); });
CountDownLatch progress = peer.startBlockChainDownload(); final DownloadListener listener = new DownloadListener();
long max = progress.getCount(); // Racy but no big deal. peerGroup.startBlockChainDownload(listener);
if (max > 0) { listener.await();
System.out.println("Downloading block chain. " + (max > 1000 ? "This may take a while." : ""));
long current = max;
int lastPercent = 0;
while (current > 0) {
double pct = 100.0 - (100.0 * (current / (double) max));
if ((int)pct != lastPercent) {
System.out.println(String.format("Chain download %d%% done", (int) pct));
lastPercent = (int) pct;
}
progress.await(1, TimeUnit.SECONDS);
current = progress.getCount();
}
}
System.out.println("Send coins to: " + key.toAddress(params).toString()); System.out.println("Send coins to: " + key.toAddress(params).toString());
System.out.println("Waiting for coins to arrive. Press Ctrl-C to quit."); System.out.println("Waiting for coins to arrive. Press Ctrl-C to quit.");
// The peer thread keeps us alive until something kills the process. // The PeerGroup thread keeps us alive until something kills the process.
} }
} }

View File

@ -55,18 +55,24 @@ public class PrivateKeys {
wallet.addKey(key); wallet.addKey(key);
// Find the transactions that involve those coins. // Find the transactions that involve those coins.
NetworkConnection conn = new NetworkConnection(InetAddress.getLocalHost(), params, 0, 60000); final MemoryBlockStore blockStore = new MemoryBlockStore(params);
BlockChain chain = new BlockChain(params, wallet, new MemoryBlockStore(params)); BlockChain chain = new BlockChain(params, wallet, blockStore);
Peer peer = new Peer(params, conn, chain);
peer.start(); final PeerGroup peerGroup = new PeerGroup(1, blockStore, params, chain);
peer.startBlockChainDownload().await(); peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost()));
peerGroup.start();
DownloadListener listener = new DownloadListener();
peerGroup.startBlockChainDownload(listener);
listener.await();
peerGroup.stop();
// And take them! // And take them!
System.out.println("Claiming " + Utils.bitcoinValueToFriendlyString(wallet.getBalance()) + " coins"); System.out.println("Claiming " + Utils.bitcoinValueToFriendlyString(wallet.getBalance()) + " coins");
wallet.sendCoins(peer, destination, wallet.getBalance()); wallet.sendCoins(peerGroup, destination, wallet.getBalance());
// Wait a few seconds to let the packets flush out to the network (ugly). // Wait a few seconds to let the packets flush out to the network (ugly).
Thread.sleep(5000); Thread.sleep(5000);
peer.disconnect(); System.exit(0);
} catch (ArrayIndexOutOfBoundsException e) { } catch (ArrayIndexOutOfBoundsException e) {
System.out.println("First arg should be private key in Base58 format. Second argument should be address " + System.out.println("First arg should be private key in Base58 format. Second argument should be address " +
"to send to."); "to send to.");

View File

@ -16,14 +16,20 @@
package com.google.bitcoin.examples; package com.google.bitcoin.examples;
import com.google.bitcoin.core.*; import com.google.bitcoin.core.BlockChain;
import com.google.bitcoin.store.*; import com.google.bitcoin.core.NetworkConnection;
import com.google.bitcoin.core.NetworkParameters;
import com.google.bitcoin.core.PeerAddress;
import com.google.bitcoin.core.PeerGroup;
import com.google.bitcoin.core.Transaction;
import com.google.bitcoin.core.Wallet;
import com.google.bitcoin.core.WalletEventListener;
import com.google.bitcoin.store.BlockStore;
import com.google.bitcoin.store.MemoryBlockStore;
import java.io.File; import java.io.File;
import java.math.BigInteger; import java.math.BigInteger;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/** /**
* RefreshWallet loads a wallet, then processes the block chain to update the transaction pools within it. * RefreshWallet loads a wallet, then processes the block chain to update the transaction pools within it.
@ -37,13 +43,14 @@ public class RefreshWallet {
// Set up the components and link them together. // Set up the components and link them together.
final NetworkParameters params = NetworkParameters.testNet(); final NetworkParameters params = NetworkParameters.testNet();
BlockStore blockStore = new MemoryBlockStore(params); BlockStore blockStore = new MemoryBlockStore(params);
NetworkConnection conn = new NetworkConnection(InetAddress.getLocalHost(), params,
blockStore.getChainHead().getHeight(), 60000);
BlockChain chain = new BlockChain(params, wallet, blockStore); BlockChain chain = new BlockChain(params, wallet, blockStore);
Peer peer = new Peer(params, conn, chain);
peer.start(); final PeerGroup peerGroup = new PeerGroup(1, blockStore, params, chain);
peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost()));
peerGroup.start();
wallet.addEventListener(new WalletEventListener() { wallet.addEventListener(new WalletEventListener() {
@Override
public void onCoinsReceived(Wallet w, Transaction tx, BigInteger prevBalance, BigInteger newBalance) { public void onCoinsReceived(Wallet w, Transaction tx, BigInteger prevBalance, BigInteger newBalance) {
System.out.println("\nReceived tx " + tx.getHashAsString()); System.out.println("\nReceived tx " + tx.getHashAsString());
System.out.println(tx.toString()); System.out.println(tx.toString());
@ -51,19 +58,11 @@ public class RefreshWallet {
}); });
// Now download and process the block chain. // Now download and process the block chain.
CountDownLatch progress = peer.startBlockChainDownload(); DownloadListener listener = new DownloadListener();
long max = progress.getCount(); // Racy but no big deal. peerGroup.startBlockChainDownload(listener);
if (max > 0) { listener.await();
System.out.println("Downloading block chain. " + (max > 1000 ? "This may take a while." : ""));
long current = max; peerGroup.stop();
while (current > 0) {
double pct = 100.0 - (100.0 * (current / (double) max));
System.out.println(String.format("Chain download %d%% done", (int) pct));
progress.await(1, TimeUnit.SECONDS);
current = progress.getCount();
}
}
peer.disconnect();
wallet.saveToFile(file); wallet.saveToFile(file);
System.out.println("\nDone!\n"); System.out.println("\nDone!\n");
System.out.println(wallet.toString()); System.out.println(wallet.toString());