diff --git a/src/com/google/bitcoin/core/NetworkConnection.java b/src/com/google/bitcoin/core/NetworkConnection.java index 54f649baf..2c0646f04 100644 --- a/src/com/google/bitcoin/core/NetworkConnection.java +++ b/src/com/google/bitcoin/core/NetworkConnection.java @@ -53,20 +53,21 @@ public class NetworkConnection { * Connect to the given IP address using the port specified as part of the network parameters. Once construction * is complete a functioning network channel is set up and running. * - * @param remoteIp IP address to connect to. IPv6 is not currently supported by BitCoin. + * @param peerAddress address to connect to. IPv6 is not currently supported by BitCoin. * @param params Defines which network to connect to and details of the protocol. * @param bestHeight How many blocks are in our best chain * @param connectTimeout Timeout in milliseconds when initially connecting to peer * @throws IOException if there is a network related failure. * @throws ProtocolException if the version negotiation failed. */ - public NetworkConnection(InetAddress remoteIp, NetworkParameters params, int bestHeight, int connectTimeout) + public NetworkConnection(PeerAddress peerAddress, NetworkParameters params, int bestHeight, int connectTimeout) throws IOException, ProtocolException { this.params = params; - this.remoteIp = remoteIp; + this.remoteIp = peerAddress.addr; + int port = (peerAddress.port > 0) ? peerAddress.port : params.port; - InetSocketAddress address = new InetSocketAddress(remoteIp, params.port); + InetSocketAddress address = new InetSocketAddress(remoteIp, port); socket = new Socket(); socket.connect(address, connectTimeout); diff --git a/src/com/google/bitcoin/core/Peer.java b/src/com/google/bitcoin/core/Peer.java index a8a444174..bd1ea186d 100644 --- a/src/com/google/bitcoin/core/Peer.java +++ b/src/com/google/bitcoin/core/Peer.java @@ -21,63 +21,90 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** - * A Peer handles the high level communication with a BitCoin node. It requires a NetworkConnection to be set up for - * it. After that it takes ownership of the connection, creates and manages its own thread used for communication - * with the network. All these threads synchronize on the block chain. + * A Peer handles the high level communication with a BitCoin node. + * + *

After making the connection with connect(), call run() to start the message handling loop. */ public class Peer { - private static final Logger log = LoggerFactory.getLogger(Peer.class); + private static final Logger log = LoggerFactory.getLogger(Peer.class); - private final NetworkConnection conn; + private NetworkConnection conn; private final NetworkParameters params; - private Thread thread; - // Whether the peer thread is supposed to be running or not. Set to false during shutdown so the peer thread + // Whether the peer loop is supposed to be running or not. Set to false during shutdown so the peer loop // knows to quit when the socket goes away. private boolean running; private final BlockChain blockChain; - // Used to notify clients when the initial block chain download is finished. - private CountDownLatch chainCompletionLatch; // When we want to download a block or transaction from a peer, the InventoryItem is put here whilst waiting for // the response. Synchronized on itself. private final List> pendingGetBlockFutures; + private int bestHeight; + + private PeerAddress address; + + private List eventListeners; + /** * Construct a peer that handles the given network connection and reads/writes from the given block chain. Note that - * communication won't occur until you call start(). + * communication won't occur until you call connect(). */ - public Peer(NetworkParameters params, NetworkConnection conn, BlockChain blockChain) { - this.conn = conn; + public Peer(NetworkParameters params, PeerAddress address, int bestHeight, BlockChain blockChain) { this.params = params; + this.address = address; + this.bestHeight = bestHeight; this.blockChain = blockChain; this.pendingGetBlockFutures = new ArrayList>(); + this.eventListeners = new ArrayList(); } - /** Starts the background thread that processes messages. */ - public void start() { - this.thread = new Thread(new Runnable() { - public void run() { - Peer.this.run(); - } - }); - synchronized (this) { - running = true; - } - this.thread.setName("Bitcoin peer thread: " + conn.toString()); - this.thread.start(); + public synchronized void addEventListener(PeerEventListener listener) { + eventListeners.add(listener); + } + + public synchronized void removeEventListener(PeerEventListener listener) { + eventListeners.remove(listener); + } + + @Override + public String toString() { + return "Peer(" + address.addr + ":" + address.port + ")"; } /** - * Runs in the peers network thread and manages communication with the peer. + * Connects to the peer. */ - private void run() { - assert Thread.currentThread() == thread; + public void connect() { + try { + conn = new NetworkConnection(address, params, bestHeight, 60000); + } catch (IOException ex) { + log.error("while trying to open connection", ex); + throw new RuntimeException(ex); + } catch (ProtocolException ex) { + log.error("while trying to negotiate connection", ex); + throw new RuntimeException(ex); + } + } + + /** + * Runs in the peers network loop and manages communication with the peer. + * + *

connect() must be called first + */ + public void run() { + if (conn == null) + throw new RuntimeException("please call connect() first"); + + running = true; try { while (true) { Message m = conn.readMessage(); @@ -97,19 +124,25 @@ public class Peer { } catch (Exception e) { if (e instanceof IOException && !running) { // This exception was expected because we are tearing down the socket as part of quitting. - log.info("Shutting down peer thread"); + log.info("Shutting down peer loop"); } else { // We caught an unexpected exception. e.printStackTrace(); } } + + try { + conn.shutdown(); + } catch (IOException e) { + // Ignore exceptions on shutdown, socket might be dead + } + synchronized (this) { running = false; } } private void processBlock(Block m) throws IOException { - assert Thread.currentThread() == thread; try { // Was this block requested by getBlock()? synchronized (pendingGetBlockFutures) { @@ -128,12 +161,8 @@ public class Peer { // This call will synchronize on blockChain. if (blockChain.add(m)) { // The block was successfully linked into the chain. Notify the user of our progress. - if (chainCompletionLatch != null) { - chainCompletionLatch.countDown(); - if (chainCompletionLatch.getCount() == 0) { - // All blocks fetched, so we don't need this anymore. - chainCompletionLatch = null; - } + for (PeerEventListener listener : eventListeners) { + listener.onBlocksDownloaded(this, getPeerBlocksToGet()); } } else { // This block is unconnected - we don't know how to get from it back to the genesis block yet. That @@ -147,15 +176,14 @@ public class Peer { } } catch (VerificationException e) { // We don't want verification failures to kill the thread. - log.warn("block verification failed", e); + log.warn("block verification failed", e); } catch (ScriptException e) { // We don't want script failures to kill the thread. - log.warn("script exception", e); + log.warn("script exception", e); } } private void processInv(InventoryMessage inv) throws IOException { - assert Thread.currentThread() == thread; // The peer told us about some blocks or transactions they have. For now we only care about blocks. // Note that as we don't actually want to store the entire block chain or even the headers of the block // chain, we may end up requesting blocks we already requested before. This shouldn't (in theory) happen @@ -256,7 +284,6 @@ public class Peer { /** Called by the Peer when the result has arrived. Completes the task. */ void setResult(T result) { - assert Thread.currentThread() == thread; // Called from peer thread. this.result = result; // Now release the thread that is waiting. We don't need to synchronize here as the latch establishes // a memory barrier. @@ -318,35 +345,42 @@ public class Peer { /** * Starts an asynchronous download of the block chain. The chain download is deemed to be complete once we've * downloaded the same number of blocks that the peer advertised having in its version handshake message. - * - * @return a {@link CountDownLatch} that can be used to track progress and wait for completion. */ - public CountDownLatch startBlockChainDownload() throws IOException { + public void startBlockChainDownload() throws IOException { + for (PeerEventListener listener : eventListeners) { + listener.onBlocksDownloaded(this, getPeerBlocksToGet()); + } + + if (getPeerBlocksToGet() > 0) { + // When we just want as many blocks as possible, we can set the target hash to zero. + blockChainDownload(Sha256Hash.ZERO_HASH); + } + } + + /** + * @return the number of blocks to get, based on our chain height and the peer reported height + */ + private int getPeerBlocksToGet() { // Chain will overflow signed int blocks in ~41,000 years. int chainHeight = (int) conn.getVersionMessage().bestHeight; if (chainHeight <= 0) { // This should not happen because we shouldn't have given the user a Peer that is to another client-mode // node. If that happens it means the user overrode us somewhere. - throw new RuntimeException("Peer does not have block chain"); + throw new RuntimeException("Peer does not have block chain"); } int blocksToGet = chainHeight - blockChain.getChainHead().getHeight(); - chainCompletionLatch = new CountDownLatch(blocksToGet); - if (blocksToGet > 0) { - // When we just want as many blocks as possible, we can set the target hash to zero. - blockChainDownload(Sha256Hash.ZERO_HASH); - } - return chainCompletionLatch; + return blocksToGet; } /** - * Terminates the network connection and stops the background thread. + * Terminates the network connection and stops the message handling loop. */ public void disconnect() { synchronized (this) { running = false; } try { - // This will cause the background thread to die, but it's really ugly. We must do a better job of this. + // This is the correct way to stop an IO bound loop conn.shutdown(); } catch (IOException e) { // Don't care about this. diff --git a/src/com/google/bitcoin/core/PeerAddress.java b/src/com/google/bitcoin/core/PeerAddress.java index d83a825b6..cc3a2e77f 100644 --- a/src/com/google/bitcoin/core/PeerAddress.java +++ b/src/com/google/bitcoin/core/PeerAddress.java @@ -49,6 +49,15 @@ public class PeerAddress extends Message { this.services = BigInteger.ZERO; } + public PeerAddress(InetAddress addr, int port) { + this(addr, port, NetworkParameters.PROTOCOL_VERSION); + } + + public PeerAddress(InetAddress addr) { + this(addr, 0); + } + + @Override public void bitcoinSerializeToStream(OutputStream stream) throws IOException { if (protocolVersion >= 31402) { int secs = (int)(new Date().getTime() / 1000); @@ -71,7 +80,7 @@ public class PeerAddress extends Message { } @Override - protected void parse() throws ProtocolException { + protected void parse() { // Format of a serialized address: // uint32 timestamp // uint64 services (flags determining what the node can do) diff --git a/src/com/google/bitcoin/core/Wallet.java b/src/com/google/bitcoin/core/Wallet.java index fcdeece54..fe5f945ef 100644 --- a/src/com/google/bitcoin/core/Wallet.java +++ b/src/com/google/bitcoin/core/Wallet.java @@ -422,18 +422,19 @@ public class Wallet implements Serializable { } /** - * Sends coins to the given address, via the given {@link Peer}. Change is returned to the first key in the wallet. + * Sends coins to the given address, via the given {@link PeerGroup}. Change is returned to the first key in the wallet. * @param to Which address to send coins to. * @param nanocoins How many nanocoins to send. You can use Utils.toNanoCoins() to calculate this. * @return The {@link Transaction} that was created or null if there was insufficient balance to send the coins. * @throws IOException if there was a problem broadcasting the transaction */ - public synchronized Transaction sendCoins(Peer peer, Address to, BigInteger nanocoins) throws IOException { + public synchronized Transaction sendCoins(PeerGroup peerGroup, Address to, BigInteger nanocoins) throws IOException { Transaction tx = createSend(to, nanocoins); if (tx == null) // Not enough money! :-( return null; - peer.broadcastTransaction(tx); - confirmSend(tx); + if (peerGroup.broadcastTransaction(tx)) { + confirmSend(tx); + } return tx; } diff --git a/src/com/google/bitcoin/examples/PingService.java b/src/com/google/bitcoin/examples/PingService.java index f71ae6ef1..6a375d39b 100644 --- a/src/com/google/bitcoin/examples/PingService.java +++ b/src/com/google/bitcoin/examples/PingService.java @@ -16,7 +16,18 @@ package com.google.bitcoin.examples; -import com.google.bitcoin.core.*; +import com.google.bitcoin.core.Address; +import com.google.bitcoin.core.BlockChain; +import com.google.bitcoin.core.ECKey; +import com.google.bitcoin.core.NetworkParameters; +import com.google.bitcoin.core.PeerAddress; +import com.google.bitcoin.core.PeerGroup; +import com.google.bitcoin.core.ScriptException; +import com.google.bitcoin.core.Transaction; +import com.google.bitcoin.core.TransactionInput; +import com.google.bitcoin.core.Utils; +import com.google.bitcoin.core.Wallet; +import com.google.bitcoin.core.WalletEventListener; import com.google.bitcoin.store.BlockStore; import com.google.bitcoin.store.BoundedOverheadBlockStore; @@ -24,8 +35,6 @@ import java.io.File; import java.io.IOException; import java.math.BigInteger; import java.net.InetAddress; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; /** *

@@ -75,14 +84,15 @@ public class PingService { // Connect to the localhost node. One minute timeout since we won't try any other peers System.out.println("Connecting ..."); - NetworkConnection conn = new NetworkConnection(InetAddress.getLocalHost(), params, - blockStore.getChainHead().getHeight(), 60000); BlockChain chain = new BlockChain(params, wallet, blockStore); - final Peer peer = new Peer(params, conn, chain); - peer.start(); + + final PeerGroup peerGroup = new PeerGroup(1, blockStore, params, chain); + peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost())); + peerGroup.start(); // We want to know when the balance changes. wallet.addEventListener(new WalletEventListener() { + @Override public void onCoinsReceived(Wallet w, Transaction tx, BigInteger prevBalance, BigInteger newBalance) { // Running on a peer thread. assert !newBalance.equals(BigInteger.ZERO); @@ -95,7 +105,7 @@ public class PingService { BigInteger value = tx.getValueSentToMe(w); System.out.println("Received " + Utils.bitcoinValueToFriendlyString(value) + " from " + from.toString()); // Now send the coins back! - Transaction sendTx = w.sendCoins(peer, from, value); + Transaction sendTx = w.sendCoins(peerGroup, from, value); assert sendTx != null; // We should never try to send more coins than we have! System.out.println("Sent coins back! Transaction hash is " + sendTx.getHashAsString()); w.saveToFile(walletFile); @@ -110,24 +120,11 @@ public class PingService { } }); - CountDownLatch progress = peer.startBlockChainDownload(); - long max = progress.getCount(); // Racy but no big deal. - if (max > 0) { - System.out.println("Downloading block chain. " + (max > 1000 ? "This may take a while." : "")); - long current = max; - int lastPercent = 0; - while (current > 0) { - double pct = 100.0 - (100.0 * (current / (double) max)); - if ((int)pct != lastPercent) { - System.out.println(String.format("Chain download %d%% done", (int) pct)); - lastPercent = (int) pct; - } - progress.await(1, TimeUnit.SECONDS); - current = progress.getCount(); - } - } + final DownloadListener listener = new DownloadListener(); + peerGroup.startBlockChainDownload(listener); + listener.await(); System.out.println("Send coins to: " + key.toAddress(params).toString()); System.out.println("Waiting for coins to arrive. Press Ctrl-C to quit."); - // The peer thread keeps us alive until something kills the process. + // The PeerGroup thread keeps us alive until something kills the process. } } diff --git a/src/com/google/bitcoin/examples/PrivateKeys.java b/src/com/google/bitcoin/examples/PrivateKeys.java index 9bda78963..9ada27caa 100644 --- a/src/com/google/bitcoin/examples/PrivateKeys.java +++ b/src/com/google/bitcoin/examples/PrivateKeys.java @@ -55,18 +55,24 @@ public class PrivateKeys { wallet.addKey(key); // Find the transactions that involve those coins. - NetworkConnection conn = new NetworkConnection(InetAddress.getLocalHost(), params, 0, 60000); - BlockChain chain = new BlockChain(params, wallet, new MemoryBlockStore(params)); - Peer peer = new Peer(params, conn, chain); - peer.start(); - peer.startBlockChainDownload().await(); + final MemoryBlockStore blockStore = new MemoryBlockStore(params); + BlockChain chain = new BlockChain(params, wallet, blockStore); + + final PeerGroup peerGroup = new PeerGroup(1, blockStore, params, chain); + peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost())); + peerGroup.start(); + DownloadListener listener = new DownloadListener(); + peerGroup.startBlockChainDownload(listener); + listener.await(); + + peerGroup.stop(); // And take them! System.out.println("Claiming " + Utils.bitcoinValueToFriendlyString(wallet.getBalance()) + " coins"); - wallet.sendCoins(peer, destination, wallet.getBalance()); + wallet.sendCoins(peerGroup, destination, wallet.getBalance()); // Wait a few seconds to let the packets flush out to the network (ugly). Thread.sleep(5000); - peer.disconnect(); + System.exit(0); } catch (ArrayIndexOutOfBoundsException e) { System.out.println("First arg should be private key in Base58 format. Second argument should be address " + "to send to."); diff --git a/src/com/google/bitcoin/examples/RefreshWallet.java b/src/com/google/bitcoin/examples/RefreshWallet.java index 52905f9af..a16908c10 100644 --- a/src/com/google/bitcoin/examples/RefreshWallet.java +++ b/src/com/google/bitcoin/examples/RefreshWallet.java @@ -16,14 +16,20 @@ package com.google.bitcoin.examples; -import com.google.bitcoin.core.*; -import com.google.bitcoin.store.*; +import com.google.bitcoin.core.BlockChain; +import com.google.bitcoin.core.NetworkConnection; +import com.google.bitcoin.core.NetworkParameters; +import com.google.bitcoin.core.PeerAddress; +import com.google.bitcoin.core.PeerGroup; +import com.google.bitcoin.core.Transaction; +import com.google.bitcoin.core.Wallet; +import com.google.bitcoin.core.WalletEventListener; +import com.google.bitcoin.store.BlockStore; +import com.google.bitcoin.store.MemoryBlockStore; import java.io.File; import java.math.BigInteger; import java.net.InetAddress; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; /** * RefreshWallet loads a wallet, then processes the block chain to update the transaction pools within it. @@ -37,13 +43,14 @@ public class RefreshWallet { // Set up the components and link them together. final NetworkParameters params = NetworkParameters.testNet(); BlockStore blockStore = new MemoryBlockStore(params); - NetworkConnection conn = new NetworkConnection(InetAddress.getLocalHost(), params, - blockStore.getChainHead().getHeight(), 60000); BlockChain chain = new BlockChain(params, wallet, blockStore); - Peer peer = new Peer(params, conn, chain); - peer.start(); + + final PeerGroup peerGroup = new PeerGroup(1, blockStore, params, chain); + peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost())); + peerGroup.start(); wallet.addEventListener(new WalletEventListener() { + @Override public void onCoinsReceived(Wallet w, Transaction tx, BigInteger prevBalance, BigInteger newBalance) { System.out.println("\nReceived tx " + tx.getHashAsString()); System.out.println(tx.toString()); @@ -51,19 +58,11 @@ public class RefreshWallet { }); // Now download and process the block chain. - CountDownLatch progress = peer.startBlockChainDownload(); - long max = progress.getCount(); // Racy but no big deal. - if (max > 0) { - System.out.println("Downloading block chain. " + (max > 1000 ? "This may take a while." : "")); - long current = max; - while (current > 0) { - double pct = 100.0 - (100.0 * (current / (double) max)); - System.out.println(String.format("Chain download %d%% done", (int) pct)); - progress.await(1, TimeUnit.SECONDS); - current = progress.getCount(); - } - } - peer.disconnect(); + DownloadListener listener = new DownloadListener(); + peerGroup.startBlockChainDownload(listener); + listener.await(); + + peerGroup.stop(); wallet.saveToFile(file); System.out.println("\nDone!\n"); System.out.println(wallet.toString());