Experimental tweak to the definition of the numBroadcastPeers confidence field: now it actually does what the name implies.

This commit is contained in:
Mike Hearn 2013-11-17 23:13:07 +01:00
parent e636ee2927
commit e49255c9e0
3 changed files with 40 additions and 29 deletions

View file

@ -20,6 +20,7 @@ import com.google.bitcoin.utils.Threading;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
@ -49,7 +50,8 @@ public class MemoryPool {
// For each transaction we may have seen:
// - only its hash in an inv packet
// - the full transaction itself, if we asked for it to be sent to us (or a peer sent it regardless)
// - the full transaction itself, if we asked for it to be sent to us (or a peer sent it regardless), or if we
// sent it.
//
// Before we see the full transaction, we need to track how many peers advertised it, so we can estimate its
// confidence pre-chain inclusion assuming an un-tampered with network connection. After we see the full transaction
@ -162,36 +164,27 @@ public class MemoryPool {
}
/**
* Called by peers when they receive a "tx" message containing a valid serialized transaction.
* @param tx The TX deserialized from the wire.
* @param byPeer The Peer that received it.
* @return An object that is semantically the same TX but may be a different object instance.
* Puts the tx into the table and returns either it, or a different Transaction object that has the same hash.
* Unlike seen and the other methods, this one does not imply that a tx has been announced by a peer and does
* not mark it as such.
*/
public Transaction seen(Transaction tx, PeerAddress byPeer) {
public Transaction intern(Transaction tx) {
lock.lock();
try {
cleanPool();
Entry entry = memoryPool.get(tx.getHash());
if (entry != null) {
// This TX or its hash have been previously announced.
// This TX or its hash have been previously interned.
if (entry.tx != null) {
// We already downloaded it.
// We already interned it (but may have thrown it away).
checkState(entry.addresses == null);
// We only want one canonical object instance for a transaction no matter how many times it is
// deserialized.
Transaction transaction = entry.tx.get();
if (transaction == null) {
// We previously downloaded this transaction, but the garbage collector threw it away because
// no other part of the system cared enough to keep it around (it's not relevant to us).
// Given the lack of interest last time we probably don't need to track it this time either.
log.info("{}: Provided with a transaction that we previously threw away: {}", byPeer, tx.getHash());
} else {
if (transaction != null) {
// We saw it before and kept it around. Hand back the canonical copy.
tx = transaction;
log.info("{}: Provided with a transaction downloaded before: [{}] {}",
byPeer, tx.getConfidence().numBroadcastPeers(), tx.getHash());
}
markBroadcast(byPeer, tx);
return tx;
} else {
// We received a transaction that we have previously seen announced but not downloaded until now.
@ -200,8 +193,8 @@ public class MemoryPool {
Set<PeerAddress> addrs = entry.addresses;
entry.addresses = null;
TransactionConfidence confidence = tx.getConfidence();
log.debug("{}: Adding tx [{}] {} to the memory pool",
byPeer, confidence.numBroadcastPeers(), tx.getHashAsString());
log.debug("Adding tx [{}] {} to the memory pool",
confidence.numBroadcastPeers(), tx.getHashAsString());
for (PeerAddress a : addrs) {
markBroadcast(a, tx);
}
@ -210,12 +203,10 @@ public class MemoryPool {
} else {
// This often happens when we are downloading a Bloom filtered chain, or recursively downloading
// dependencies of a relevant transaction (see Peer.downloadDependencies).
log.debug("{}: Provided with a downloaded transaction we didn't see announced yet: {}",
byPeer, tx.getHashAsString());
log.debug("Provided with a downloaded transaction we didn't see announced yet: {}", tx.getHashAsString());
entry = new Entry();
entry.tx = new WeakTransactionReference(tx, referenceQueue);
memoryPool.put(tx.getHash(), entry);
markBroadcast(byPeer, tx);
return tx;
}
} finally {
@ -223,6 +214,23 @@ public class MemoryPool {
}
}
/**
* Called by peers when they receive a "tx" message containing a valid serialized transaction.
* @param tx The TX deserialized from the wire.
* @param byPeer The Peer that received it.
* @return An object that is semantically the same TX but may be a different object instance.
*/
public Transaction seen(Transaction tx, PeerAddress byPeer) {
lock.lock();
try {
final Transaction interned = intern(tx);
markBroadcast(byPeer, interned);
return interned;
} finally {
lock.unlock();
}
}
/**
* Called by peers when they see a transaction advertised in an "inv" message. It either will increase the
* confidence of the pre-existing transaction or will just keep a record of the address for future usage.
@ -242,8 +250,8 @@ public class MemoryPool {
log.debug("{}: Peer announced transaction we have seen before [{}] {}",
byPeer, tx.getConfidence().numBroadcastPeers(), tx.getHashAsString());
} else {
// The inv is telling us about a transaction that we previously downloaded, and threw away because
// nothing found it interesting enough to keep around. So do nothing.
// The inv is telling us about a transaction that we previously downloaded, and threw away
// because nothing found it interesting enough to keep around. So do nothing.
}
} else {
checkNotNull(entry.addresses);
@ -277,6 +285,7 @@ public class MemoryPool {
* we only saw advertisements for it yet or it has been downloaded but garbage collected due to nowhere else
* holding a reference to it.
*/
@Nullable
public Transaction get(Sha256Hash hash) {
lock.lock();
try {

View file

@ -82,7 +82,8 @@ public class TransactionBroadcast {
// be seen, 4 peers is probably too little - it doesn't taken many broken peers for tx propagation to have
// a big effect.
List<Peer> peers = peerGroup.getConnectedPeers(); // snapshots
pinnedTx = peerGroup.getMemoryPool().seen(tx, peers.get(0).getAddress());
// We intern the tx here so we are using a canonical version of the object (as it's unfortunately mutable).
pinnedTx = peerGroup.getMemoryPool().intern(tx);
// Prepare to send the transaction by adding a listener that'll be called when confidence changes.
// Only bother with this if we might actually hear back:
if (minConnections > 1)
@ -105,7 +106,8 @@ public class TransactionBroadcast {
for (Peer peer : peers) {
try {
peer.sendMessage(pinnedTx);
peerGroup.getMemoryPool().seen(pinnedTx, peer.getAddress());
// We don't record the peer as having seen the tx in the memory pool because we want to track only
// how many peers announced to us.
} catch (Exception e) {
log.error("Caught exception sending to {}", peer, e);
}
@ -128,7 +130,7 @@ public class TransactionBroadcast {
boolean mined = tx.getAppearsInHashes() != null;
log.info("broadcastTransaction: {}: TX {} seen by {} peers{}", reason, pinnedTx.getHashAsString(),
numSeenPeers, mined ? " and mined" : "");
if (numSeenPeers >= numWaitingFor + numToBroadcastTo || mined) {
if (numSeenPeers >= numWaitingFor || mined) {
// We've seen the min required number of peers announce the transaction, or it was included
// in a block. Normally we'd expect to see it fully propagate before it gets mined, but
// it can be that a block is solved very soon after broadcast, and it's also possible that

View file

@ -98,7 +98,7 @@ public class TransactionBroadcastTest extends TestWithPeerGroup {
Threading.waitForUserCode();
assertFalse(sendResult.broadcastComplete.isDone());
assertEquals(transactions[0], sendResult.tx);
assertEquals(transactions[0].getConfidence().numBroadcastPeers(), 1);
assertEquals(transactions[0].getConfidence().numBroadcastPeers(), 0);
transactions[0] = null;
Transaction t1 = (Transaction) outbound(p1);
assertNotNull(t1);
@ -111,7 +111,7 @@ public class TransactionBroadcastTest extends TestWithPeerGroup {
Threading.waitForUserCode();
assertTrue(sendResult.broadcastComplete.isDone());
assertEquals(transactions[0], sendResult.tx);
assertEquals(2, transactions[0].getConfidence().numBroadcastPeers());
assertEquals(1, transactions[0].getConfidence().numBroadcastPeers());
// Confirm it.
Block b2 = TestUtils.createFakeBlock(blockStore, t1).block;
inbound(p1, b2);