Provide a sendCoinsAsync that returns immediately instead of waiting for the tx to be broadcast. Change PeerGroup.broadcastTransaction to return a Future<Transaction> and only consider the tx to be broadcast once it was written to at least one peer directly (not waiting for a response to an inv). Re-plumb the peer group thread to poll for tasks when there are active peers.

This commit is contained in:
Mike Hearn 2012-01-16 19:16:16 +01:00
parent 3fa5c89b3e
commit c7fd805e47
3 changed files with 113 additions and 69 deletions

View file

@ -17,32 +17,19 @@
package com.google.bitcoin.core;
import com.google.bitcoin.discovery.PeerDiscovery;
import com.google.bitcoin.discovery.PeerDiscoveryException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.bitcoin.discovery.PeerDiscovery;
import com.google.bitcoin.discovery.PeerDiscoveryException;
/**
* Maintain a number of connections to peers.
* <p/>
@ -69,7 +56,7 @@ public class PeerGroup {
// Addresses to try to connect to, excluding active peers
private BlockingQueue<PeerAddress> inactives;
// Connection initiation thread
private Thread connectThread;
private PeerGroupThread peerGroupThread;
// True if the connection initiation thread should be running
private boolean running;
// A pool of threads for peers, of size maxConnection
@ -115,6 +102,7 @@ public class PeerGroup {
this.wallets = new ArrayList<Wallet>(1);
inactives = new LinkedBlockingQueue<PeerAddress>();
// TODO: Remove usage of synchronized sets here in favor of simple coarse-grained locking.
peers = Collections.synchronizedSet(new HashSet<Peer>());
peerDiscoverers = Collections.synchronizedSet(new HashSet<PeerDiscovery>());
peerPool = new ThreadPoolExecutor(
@ -262,9 +250,9 @@ public class PeerGroup {
* Starts the background thread that makes connections.
*/
public synchronized void start() {
this.connectThread = new Thread(new PeerExecutionRunnable(), "Peer group thread");
this.peerGroupThread = new PeerGroupThread();
running = true;
this.connectThread.start();
this.peerGroupThread.start();
}
/**
@ -276,28 +264,34 @@ public class PeerGroup {
public synchronized void stop() {
if (running) {
running = false;
connectThread.interrupt();
peerGroupThread.interrupt();
}
}
/**
* Broadcast a transaction to all connected peers.
* Queues a transaction for asynchronous broadcast. The transaction will be considered broadcast and forgotten
* about (by the PeerGroup) once it's been written out to at least one node, but that does not guarantee inclusion
* in the chain - incorrect fees or a flaky remote node can cause this as well. Wallets attached with
* {@link PeerGroup#addWallet(Wallet)} will have their pending transactions announced to every newly connected
* node.
*
* @return whether we sent to at least one peer
* @return a Future that can be used to wait for the async broadcast to complete.
*/
public boolean broadcastTransaction(Transaction tx) {
boolean success = false;
synchronized (peers) {
for (Peer peer : peers) {
try {
peer.sendMessage(tx);
success = true;
} catch (IOException e) {
log.error("failed to broadcast to " + peer, e);
public Future<Transaction> broadcastTransaction(final Transaction tx) {
FutureTask<Transaction> future = new FutureTask<Transaction>(new Runnable() {
public void run() {
// This is run with the peer group already locked.
for (Peer peer : peers) {
try {
peer.sendMessage(tx);
} catch (IOException e) {
log.warn("Caught IOException whilst sending transaction: {}", e.getMessage());
}
}
}
}
return success;
}, tx);
peerGroupThread.addTask(future);
return future;
}
/**
@ -330,25 +324,48 @@ public class PeerGroup {
return peers.size();
}
private final class PeerExecutionRunnable implements Runnable {
/*
* Repeatedly get the next peer address from the inactive queue and try to connect.
*
* <p>We can be terminated with Thread.interrupt. When an interrupt is received,
* we will ask the executor to shutdown and ask each peer to disconnect. At that point
* no threads or network connections will be active.
*/
/**
* Performs various tasks for the peer group: connects to new nodes to keep the currently connected node count at
* the right level, runs peer discovery if we run out, and broadcasts transactions that were submitted via
* broadcastTransaction().
*/
private final class PeerGroupThread extends Thread {
private LinkedBlockingQueue<FutureTask> tasks;
public PeerGroupThread() {
super("Peer group thread");
tasks = new LinkedBlockingQueue<FutureTask>();
}
public void run() {
try {
while (running) {
if (inactives.size() == 0) {
discoverPeers();
} else {
tryNextPeer();
// Modify the peer group under its lock, always.
int numPeers;
synchronized (PeerGroup.this) {
numPeers = peers.size();
if (inactives.size() == 0) {
discoverPeers();
} else if (numPeers < getMaxConnections()) {
tryNextPeer();
}
}
// We started a new peer connection, delay before trying another one
Thread.sleep(connectionDelayMillis);
// Wait for a task or the connection polling timeout to elapse. Tasks are only eligible to run
// when there is at least one active peer.
// TODO: Remove the need for this polling, only wake up the peer group thread when there's actually
// something useful to do.
if (numPeers > 0) {
FutureTask task = tasks.poll(connectionDelayMillis, TimeUnit.MILLISECONDS);
if (task != null) {
synchronized (PeerGroup.this) {
task.run();
}
}
} else {
// TODO: This should actually be waiting for a peer to become active OR the timeout to elapse.
Thread.sleep(connectionDelayMillis);
}
}
} catch (InterruptedException ex) {
}
@ -407,6 +424,14 @@ public class PeerGroup {
Thread.sleep(connectionDelayMillis);
}
}
/**
* Add a task to be executed on the peer thread. Tasks are run with the peer group locked and when there is
* at least one peer.
*/
public synchronized <T> void addTask(FutureTask<T> task) {
tasks.add(task);
}
}
private enum ExecuteBlockMode {

View file

@ -23,6 +23,8 @@ import org.slf4j.LoggerFactory;
import java.io.*;
import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static com.google.bitcoin.core.Utils.bitcoinValueToFriendlyString;
@ -856,22 +858,43 @@ public class Wallet implements Serializable {
/**
* Sends coins to the given address, via the given {@link PeerGroup}. Change is returned to the first key in the
* wallet. If there is an exception thrown whilst sending to the peer group, the transaction will still be committed
* and the PeerGroup will try and rebroadcast it when it can.
* wallet. The transaction will be announced to any connected nodes asynchronously. If you would like to know when
* the transaction was successfully sent to at least one node, use
* {@link Wallet#sendCoinsOffline(Address, java.math.BigInteger)} and then {@link PeerGroup#broadcastTransaction(Transaction)}
* on the result to obtain a {@link Future<Transaction>}.
*
* @param peerGroup a PeerGroup to use for broadcast or null.
* @param to Which address to send coins to.
* @param nanocoins How many nanocoins to send. You can use Utils.toNanoCoins() to calculate this.
* @return the Transaction
* @throws IOException if there was a problem broadcasting the transaction
*/
public synchronized Transaction sendCoinsAsync(PeerGroup peerGroup, Address to, BigInteger nanocoins) throws IOException {
Transaction tx = sendCoinsOffline(to, nanocoins);
// Just throw away the Future here. If the user wants it, they can call sendCoinsOffline/broadcastTransaction
// themselves.
peerGroup.broadcastTransaction(tx);
return tx;
}
/**
* Sends coins to the given address, via the given {@link PeerGroup}. Change is returned to the first key in the
* wallet. The method will block until the transaction has been announced to at least one node.
*
* @param peerGroup a PeerGroup to use for broadcast or null.
* @param to Which address to send coins to.
* @param nanocoins How many nanocoins to send. You can use Utils.toNanoCoins() to calculate this.
* @return The {@link Transaction} that was created or null if there was insufficient balance to send the coins.
* @throws IOException if there was a problem broadcasting the transaction
*/
public synchronized Transaction sendCoins(PeerGroup peerGroup, Address to, BigInteger nanocoins) throws IOException {
public synchronized Transaction sendCoins(PeerGroup peerGroup, Address to, BigInteger nanocoins) {
Transaction tx = sendCoinsOffline(to, nanocoins);
if (!peerGroup.broadcastTransaction(tx)) {
throw new IOException("Failed to broadcast tx to all connected peers");
try {
return peerGroup.broadcastTransaction(tx).get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
return tx;
}
/**

View file

@ -16,10 +16,10 @@
package com.google.bitcoin.core;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import com.google.bitcoin.discovery.PeerDiscovery;
import com.google.bitcoin.discovery.PeerDiscoveryException;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.math.BigInteger;
@ -30,11 +30,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import org.junit.Before;
import org.junit.Test;
import com.google.bitcoin.discovery.PeerDiscovery;
import com.google.bitcoin.discovery.PeerDiscoveryException;
import static org.junit.Assert.*;
public class PeerGroupTest extends TestWithNetworkConnections {
static final NetworkParameters params = NetworkParameters.unitTests();
@ -263,7 +259,7 @@ public class PeerGroupTest extends TestWithNetworkConnections {
// Now create a spend, and expect the announcement.
Address dest = new ECKey().toAddress(params);
wallet.sendCoins(peerGroup, dest, Utils.toNanoCoins(1, 0));
assertNotNull(wallet.sendCoins(peerGroup, dest, Utils.toNanoCoins(1, 0)));
Transaction t1 = (Transaction) n1.outbound();
assertNotNull(t1);
// 49 BTC in change.