Some changes to PeerGroup and how we manage the download process:

- Have a dominant peer that is responsible for all data downloads. This eliminates the case of multiple threads fighting over download of the block chain and wasting time/bandwidth duplicating work.
- Make NetworkConnection an interface with two implementations: {TCP,Mock}NetworkConnection
- Rewrite the Peer/PeerGroup tests to use the mock connection. This simplifies testing of multiple independent peer threads within the same group.
- Switch off the MOBILE_OPTIMIZED mode as it's no longer required. It may still be useful for the multiplexing proxy project.
This commit is contained in:
Mike Hearn 2011-10-21 13:13:33 +00:00
parent 0c5408e7c6
commit 0cec27e5a7
8 changed files with 763 additions and 430 deletions

View File

@ -1,4 +1,4 @@
/**
/*
* Copyright 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@ -16,147 +16,33 @@
package com.google.bitcoin.core;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Date;
import java.util.LinkedHashMap;
/**
* A NetworkConnection handles talking to a remote BitCoin peer at a low level. It understands how to read and write
* messages off the network, but doesn't asynchronously communicate with the peer or handle the higher level details
* messages, but doesn't asynchronously communicate with the peer or handle the higher level details
* of the protocol. After constructing a NetworkConnection, use a {@link Peer} to hand off communication to a
* background thread.<p>
*
* Multiple NetworkConnections will, by default, wait if another NetworkConnection instance is deserializing a
* message and discard duplicates before reading them. This is intended to avoid memory usage spikes in constrained
* environments like Android where deserializing a large message (like a block) on multiple threads simultaneously is
* both wasteful and can cause OOM failures.<p>
* NetworkConnection is an interface in order to support multiple low level protocols. You likely want a
* {@link TCPNetworkConnection} as it's currently the only NetworkConnection implementation. In future there may be
* others that support connections over Bluetooth, NFC, UNIX domain sockets and so on.<p>
*
* Construction is blocking whilst the protocol version is negotiated.
*/
public class NetworkConnection {
private static final Logger log = LoggerFactory.getLogger(NetworkConnection.class);
private final Socket socket;
private final OutputStream out;
private final InputStream in;
// The IP address to which we are connecting.
private final InetAddress remoteIp;
private final NetworkParameters params;
private final VersionMessage versionMessage;
// Given to the BitcoinSerializer to de-duplicate messages.
private static final LinkedHashMap<Sha256Hash, Integer> dedupeList = BitcoinSerializer.createDedupeList();
private BitcoinSerializer serializer = null;
/**
* Connect to the given IP address using the port specified as part of the network parameters. Once construction
* is complete a functioning network channel is set up and running.
*
* @param peerAddress address to connect to. IPv6 is not currently supported by BitCoin. If
* port is not positive the default port from params is used.
* @param params Defines which network to connect to and details of the protocol.
* @param bestHeight How many blocks are in our best chain
* @param connectTimeout Timeout in milliseconds when initially connecting to peer
* @param dedupe Whether to avoid parsing duplicate messages from the network (ie from other peers).
* @throws IOException if there is a network related failure.
* @throws ProtocolException if the version negotiation failed.
*/
public NetworkConnection(PeerAddress peerAddress, NetworkParameters params,
int bestHeight, int connectTimeout, boolean dedupe)
throws IOException, ProtocolException {
this.params = params;
this.remoteIp = peerAddress.getAddr();
int port = (peerAddress.getPort() > 0) ? peerAddress.getPort() : params.port;
InetSocketAddress address = new InetSocketAddress(remoteIp, port);
socket = new Socket();
socket.connect(address, connectTimeout);
out = socket.getOutputStream();
in = socket.getInputStream();
// The version message never uses checksumming. Update checkumming property after version is read.
this.serializer = new BitcoinSerializer(params, false, dedupe ? dedupeList : null);
// Announce ourselves. This has to come first to connect to clients beyond v0.30.20.2 which wait to hear
// from us until they send their version message back.
writeMessage(new VersionMessage(params, bestHeight));
// When connecting, the remote peer sends us a version message with various bits of
// useful data in it. We need to know the peer protocol version before we can talk to it.
Message m = readMessage();
if (!(m instanceof VersionMessage)) {
// Bad peers might not follow the protocol. This has been seen in the wild (issue 81).
throw new ProtocolException("First message received was not a version message but rather " + m);
}
versionMessage = (VersionMessage) m;
// Now it's our turn ...
// Send an ACK message stating we accept the peers protocol version.
writeMessage(new VersionAck());
// And get one back ...
readMessage();
// Switch to the new protocol version.
int peerVersion = versionMessage.clientVersion;
log.info("Connected to peer: version={}, subVer='{}', services=0x{}, time={}, blocks={}", new Object[]{
peerVersion,
versionMessage.subVer,
versionMessage.localServices,
new Date(versionMessage.time * 1000),
versionMessage.bestHeight
});
// BitCoinJ is a client mode implementation. That means there's not much point in us talking to other client
// mode nodes because we can't download the data from them we need to find/verify transactions.
if (!versionMessage.hasBlockChain()) {
// Shut down the socket
try {
shutdown();
} catch (IOException ex) {
// ignore exceptions while aborting
}
throw new ProtocolException("Peer does not have a copy of the block chain.");
}
// newer clients use checksumming
serializer.setUseChecksumming(peerVersion >= 209);
// Handshake is done!
}
public NetworkConnection(InetAddress inetAddress, NetworkParameters params, int bestHeight, int connectTimeout)
throws IOException, ProtocolException {
this(new PeerAddress(inetAddress), params, bestHeight, connectTimeout, true);
}
public interface NetworkConnection {
/**
* Sends a "ping" message to the remote node. The protocol doesn't presently use this feature much.
*
* @throws IOException
*/
public void ping() throws IOException {
writeMessage(new Ping());
}
void ping() throws IOException;
/**
* Shuts down the network socket. Note that there's no way to wait for a socket to be fully flushed out to the
* wire, so if you call this immediately after sending a message it might not get sent.
*/
public void shutdown() throws IOException {
socket.shutdownOutput();
socket.shutdownInput();
socket.close();
}
@Override
public String toString() {
return "[" + remoteIp.getHostAddress() + "]:" + params.port + " (" + (socket.isConnected() ? "connected" :
"disconnected") + ")";
}
void shutdown() throws IOException;
/**
* Reads a network message from the wire, blocking until the message is fully received.
@ -164,15 +50,7 @@ public class NetworkConnection {
* @return An instance of a Message subclass
* @throws ProtocolException if the message is badly formatted, failed checksum or there was a TCP failure.
*/
public Message readMessage() throws IOException, ProtocolException {
Message message;
do {
message = serializer.deserialize(in);
// If message is null, it means deduping was enabled, we read a duplicated message and skipped parsing to
// avoid doing redundant work. So go around and wait for another message.
} while (message == null);
return message;
}
Message readMessage() throws IOException, ProtocolException;
/**
* Writes the given message out over the network using the protocol tag. For a Transaction
@ -181,16 +59,8 @@ public class NetworkConnection {
*
* @throws IOException
*/
public void writeMessage(Message message) throws IOException {
synchronized (out) {
serializer.serialize(message, out);
}
}
void writeMessage(Message message) throws IOException;
/**
* Returns the version message received from the other end of the connection during the handshake.
*/
public VersionMessage getVersionMessage() {
return versionMessage;
}
/** Returns the version message received from the other end of the connection during the handshake. */
VersionMessage getVersionMessage();
}

View File

@ -39,26 +39,27 @@ public class Peer {
// knows to quit when the socket goes away.
private boolean running;
private final BlockChain blockChain;
// When we want to download a block or transaction from a peer, the InventoryItem is put here whilst waiting for
// the response. Synchronized on itself.
// When an API user explicitly requests a block or transaction from a peer, the InventoryItem is put here
// whilst waiting for the response. Synchronized on itself. Is not used for downloads Peer generates itself.
private final List<GetDataFuture<Block>> pendingGetBlockFutures;
// Height of the chain advertised in the peers version message.
private int bestHeight;
private PeerAddress address;
private List<PeerEventListener> eventListeners;
// Whether to try and download blocks and transactions from this peer. Set to false by PeerGroup if not the
// primary peer. This is to avoid redundant work and concurrency problems with downloading the same chain
// in parallel.
private boolean downloadData = true;
/**
* If true, we do some things that may only make sense on constrained devices like Android phones. Currently this
* only controls message deduplication.
*/
public static boolean MOBILE_OPTIMIZED = true;
public static boolean MOBILE_OPTIMIZED = false;
/**
* Construct a peer that handles the given network connection and reads/writes from the given block chain. Note that
* communication won't occur until you call connect().
* Construct a peer that reads/writes from the given block chain. Note that communication won't occur until
* you call connect(), which will set up a new NetworkConnection.
*
* @param bestHeight our current best chain height, to facilitate downloading
*/
@ -72,13 +73,21 @@ public class Peer {
}
/**
* Construct a peer that handles the given network connection and reads/writes from the given block chain. Note that
* communication won't occur until you call connect().
* Construct a peer that reads/writes from the given block chain. Note that communication won't occur until
* you call connect(), which will set up a new NetworkConnection.
*/
public Peer(NetworkParameters params, PeerAddress address, BlockChain blockChain) {
this(params, address, 0, blockChain);
}
/**
* Construct a peer that uses the given, already connected network connection object.
*/
public Peer(NetworkParameters params, BlockChain blockChain, NetworkConnection connection) {
this(params, null, 0, blockChain);
this.conn = connection;
}
public synchronized void addEventListener(PeerEventListener listener) {
eventListeners.add(listener);
}
@ -89,7 +98,12 @@ public class Peer {
@Override
public String toString() {
return "Peer(" + address.getAddr() + ":" + address.getPort() + ")";
if (address == null) {
// User-provided NetworkConnection object.
return "Peer(NetworkConnection:" + conn + ")";
} else {
return "Peer(" + address.getAddr() + ":" + address.getPort() + ")";
}
}
/**
@ -99,7 +113,7 @@ public class Peer {
*/
public synchronized void connect() throws PeerException {
try {
conn = new NetworkConnection(address, params, bestHeight, 60000, MOBILE_OPTIMIZED);
conn = new TCPNetworkConnection(address, params, bestHeight, 60000, MOBILE_OPTIMIZED);
} catch (IOException ex) {
throw new PeerException(ex);
} catch (ProtocolException ex) {
@ -207,17 +221,18 @@ public class Peer {
}
private void processInv(InventoryMessage inv) throws IOException {
// This should be called in the network loop thread for this peer
// This should be called in the network loop thread for this peer.
// If this peer isn't responsible for downloading stuff, ignore inv messages.
// TODO: In future, we should not ignore but count them. This allows a guesstimate of trustworthyness.
if (!downloadData)
return;
// The peer told us about some blocks or transactions they have. For now we only care about blocks.
// Note that as we don't actually want to store the entire block chain or even the headers of the block
// chain, we may end up requesting blocks we already requested before. This shouldn't (in theory) happen
// enough to be a problem.
Block topBlock = blockChain.getUnconnectedBlock();
Sha256Hash topHash = (topBlock != null ? topBlock.getHash() : null);
List<InventoryItem> items = inv.getItems();
if (items.size() == 1 && items.get(0).type == InventoryItem.Type.Block && topHash != null &&
items.get(0).hash.equals(topHash)) {
if (isNewBlockTickle(topHash, items)) {
// An inv with a single hash containing our most recent unconnected block is a special inv,
// it's kind of like a tickle from the peer telling us that it's time to download more blocks to catch up to
// the block chain. We could just ignore this and treat it as a regular inv but then we'd download the head
@ -240,6 +255,14 @@ public class Peer {
conn.writeMessage(getdata);
}
/** A new block tickle is an inv with a hash containing the topmost block. */
private boolean isNewBlockTickle(Sha256Hash topHash, List<InventoryItem> items) {
return items.size() == 1 &&
items.get(0).type == InventoryItem.Type.Block &&
topHash != null &&
items.get(0).hash.equals(topHash);
}
/**
* Asks the connected peer for the block of the given hash, and returns a Future representing the answer.
* If you want the block right away and don't mind waiting for it, just call .get() on the result. Your thread
@ -386,6 +409,7 @@ public class Peer {
* downloaded the same number of blocks that the peer advertised having in its version handshake message.
*/
public void startBlockChainDownload() throws IOException {
setDownloadData(true);
// TODO: peer might still have blocks that we don't have, and even have a heavier
// chain even if the chain block count is lower.
if (getPeerBlocksToGet() > 0) {
@ -428,4 +452,20 @@ public class Peer {
// Don't care about this.
}
}
/**
* Returns true if this peer will try and download things it is sent in "inv" messages. Normally you only need
* one peer to be downloading data. Defaults to true.
*/
public boolean getDownloadData() {
return downloadData;
}
/**
* If set to false, the peer won't try and fetch blocks and transactions it hears about. Normally, only one
* peer should download missing blocks. Defaults to true.
*/
public void setDownloadData(boolean downloadData) {
this.downloadData = downloadData;
}
}

View File

@ -55,7 +55,6 @@ public class PeerGroup {
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
public static final int DEFAULT_CONNECTION_DELAY_MILLIS = 5 * 1000;
private static final int CORE_THREADS = 1;
private static final int THREAD_KEEP_ALIVE_SECONDS = 1;
// Addresses to try to connect to, excluding active peers
@ -103,7 +102,9 @@ public class PeerGroup {
peers = Collections.synchronizedSet(new HashSet<Peer>());
peerEventListeners = Collections.synchronizedSet(new HashSet<PeerEventListener>());
peerDiscoverers = Collections.synchronizedSet(new HashSet<PeerDiscovery>());
peerPool = new ThreadPoolExecutor(CORE_THREADS, DEFAULT_CONNECTIONS,
peerPool = new ThreadPoolExecutor(
DEFAULT_CONNECTIONS,
DEFAULT_CONNECTIONS,
THREAD_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1),
new PeerGroupThreadFactory());
@ -121,16 +122,39 @@ public class PeerGroup {
return peerEventListeners.remove(listener);
}
/**
* Use this to directly add an already initialized and connected {@link Peer} object. Normally, you would prefer
* to use {@link PeerGroup#addAddress(PeerAddress)} and let this object handle construction of the peer for you.
* This method is useful when you are working closely with the network code (and in unit tests).<p>
*
* Note that if this peer group already has the maximum number of peers running (see {@link PeerGroup#DEFAULT_CONNECTIONS})
* then this method will block until other peers are disconnected.<p>
*
* Calling this will result in calls to any registered {@link PeerEventListener}s. Block chain download may occur.
*/
public synchronized void addPeer(Peer peer) {
// TODO: Reconsider the behavior of this method. It probably should never block.
if (!running)
throw new IllegalStateException("Must call start() before adding peers.");
log.info("Adding directly to group: {}", peer);
// Set download mode to be false whilst we spin up the peer thread. Otherwise there is a race.
peer.setDownloadData(false);
// This starts the peer thread running.
executePeer(null, peer, false);
// This ensures downloadPeer is set up correctly and triggers block chain retrieval if necesssary.
handleNewPeer(peer);
}
/**
* Depending on the environment, this should normally be between 1 and 10, default is 4.
*
* @param maxConnections the maximum number of peer connections that this group will try to make.
*/
public void setMaxConnections(int maxConnections) {
public synchronized void setMaxConnections(int maxConnections) {
peerPool.setMaximumPoolSize(maxConnections);
}
public int getMaxConnections() {
public synchronized int getMaxConnections() {
return peerPool.getMaximumPoolSize();
}
@ -152,7 +176,7 @@ public class PeerGroup {
/**
* Starts the background thread that makes connections.
*/
public void start() {
public synchronized void start() {
this.connectThread = new Thread(new PeerExecutionRunnable(), "Peer group thread");
running = true;
this.connectThread.start();
@ -191,10 +215,9 @@ public class PeerGroup {
}
private final class PeerExecutionRunnable implements Runnable {
/**
* Repeatedly get the next peer address from the inactive queue
* and try to connect.
* <p/>
/*
* Repeatedly get the next peer address from the inactive queue and try to connect.
*
* <p>We can be terminated with Thread.interrupt. When an interrupt is received,
* we will ask the executor to shutdown and ask each peer to disconnect. At that point
* no threads or network connections will be active.
@ -250,40 +273,8 @@ public class PeerGroup {
final PeerAddress address = inactives.take();
while (true) {
try {
final Peer peer = new Peer(params, address, blockStore.getChainHead().getHeight(), chain);
Runnable command = new Runnable() {
public void run() {
try {
log.info("Connecting to " + peer);
peer.connect();
peers.add(peer);
handleNewPeer(peer);
peer.run();
} catch (PeerException ex) {
// Do not propagate PeerException - log and try next peer. Suppress stack traces for
// exceptions we expect as part of normal network behaviour.
final Throwable cause = ex.getCause();
if (cause instanceof SocketTimeoutException) {
log.info("Timeout talking to " + peer + ": " + cause.getMessage());
} else if (cause instanceof ConnectException) {
log.info("Could not connect to " + peer + ": " + cause.getMessage());
} else if (cause instanceof IOException) {
log.info("Error talking to " + peer + ": " + cause.getMessage());
} else {
log.error("Unexpected exception whilst talking to " + peer, ex);
}
} finally {
// In all cases, disconnect and put the address back on the queue.
// We will retry this peer after all other peers have been tried.
peer.disconnect();
inactives.add(address);
if (peers.remove(peer))
handlePeerDeath(peer);
}
}
};
peerPool.execute(command);
Peer peer = new Peer(params, address, blockStore.getChainHead().getHeight(), chain);
executePeer(address, peer, true);
break;
} catch (RejectedExecutionException e) {
// Reached maxConnections, try again after a delay
@ -298,7 +289,7 @@ public class PeerGroup {
running = false;
throw new RuntimeException(e);
}
// If we got here, we should retry this address because an error unrelated
// to the peer has occurred.
Thread.sleep(connectionDelayMillis);
@ -306,6 +297,45 @@ public class PeerGroup {
}
}
private void executePeer(final PeerAddress address, final Peer peer, final boolean shouldConnect) {
peerPool.execute(new Runnable() {
public void run() {
try {
if (shouldConnect) {
log.info("Connecting to " + peer);
peer.connect();
}
peers.add(peer);
handleNewPeer(peer);
peer.run();
} catch (PeerException ex) {
// Do not propagate PeerException - log and try next peer. Suppress stack traces for
// exceptions we expect as part of normal network behaviour.
final Throwable cause = ex.getCause();
if (cause instanceof SocketTimeoutException) {
log.info("Timeout talking to " + peer + ": " + cause.getMessage());
} else if (cause instanceof ConnectException) {
log.info("Could not connect to " + peer + ": " + cause.getMessage());
} else if (cause instanceof IOException) {
log.info("Error talking to " + peer + ": " + cause.getMessage());
} else {
log.error("Unexpected exception whilst talking to " + peer, ex);
}
} finally {
// In all cases, disconnect and put the address back on the queue.
// We will retry this peer after all other peers have been tried.
peer.disconnect();
// We may not know the address if the peer was added directly.
if (address != null)
inactives.add(address);
if (peers.remove(peer))
handlePeerDeath(peer);
}
}
});
}
/**
* Start downloading the blockchain from the first available peer.
* <p/>
@ -316,7 +346,7 @@ public class PeerGroup {
*/
public synchronized void startBlockChainDownload(PeerEventListener listener) {
this.downloadListener = listener;
// TODO be more nuanced about which peer to download from. We can also try
// TODO: be more nuanced about which peer to download from. We can also try
// downloading from multiple peers and handle the case when a new peer comes along
// with a longer chain after we thought we were done.
synchronized (peers) {
@ -343,8 +373,14 @@ public class PeerGroup {
}
protected synchronized void handleNewPeer(Peer peer) {
if (downloadListener != null && downloadPeer == null)
log.info("Handling new {}", peer);
// If we want to download the chain, and we aren't currently doing so, do so now.
if (downloadListener != null && downloadPeer == null) {
log.info(" starting block chain download");
startBlockChainDownloadFromPeer(peer);
} else if (downloadPeer == null) {
setDownloadPeer(peer);
}
synchronized (peerEventListeners) {
for (PeerEventListener listener : peerEventListeners) {
synchronized (listener) {
@ -354,12 +390,29 @@ public class PeerGroup {
}
}
private void setDownloadPeer(Peer peer) {
if (downloadPeer != null) {
downloadPeer.setDownloadData(false);
}
downloadPeer = peer;
if (downloadPeer != null) {
downloadPeer.setDownloadData(true);
}
}
protected synchronized void handlePeerDeath(Peer peer) {
assert !peers.contains(peer);
if (peer == downloadPeer) {
downloadPeer = null;
log.info("Download peer died. Picking a new one.");
setDownloadPeer(null);
// Pick a new one and possibly tell it to download the chain.
synchronized (peers) {
if (downloadListener != null && !peers.isEmpty()) {
startBlockChainDownloadFromPeer(peers.iterator().next());
if (!peers.isEmpty()) {
Peer next = peers.iterator().next();
setDownloadPeer(next);
if (downloadListener != null) {
startBlockChainDownloadFromPeer(next);
}
}
}
}
@ -374,14 +427,15 @@ public class PeerGroup {
}
private synchronized void startBlockChainDownloadFromPeer(Peer peer) {
peer.addEventListener(downloadListener);
try {
peer.addEventListener(downloadListener);
setDownloadPeer(peer);
// startBlockChainDownload will setDownloadData(true) on itself automatically.
peer.startBlockChainDownload();
} catch (IOException e) {
log.error("failed to start block chain download from " + peer, e);
return;
}
downloadPeer = peer;
}
static class PeerGroupThreadFactory implements ThreadFactory {

View File

@ -0,0 +1,167 @@
/*
* Copyright 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.bitcoin.core;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Date;
import java.util.LinkedHashMap;
/**
* A {@code TCPNetworkConnection} is used for connecting to a Bitcoin node over the standard TCP/IP protocol.<p>
*
* Multiple {@code TCPNetworkConnection}s can wait if another NetworkConnection instance is deserializing a
* message and discard duplicates before reading them. This is intended to avoid memory usage spikes in constrained
* environments like Android where deserializing a large message (like a block) on multiple threads simultaneously is
* both wasteful and can cause OOM failures. This feature is controlled at construction time.
*/
public class TCPNetworkConnection implements NetworkConnection {
private static final Logger log = LoggerFactory.getLogger(TCPNetworkConnection.class);
private final Socket socket;
private final OutputStream out;
private final InputStream in;
// The IP address to which we are connecting.
private final InetAddress remoteIp;
private final NetworkParameters params;
private final VersionMessage versionMessage;
// Given to the BitcoinSerializer to de-duplicate messages.
private static final LinkedHashMap<Sha256Hash, Integer> dedupeList = BitcoinSerializer.createDedupeList();
private BitcoinSerializer serializer = null;
/**
* Connect to the given IP address using the port specified as part of the network parameters. Once construction
* is complete a functioning network channel is set up and running.
*
* @param peerAddress address to connect to. IPv6 is not currently supported by BitCoin. If
* port is not positive the default port from params is used.
* @param params Defines which network to connect to and details of the protocol.
* @param bestHeight How many blocks are in our best chain
* @param connectTimeout Timeout in milliseconds when initially connecting to peer
* @param dedupe Whether to avoid parsing duplicate messages from the network (ie from other peers).
* @throws IOException if there is a network related failure.
* @throws ProtocolException if the version negotiation failed.
*/
public TCPNetworkConnection(PeerAddress peerAddress, NetworkParameters params,
int bestHeight, int connectTimeout, boolean dedupe)
throws IOException, ProtocolException {
this.params = params;
this.remoteIp = peerAddress.getAddr();
int port = (peerAddress.getPort() > 0) ? peerAddress.getPort() : params.port;
InetSocketAddress address = new InetSocketAddress(remoteIp, port);
socket = new Socket();
socket.connect(address, connectTimeout);
out = socket.getOutputStream();
in = socket.getInputStream();
// The version message never uses checksumming. Update checkumming property after version is read.
this.serializer = new BitcoinSerializer(params, false, dedupe ? dedupeList : null);
// Announce ourselves. This has to come first to connect to clients beyond v0.30.20.2 which wait to hear
// from us until they send their version message back.
writeMessage(new VersionMessage(params, bestHeight));
// When connecting, the remote peer sends us a version message with various bits of
// useful data in it. We need to know the peer protocol version before we can talk to it.
Message m = readMessage();
if (!(m instanceof VersionMessage)) {
// Bad peers might not follow the protocol. This has been seen in the wild (issue 81).
throw new ProtocolException("First message received was not a version message but rather " + m);
}
versionMessage = (VersionMessage) m;
// Now it's our turn ...
// Send an ACK message stating we accept the peers protocol version.
writeMessage(new VersionAck());
// And get one back ...
readMessage();
// Switch to the new protocol version.
int peerVersion = versionMessage.clientVersion;
log.info("Connected to peer: version={}, subVer='{}', services=0x{}, time={}, blocks={}", new Object[] {
peerVersion,
versionMessage.subVer,
versionMessage.localServices,
new Date(versionMessage.time * 1000),
versionMessage.bestHeight
});
// BitCoinJ is a client mode implementation. That means there's not much point in us talking to other client
// mode nodes because we can't download the data from them we need to find/verify transactions.
if (!versionMessage.hasBlockChain()) {
// Shut down the socket
try {
shutdown();
} catch (IOException ex) {
// ignore exceptions while aborting
}
throw new ProtocolException("Peer does not have a copy of the block chain.");
}
// newer clients use checksumming
serializer.setUseChecksumming(peerVersion >= 209);
// Handshake is done!
}
public TCPNetworkConnection(InetAddress inetAddress, NetworkParameters params, int bestHeight, int connectTimeout)
throws IOException, ProtocolException {
this(new PeerAddress(inetAddress), params, bestHeight, connectTimeout, true);
}
public void ping() throws IOException {
writeMessage(new Ping());
}
public void shutdown() throws IOException {
socket.shutdownOutput();
socket.shutdownInput();
socket.close();
}
@Override
public String toString() {
return "[" + remoteIp.getHostAddress() + "]:" + params.port + " (" + (socket.isConnected() ? "connected" :
"disconnected") + ")";
}
public Message readMessage() throws IOException, ProtocolException {
Message message;
do {
message = serializer.deserialize(in);
// If message is null, it means deduping was enabled, we read a duplicated message and skipped parsing to
// avoid doing redundant work. So go around and wait for another message.
} while (message == null);
return message;
}
public void writeMessage(Message message) throws IOException {
synchronized (out) {
serializer.serialize(message, out);
}
}
public VersionMessage getVersionMessage() {
return versionMessage;
}
}

View File

@ -0,0 +1,136 @@
/*
* Copyright 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.bitcoin.core;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/** Allows messages to be inserted and removed in a thread-safe manner. */
public class MockNetworkConnection implements NetworkConnection {
private BlockingQueue<Object> inboundMessageQ;
private BlockingQueue<Message> outboundMessageQ;
private boolean waitingToRead;
// Not used for anything except marking the shutdown point in the inbound queue.
private Object disconnectMarker = new Object();
private VersionMessage versionMessage;
public MockNetworkConnection() {
inboundMessageQ = new ArrayBlockingQueue<Object>(10);
outboundMessageQ = new ArrayBlockingQueue<Message>(10);
}
public void ping() throws IOException {
}
public void shutdown() throws IOException {
}
public synchronized void disconnect() throws IOException {
inboundMessageQ.add(disconnectMarker);
}
public void exceptionOnRead(Exception e) {
inboundMessageQ.add(e);
}
public Message readMessage() throws IOException, ProtocolException {
try {
// Notify popOutbound() that the network thread is now waiting to receive input. This is needed because
// otherwise it's impossible to tell apart "thread decided to not write any message" from "thread is still
// working on it" in a race-free manner.
synchronized (this) {
waitingToRead = true;
notifyAll();
}
Object o = inboundMessageQ.take();
if (o instanceof IOException) {
throw (IOException) o;
} else if (o instanceof ProtocolException) {
throw (ProtocolException) o;
} else if (o instanceof Message) {
return (Message) o;
} else if (o == disconnectMarker) {
throw new IOException("done");
} else {
throw new RuntimeException("Unknown object in inbound queue.");
}
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
synchronized (this) {
waitingToRead = false;
}
}
}
public void writeMessage(Message message) throws IOException {
try {
outboundMessageQ.put(message);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
public void setVersionMessage(VersionMessage msg) {
this.versionMessage = msg;
}
public void setVersionMessageForHeight(NetworkParameters params, int chainHeight) {
versionMessage = new VersionMessage(params, chainHeight);
}
public VersionMessage getVersionMessage() {
if (versionMessage == null) throw new RuntimeException("Need to call setVersionMessage first");
return versionMessage;
}
/** Call this to add a message which will be received by the NetworkConnection user. Wakes up the network thread. */
public void inbound(Message m) {
try {
inboundMessageQ.put(m);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* Returns a message that has been written with writeMessage. Waits until the peer thread is sitting inside
* readMessage() and has no further inbound messages to process. If at that point there is a message in the outbound
* queue, takes and returns it. Otherwise returns null. Use popOutbound() for when there is no other thread.
*/
public Message outbound() throws InterruptedException {
synchronized (this) {
while (!waitingToRead || inboundMessageQ.size() > 0) {
wait();
}
}
return popOutbound();
}
/**
* Takes the most recently received message or returns NULL if there are none waiting.
*/
public Message popOutbound() throws InterruptedException {
if (outboundMessageQ.peek() != null)
return outboundMessageQ.take();
else
return null;
}
}

View File

@ -17,32 +17,50 @@
package com.google.bitcoin.core;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import com.google.bitcoin.discovery.PeerDiscovery;
import com.google.bitcoin.discovery.PeerDiscoveryException;
import com.google.bitcoin.store.BlockStore;
import com.google.bitcoin.store.MemoryBlockStore;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
public class PeerGroupTest {
public class PeerGroupTest extends TestWithNetworkConnections {
static final NetworkParameters params = NetworkParameters.unitTests();
private Wallet wallet;
private BlockStore blockStore;
private PeerGroup peerGroup;
private final BlockingQueue<Peer> disconnectedPeers = new LinkedBlockingQueue<Peer>();
@Override
@Before
public void setUp() throws Exception {
super.setUp();
wallet = new Wallet(params);
blockStore = new MemoryBlockStore(params);
BlockChain chain = new BlockChain(params, wallet, blockStore);
peerGroup = new PeerGroup(blockStore, params, chain, 1000);
// Support for testing disconnect events in a non-racy manner.
peerGroup.addEventListener(new AbstractPeerEventListener() {
@Override
public void onPeerDisconnected(Peer peer, int peerCount) {
super.onPeerDisconnected(peer, peerCount);
try {
disconnectedPeers.put(peer);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
@Test
@ -79,4 +97,87 @@ public class PeerGroupTest {
assertTrue(result[0]);
peerGroup.stop();
}
@Test
public void singleDownloadPeer1() throws Exception {
// Check that we don't attempt to retrieve blocks on multiple peers.
// Create a couple of peers.
MockNetworkConnection n1 = createMockNetworkConnection();
Peer p1 = new Peer(params, blockChain, n1);
MockNetworkConnection n2 = createMockNetworkConnection();
Peer p2 = new Peer(params, blockChain, n2);
peerGroup.start();
peerGroup.addPeer(p1);
peerGroup.addPeer(p2);
// Set up a little block chain. We heard about b1 but not b2 (it is pending download). b3 is solved whilst we
// are downloading the chain.
Block b1 = TestUtils.createFakeBlock(params, blockStore).block;
blockChain.add(b1);
Block b2 = TestUtils.makeSolvedTestBlock(params, b1);
Block b3 = TestUtils.makeSolvedTestBlock(params, b2);
// Peer 1 and 2 receives an inv advertising a newly solved block.
InventoryMessage inv = new InventoryMessage(params);
inv.addItem(new InventoryItem(InventoryItem.Type.Block, b3.getHash()));
n1.inbound(inv);
n2.inbound(inv);
// Only peer 1 tries to download it.
assertTrue(n1.outbound() instanceof GetDataMessage);
assertNull(n2.outbound());
// Peer 1 goes away.
disconnectAndWait(n1);
// Peer 2 fetches it next time it hears an inv (should it fetch immediately?).
n2.inbound(inv);
assertTrue(n2.outbound() instanceof GetDataMessage);
peerGroup.stop();
}
@Test
public void singleDownloadPeer2() throws Exception {
// Check that we don't attempt multiple simultaneous block chain downloads, when adding a new peer in the
// middle of an existing chain download.
// Create a couple of peers.
MockNetworkConnection n1 = createMockNetworkConnection();
Peer p1 = new Peer(params, blockChain, n1);
MockNetworkConnection n2 = createMockNetworkConnection();
Peer p2 = new Peer(params, blockChain, n2);
peerGroup.start();
peerGroup.addPeer(p1);
// Set up a little block chain.
Block b1 = TestUtils.createFakeBlock(params, blockStore).block;
Block b2 = TestUtils.makeSolvedTestBlock(params, b1);
Block b3 = TestUtils.makeSolvedTestBlock(params, b2);
n1.setVersionMessageForHeight(params, 3);
n2.setVersionMessageForHeight(params, 3);
// Expect a zero hash getblocks on p1. This is how the process starts.
peerGroup.startBlockChainDownload(new AbstractPeerEventListener() {
});
GetBlocksMessage getblocks = (GetBlocksMessage) n1.outbound();
assertEquals(Sha256Hash.ZERO_HASH, getblocks.getStopHash());
// We give back an inv with some blocks in it.
InventoryMessage inv = new InventoryMessage(params);
inv.addItem(new InventoryItem(InventoryItem.Type.Block, b1.getHash()));
inv.addItem(new InventoryItem(InventoryItem.Type.Block, b2.getHash()));
inv.addItem(new InventoryItem(InventoryItem.Type.Block, b3.getHash()));
n1.inbound(inv);
// Peer creates a getdata message.
GetDataMessage getdata = (GetDataMessage) n1.outbound();
// We hand back the first block.
n1.inbound(b1);
// Now we successfully connect to another peer. There should be no messages sent.
peerGroup.addPeer(p2);
Message message = n2.outbound();
assertNull(message == null ? "" : message.toString(), message);
}
private void disconnectAndWait(MockNetworkConnection conn) throws IOException, InterruptedException {
conn.disconnect();
disconnectedPeers.take();
}
}

View File

@ -19,49 +19,26 @@ package com.google.bitcoin.core;
import static org.junit.Assert.*;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import static org.easymock.EasyMock.*;
import org.easymock.Capture;
import org.easymock.IAnswer;
import org.easymock.IMocksControl;
import org.junit.Before;
import org.junit.Test;
import com.google.bitcoin.store.MemoryBlockStore;
public class PeerTest {
public class PeerTest extends TestWithNetworkConnections {
private Peer peer;
private IMocksControl control;
private NetworkConnection conn;
private NetworkParameters unitTestParams;
private MemoryBlockStore blockStore;
private BlockChain blockChain;
private MockNetworkConnection conn;
@Override
@Before
public void setUp() throws Exception {
control = createStrictControl();
control.checkOrder(true);
unitTestParams = NetworkParameters.unitTests();
blockStore = new MemoryBlockStore(unitTestParams);
blockChain = new BlockChain(unitTestParams, new Wallet(unitTestParams), blockStore);
PeerAddress address = new PeerAddress(InetAddress.getLocalHost());
super.setUp();
conn = createMockBuilder(NetworkConnection.class)
.addMockedMethod("getVersionMessage")
.addMockedMethod("readMessage")
.addMockedMethod("writeMessage")
.addMockedMethod("shutdown")
.addMockedMethod("toString")
.createMock(control);
peer = new Peer(unitTestParams, address, blockChain);
peer.setConnection(conn);
conn = createMockNetworkConnection();
peer = new Peer(unitTestParams, blockChain, conn);
}
@Test
@ -72,32 +49,17 @@ public class PeerTest {
assertFalse(peer.removeEventListener(listener));
}
// Check that the connection is shut down if there's a read error and the exception is propagated.
// Check that the connection is shut down if there's a read error and that the exception is propagated.
@Test
public void testRun_exception() throws Exception {
expect(conn.readMessage()).andThrow(new IOException("done"));
conn.shutdown();
expectLastCall();
control.replay();
conn.exceptionOnRead(new IOException("done"));
try {
peer.run();
fail("did not throw");
} catch (PeerException e) {
// expected
assertTrue(e.getCause() instanceof IOException);
}
control.verify();
control.reset();
expect(conn.readMessage()).andThrow(new ProtocolException("proto"));
conn.shutdown();
expectLastCall();
control.replay();
conn.exceptionOnRead(new ProtocolException("proto"));
try {
peer.run();
fail("did not throw");
@ -105,228 +67,161 @@ public class PeerTest {
// expected
assertTrue(e.toString(), e.getCause() instanceof ProtocolException);
}
control.verify();
}
// Check that it runs through the event loop and shut down correctly
@Test
public void testRun_normal() throws Exception {
runPeerAndVerify();
public void shutdown() throws Exception {
runPeer(peer, conn);
}
// Check that when we receive a block that does not connect to our chain, we send a
// getblocks to fetch the intermediates.
// Check that when we receive a block that does not connect to our chain, we send a getblocks to fetch
// the intermediates.
@Test
public void testRun_unconnected_block() throws Exception {
PeerEventListener listener = control.createMock(PeerEventListener.class);
peer.addEventListener(listener);
public void unconnectedBlock() throws Exception {
Block b1 = TestUtils.createFakeBlock(unitTestParams, blockStore).block;
blockChain.add(b1);
Block prev = TestUtils.makeSolvedTestBlock(unitTestParams, blockStore);
final Block block = TestUtils.makeSolvedTestBlock(unitTestParams, prev);
expect(conn.readMessage()).andReturn(block);
Capture<GetBlocksMessage> message = captureGetBlocksMessage();
runPeerAndVerify();
Block b2 = TestUtils.makeSolvedTestBlock(unitTestParams, b1);
Block b3 = TestUtils.makeSolvedTestBlock(unitTestParams, b2);
conn.inbound(b3);
runPeer(peer, conn);
GetBlocksMessage getblocks = (GetBlocksMessage) conn.popOutbound();
List<Sha256Hash> expectedLocator = new ArrayList<Sha256Hash>();
expectedLocator.add(b1.getHash());
expectedLocator.add(b1.getPrevBlockHash());
expectedLocator.add(unitTestParams.genesisBlock.getHash());
assertEquals(message.getValue().getLocator(), expectedLocator);
assertEquals(message.getValue().getStopHash(), block.getHash());
assertEquals(getblocks.getLocator(), expectedLocator);
assertEquals(getblocks.getStopHash(), b3.getHash());
}
// Check that an inventory tickle is processed correctly
// Check that an inventory tickle is processed correctly when downloading missing blocks is active.
@Test
public void testRun_inv_tickle() throws Exception {
PeerEventListener listener = control.createMock(PeerEventListener.class);
peer.addEventListener(listener);
public void invTickle() throws Exception {
Block b1 = TestUtils.createFakeBlock(unitTestParams, blockStore).block;
blockChain.add(b1);
Block prev = TestUtils.makeSolvedTestBlock(unitTestParams, blockStore);
final Block block = TestUtils.makeSolvedTestBlock(unitTestParams, prev);
expect(conn.readMessage()).andReturn(block);
conn.writeMessage(anyObject(Message.class));
expectLastCall();
InventoryMessage inv = new InventoryMessage(unitTestParams);
InventoryItem item = new InventoryItem(InventoryItem.Type.Block, block.getHash());
inv.addItem(item);
expect(conn.readMessage()).andReturn(inv);
Capture<GetBlocksMessage> message = captureGetBlocksMessage();
runPeerAndVerify();
List<Sha256Hash> expectedLocator = new ArrayList<Sha256Hash>();
expectedLocator.add(b1.getHash());
expectedLocator.add(b1.getPrevBlockHash());
expectedLocator.add(unitTestParams.genesisBlock.getHash());
assertEquals(message.getValue().getLocator(), expectedLocator);
assertEquals(message.getValue().getStopHash(), block.getHash());
}
// Check that inventory message containing a block is processed correctly
@Test
public void testRun_inv_block() throws Exception {
PeerEventListener listener = control.createMock(PeerEventListener.class);
peer.addEventListener(listener);
Block b1 = TestUtils.createFakeBlock(unitTestParams, blockStore).block;
blockChain.add(b1);
Block prev = TestUtils.makeSolvedTestBlock(unitTestParams, blockStore);
final Block b2 = TestUtils.makeSolvedTestBlock(unitTestParams, prev);
final Block b3 = TestUtils.makeSolvedTestBlock(unitTestParams, b2);
expect(conn.readMessage()).andReturn(b2);
conn.writeMessage(anyObject(Message.class));
expectLastCall();
// Make a missing block.
Block b2 = TestUtils.makeSolvedTestBlock(unitTestParams, b1);
Block b3 = TestUtils.makeSolvedTestBlock(unitTestParams, b2);
conn.inbound(b3);
InventoryMessage inv = new InventoryMessage(unitTestParams);
InventoryItem item = new InventoryItem(InventoryItem.Type.Block, b3.getHash());
inv.addItem(item);
expect(conn.readMessage()).andReturn(inv);
Capture<GetDataMessage> message = captureGetDataMessage();
runPeerAndVerify();
List<InventoryItem> items = message.getValue().getItems();
assertEquals(1, items.size());
assertEquals(b3.getHash(), items.get(0).hash);
assertEquals(InventoryItem.Type.Block, items.get(0).type);
}
// Check that it starts downloading the block chain correctly
@Test
public void testStartBlockChainDownload() throws Exception {
PeerEventListener listener = control.createMock(PeerEventListener.class);
peer.addEventListener(listener);
Block b1 = TestUtils.createFakeBlock(unitTestParams, blockStore).block;
blockChain.add(b1);
expect(conn.getVersionMessage()).andStubReturn(new VersionMessage(unitTestParams, 100));
listener.onChainDownloadStarted(peer, 99);
expectLastCall();
Capture<GetBlocksMessage> message = captureGetBlocksMessage();
control.replay();
peer.startBlockChainDownload();
control.verify();
conn.inbound(inv);
runPeer(peer, conn);
GetBlocksMessage getblocks = (GetBlocksMessage) conn.popOutbound();
List<Sha256Hash> expectedLocator = new ArrayList<Sha256Hash>();
expectedLocator.add(b1.getHash());
expectedLocator.add(b1.getPrevBlockHash());
expectedLocator.add(unitTestParams.genesisBlock.getHash());
assertEquals(message.getValue().getLocator(), expectedLocator);
assertEquals(message.getValue().getStopHash(), Sha256Hash.ZERO_HASH);
assertEquals(getblocks.getLocator(), expectedLocator);
assertEquals(getblocks.getStopHash(), b3.getHash());
}
// Check that an inv to a peer that is not set to download missing blocks does nothing.
@Test
public void testGetBlock() throws Exception {
public void invNoDownload() throws Exception {
// Don't download missing blocks.
peer.setDownloadData(false);
// Make a missing block that we receive.
Block b1 = TestUtils.createFakeBlock(unitTestParams, blockStore).block;
blockChain.add(b1);
Block b2 = TestUtils.makeSolvedTestBlock(unitTestParams, b1);
// Receive an inv.
InventoryMessage inv = new InventoryMessage(unitTestParams);
InventoryItem item = new InventoryItem(InventoryItem.Type.Block, b2.getHash());
inv.addItem(item);
conn.inbound(inv);
// Peer does nothing with it.
runPeer(peer, conn);
Message message = conn.popOutbound();
assertNull(message != null ? message.toString() : "", message);
}
// Check that inventory message containing blocks we want is processed correctly.
@Test
public void newBlock() throws Exception {
PeerEventListener listener = control.createMock(PeerEventListener.class);
peer.addEventListener(listener);
Block b1 = TestUtils.createFakeBlock(unitTestParams, blockStore).block;
blockChain.add(b1);
Block b2 = TestUtils.makeSolvedTestBlock(unitTestParams, b1);
Block b3 = TestUtils.makeSolvedTestBlock(unitTestParams, b2);
conn.setVersionMessageForHeight(unitTestParams, 100);
// Receive notification of a new block.
InventoryMessage inv = new InventoryMessage(unitTestParams);
InventoryItem item = new InventoryItem(InventoryItem.Type.Block, b2.getHash());
inv.addItem(item);
conn.inbound(inv);
// Response to the getdata message.
conn.inbound(b2);
Block prev = TestUtils.makeSolvedTestBlock(unitTestParams, blockStore);
final Block b2 = TestUtils.makeSolvedTestBlock(unitTestParams, prev);
expect(conn.getVersionMessage()).andStubReturn(new VersionMessage(unitTestParams, 100));
Capture<GetDataMessage> message = captureGetDataMessage();
expect(conn.readMessage()).andReturn(b2);
expectPeerDisconnect();
listener.onBlocksDownloaded(eq(peer), anyObject(Block.class), eq(98));
expectLastCall();
control.replay();
Future<Block> resultFuture = peer.getBlock(b2.getHash());
peer.run();
assertEquals(b2.getHash(), resultFuture.get().getHash());
runPeer(peer, conn);
control.verify();
List<Sha256Hash> expectedLocator = new ArrayList<Sha256Hash>();
expectedLocator.add(b1.getHash());
expectedLocator.add(unitTestParams.genesisBlock.getHash());
List<InventoryItem> items = message.getValue().getItems();
GetDataMessage getdata = (GetDataMessage) conn.popOutbound();
List<InventoryItem> items = getdata.getItems();
assertEquals(1, items.size());
assertEquals(b2.getHash(), items.get(0).hash);
assertEquals(InventoryItem.Type.Block, items.get(0).type);
}
// Check that the next block on the chain is processed correctly and that the listener is notified
// Check that it starts downloading the block chain correctly on request.
@Test
public void testRun_new_block() throws Exception {
public void startBlockChainDownload() throws Exception {
PeerEventListener listener = control.createMock(PeerEventListener.class);
peer.addEventListener(listener);
expect(conn.readMessage()).andReturn(TestUtils.makeSolvedTestBlock(unitTestParams, blockStore));
expect(conn.getVersionMessage()).andReturn(new VersionMessage(unitTestParams, 100));
listener.onBlocksDownloaded(eq(peer), anyObject(Block.class), eq(99));
expectLastCall();
runPeerAndVerify();
}
Block b1 = TestUtils.createFakeBlock(unitTestParams, blockStore).block;
blockChain.add(b1);
Block b2 = TestUtils.makeSolvedTestBlock(unitTestParams, b1);
blockChain.add(b2);
conn.setVersionMessageForHeight(unitTestParams, 100);
private Capture<GetBlocksMessage> captureGetBlocksMessage() throws IOException {
Capture<GetBlocksMessage> message = new Capture<GetBlocksMessage>();
conn.writeMessage(capture(message));
listener.onChainDownloadStarted(peer, 98);
expectLastCall();
return message;
}
private Capture<GetDataMessage> captureGetDataMessage() throws IOException {
Capture<GetDataMessage> message = new Capture<GetDataMessage>();
conn.writeMessage(capture(message));
expectLastCall();
return message;
}
// Stage a disconnect, replay the mocks, run and verify
private void runPeerAndVerify() throws IOException, ProtocolException, PeerException {
expectPeerDisconnect();
control.replay();
peer.run();
peer.startBlockChainDownload();
runPeer(peer, conn);
control.verify();
List<Sha256Hash> expectedLocator = new ArrayList<Sha256Hash>();
expectedLocator.add(b2.getHash());
expectedLocator.add(b1.getHash());
expectedLocator.add(unitTestParams.genesisBlock.getHash());
GetBlocksMessage message = (GetBlocksMessage) conn.popOutbound();
assertEquals(message.getLocator(), expectedLocator);
assertEquals(message.getStopHash(), Sha256Hash.ZERO_HASH);
}
// Step the peer through a disconnection event
private void expectPeerDisconnect() throws IOException, ProtocolException {
expect(conn.readMessage()).andAnswer(new IAnswer<Message>() {
public Message answer() throws Throwable {
peer.disconnect();
throw new IOException("done");
}
});
conn.shutdown();
expectLastCall().times(2);
@Test
public void getBlock() throws Exception {
Block b1 = TestUtils.createFakeBlock(unitTestParams, blockStore).block;
blockChain.add(b1);
Block b2 = TestUtils.makeSolvedTestBlock(unitTestParams, b1);
Block b3 = TestUtils.makeSolvedTestBlock(unitTestParams, b2);
conn.setVersionMessageForHeight(unitTestParams, 100);
runPeerAsync(peer, conn);
// Request the block.
Future<Block> resultFuture = peer.getBlock(b3.getHash());
assertFalse(resultFuture.isDone());
// Peer asks for it.
GetDataMessage message = (GetDataMessage) conn.outbound();
assertEquals(message.getItems().get(0).hash, b3.getHash());
assertFalse(resultFuture.isDone());
// Peer receives it.
conn.inbound(b3);
Block b = resultFuture.get();
assertEquals(b, b3);
conn.disconnect();
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.bitcoin.core;
import com.google.bitcoin.store.MemoryBlockStore;
import org.easymock.IMocksControl;
import java.io.IOException;
import static org.easymock.EasyMock.*;
/**
* Utility class that makes it easy to work with mock NetworkConnections.
*/
public class TestWithNetworkConnections {
protected IMocksControl control;
protected NetworkParameters unitTestParams;
protected MemoryBlockStore blockStore;
protected BlockChain blockChain;
public void setUp() throws Exception {
control = createStrictControl();
control.checkOrder(true);
unitTestParams = NetworkParameters.unitTests();
blockStore = new MemoryBlockStore(unitTestParams);
blockChain = new BlockChain(unitTestParams, new Wallet(unitTestParams), blockStore);
}
protected MockNetworkConnection createMockNetworkConnection() {
return new MockNetworkConnection();
}
protected void runPeer(Peer peer, MockNetworkConnection connection) throws IOException, PeerException {
connection.disconnect();
try {
peer.run();
} catch (PeerException e) {
if (!e.getCause().getMessage().equals("done"))
throw e;
}
}
protected void runPeerAsync(final Peer peer, MockNetworkConnection connection) throws IOException, PeerException {
new Thread("Test Peer Thread") {
@Override
public void run() {
try {
peer.run();
} catch (PeerException e) {
if (!e.getCause().getMessage().equals("done")) throw new RuntimeException(e);
}
}
}.start();
}
}