Make block chain listeners run in given executors and default to the user thread.

This commit is contained in:
Mike Hearn 2013-07-01 11:42:42 +02:00
parent 2537ff47b5
commit 35a7f38d86
2 changed files with 102 additions and 63 deletions

View File

@ -18,6 +18,7 @@ package com.google.bitcoin.core;
import com.google.bitcoin.store.BlockStore;
import com.google.bitcoin.store.BlockStoreException;
import com.google.bitcoin.utils.ListenerRegistration;
import com.google.bitcoin.utils.Threading;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
@ -28,6 +29,7 @@ import org.slf4j.LoggerFactory;
import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
import static com.google.common.base.Preconditions.*;
@ -100,7 +102,7 @@ public abstract class AbstractBlockChain {
private final Object chainHeadLock = new Object();
protected final NetworkParameters params;
private final CopyOnWriteArrayList<BlockChainListener> listeners;
private final CopyOnWriteArrayList<ListenerRegistration<BlockChainListener>> listeners;
// Holds a block header and, optionally, a list of tx hashes or block's transactions
protected static class OrphanBlock {
@ -128,7 +130,8 @@ public abstract class AbstractBlockChain {
chainHead = blockStore.getChainHead();
log.info("chain head is at height {}:\n{}", chainHead.getHeight(), chainHead.getHeader());
this.params = params;
this.listeners = new CopyOnWriteArrayList<BlockChainListener>(listeners);
this.listeners = new CopyOnWriteArrayList<ListenerRegistration<BlockChainListener>>();
for (BlockChainListener l : listeners) addListener(l, Threading.SAME_THREAD);
}
/**
@ -138,14 +141,21 @@ public abstract class AbstractBlockChain {
* wallets is not well tested!
*/
public void addWallet(Wallet wallet) {
listeners.add(wallet);
addListener(wallet, Threading.SAME_THREAD);
}
/**
* Adds a generic {@link BlockChainListener} listener to the chain.
*/
public void addListener(BlockChainListener listener) {
listeners.add(listener);
addListener(listener, Threading.SAME_THREAD);
}
/**
* Adds a generic {@link BlockChainListener} listener to the chain.
*/
public void addListener(BlockChainListener listener, Executor executor) {
listeners.add(new ListenerRegistration<BlockChainListener>(listener, executor));
}
/**
@ -381,8 +391,8 @@ public abstract class AbstractBlockChain {
// expensiveChecks enables checks that require looking at blocks further back in the chain
// than the previous one when connecting (eg median timestamp check)
// It could be exposed, but for now we just set it to shouldVerifyTransactions()
private void connectBlock(Block block, StoredBlock storedPrev, boolean expensiveChecks,
Set<Sha256Hash> filteredTxHashList, List<Transaction> filteredTxn)
private void connectBlock(final Block block, StoredBlock storedPrev, boolean expensiveChecks,
Set<Sha256Hash> filteredTxHashList, final List<Transaction> filteredTxn)
throws BlockStoreException, VerificationException, PrunedException {
checkState(lock.isLocked());
// Check that we aren't connecting a block that fails a checkpoint check
@ -406,28 +416,7 @@ public abstract class AbstractBlockChain {
block.transactions == null ? block : block.cloneAsHeader(), txOutChanges);
setChainHead(newStoredBlock);
log.debug("Chain is now {} blocks high", newStoredBlock.getHeight());
// Notify the listeners of the new block, so the depth and workDone of stored transactions can be updated
// (in the case of the listener being a wallet). Wallets need to know how deep each transaction is so
// coinbases aren't used before maturity.
boolean first = true;
for (BlockChainListener listener : listeners) {
if (block.transactions != null || filteredTxn != null) {
// If this is not the first wallet, ask for the transactions to be duplicated before being given
// to the wallet when relevant. This ensures that if we have two connected wallets and a tx that
// is relevant to both of them, they don't end up accidentally sharing the same object (which can
// result in temporary in-memory corruption during re-orgs). See bug 257. We only duplicate in
// the case of multiple wallets to avoid an unnecessary efficiency hit in the common case.
sendTransactionsToListener(newStoredBlock, NewBlockType.BEST_CHAIN, listener,
block.transactions != null ? block.transactions : filteredTxn, !first);
}
if (filteredTxHashList != null) {
for (Sha256Hash hash : filteredTxHashList) {
listener.notifyTransactionIsInBlock(hash, newStoredBlock, NewBlockType.BEST_CHAIN);
}
}
listener.notifyNewBestBlock(newStoredBlock);
first = false;
}
informListenersForNewBlock(block, NewBlockType.BEST_CHAIN, filteredTxHashList, filteredTxn, newStoredBlock);
} else {
// This block connects to somewhere other than the top of the best known chain. We treat these differently.
//
@ -466,33 +455,72 @@ public abstract class AbstractBlockChain {
// If we do, send them to the wallet but state that they are on a side chain so it knows not to try and
// spend them until they become activated.
if (block.transactions != null || filteredTxn != null) {
boolean first = true;
for (BlockChainListener listener : listeners) {
List<Transaction> txnToNotify;
if (block.transactions != null)
txnToNotify = block.transactions;
else
txnToNotify = filteredTxn;
// If this is not the first wallet, ask for the transactions to be duplicated before being given
// to the wallet when relevant. This ensures that if we have two connected wallets and a tx that
// is relevant to both of them, they don't end up accidentally sharing the same object (which can
// result in temporary in-memory corruption during re-orgs). See bug 257. We only duplicate in
// the case of multiple wallets to avoid an unnecessary efficiency hit in the common case.
sendTransactionsToListener(newBlock, NewBlockType.SIDE_CHAIN, listener, txnToNotify, !first);
if (filteredTxHashList != null) {
for (Sha256Hash hash : filteredTxHashList) {
listener.notifyTransactionIsInBlock(hash, newBlock, NewBlockType.SIDE_CHAIN);
}
}
first = false;
}
informListenersForNewBlock(block, NewBlockType.SIDE_CHAIN, filteredTxHashList, filteredTxn, newBlock);
}
if (haveNewBestChain)
handleNewBestChain(storedPrev, newBlock, block, expensiveChecks);
}
}
private void informListenersForNewBlock(final Block block, final NewBlockType newBlockType,
final Set<Sha256Hash> filteredTxHashList,
final List<Transaction> filteredTxn, final StoredBlock newStoredBlock) throws VerificationException {
// Notify the listeners of the new block, so the depth and workDone of stored transactions can be updated
// (in the case of the listener being a wallet). Wallets need to know how deep each transaction is so
// coinbases aren't used before maturity.
boolean first = true;
for (final ListenerRegistration<BlockChainListener> registration : listeners) {
if (registration.executor == Threading.SAME_THREAD) {
informListenerForNewTransactions(block, newBlockType, filteredTxHashList, filteredTxn,
newStoredBlock, first, registration.listener);
if (newBlockType == NewBlockType.BEST_CHAIN)
registration.listener.notifyNewBestBlock(newStoredBlock);
} else {
// Listener wants to be run on some other thread, so marshal it across here.
final boolean notFirst = !first;
registration.executor.execute(new Runnable() {
@Override
public void run() {
try {
informListenerForNewTransactions(block, newBlockType, filteredTxHashList, filteredTxn,
newStoredBlock, notFirst, registration.listener);
if (newBlockType == NewBlockType.BEST_CHAIN)
registration.listener.notifyNewBestBlock(newStoredBlock);
} catch (VerificationException e) {
log.error("Block chain listener threw exception: ", e);
// Don't attempt to relay this back to the original peer thread if this was an async
// listener invocation.
// TODO: Make exception reporting a global feature and use it here.
}
}
});
}
first = false;
}
}
private static void informListenerForNewTransactions(Block block, NewBlockType newBlockType,
Set<Sha256Hash> filteredTxHashList,
List<Transaction> filteredTxn,
StoredBlock newStoredBlock, boolean first,
BlockChainListener listener) throws VerificationException {
if (block.transactions != null || filteredTxn != null) {
// If this is not the first wallet, ask for the transactions to be duplicated before being given
// to the wallet when relevant. This ensures that if we have two connected wallets and a tx that
// is relevant to both of them, they don't end up accidentally sharing the same object (which can
// result in temporary in-memory corruption during re-orgs). See bug 257. We only duplicate in
// the case of multiple wallets to avoid an unnecessary efficiency hit in the common case.
sendTransactionsToListener(newStoredBlock, newBlockType, listener,
block.transactions != null ? block.transactions : filteredTxn, !first);
}
if (filteredTxHashList != null) {
for (Sha256Hash hash : filteredTxHashList) {
listener.notifyTransactionIsInBlock(hash, newStoredBlock, newBlockType);
}
}
}
/**
* Gets the median timestamp of the last 11 blocks
*/
@ -530,14 +558,14 @@ public abstract class AbstractBlockChain {
// Firstly, calculate the block at which the chain diverged. We only need to examine the
// chain from beyond this block to find differences.
StoredBlock head = getChainHead();
StoredBlock splitPoint = findSplit(newChainHead, head, blockStore);
final StoredBlock splitPoint = findSplit(newChainHead, head, blockStore);
log.info("Re-organize after split at height {}", splitPoint.getHeight());
log.info("Old chain head: {}", head.getHeader().getHashAsString());
log.info("New chain head: {}", newChainHead.getHeader().getHashAsString());
log.info("Split at block: {}", splitPoint.getHeader().getHashAsString());
// Then build a list of all blocks in the old part of the chain and the new part.
LinkedList<StoredBlock> oldBlocks = getPartialChain(head, splitPoint, blockStore);
LinkedList<StoredBlock> newBlocks = getPartialChain(newChainHead, splitPoint, blockStore);
final LinkedList<StoredBlock> oldBlocks = getPartialChain(head, splitPoint, blockStore);
final LinkedList<StoredBlock> newBlocks = getPartialChain(newChainHead, splitPoint, blockStore);
// Disconnect each transaction in the previous main chain that is no longer in the new main chain
StoredBlock storedNewHead = splitPoint;
if (shouldVerifyTransactions()) {
@ -572,13 +600,22 @@ public abstract class AbstractBlockChain {
// Now inform the listeners. This is necessary so the set of currently active transactions (that we can spend)
// can be updated to take into account the re-organize. We might also have received new coins we didn't have
// before and our previous spends might have been undone.
for (int i = 0; i < listeners.size(); i++) {
BlockChainListener listener = listeners.get(i);
listener.reorganize(splitPoint, oldBlocks, newBlocks);
if (i == listeners.size()) {
break; // Listener removed itself and it was the last one.
} else if (listeners.get(i) != listener) {
i--; // Listener removed itself and it was not the last one.
for (final ListenerRegistration<BlockChainListener> registration : listeners) {
if (registration.executor == Threading.SAME_THREAD) {
// Short circuit the executor so we can propagate any exceptions.
// TODO: Do we really need to do this or should it be irrelevant?
registration.listener.reorganize(splitPoint, oldBlocks, newBlocks);
} else {
registration.executor.execute(new Runnable() {
@Override
public void run() {
try {
registration.listener.reorganize(splitPoint, oldBlocks, newBlocks);
} catch (VerificationException e) {
log.error("Block chain listener threw exception during reorg", e);
}
}
});
}
}
// Update the pointer to the best known block.
@ -812,8 +849,9 @@ public abstract class AbstractBlockChain {
// Does not need to be locked.
for (Transaction tx : block.transactions) {
try {
for (BlockChainListener listener : listeners) {
if (listener.isTransactionRelevant(tx)) return true;
for (final ListenerRegistration<BlockChainListener> registration : listeners) {
if (registration.executor != Threading.SAME_THREAD) continue;
if (registration.listener.isTransactionRelevant(tx)) return true;
}
} catch (ScriptException e) {
// We don't want scripts we don't understand to break the block chain so just note that this tx was
@ -897,7 +935,7 @@ public abstract class AbstractBlockChain {
result.set(block);
}
}
});
}, Threading.SAME_THREAD);
return result;
}
}

View File

@ -5,6 +5,7 @@ import com.google.bitcoin.params.MainNetParams;
import com.google.bitcoin.store.BlockStore;
import com.google.bitcoin.store.MemoryBlockStore;
import com.google.bitcoin.utils.BriefLogFormatter;
import com.google.bitcoin.utils.Threading;
import java.io.DataOutputStream;
import java.io.FileInputStream;
@ -51,7 +52,7 @@ public class BuildCheckpoints {
checkpoints.put(height, block);
}
}
});
}, Threading.SAME_THREAD);
peerGroup.startAndWait();
peerGroup.downloadBlockChain();