diff --git a/src/com/google/bitcoin/core/DownloadListener.java b/src/com/google/bitcoin/core/DownloadListener.java new file mode 100644 index 000000000..b31957b6f --- /dev/null +++ b/src/com/google/bitcoin/core/DownloadListener.java @@ -0,0 +1,96 @@ +/** + * Copyright 2011 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.bitcoin.core; + +import com.google.bitcoin.core.AbstractPeerEventListener; +import com.google.bitcoin.core.Block; +import com.google.bitcoin.core.Peer; + +import java.util.concurrent.Semaphore; + +/** + * Listen to chain download events and print useful informational messages. + * + *

progress, startDownload, doneDownload maybe be overridden to change the way the user + * is notified. + * + *

Methods are called with the event listener object locked so your + * implementation does not have to be thread safe. + * + * @author miron@google.com (Miron Cuperman a.k.a. devrandom) + * + */ +public class DownloadListener extends AbstractPeerEventListener { + private int originalBlocksLeft = -1; + private int lastPercent = -1; + Semaphore done = new Semaphore(0); + + @Override + public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft) { + if (blocksLeft == 0) { + doneDownload(); + done.release(); + } + + if (blocksLeft <= 0) + return; + + if (originalBlocksLeft < 0) { + startDownload(blocksLeft); + originalBlocksLeft = blocksLeft; + } + + double pct = 100.0 - (100.0 * (blocksLeft / (double) originalBlocksLeft)); + if ((int)pct != lastPercent) { + progress(pct); + lastPercent = (int)pct; + } + } + + /** + * Called when download progress is made. + * + * @param pct the percentage of chain downloaded, estimated + */ + protected void progress(double pct) { + System.out.println(String.format("Chain download %d%% done", (int) pct)); + } + + /** + * Called when download is initiated. + * + * @param blocks the number of blocks to download, estimated + */ + protected void startDownload(int blocks) { + System.out.println("Downloading block chain of size " + blocks + ". " + + (lastPercent > 1000 ? "This may take a while." : "")); + } + + /** + * Called when we are done downloading the block chain. + */ + protected void doneDownload() { + System.out.println("Done downloading block chain"); + } + + /** + * Wait for the chain to be downloaded. + */ + public void await() throws InterruptedException { + done.acquire(); + } +} \ No newline at end of file diff --git a/src/com/google/bitcoin/core/NetworkConnection.java b/src/com/google/bitcoin/core/NetworkConnection.java index 2c0646f04..c52adcc31 100644 --- a/src/com/google/bitcoin/core/NetworkConnection.java +++ b/src/com/google/bitcoin/core/NetworkConnection.java @@ -53,7 +53,8 @@ public class NetworkConnection { * Connect to the given IP address using the port specified as part of the network parameters. Once construction * is complete a functioning network channel is set up and running. * - * @param peerAddress address to connect to. IPv6 is not currently supported by BitCoin. + * @param peerAddress address to connect to. IPv6 is not currently supported by BitCoin. If + * port is not positive the default port from params is used. * @param params Defines which network to connect to and details of the protocol. * @param bestHeight How many blocks are in our best chain * @param connectTimeout Timeout in milliseconds when initially connecting to peer @@ -106,6 +107,11 @@ public class NetworkConnection { // Handshake is done! } + public NetworkConnection(InetAddress inetAddress, NetworkParameters params, int bestHeight, int connectTimeout) + throws IOException, ProtocolException { + this(new PeerAddress(inetAddress), params, bestHeight, connectTimeout); + } + /** * Sends a "ping" message to the remote node. The protocol doesn't presently use this feature much. * @throws IOException diff --git a/src/com/google/bitcoin/core/Peer.java b/src/com/google/bitcoin/core/Peer.java index bd1ea186d..26abb7304 100644 --- a/src/com/google/bitcoin/core/Peer.java +++ b/src/com/google/bitcoin/core/Peer.java @@ -57,6 +57,8 @@ public class Peer { /** * Construct a peer that handles the given network connection and reads/writes from the given block chain. Note that * communication won't occur until you call connect(). + * + * @param bestHeight our current best chain height, to facilitate downloading */ public Peer(NetworkParameters params, PeerAddress address, int bestHeight, BlockChain blockChain) { this.params = params; @@ -101,6 +103,7 @@ public class Peer { *

connect() must be called first */ public void run() { + // This should be called in the network loop thread for this peer if (conn == null) throw new RuntimeException("please call connect() first"); @@ -143,6 +146,7 @@ public class Peer { } private void processBlock(Block m) throws IOException { + // This should called in the network loop thread for this peer try { // Was this block requested by getBlock()? synchronized (pendingGetBlockFutures) { @@ -162,7 +166,9 @@ public class Peer { if (blockChain.add(m)) { // The block was successfully linked into the chain. Notify the user of our progress. for (PeerEventListener listener : eventListeners) { - listener.onBlocksDownloaded(this, getPeerBlocksToGet()); + synchronized (listener) { + listener.onBlocksDownloaded(this, m, getPeerBlocksToGet()); + } } } else { // This block is unconnected - we don't know how to get from it back to the genesis block yet. That @@ -176,14 +182,16 @@ public class Peer { } } catch (VerificationException e) { // We don't want verification failures to kill the thread. - log.warn("block verification failed", e); + log.warn("Block verification failed", e); } catch (ScriptException e) { // We don't want script failures to kill the thread. - log.warn("script exception", e); + log.warn("Script exception", e); } } private void processInv(InventoryMessage inv) throws IOException { + // This should be called in the network loop thread for this peer + // The peer told us about some blocks or transactions they have. For now we only care about blocks. // Note that as we don't actually want to store the entire block chain or even the headers of the block // chain, we may end up requesting blocks we already requested before. This shouldn't (in theory) happen @@ -284,6 +292,7 @@ public class Peer { /** Called by the Peer when the result has arrived. Completes the task. */ void setResult(T result) { + // This should be called in the network loop thread for this peer this.result = result; // Now release the thread that is waiting. We don't need to synchronize here as the latch establishes // a memory barrier. @@ -348,7 +357,9 @@ public class Peer { */ public void startBlockChainDownload() throws IOException { for (PeerEventListener listener : eventListeners) { - listener.onBlocksDownloaded(this, getPeerBlocksToGet()); + synchronized (listener) { + listener.onBlocksDownloaded(this, null, getPeerBlocksToGet()); + } } if (getPeerBlocksToGet() > 0) { diff --git a/src/com/google/bitcoin/core/PeerAddress.java b/src/com/google/bitcoin/core/PeerAddress.java index cc3a2e77f..491c4b759 100644 --- a/src/com/google/bitcoin/core/PeerAddress.java +++ b/src/com/google/bitcoin/core/PeerAddress.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.OutputStream; import java.math.BigInteger; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Date; @@ -38,10 +39,16 @@ public class PeerAddress extends Message { BigInteger services; long time; + /** + * Construct a peer address from a serialized payload. + */ public PeerAddress(NetworkParameters params, byte[] payload, int offset, int protocolVersion) throws ProtocolException { super(params, payload, offset, protocolVersion); } + /** + * Construct a peer address from a memorized or hardcoded address. + */ public PeerAddress(InetAddress addr, int port, int protocolVersion) { this.addr = addr; this.port = port; @@ -57,6 +64,10 @@ public class PeerAddress extends Message { this(addr, 0); } + public PeerAddress(InetSocketAddress addr) { + this(addr.getAddress(), addr.getPort()); + } + @Override public void bitcoinSerializeToStream(OutputStream stream) throws IOException { if (protocolVersion >= 31402) { diff --git a/src/com/google/bitcoin/core/PeerEventListener.java b/src/com/google/bitcoin/core/PeerEventListener.java index 9921916cb..152fcb1e8 100644 --- a/src/com/google/bitcoin/core/PeerEventListener.java +++ b/src/com/google/bitcoin/core/PeerEventListener.java @@ -18,14 +18,24 @@ package com.google.bitcoin.core; /** * Implementing a PeerEventListener allows you to learn when significant Peer communication - * has occurred. + * has occurred. + * + *

Methods are called with the event listener object locked so your + * implementation does not have to be thread safe. + * + * @author miron@google.com (Miron Cuperman a.k.a devrandom) + * */ public interface PeerEventListener { /** - * This is called on a Peer thread when a block is received. + * This is called on a Peer thread when a block is received. It is also called when a download + * is started with the initial number of blocks to be downloaded. + * + *

The block may have transactions or may be a header only once getheaders is implemented * - * @param peer The peer receiving the block - * @param blocksLeft The number of blocks left to download + * @param peer the peer receiving the block + * @param block the downloaded block, or null if this is the initial callback + * @param blocksLeft the number of blocks left to download */ - public void onBlocksDownloaded(Peer peer, int blocksLeft); + public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft); } diff --git a/src/com/google/bitcoin/core/PeerGroup.java b/src/com/google/bitcoin/core/PeerGroup.java index cc17b3543..ea6c42441 100644 --- a/src/com/google/bitcoin/core/PeerGroup.java +++ b/src/com/google/bitcoin/core/PeerGroup.java @@ -17,6 +17,8 @@ package com.google.bitcoin.core; +import com.google.bitcoin.discovery.PeerDiscovery; +import com.google.bitcoin.discovery.PeerDiscoveryException; import com.google.bitcoin.store.BlockStore; import com.google.bitcoin.store.BlockStoreException; @@ -24,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -36,6 +39,18 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** + * Maintain a number of connections to peers. + * + *

PeerGroup tries to maintain a constant number of connections to a set of distinct peers. + * Each peer runs a network listener in its own thread. When a connection is lost, a new peer + * will be tried after a delay as long as the number of connections less than the maximum. + * + *

Connections are made to addresses from a provided list. When that list is exhausted, + * we start again from the head of the list. + * + *

The PeerGroup can broadcast a transaction to the currently connected set of peers. It can + * also handle download of the blockchain from peers, restarting the process when peers die. + * * @author miron@google.com (Miron Cuperman a.k.a devrandom) * */ @@ -48,21 +63,33 @@ public class PeerGroup { // Maximum number of connections this peerGroup will make private int maxConnections; + // Addresses to try to connect to, excluding active peers private BlockingQueue inactives; + // Connection initiation thread private Thread thread; + // True if the connection initiation thread should be running private boolean running; + // A pool of threads for peers, of size maxConnection private ThreadPoolExecutor executor; + // Currently active peers + private Set peers; + // The peer we are currently downloading the chain from + private Peer downloadPeer; + // Callback for events related to chain download + private PeerEventListener downloadListener; + private NetworkParameters params; private BlockStore blockStore; private BlockChain chain; - private Set peers; - private Peer downloadPeer; - - private PeerEventListener downloadListener; /** + * Create a PeerGroup + * + * @param maxConnections the maximum number of peer connections that this group will try to make. + * Depending on the environment, this is normally between 1 and 10. */ - public PeerGroup(int maxConnections, BlockStore blockStore, NetworkParameters params, BlockChain chain) { + public PeerGroup(int maxConnections, BlockStore blockStore, NetworkParameters params, + BlockChain chain) { this.maxConnections = maxConnections; this.blockStore = blockStore; this.params = params; @@ -79,9 +106,26 @@ public class PeerGroup { /** Add an address to the list of potential peers to connect to */ public void addAddress(PeerAddress peerAddress) { + // TODO(miron) consider deduplication inactives.add(peerAddress); } + /** Add addresses from a discovery source to the list of potential peers to connect to */ + public void addPeerDiscovery(PeerDiscovery peerDiscovery) { + // TODO(miron) consider remembering the discovery source and retrying occasionally + InetSocketAddress[] addresses; + try { + addresses = peerDiscovery.getPeers(); + } catch (PeerDiscoveryException e) { + log.error("Failed to discover peer addresses from discovery source", e); + return; + } + + for (int i = 0; i < addresses.length; i++) { + inactives.add(new PeerAddress(addresses[i])); + } + } + /** Starts the background thread that makes connections. */ public void start() { this.thread = new Thread(new PeerExecutionRunnable(), "Peer group thread"); @@ -90,7 +134,10 @@ public class PeerGroup { } /** - * Stop this PeerGroup + * Stop this PeerGroup + * + *

The peer group will be asynchronously shut down. After it is shut down + * all peers will be disconnected and no threads will be running. */ public synchronized void stop() { if (running) { @@ -120,6 +167,10 @@ public class PeerGroup { /** * Repeatedly get the next peer address from the inactive queue * and try to connect. + * + *

We can be terminated with Thread.interrupt. When an interrupt is received, + * we will ask the executor to shutdown and ask each peer to disconnect. At that point + * no threads or network connections will be active. */ @Override public void run() { @@ -165,7 +216,8 @@ public class PeerGroup { peer.run(); } finally { - // In all cases, put the address back on the queue + // In all cases, put the address back on the queue. + // We will retry this peer after all other peers have been tried. inactives.add(address); peers.remove(peer); handlePeerDeath(peer); @@ -174,11 +226,13 @@ public class PeerGroup { }; executor.execute(command); break; - } - catch (RejectedExecutionException e) { + } catch (RejectedExecutionException e) { // Reached maxConnections, try again after a delay } catch (BlockStoreException e) { - log.error("block store corrupt?", e); + // Fatal error + log.error("Block store corrupt?", e); + running = false; + break; } // If we got here, we should retry this address because an error unrelated @@ -193,6 +247,8 @@ public class PeerGroup { * *

If no peers are currently connected, the download will be started * once a peer starts. If the peer dies, the download will resume with another peer. + * + * @param listener a listener for chain download events, may not be null */ public synchronized void startBlockChainDownload(PeerEventListener listener) { this.downloadListener = listener; @@ -213,7 +269,7 @@ public class PeerGroup { } } - private void startBlockChainDownloadFromPeer(Peer peer) { + private synchronized void startBlockChainDownloadFromPeer(Peer peer) { peer.addEventListener(downloadListener); try { peer.startBlockChainDownload(); @@ -231,9 +287,7 @@ public class PeerGroup { final String namePrefix; PeerGroupThreadFactory() { - SecurityManager s = System.getSecurityManager(); - group = (s != null)? s.getThreadGroup() : - Thread.currentThread().getThreadGroup(); + group = Thread.currentThread().getThreadGroup(); namePrefix = "PeerGroup-" + poolNumber.getAndIncrement() + "-thread-"; diff --git a/src/com/google/bitcoin/core/Wallet.java b/src/com/google/bitcoin/core/Wallet.java index fe5f945ef..18f5b7335 100644 --- a/src/com/google/bitcoin/core/Wallet.java +++ b/src/com/google/bitcoin/core/Wallet.java @@ -422,19 +422,45 @@ public class Wallet implements Serializable { } /** - * Sends coins to the given address, via the given {@link PeerGroup}. Change is returned to the first key in the wallet. + * Sends coins to the given address, via the given {@link PeerGroup}. + * Change is returned to the first key in the wallet. + * * @param to Which address to send coins to. * @param nanocoins How many nanocoins to send. You can use Utils.toNanoCoins() to calculate this. * @return The {@link Transaction} that was created or null if there was insufficient balance to send the coins. * @throws IOException if there was a problem broadcasting the transaction */ - public synchronized Transaction sendCoins(PeerGroup peerGroup, Address to, BigInteger nanocoins) throws IOException { + public synchronized Transaction sendCoins(PeerGroup peerGroup, Address to, BigInteger nanocoins) + throws IOException { Transaction tx = createSend(to, nanocoins); if (tx == null) // Not enough money! :-( return null; - if (peerGroup.broadcastTransaction(tx)) { - confirmSend(tx); + if (!peerGroup.broadcastTransaction(tx)) { + throw new IOException("Failed to broadcast tx to all connected peers"); } + + // TODO - retry logic + confirmSend(tx); + return tx; + } + + /** + * Sends coins to the given address, via the given {@link Peer}. + * Change is returned to the first key in the wallet. + * + * @param to Which address to send coins to. + * @param nanocoins How many nanocoins to send. You can use Utils.toNanoCoins() to calculate this. + * @return The {@link Transaction} that was created or null if there was insufficient balance to send the coins. + * @throws IOException if there was a problem broadcasting the transaction + */ + public synchronized Transaction sendCoins(Peer peer, Address to, BigInteger nanocoins) + throws IOException { + Transaction tx = createSend(to, nanocoins); + if (tx == null) // Not enough money! :-( + return null; + peer.broadcastTransaction(tx); + confirmSend(tx); + return tx; } diff --git a/src/com/google/bitcoin/examples/DownloadListener.java b/src/com/google/bitcoin/examples/DownloadListener.java deleted file mode 100644 index 2f631a9ef..000000000 --- a/src/com/google/bitcoin/examples/DownloadListener.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Copyright 2011 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.bitcoin.examples; - -import com.google.bitcoin.core.Peer; -import com.google.bitcoin.core.PeerEventListener; - -import java.util.concurrent.Semaphore; - -class DownloadListener implements PeerEventListener { - private int originalBlocksLeft = -1; - private int lastPercent = -1; - Semaphore done = new Semaphore(0); - - @Override - public void onBlocksDownloaded(Peer peer, int blocksLeft) { - if (blocksLeft == 0) { - System.out.println("Done downloading block chain"); - done.release(); - } - - if (blocksLeft <= 0) - return; - - if (originalBlocksLeft < 0) { - System.out.println("Downloading block chain of size " + blocksLeft + ". " + - (lastPercent > 1000 ? "This may take a while." : "")); - originalBlocksLeft = blocksLeft; - } - - double pct = 100.0 - (100.0 * (blocksLeft / (double) originalBlocksLeft)); - if ((int)pct != lastPercent) { - System.out.println(String.format("Chain download %d%% done", (int) pct)); - lastPercent = (int)pct; - } - } - - public void await() throws InterruptedException { - done.acquire(); - } -} \ No newline at end of file diff --git a/src/com/google/bitcoin/examples/PingService.java b/src/com/google/bitcoin/examples/PingService.java index 6a375d39b..2c9211635 100644 --- a/src/com/google/bitcoin/examples/PingService.java +++ b/src/com/google/bitcoin/examples/PingService.java @@ -18,6 +18,7 @@ package com.google.bitcoin.examples; import com.google.bitcoin.core.Address; import com.google.bitcoin.core.BlockChain; +import com.google.bitcoin.core.DownloadListener; import com.google.bitcoin.core.ECKey; import com.google.bitcoin.core.NetworkParameters; import com.google.bitcoin.core.PeerAddress; diff --git a/src/com/google/bitcoin/examples/RefreshWallet.java b/src/com/google/bitcoin/examples/RefreshWallet.java index a16908c10..e253bbfdd 100644 --- a/src/com/google/bitcoin/examples/RefreshWallet.java +++ b/src/com/google/bitcoin/examples/RefreshWallet.java @@ -17,7 +17,7 @@ package com.google.bitcoin.examples; import com.google.bitcoin.core.BlockChain; -import com.google.bitcoin.core.NetworkConnection; +import com.google.bitcoin.core.DownloadListener; import com.google.bitcoin.core.NetworkParameters; import com.google.bitcoin.core.PeerAddress; import com.google.bitcoin.core.PeerGroup;