Recalculate but don't rebroadcast bloom filters when a p2pubkey output is received.

Resolves issue 513.
This commit is contained in:
Mike Hearn 2014-01-30 17:43:33 +01:00
parent 9204c13233
commit fff5af29ff
4 changed files with 99 additions and 29 deletions

View File

@ -127,28 +127,39 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
double rate = checkNotNull(chain).getFalsePositiveRate();
if (rate > bloomFilterFPRate * MAX_FP_RATE_INCREASE) {
log.info("Force update Bloom filter due to high false positive rate");
recalculateFastCatchupAndFilter(true);
recalculateFastCatchupAndFilter(FilterRecalculateMode.FORCE_SEND);
}
}
};
private int minBroadcastConnections = 0;
private Runnable recalculateRunnable = new Runnable() {
private Runnable bloomSendIfChanged = new Runnable() {
@Override public void run() {
recalculateFastCatchupAndFilter(false);
recalculateFastCatchupAndFilter(FilterRecalculateMode.SEND_IF_CHANGED);
}
};
private Runnable bloomDontSend = new Runnable() {
@Override public void run() {
recalculateFastCatchupAndFilter(FilterRecalculateMode.DONT_SEND);
}
};
private AbstractWalletEventListener walletEventListener = new AbstractWalletEventListener() {
private void queueRecalc() {
Uninterruptibles.putUninterruptibly(jobQueue, recalculateRunnable);
private void queueRecalc(boolean andTransmit) {
if (andTransmit) {
log.info("Queuing recalc of the Bloom filter due to new keys or scripts becoming available");
Uninterruptibles.putUninterruptibly(jobQueue, bloomSendIfChanged);
} else {
log.info("Queuing recalc of the Bloom filter due to observing a pay to pubkey output on a relevant tx");
Uninterruptibles.putUninterruptibly(jobQueue, bloomDontSend);
}
}
@Override public void onScriptsAdded(Wallet wallet, List<Script> scripts) {
queueRecalc();
queueRecalc(true);
}
@Override public void onKeysAdded(Wallet wallet, List<ECKey> keys) {
queueRecalc();
queueRecalc(true);
}
@Override
@ -161,21 +172,29 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
// existing filter, so that it includes the tx hash in which the pay-to-pubkey output was observed. Thus
// the spending transaction will always match (due to the outpoint structure).
//
// Unfortunately, whilst this is required for correct sync of the chain in blocks, there is an edge case.
// If a wallet receives a relevant pay-to-pubkey output in a block that was not broadcast across the network
// for example, in a coinbase transaction, then the node that's serving us the chain will update its filter
// Unfortunately, whilst this is required for correct sync of the chain in blocks, there are two edge cases.
//
// (1) If a wallet receives a relevant, confirmed p2pubkey output that was not broadcast across the network,
// for example in a coinbase transaction, then the node that's serving us the chain will update its filter
// but the rest will not. If another transaction then spends it, the other nodes won't match/relay it.
//
// (2) If we receive a p2pubkey output broadcast across the network, all currently connected nodes will see
// it and update their filter themselves, but any newly connected nodes will receive the last filter we
// calculated, which would not include this transaction.
//
// For this reason we check if the transaction contained any relevant pay to pubkeys and force a recalc
// and thus retransmit if so.
boolean shouldRecalc = false;
// and possibly retransmit if so. The recalculation process will end up including the tx hash into the
// filter. In case (1), we need to retransmit the filter to the connected peers. In case (2), we don't
// and shouldn't, we should just recalculate and cache the new filter for next time.
for (TransactionOutput output : tx.getOutputs()) {
if (output.getScriptPubKey().isSentToRawPubKey() && output.isMine(wallet)) {
shouldRecalc = true;
break;
if (tx.getConfidence().getConfidenceType() == TransactionConfidence.ConfidenceType.BUILDING)
queueRecalc(true);
else
queueRecalc(false);
return;
}
}
if (shouldRecalc) queueRecalc();
}
};
@ -722,7 +741,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
// if a key is added. Of course, by then we may have downloaded the chain already. Ideally adding keys would
// automatically rewind the block chain and redownload the blocks to find transactions relevant to those keys,
// all transparently and in the background. But we are a long way from that yet.
recalculateFastCatchupAndFilter(false);
recalculateFastCatchupAndFilter(FilterRecalculateMode.SEND_IF_CHANGED);
updateVersionMessageRelayTxesBeforeFilter(getVersionMessage());
} finally {
lock.unlock();
@ -739,19 +758,25 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
wallet.setTransactionBroadcaster(null);
}
public static enum FilterRecalculateMode {
SEND_IF_CHANGED,
FORCE_SEND,
DONT_SEND,
}
/**
* Recalculates the bloom filter given to peers as well as the timestamp after which full blocks are downloaded
* (instead of only headers).
*
* @param forceFilterUpdate send the bloom filter even if it didn't change. Use
* this if the false positive rate is high due to server auto-update.
* @param mode In what situations to send the filter to connected peers.
*/
public void recalculateFastCatchupAndFilter(boolean forceFilterUpdate) {
public void recalculateFastCatchupAndFilter(FilterRecalculateMode mode) {
lock.lock();
try {
// Fully verifying mode doesn't use this optimization (it can't as it needs to see all transactions).
if (chain != null && chain.shouldVerifyTransactions())
return;
log.info("Recalculating filter in mode {}", mode);
long earliestKeyTimeSecs = Long.MAX_VALUE;
int elements = 0;
boolean requiresUpdateAll = false;
@ -772,8 +797,18 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
BloomFilter filter = new BloomFilter(lastBloomFilterElementCount, bloomFilterFPRate, bloomFilterTweak, bloomFlags);
for (PeerFilterProvider p : peerFilterProviders)
filter.merge(p.getBloomFilter(lastBloomFilterElementCount, bloomFilterFPRate, bloomFilterTweak));
if (forceFilterUpdate || !filter.equals(bloomFilter)) {
bloomFilter = filter;
bloomFilter = filter;
boolean changed = !filter.equals(bloomFilter);
boolean send = false;
switch (mode) {
case SEND_IF_CHANGED: send = changed; break;
case DONT_SEND: send = false; break;
case FORCE_SEND: send = true; break;
}
if (send) {
for (Peer peer : peers)
peer.setBloomFilter(filter);
// Reset the false positive estimate so that we don't send a flood of filter updates
@ -807,7 +842,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
lock.lock();
try {
this.bloomFilterFPRate = bloomFilterFPRate;
recalculateFastCatchupAndFilter(false);
recalculateFastCatchupAndFilter(FilterRecalculateMode.SEND_IF_CHANGED);
} finally {
lock.unlock();
}

View File

@ -1,22 +1,22 @@
package com.google.bitcoin.core;
import com.google.common.util.concurrent.SettableFuture;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import com.google.common.util.concurrent.SettableFuture;
/**
* An extension of {@link PeerSocketHandler} that keeps inbound messages in a queue for later processing
*/
public abstract class InboundMessageQueuer extends PeerSocketHandler {
final BlockingQueue<Message> inboundMessages = new ArrayBlockingQueue<Message>(1000);
final Map<Long, SettableFuture<Void>> mapPingFutures = new HashMap<Long, SettableFuture<Void>>();
public Peer peer;
public BloomFilter lastReceivedFilter;
protected InboundMessageQueuer(NetworkParameters params) {
super(params, new InetSocketAddress("127.0.0.1", 2000));
@ -39,6 +39,9 @@ public abstract class InboundMessageQueuer extends PeerSocketHandler {
return;
}
}
if (m instanceof BloomFilter) {
lastReceivedFilter = (BloomFilter) m;
}
inboundMessages.offer(m);
}
}

View File

@ -22,6 +22,7 @@ import com.google.bitcoin.params.UnitTestParams;
import com.google.bitcoin.store.MemoryBlockStore;
import com.google.bitcoin.utils.TestUtils;
import com.google.bitcoin.utils.Threading;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.After;
@ -507,4 +508,36 @@ public class PeerGroupTest extends TestWithPeerGroup {
Utils.passMockSleep();
assertEquals(2001, disconnectedPeers.take().getAddress().getPort());
}
@Test
public void testBloomOnP2Pubkey() throws Exception {
// Cover bug 513. When a relevant transaction with a p2pubkey output is found, the Bloom filter should be
// recalculated to include that transaction hash but not re-broadcast as the remote nodes should have followed
// the same procedure. However a new node that's connected should get the fresh filter.
peerGroup.startAndWait();
final ECKey key = wallet.getKeys().get(0);
// Create a couple of peers.
InboundMessageQueuer p1 = connectPeer(1);
InboundMessageQueuer p2 = connectPeer(2);
// Create a pay to pubkey tx.
Transaction tx = TestUtils.createFakeTx(params, Utils.COIN, key);
Transaction tx2 = new Transaction(params);
tx2.addInput(tx.getOutput(0));
TransactionOutPoint outpoint = tx2.getInput(0).getOutpoint();
assertTrue(p1.lastReceivedFilter.contains(key.getPubKey()));
assertFalse(p1.lastReceivedFilter.contains(tx.getHash().getBytes()));
inbound(p1, tx);
// p1 requests dep resolution, p2 is quiet.
assertTrue(outbound(p1) instanceof GetDataMessage);
final Sha256Hash dephash = tx.getInput(0).getOutpoint().getHash();
final InventoryItem inv = new InventoryItem(InventoryItem.Type.Transaction, dephash);
inbound(p1, new NotFoundMessage(params, ImmutableList.of(inv)));
assertNull(outbound(p1));
assertNull(outbound(p2));
peerGroup.waitForJobQueue();
// Now we connect p3 and there is a new bloom filter sent, that DOES match the relevant outpoint.
InboundMessageQueuer p3 = connectPeer(3);
assertTrue(p3.lastReceivedFilter.contains(key.getPubKey()));
assertTrue(p3.lastReceivedFilter.contains(outpoint.bitcoinSerialize()));
}
}

View File

@ -16,11 +16,10 @@
package com.google.bitcoin.core;
import com.google.bitcoin.params.UnitTestParams;
import com.google.bitcoin.net.BlockingClientManager;
import com.google.bitcoin.net.NioClientManager;
import com.google.bitcoin.params.UnitTestParams;
import com.google.bitcoin.store.BlockStore;
import com.google.bitcoin.utils.ExponentialBackoff;
import com.google.common.base.Preconditions;
import java.net.InetSocketAddress;
@ -50,7 +49,7 @@ public class TestWithPeerGroup extends TestWithNetworkConnections {
remoteVersionMessage = new VersionMessage(unitTestParams, 1);
remoteVersionMessage.localServices = VersionMessage.NODE_NETWORK;
remoteVersionMessage.clientVersion = FilteredBlock.MIN_PROTOCOL_VERSION;
remoteVersionMessage.clientVersion = NotFoundMessage.MIN_PROTOCOL_VERSION;
initPeerGroup();
}