diff --git a/core/src/main/java/com/google/bitcoin/core/Peer.java b/core/src/main/java/com/google/bitcoin/core/Peer.java index 621b7cd1f..af6ee4536 100644 --- a/core/src/main/java/com/google/bitcoin/core/Peer.java +++ b/core/src/main/java/com/google/bitcoin/core/Peer.java @@ -88,6 +88,15 @@ public class Peer { private boolean useFilteredBlocks = false; // The last filtered block we received, we're waiting to fill it out with transactions. private FilteredBlock currentFilteredBlock = null; + // The current Bloom filter set on the connection, used to tell the remote peer what transactions to send us. + private BloomFilter bloomFilter; + // How many filtered blocks have been received during the lifetime of this connection. Used to decide when to + // refresh the server-side side filter by sending a new one (it degrades over time as false positives are added + // on the remote side, see BIP 37 for a discussion of this). + private int filteredBlocksReceived; + // How frequently to refresh the filter. This should become dynamic in future and calculated depending on the + // actual false positive rate. For now a good value was determined empirically around January 2013. + private static final int RESEND_BLOOM_FILTER_BLOCK_COUNT = 25000; // Keeps track of things we requested internally with getdata but didn't receive yet, so we can avoid re-requests. // It's not quite the same as getDataFutures, as this is used only for getdatas done as part of downloading // the chain and so is lighter weight (we just keep a bunch of hashes not futures). @@ -253,7 +262,7 @@ public class Peer { lock.lock(); try { if (currentFilteredBlock != null && !(m instanceof Transaction)) { - processFilteredBlock(currentFilteredBlock); + endFilteredBlock(currentFilteredBlock); currentFilteredBlock = null; } } finally { @@ -269,16 +278,7 @@ public class Peer { } else if (m instanceof Block) { processBlock((Block) m); } else if (m instanceof FilteredBlock) { - // Filtered blocks come before the data that they refer to, so stash it here and then fill it out as - // messages stream in. We'll call processFilteredBlock when a non-tx message arrives (eg, another - // FilteredBlock) or when a tx that isn't needed by that block is found. A ping message is sent after - // a getblocks, to force the non-tx message path. - lock.lock(); - try { - currentFilteredBlock = (FilteredBlock) m; - } finally { - lock.unlock(); - } + startFilteredBlock((FilteredBlock) m); } else if (m instanceof Transaction) { processTransaction((Transaction) m); } else if (m instanceof GetDataMessage) { @@ -318,9 +318,28 @@ public class Peer { } } - private void processNotFoundMessage(NotFoundMessage m) { - // This does not need to be locked. + private void startFilteredBlock(FilteredBlock m) throws IOException { + // Filtered blocks come before the data that they refer to, so stash it here and then fill it out as + // messages stream in. We'll call endFilteredBlock when a non-tx message arrives (eg, another + // FilteredBlock) or when a tx that isn't needed by that block is found. A ping message is sent after + // a getblocks, to force the non-tx message path. + lock.lock(); + try { + currentFilteredBlock = m; + // Potentially refresh the server side filter. Because the remote node adds hits back into the filter + // to save round-tripping back through us, the filter degrades over time as false positives get added, + // triggering yet more false positives. We refresh it every so often to get the FP rate back down. + filteredBlocksReceived++; + if (filteredBlocksReceived % RESEND_BLOOM_FILTER_BLOCK_COUNT == RESEND_BLOOM_FILTER_BLOCK_COUNT - 1) { + sendMessage(bloomFilter); + } + } finally { + lock.unlock(); + } + } + private void processNotFoundMessage(NotFoundMessage m) { + checkNotLocked(lock); // This is received when we previously did a getdata but the peer couldn't find what we requested in it's // memory pool. Typically, because we are downloading dependencies of a relevant transaction and reached // the bottom of the dependency tree (where the unconfirmed transactions connect to transactions that are @@ -340,7 +359,7 @@ public class Peer { } private void processAlert(AlertMessage m) { - // This does not need to be locked. + checkNotLocked(lock); try { if (m.isSignatureValid()) { log.info("Received alert from peer {}: {}", toString(), m.getStatusBar()); @@ -411,7 +430,7 @@ public class Peer { } private void processGetData(GetDataMessage getdata) throws IOException { - // This does not need to be locked. + checkNotLocked(lock); log.info("{}: Received getdata message: {}", address.get(), getdata.toString()); ArrayList items = new ArrayList(); for (PeerEventListener listener : eventListeners) { @@ -446,7 +465,7 @@ public class Peer { if (currentFilteredBlock != null) { if (!currentFilteredBlock.provideTransaction(tx)) { // Got a tx that didn't fit into the filtered block, so we must have received everything. - processFilteredBlock(currentFilteredBlock); + endFilteredBlock(currentFilteredBlock); currentFilteredBlock = null; } // Don't tell wallets or listeners about this tx as they'll learn about it when the filtered block is @@ -706,7 +725,7 @@ public class Peer { } // TODO: Fix this duplication. - private void processFilteredBlock(FilteredBlock m) throws IOException { + private void endFilteredBlock(FilteredBlock m) throws IOException { if (log.isDebugEnabled()) log.debug("{}: Received broadcast filtered block {}", address.get(), m.getHash().toString()); lock.lock(); @@ -1320,9 +1339,9 @@ public class Peer { * downloadData property is true, a {@link MemoryPoolMessage} is sent as well to trigger downloading of any * pending transactions that may be relevant.

* - *

The Peer does not keep a reference to the BloomFilter. Also, it will not automatically request filters - * from any wallets added using {@link Peer#addWallet(Wallet)}. This is to allow callers to avoid redundantly - * recalculating the same filter repeatedly when using multiple peers together.

+ *

The Peer does not automatically request filters from any wallets added using {@link Peer#addWallet(Wallet)}. + * This is to allow callers to avoid redundantly recalculating the same filter repeatedly when using multiple peers + * and multiple wallets together.

* *

Therefore, you should not use this method if your app uses a {@link PeerGroup}. It is called for you.

* @@ -1335,6 +1354,7 @@ public class Peer { lock.lock(); try { shouldQueryMemPool = memoryPool != null || downloadData.get(); + bloomFilter = filter; } finally { lock.unlock(); } @@ -1347,4 +1367,17 @@ public class Peer { } }); } + + /** + * Returns the last {@link BloomFilter} set by {@link Peer#setBloomFilter(BloomFilter)}. Bloom filters tell + * the remote node what transactions to send us, in a compact manner. + */ + public BloomFilter getBloomFilter() { + lock.lock(); + try { + return bloomFilter; + } finally { + lock.unlock(); + } + } } diff --git a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java index a1468cce0..2a91e1d27 100644 --- a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java +++ b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java @@ -163,12 +163,6 @@ public class PeerGroup extends AbstractIdleService { // We use a constant tweak to avoid giving up privacy when we regenerate our filter with new keys private final long bloomFilterTweak = (long) (Math.random() * Long.MAX_VALUE); private int lastBloomFilterElementCount; - - /** - * Every RESEND_BLOOM_FILTER_BLOCK_COUNT FilteredBlocks received, the bloom filter is refreshed. - * This prevents the actual false positive rate from ballooning as the filter gets elements added to it automatically. - */ - public static final int RESEND_BLOOM_FILTER_BLOCK_COUNT = 25000; /** * Creates a PeerGroup with the given parameters. No chain is provided so this node will report its chain height @@ -829,28 +823,6 @@ public class PeerGroup extends AbstractIdleService { setupPingingForNewPeer(peer); for (PeerEventListener listener : peerEventListeners) listener.onPeerConnected(peer, peers.size()); - // TODO: Move this into the Peer object itself. - peer.addEventListener(new AbstractPeerEventListener() { - int filteredBlocksReceivedFromPeer = 0; - - @Override - public Message onPreMessageReceived(Peer peer, Message m) { - if (m instanceof FilteredBlock) { - filteredBlocksReceivedFromPeer++; - if (filteredBlocksReceivedFromPeer % RESEND_BLOOM_FILTER_BLOCK_COUNT == RESEND_BLOOM_FILTER_BLOCK_COUNT - 1) { - lock.lock(); - try { - peer.sendMessage(bloomFilter); - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - lock.unlock(); - } - } - } - return m; - } - }); } finally { lock.unlock(); }