From 29d996b552c2e3243e1507e71ec981236ab2d1de Mon Sep 17 00:00:00 2001 From: "Miron Cuperman (devrandom)" Date: Fri, 15 Jul 2011 22:59:15 +0000 Subject: [PATCH] PeerGroup cleanup 2 --- .../core/AbstractPeerEventListener.java | 35 +++++++++++ .../google/bitcoin/core/DownloadListener.java | 27 ++++---- src/com/google/bitcoin/core/Peer.java | 2 +- .../bitcoin/core/PeerEventListener.java | 15 +++-- src/com/google/bitcoin/core/PeerGroup.java | 63 ++++++++++++++----- src/com/google/bitcoin/core/Wallet.java | 3 +- .../google/bitcoin/examples/PingService.java | 6 +- .../google/bitcoin/examples/PrivateKeys.java | 7 +-- .../bitcoin/examples/RefreshWallet.java | 7 +-- 9 files changed, 119 insertions(+), 46 deletions(-) create mode 100644 src/com/google/bitcoin/core/AbstractPeerEventListener.java diff --git a/src/com/google/bitcoin/core/AbstractPeerEventListener.java b/src/com/google/bitcoin/core/AbstractPeerEventListener.java new file mode 100644 index 000000000..8e4fda635 --- /dev/null +++ b/src/com/google/bitcoin/core/AbstractPeerEventListener.java @@ -0,0 +1,35 @@ +/** + * Copyright 2011 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.bitcoin.core; + +/** + * Convenience abstract class for implmenting a PeerEventListener. + * + *

The default method implementations do nothing. + * + * @author miron@google.com (Miron Cuperman) + * + */ +public class AbstractPeerEventListener extends Object implements PeerEventListener { + @Override + public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft) { + } + + @Override + public void onChainDownloadStarted(Peer peer, int blocksLeft) { + } +} diff --git a/src/com/google/bitcoin/core/DownloadListener.java b/src/com/google/bitcoin/core/DownloadListener.java index b31957b6f..a18482510 100644 --- a/src/com/google/bitcoin/core/DownloadListener.java +++ b/src/com/google/bitcoin/core/DownloadListener.java @@ -20,6 +20,8 @@ import com.google.bitcoin.core.AbstractPeerEventListener; import com.google.bitcoin.core.Block; import com.google.bitcoin.core.Peer; +import java.text.DateFormat; +import java.util.Date; import java.util.concurrent.Semaphore; /** @@ -36,9 +38,15 @@ import java.util.concurrent.Semaphore; */ public class DownloadListener extends AbstractPeerEventListener { private int originalBlocksLeft = -1; - private int lastPercent = -1; + private int lastPercent = 0; Semaphore done = new Semaphore(0); + @Override + public void onChainDownloadStarted(Peer peer, int blocksLeft) { + startDownload(blocksLeft); + originalBlocksLeft = blocksLeft; + } + @Override public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft) { if (blocksLeft == 0) { @@ -46,17 +54,12 @@ public class DownloadListener extends AbstractPeerEventListener { done.release(); } - if (blocksLeft <= 0) + if (blocksLeft < 0 || originalBlocksLeft <= 0) return; - if (originalBlocksLeft < 0) { - startDownload(blocksLeft); - originalBlocksLeft = blocksLeft; - } - double pct = 100.0 - (100.0 * (blocksLeft / (double) originalBlocksLeft)); if ((int)pct != lastPercent) { - progress(pct); + progress(pct, new Date(block.getTime())); lastPercent = (int)pct; } } @@ -65,9 +68,11 @@ public class DownloadListener extends AbstractPeerEventListener { * Called when download progress is made. * * @param pct the percentage of chain downloaded, estimated + * @param date the date of the last block downloaded */ - protected void progress(double pct) { - System.out.println(String.format("Chain download %d%% done", (int) pct)); + protected void progress(double pct, Date date) { + System.out.println(String.format("Chain download %d%% done, block date %s", (int) pct, + DateFormat.getDateInstance().format(date))); } /** @@ -77,7 +82,7 @@ public class DownloadListener extends AbstractPeerEventListener { */ protected void startDownload(int blocks) { System.out.println("Downloading block chain of size " + blocks + ". " + - (lastPercent > 1000 ? "This may take a while." : "")); + (blocks > 1000 ? "This may take a while." : "")); } /** diff --git a/src/com/google/bitcoin/core/Peer.java b/src/com/google/bitcoin/core/Peer.java index 26abb7304..d17d972cc 100644 --- a/src/com/google/bitcoin/core/Peer.java +++ b/src/com/google/bitcoin/core/Peer.java @@ -358,7 +358,7 @@ public class Peer { public void startBlockChainDownload() throws IOException { for (PeerEventListener listener : eventListeners) { synchronized (listener) { - listener.onBlocksDownloaded(this, null, getPeerBlocksToGet()); + listener.onChainDownloadStarted(this, getPeerBlocksToGet()); } } diff --git a/src/com/google/bitcoin/core/PeerEventListener.java b/src/com/google/bitcoin/core/PeerEventListener.java index 152fcb1e8..7f6ecc876 100644 --- a/src/com/google/bitcoin/core/PeerEventListener.java +++ b/src/com/google/bitcoin/core/PeerEventListener.java @@ -28,14 +28,21 @@ package com.google.bitcoin.core; */ public interface PeerEventListener { /** - * This is called on a Peer thread when a block is received. It is also called when a download - * is started with the initial number of blocks to be downloaded. + * Called on a Peer thread when a block is received. * - *

The block may have transactions or may be a header only once getheaders is implemented + *

The block may have transactions or may be a header only once getheaders is implemented. * * @param peer the peer receiving the block - * @param block the downloaded block, or null if this is the initial callback + * @param block the downloaded block * @param blocksLeft the number of blocks left to download */ public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft); + + /** + * Called when a download is started with the initial number of blocks to be downloaded. + * + * @param peer the peer receiving the block + * @param blocksLeft the number of blocks left to download + */ + public void onChainDownloadStarted(Peer peer, int blocksLeft); } diff --git a/src/com/google/bitcoin/core/PeerGroup.java b/src/com/google/bitcoin/core/PeerGroup.java index ea6c42441..d3ce3380e 100644 --- a/src/com/google/bitcoin/core/PeerGroup.java +++ b/src/com/google/bitcoin/core/PeerGroup.java @@ -55,6 +55,8 @@ import java.util.concurrent.atomic.AtomicInteger; * */ public class PeerGroup { + private static final int DEFAULT_CONNECTIONS = 10; + private static final Logger log = LoggerFactory.getLogger(PeerGroup.class); private static final int CONNECTION_DELAY_MILLIS = 5 * 1000; @@ -66,11 +68,11 @@ public class PeerGroup { // Addresses to try to connect to, excluding active peers private BlockingQueue inactives; // Connection initiation thread - private Thread thread; + private Thread connectThread; // True if the connection initiation thread should be running private boolean running; // A pool of threads for peers, of size maxConnection - private ThreadPoolExecutor executor; + private ThreadPoolExecutor peerPool; // Currently active peers private Set peers; // The peer we are currently downloading the chain from @@ -84,13 +86,9 @@ public class PeerGroup { /** * Create a PeerGroup - * - * @param maxConnections the maximum number of peer connections that this group will try to make. - * Depending on the environment, this is normally between 1 and 10. */ - public PeerGroup(int maxConnections, BlockStore blockStore, NetworkParameters params, - BlockChain chain) { - this.maxConnections = maxConnections; + public PeerGroup(BlockStore blockStore, NetworkParameters params, BlockChain chain) { + this.maxConnections = DEFAULT_CONNECTIONS; this.blockStore = blockStore; this.params = params; this.chain = chain; @@ -98,12 +96,25 @@ public class PeerGroup { inactives = new LinkedBlockingQueue(); peers = Collections.synchronizedSet(new HashSet()); - executor = new ThreadPoolExecutor(CORE_THREADS, this.maxConnections, + peerPool = new ThreadPoolExecutor(CORE_THREADS, this.maxConnections, THREAD_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, new LinkedBlockingQueue(1), new PeerGroupThreadFactory()); } + /** + * @param maxConnections the maximum number of peer connections that this group will try to make. + * + * Depending on the environment, this should normally be between 1 and 10, default is 10. + */ + public void setMaxConnections(int maxConnections) { + this.maxConnections = maxConnections; + } + + public int getMaxConnections() { + return maxConnections; + } + /** Add an address to the list of potential peers to connect to */ public void addAddress(PeerAddress peerAddress) { // TODO(miron) consider deduplication @@ -128,9 +139,9 @@ public class PeerGroup { /** Starts the background thread that makes connections. */ public void start() { - this.thread = new Thread(new PeerExecutionRunnable(), "Peer group thread"); + this.connectThread = new Thread(new PeerExecutionRunnable(), "Peer group thread"); running = true; - this.thread.start(); + this.connectThread.start(); } /** @@ -141,7 +152,7 @@ public class PeerGroup { */ public synchronized void stop() { if (running) { - thread.interrupt(); + connectThread.interrupt(); } } @@ -187,7 +198,7 @@ public class PeerGroup { } } - executor.shutdownNow(); + peerPool.shutdownNow(); for (Peer peer : peers) { peer.disconnect(); @@ -224,10 +235,15 @@ public class PeerGroup { } } }; - executor.execute(command); + peerPool.execute(command); break; } catch (RejectedExecutionException e) { // Reached maxConnections, try again after a delay + + // TODO - consider being smarter about retry. No need to retry + // if we reached maxConnections or if peer queue is empty. Also consider + // exponential backoff on peers and adjusting the sleep time according to the + // lowest backoff value in queue. } catch (BlockStoreException e) { // Fatal error log.error("Block store corrupt?", e); @@ -252,10 +268,29 @@ public class PeerGroup { */ public synchronized void startBlockChainDownload(PeerEventListener listener) { this.downloadListener = listener; + // TODO be more nuanced about which peer to download from. We can also try + // downloading from multiple peers and handle the case when a new peer comes along + // with a longer chain after we thought we were done. if (!peers.isEmpty()) startBlockChainDownloadFromPeer(peers.iterator().next()); } + /** + * Download the blockchain from peers. + * + *

This method wait until the download is complete. "Complete" is defined as downloading + * from at least one peer all the blocks that are in that peer's inventory. + */ + public void downloadBlockChain() { + DownloadListener listener = new DownloadListener(); + startBlockChainDownload(listener); + try { + listener.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + protected synchronized void handleNewPeer(Peer peer) { if (downloadListener != null && downloadPeer == null) startBlockChainDownloadFromPeer(peer); diff --git a/src/com/google/bitcoin/core/Wallet.java b/src/com/google/bitcoin/core/Wallet.java index 18f5b7335..d177aa154 100644 --- a/src/com/google/bitcoin/core/Wallet.java +++ b/src/com/google/bitcoin/core/Wallet.java @@ -430,8 +430,7 @@ public class Wallet implements Serializable { * @return The {@link Transaction} that was created or null if there was insufficient balance to send the coins. * @throws IOException if there was a problem broadcasting the transaction */ - public synchronized Transaction sendCoins(PeerGroup peerGroup, Address to, BigInteger nanocoins) - throws IOException { + public synchronized Transaction sendCoins(PeerGroup peerGroup, Address to, BigInteger nanocoins) throws IOException { Transaction tx = createSend(to, nanocoins); if (tx == null) // Not enough money! :-( return null; diff --git a/src/com/google/bitcoin/examples/PingService.java b/src/com/google/bitcoin/examples/PingService.java index 2c9211635..a30837879 100644 --- a/src/com/google/bitcoin/examples/PingService.java +++ b/src/com/google/bitcoin/examples/PingService.java @@ -87,7 +87,7 @@ public class PingService { System.out.println("Connecting ..."); BlockChain chain = new BlockChain(params, wallet, blockStore); - final PeerGroup peerGroup = new PeerGroup(1, blockStore, params, chain); + final PeerGroup peerGroup = new PeerGroup(blockStore, params, chain); peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost())); peerGroup.start(); @@ -121,9 +121,7 @@ public class PingService { } }); - final DownloadListener listener = new DownloadListener(); - peerGroup.startBlockChainDownload(listener); - listener.await(); + peerGroup.downloadBlockChain(); System.out.println("Send coins to: " + key.toAddress(params).toString()); System.out.println("Waiting for coins to arrive. Press Ctrl-C to quit."); // The PeerGroup thread keeps us alive until something kills the process. diff --git a/src/com/google/bitcoin/examples/PrivateKeys.java b/src/com/google/bitcoin/examples/PrivateKeys.java index 9ada27caa..de8789fcb 100644 --- a/src/com/google/bitcoin/examples/PrivateKeys.java +++ b/src/com/google/bitcoin/examples/PrivateKeys.java @@ -58,13 +58,10 @@ public class PrivateKeys { final MemoryBlockStore blockStore = new MemoryBlockStore(params); BlockChain chain = new BlockChain(params, wallet, blockStore); - final PeerGroup peerGroup = new PeerGroup(1, blockStore, params, chain); + final PeerGroup peerGroup = new PeerGroup(blockStore, params, chain); peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost())); peerGroup.start(); - DownloadListener listener = new DownloadListener(); - peerGroup.startBlockChainDownload(listener); - listener.await(); - + peerGroup.downloadBlockChain(); peerGroup.stop(); // And take them! diff --git a/src/com/google/bitcoin/examples/RefreshWallet.java b/src/com/google/bitcoin/examples/RefreshWallet.java index e253bbfdd..d31c35d9b 100644 --- a/src/com/google/bitcoin/examples/RefreshWallet.java +++ b/src/com/google/bitcoin/examples/RefreshWallet.java @@ -45,7 +45,7 @@ public class RefreshWallet { BlockStore blockStore = new MemoryBlockStore(params); BlockChain chain = new BlockChain(params, wallet, blockStore); - final PeerGroup peerGroup = new PeerGroup(1, blockStore, params, chain); + final PeerGroup peerGroup = new PeerGroup(blockStore, params, chain); peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost())); peerGroup.start(); @@ -58,10 +58,7 @@ public class RefreshWallet { }); // Now download and process the block chain. - DownloadListener listener = new DownloadListener(); - peerGroup.startBlockChainDownload(listener); - listener.await(); - + peerGroup.downloadBlockChain(); peerGroup.stop(); wallet.saveToFile(file); System.out.println("\nDone!\n");