Add JCIP thread annotations jar and use @GuardedBy in Peer.

Unlock Peer almost completely and make it fully thread safe with minimal use of locking.
Introduce a new convention that volatile variables have a v prefix.
This commit is contained in:
Mike Hearn 2013-03-11 14:51:15 +01:00
parent 45ce6fe9df
commit ac8a5008fe
2 changed files with 127 additions and 171 deletions

View file

@ -158,6 +158,12 @@
<artifactId>guava</artifactId>
<version>13.0</version>
</dependency>
<dependency>
<groupId>net.jcip</groupId>
<artifactId>jcip-annotations</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
</project>

View file

@ -1,5 +1,5 @@
/**
* Copyright 2011 Google Inc.
* Copyright 2013 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -23,6 +23,7 @@ import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.*;
import net.jcip.annotations.GuardedBy;
import org.jboss.netty.channel.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,12 +34,10 @@ import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import static com.google.bitcoin.utils.Locks.checkNotLocked;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
/**
@ -55,17 +54,18 @@ public class Peer {
}
private static final Logger log = LoggerFactory.getLogger(Peer.class);
protected final ReentrantLock lock = Locks.lock("peer");
private final NetworkParameters params;
private final AbstractBlockChain blockChain;
private final AtomicReference<PeerAddress> address = new AtomicReference<PeerAddress>();
private volatile PeerAddress vAddress;
private final CopyOnWriteArrayList<PeerEventListener> eventListeners;
private final CopyOnWriteArrayList<PeerLifecycleListener> lifecycleListeners;
// Whether to try and download blocks and transactions from this peer. Set to false by PeerGroup if not the
// primary peer. This is to avoid redundant work and concurrency problems with downloading the same chain
// in parallel.
private final AtomicBoolean downloadData = new AtomicBoolean();
private volatile boolean vDownloadData;
// The version data to announce to the other side of the connections we make: useful for setting our "user agent"
// equivalent and other things.
private final VersionMessage versionMessage;
@ -79,17 +79,17 @@ public class Peer {
// Each wallet added to the peer will be notified of downloaded transaction data.
private final CopyOnWriteArrayList<Wallet> wallets;
// A time before which we only download block headers, after that point we download block bodies.
private long fastCatchupTimeSecs;
@GuardedBy("lock") private long fastCatchupTimeSecs;
// Whether we are currently downloading headers only or block bodies. Starts at true. If the fast catchup time is
// set AND our best block is before that date, switch to false until block headers beyond that point have been
// received at which point it gets set to true again. This isn't relevant unless downloadData is true.
private boolean downloadBlockBodies = true;
// received at which point it gets set to true again. This isn't relevant unless vDownloadData is true.
@GuardedBy("lock") private boolean downloadBlockBodies = true;
// Whether to request filtered blocks instead of full blocks if the protocol version allows for them.
private boolean useFilteredBlocks = false;
@GuardedBy("lock") private boolean useFilteredBlocks = false;
// The current Bloom filter set on the connection, used to tell the remote peer what transactions to send us.
private volatile BloomFilter vBloomFilter;
// The last filtered block we received, we're waiting to fill it out with transactions.
private FilteredBlock currentFilteredBlock = null;
// The current Bloom filter set on the connection, used to tell the remote peer what transactions to send us.
private BloomFilter bloomFilter;
// How many filtered blocks have been received during the lifetime of this connection. Used to decide when to
// refresh the server-side side filter by sending a new one (it degrades over time as false positives are added
// on the remote side, see BIP 37 for a discussion of this).
@ -105,9 +105,9 @@ public class Peer {
// simultaneously if we were to receive a newly solved block whilst parts of the chain are streaming to us.
private final HashSet<Sha256Hash> pendingBlockDownloads = new HashSet<Sha256Hash>();
// The lowest version number we're willing to accept. Lower than this will result in an immediate disconnect.
private int minProtocolVersion = Pong.MIN_PROTOCOL_VERSION;
private volatile int vMinProtocolVersion = Pong.MIN_PROTOCOL_VERSION;
// When an API user explicitly requests a block or transaction from a peer, the InventoryItem is put here
// whilst waiting for the response. Synchronized on itself. Is not used for downloads Peer generates itself.
// whilst waiting for the response. Is not used for downloads Peer generates itself.
private static class GetDataRequest {
Sha256Hash hash;
SettableFuture future;
@ -119,13 +119,13 @@ public class Peer {
private final CopyOnWriteArrayList<GetDataRequest> getDataFutures;
// Outstanding pings against this peer and how long the last one took to complete.
private final CopyOnWriteArrayList<PendingPing> pendingPings;
private final ReentrantLock lastPingTimesLock = new ReentrantLock();
private long[] lastPingTimes = null;
@GuardedBy("lastPingTimesLock") private long[] lastPingTimes = null;
private final CopyOnWriteArrayList<PendingPing> pendingPings;
private static final int PING_MOVING_AVERAGE_WINDOW = 20;
private Channel channel;
private final AtomicReference<VersionMessage> peerVersionMessage = new AtomicReference<VersionMessage>();
private volatile Channel vChannel;
private volatile VersionMessage vPeerVersionMessage;
private boolean isAcked;
private PeerHandler handler;
@ -145,7 +145,7 @@ public class Peer {
this.params = Preconditions.checkNotNull(params);
this.versionMessage = Preconditions.checkNotNull(ver);
this.blockChain = chain; // Allowed to be null.
this.downloadData.set(chain != null);
this.vDownloadData = chain != null;
this.getDataFutures = new CopyOnWriteArrayList<GetDataRequest>();
this.eventListeners = new CopyOnWriteArrayList<PeerEventListener>();
this.lifecycleListeners = new CopyOnWriteArrayList<PeerLifecycleListener>();
@ -185,25 +185,18 @@ public class Peer {
@Override
public String toString() {
lock.lock();
try {
PeerAddress addr = address.get();
if (addr == null) {
// User-provided NetworkConnection object.
return "Peer()";
} else {
return "Peer(" + addr.getAddr() + ":" + addr.getPort() + ")";
}
} finally {
lock.unlock();
PeerAddress addr = vAddress;
if (addr == null) {
// User-provided NetworkConnection object.
return "Peer()";
} else {
return "Peer(" + addr.getAddr() + ":" + addr.getPort() + ")";
}
}
private void notifyDisconnect() {
for (PeerLifecycleListener listener : lifecycleListeners) {
synchronized (listener) {
listener.onPeerDisconnected(Peer.this);
}
listener.onPeerDisconnected(Peer.this);
}
}
@ -216,8 +209,8 @@ public class Peer {
@Override
public void connectRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
address.set(new PeerAddress((InetSocketAddress)e.getValue()));
channel = e.getChannel();
vAddress = new PeerAddress((InetSocketAddress)e.getValue());
vChannel = e.getChannel();
super.connectRequested(ctx, e);
}
@ -225,7 +218,7 @@ public class Peer {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
String s;
PeerAddress addr = address.get();
PeerAddress addr = vAddress;
s = addr == null ? "?" : addr.toString();
if (e.getCause() instanceof ConnectException || e.getCause() instanceof IOException) {
// Short message for network errors
@ -250,7 +243,6 @@ public class Peer {
}
private void processMessage(MessageEvent e, Message m) throws IOException, VerificationException, ProtocolException {
checkNotLocked(lock);
// Allow event listeners to filter the message stream. Listeners are allowed to drop messages by
// returning null.
for (PeerEventListener listener : eventListeners) {
@ -259,14 +251,11 @@ public class Peer {
}
if (m == null) return;
lock.lock();
try {
if (currentFilteredBlock != null && !(m instanceof Transaction)) {
endFilteredBlock(currentFilteredBlock);
currentFilteredBlock = null;
}
} finally {
lock.unlock();
// If we are in the middle of receiving transactions as part of a filtered block push from the remote node,
// and we receive something that's not a transaction, then we're done.
if (currentFilteredBlock != null && !(m instanceof Transaction)) {
endFilteredBlock(currentFilteredBlock);
currentFilteredBlock = null;
}
if (m instanceof NotFoundMessage) {
@ -292,16 +281,17 @@ public class Peer {
} else if (m instanceof AlertMessage) {
processAlert((AlertMessage) m);
} else if (m instanceof VersionMessage) {
peerVersionMessage.set((VersionMessage) m);
vPeerVersionMessage = (VersionMessage) m;
for (PeerLifecycleListener listener : lifecycleListeners)
listener.onPeerConnected(this);
if (getPeerVersionMessage().clientVersion < minProtocolVersion) {
final int version = vMinProtocolVersion;
if (vPeerVersionMessage.clientVersion < version) {
log.warn("Connected to a peer speaking protocol version {} but need {}, closing",
getPeerVersionMessage().clientVersion, minProtocolVersion);
vPeerVersionMessage.clientVersion, version);
e.getChannel().close();
}
} else if (m instanceof VersionAck) {
if (getPeerVersionMessage() == null) {
if (vPeerVersionMessage == null) {
throw new ProtocolException("got a version ack before version");
}
if (isAcked) {
@ -323,23 +313,17 @@ public class Peer {
// messages stream in. We'll call endFilteredBlock when a non-tx message arrives (eg, another
// FilteredBlock) or when a tx that isn't needed by that block is found. A ping message is sent after
// a getblocks, to force the non-tx message path.
lock.lock();
try {
currentFilteredBlock = m;
// Potentially refresh the server side filter. Because the remote node adds hits back into the filter
// to save round-tripping back through us, the filter degrades over time as false positives get added,
// triggering yet more false positives. We refresh it every so often to get the FP rate back down.
filteredBlocksReceived++;
if (filteredBlocksReceived % RESEND_BLOOM_FILTER_BLOCK_COUNT == RESEND_BLOOM_FILTER_BLOCK_COUNT - 1) {
sendMessage(bloomFilter);
}
} finally {
lock.unlock();
currentFilteredBlock = m;
// Potentially refresh the server side filter. Because the remote node adds hits back into the filter
// to save round-tripping back through us, the filter degrades over time as false positives get added,
// triggering yet more false positives. We refresh it every so often to get the FP rate back down.
filteredBlocksReceived++;
if (filteredBlocksReceived % RESEND_BLOOM_FILTER_BLOCK_COUNT == RESEND_BLOOM_FILTER_BLOCK_COUNT - 1) {
sendMessage(vBloomFilter);
}
}
private void processNotFoundMessage(NotFoundMessage m) {
checkNotLocked(lock);
// This is received when we previously did a getdata but the peer couldn't find what we requested in it's
// memory pool. Typically, because we are downloading dependencies of a relevant transaction and reached
// the bottom of the dependency tree (where the unconfirmed transactions connect to transactions that are
@ -359,7 +343,6 @@ public class Peer {
}
private void processAlert(AlertMessage m) {
checkNotLocked(lock);
try {
if (m.isSignatureValid()) {
log.info("Received alert from peer {}: {}", toString(), m.getStatusBar());
@ -386,13 +369,20 @@ public class Peer {
// likely when we've requested them as part of chain download using fast catchup. We need to add each block to
// the chain if it pre-dates the fast catchup time. If we go past it, we can stop processing the headers and
// request the full blocks from that point on instead.
boolean downloadBlockBodies;
long fastCatchupTimeSecs;
lock.lock();
fastCatchupTimeSecs = this.fastCatchupTimeSecs;
downloadBlockBodies = this.downloadBlockBodies;
lock.unlock();
try {
checkState(!downloadBlockBodies, toString());
for (int i = 0; i < m.getBlockHeaders().size(); i++) {
Block header = m.getBlockHeaders().get(i);
if (header.getTimeSeconds() < fastCatchupTimeSecs) {
if (!downloadData.get()) {
if (!vDownloadData) {
// Not download peer anymore, some other peer probably became better.
log.info("Lost download peer status, throwing away downloaded headers.");
return;
@ -409,8 +399,9 @@ public class Peer {
} else {
log.info("Passed the fast catchup time, discarding {} headers and requesting full blocks",
m.getBlockHeaders().size() - i);
downloadBlockBodies = true;
lastGetBlocksBegin = Sha256Hash.ZERO_HASH; // Prevent this request being seen as a duplicate.
this.downloadBlockBodies = true;
// Prevent this request being seen as a duplicate.
this.lastGetBlocksBegin = Sha256Hash.ZERO_HASH;
blockChainDownload(Sha256Hash.ZERO_HASH);
return;
}
@ -424,14 +415,11 @@ public class Peer {
} catch (PrunedException e) {
// Unreachable when in SPV mode.
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
private void processGetData(GetDataMessage getdata) throws IOException {
checkNotLocked(lock);
log.info("{}: Received getdata message: {}", address.get(), getdata.toString());
log.info("{}: Received getdata message: {}", vAddress, getdata.toString());
ArrayList<Message> items = new ArrayList<Message>();
for (PeerEventListener listener : eventListeners) {
List<Message> listenerItems = listener.getData(this, getdata);
@ -441,7 +429,7 @@ public class Peer {
if (items.size() == 0) {
return;
}
log.info("{}: Sending {} items gathered from listeners to peer", address.get(), items.size());
log.info("{}: Sending {} items gathered from listeners to peer", vAddress, items.size());
for (Message item : items) {
sendMessage(item);
}
@ -450,7 +438,7 @@ public class Peer {
private void processTransaction(Transaction tx) throws VerificationException, IOException {
lock.lock();
try {
log.debug("{}: Received tx {}", address.get(), tx.getHashAsString());
log.debug("{}: Received tx {}", vAddress, tx.getHashAsString());
if (memoryPool != null) {
// We may get back a different transaction object.
tx = memoryPool.seen(tx, getAddress());
@ -489,11 +477,11 @@ public class Peer {
Futures.addCallback(downloadDependencies(fTx), new FutureCallback<List<Transaction>>() {
public void onSuccess(List<Transaction> dependencies) {
try {
log.info("{}: Dependency download complete!", address.get());
log.info("{}: Dependency download complete!", vAddress);
wallet.receivePending(fTx, dependencies);
} catch (VerificationException e) {
log.error("{}: Wallet failed to process pending transaction {}",
address.get(), fTx.getHashAsString());
vAddress, fTx.getHashAsString());
log.error("Error was: ", e);
// Not much more we can do at this point.
}
@ -540,7 +528,7 @@ public class Peer {
public ListenableFuture<List<Transaction>> downloadDependencies(Transaction tx) {
TransactionConfidence.ConfidenceType txConfidence = tx.getConfidence().getConfidenceType();
Preconditions.checkArgument(txConfidence != TransactionConfidence.ConfidenceType.BUILDING);
log.info("{}: Downloading dependencies of {}", address.get(), tx.getHashAsString());
log.info("{}: Downloading dependencies of {}", vAddress, tx.getHashAsString());
final LinkedList<Transaction> results = new LinkedList<Transaction>();
// future will be invoked when the entire dependency tree has been walked and the results compiled.
final ListenableFuture future = downloadDependenciesInternal(tx, new Object(), results);
@ -612,7 +600,7 @@ public class Peer {
List<ListenableFuture<Object>> childFutures = Lists.newLinkedList();
for (Transaction tx : transactions) {
if (tx == null) continue;
log.info("{}: Downloaded dependency of {}: {}", new Object[]{address.get(),
log.info("{}: Downloaded dependency of {}: {}", new Object[]{vAddress,
rootTxHash, tx.getHashAsString()});
results.add(tx);
// Now recurse into the dependencies of this transaction too.
@ -671,21 +659,19 @@ public class Peer {
}
private void processBlock(Block m) throws IOException {
if (log.isDebugEnabled())
log.debug("{}: Received broadcast block {}", address.get(), m.getHashAsString());
lock.lock();
if (log.isDebugEnabled()) {
log.debug("{}: Received broadcast block {}", vAddress, m.getHashAsString());
}
// Was this block requested by getBlock()?
if (maybeHandleRequestedData(m)) return;
// Did we lose download peer status after requesting block data?
if (!vDownloadData) {
log.debug("{}: Received block we did not ask for: {}", vAddress, m.getHashAsString());
return;
}
pendingBlockDownloads.remove(m.getHash());
try {
// Was this block requested by getBlock()?
if (maybeHandleRequestedData(m)) return;
if (!downloadData.get()) {
// This can happen if we lose download peer status after requesting block data.
log.debug("{}: Received block we did not ask for: {}", address.get(), m.getHashAsString());
return;
}
pendingBlockDownloads.remove(m.getHash());
// Otherwise it's a block sent to us because the peer thought we needed it, so add it to the block chain.
// This call will synchronize on blockChain.
if (blockChain.add(m)) {
// The block was successfully linked into the chain. Notify the user of our progress.
invokeOnBlocksDownloaded(m);
@ -715,32 +701,27 @@ public class Peer {
}
} catch (VerificationException e) {
// We don't want verification failures to kill the thread.
log.warn("{}: Block verification failed", address.get(), e);
log.warn("{}: Block verification failed", vAddress, e);
} catch (PrunedException e) {
// Unreachable when in SPV mode.
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
// TODO: Fix this duplication.
private void endFilteredBlock(FilteredBlock m) throws IOException {
if (log.isDebugEnabled())
log.debug("{}: Received broadcast filtered block {}", address.get(), m.getHash().toString());
lock.lock();
if (log.isDebugEnabled()) {
log.debug("{}: Received broadcast filtered block {}", vAddress, m.getHash().toString());
}
if (!vDownloadData) {
log.debug("{}: Received block we did not ask for: {}", vAddress, m.getHash().toString());
return;
}
// Note that we currently do nothing about peers which do not include transactions which
// actually match our filter or which do not send us all the transactions (TODO: Do something about that).
pendingBlockDownloads.remove(m.getBlockHeader().getHash());
try {
if (!downloadData.get()) {
log.debug("{}: Received block we did not ask for: {}", address.get(), m.getHash().toString());
return;
}
// Note that we currently do nothing about peers which do not include transactions which
// actually match our filter or which do not send us all the transactions (TODO: Do something about that).
pendingBlockDownloads.remove(m.getBlockHeader().getHash());
// Otherwise it's a block sent to us because the peer thought we needed it, so add it to the block chain.
// This call will synchronize on blockChain.
if (blockChain.add(m)) {
// The block was successfully linked into the chain. Notify the user of our progress.
invokeOnBlocksDownloaded(m.getBlockHeader());
@ -764,19 +745,16 @@ public class Peer {
}
} catch (VerificationException e) {
// We don't want verification failures to kill the thread.
log.warn("{}: FilteredBlock verification failed", address.get(), e);
log.warn("{}: FilteredBlock verification failed", vAddress, e);
} catch (PrunedException e) {
// We pruned away some of the data we need to properly handle this block. We need to request the needed
// data from the remote peer and fix things. Or just give up.
// TODO: Request e.getHash() and submit it to the block store before any other blocks
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
private boolean maybeHandleRequestedData(Message m) {
checkState(lock.isLocked());
boolean found = false;
Sha256Hash hash = m.getHash();
for (ListIterator<GetDataRequest> it = getDataFutures.listIterator(); it.hasNext();) {
@ -792,22 +770,15 @@ public class Peer {
}
private void invokeOnBlocksDownloaded(final Block m) {
checkState(lock.isLocked());
// It is possible for the peer block height difference to be negative when blocks have been solved and broadcast
// since the time we first connected to the peer. However, it's weird and unexpected to receive a callback
// with negative "blocks left" in this case, so we clamp to zero so the API user doesn't have to think about it.
final int blocksLeft = Math.max(0, (int)getPeerVersionMessage().bestHeight - blockChain.getBestChainHeight());
lock.unlock();
try {
for (PeerEventListener listener : eventListeners)
listener.onBlocksDownloaded(Peer.this, m, blocksLeft);
} finally {
lock.lock();
}
final int blocksLeft = Math.max(0, (int) vPeerVersionMessage.bestHeight - blockChain.getBestChainHeight());
for (PeerEventListener listener : eventListeners)
listener.onBlocksDownloaded(Peer.this, m, blocksLeft);
}
private void processInv(InventoryMessage inv) throws IOException {
checkNotLocked(lock);
List<InventoryItem> items = inv.getItems();
// Separate out the blocks and transactions, we'll handle them differently
@ -827,7 +798,7 @@ public class Peer {
}
}
final boolean downloadData = this.downloadData.get();
final boolean downloadData = this.vDownloadData;
if (transactions.size() == 0 && blocks.size() == 1) {
// Single block announcement. If we're downloading the chain this is just a tickle to make us continue
@ -864,11 +835,10 @@ public class Peer {
// Some other peer already announced this so don't download.
it.remove();
} else {
log.debug("{}: getdata on tx {}", address.get(), item.hash);
log.debug("{}: getdata on tx {}", vAddress, item.hash);
getdata.addItem(item);
}
// This can trigger transaction confidence listeners.
checkNotLocked(lock);
memoryPool.seen(item.hash, this.getAddress());
}
}
@ -902,7 +872,7 @@ public class Peer {
// the duplicate check in blockChainDownload(). But the satoshi client may change in future so
// it's better to be safe here.
if (!pendingBlockDownloads.contains(item.hash)) {
if (getPeerVersionMessage().isBloomFilteringSupported() && useFilteredBlocks) {
if (vPeerVersionMessage.isBloomFilteringSupported() && useFilteredBlocks) {
getdata.addItem(new InventoryItem(InventoryItem.Type.FilteredBlock, item.hash));
pingAfterGetData = true;
} else {
@ -1006,13 +976,11 @@ public class Peer {
* independently, otherwise the wallet will receive duplicate notifications.
*/
public void addWallet(Wallet wallet) {
// This does not need to be locked.
wallets.add(wallet);
}
/** Unlinks the given wallet from peer. See {@link Peer#addWallet(Wallet)}. */
public void removeWallet(Wallet wallet) {
// This does not need to be locked.
wallets.remove(wallet);
}
@ -1021,7 +989,7 @@ public class Peer {
*/
public ChannelFuture sendMessage(Message m) throws IOException {
// This does not need to be locked.
return Channels.write(channel, m);
return Channels.write(vChannel, m);
}
// Keep track of the last request we made to the peer in blockChainDownload so we can avoid redundant and harmful
@ -1190,9 +1158,9 @@ public class Peer {
}
protected ListenableFuture<Long> ping(long nonce) throws IOException, ProtocolException {
// This does not need to be locked.
if (!getPeerVersionMessage().isPingPongSupported())
throw new ProtocolException("Peer version is too low for measurable pings: " + getPeerVersionMessage());
final VersionMessage ver = vPeerVersionMessage;
if (!ver.isPingPongSupported())
throw new ProtocolException("Peer version is too low for measurable pings: " + ver);
PendingPing pendingPing = new PendingPing(nonce);
pendingPings.add(pendingPing);
sendMessage(new Ping(pendingPing.nonce));
@ -1254,22 +1222,17 @@ public class Peer {
* behind the peer, or negative if the peer is ahead of us.
*/
public int getPeerBlockHeightDifference() {
lock.lock();
try {
// Chain will overflow signed int blocks in ~41,000 years.
int chainHeight = (int) getBestHeight();
// chainHeight should not be zero/negative because we shouldn't have given the user a Peer that is to another
// client-mode node, nor should it be unconnected. If that happens it means the user overrode us somewhere or
// there is a bug in the peer management code.
checkState(params.allowEmptyPeerChains || chainHeight > 0, "Connected to peer with zero/negative chain height", chainHeight);
return chainHeight - blockChain.getBestChainHeight();
} finally {
lock.unlock();
}
// Chain will overflow signed int blocks in ~41,000 years.
int chainHeight = (int) getBestHeight();
// chainHeight should not be zero/negative because we shouldn't have given the user a Peer that is to another
// client-mode node, nor should it be unconnected. If that happens it means the user overrode us somewhere or
// there is a bug in the peer management code.
checkState(params.allowEmptyPeerChains || chainHeight > 0, "Connected to peer with zero/negative chain height", chainHeight);
return chainHeight - blockChain.getBestChainHeight();
}
private boolean isNotFoundMessageSupported() {
return getPeerVersionMessage().clientVersion >= 70001;
return vPeerVersionMessage.clientVersion >= 70001;
}
/**
@ -1277,7 +1240,7 @@ public class Peer {
* one peer to be downloading data. Defaults to true.
*/
public boolean getDownloadData() {
return downloadData.get();
return vDownloadData;
}
/**
@ -1286,19 +1249,19 @@ public class Peer {
* a request to the remote peer for the contents of its memory pool, if Bloom filtering is active.
*/
public void setDownloadData(boolean downloadData) {
this.downloadData.set(downloadData);
this.vDownloadData = downloadData;
}
/**
* @return the IP address and port of peer.
*/
public PeerAddress getAddress() {
return address.get();
return vAddress;
}
/** Returns version data announced by the remote peer. */
public VersionMessage getPeerVersionMessage() {
return peerVersionMessage.get();
return vPeerVersionMessage;
}
/** Returns version data we announce to our remote peers. */
@ -1310,7 +1273,7 @@ public class Peer {
* @return the height of the best chain as claimed by peer: sum of its ver announcement and blocks announced since.
*/
public long getBestHeight() {
return getPeerVersionMessage().bestHeight + blocksAnnounced.get();
return vPeerVersionMessage.bestHeight + blocksAnnounced.get();
}
/**
@ -1319,15 +1282,10 @@ public class Peer {
* @return if not-null then this is the future for the Peer disconnection event.
*/
public ChannelFuture setMinProtocolVersion(int minProtocolVersion) {
lock.lock();
try {
this.minProtocolVersion = minProtocolVersion;
} finally {
lock.unlock();
}
this.vMinProtocolVersion = minProtocolVersion;
if (getVersionMessage().clientVersion < minProtocolVersion) {
log.warn("{}: Disconnecting due to new min protocol version {}", this, minProtocolVersion);
return Channels.close(channel);
return Channels.close(vChannel);
} else {
return null;
}
@ -1336,7 +1294,7 @@ public class Peer {
/**
* <p>Sets a Bloom filter on this connection. This will cause the given {@link BloomFilter} object to be sent to the
* remote peer and if either a memory pool has been set using the constructor or the
* downloadData property is true, a {@link MemoryPoolMessage} is sent as well to trigger downloading of any
* vDownloadData property is true, a {@link MemoryPoolMessage} is sent as well to trigger downloading of any
* pending transactions that may be relevant.</p>
*
* <p>The Peer does not automatically request filters from any wallets added using {@link Peer#addWallet(Wallet)}.
@ -1345,19 +1303,16 @@ public class Peer {
*
* <p>Therefore, you should not use this method if your app uses a {@link PeerGroup}. It is called for you.</p>
*
* <p>If the remote peer doesn't support Bloom filtering, then this call is ignored.</p>
* <p>If the remote peer doesn't support Bloom filtering, then this call is ignored. Once set you presently cannot
* unset a filter, though the underlying p2p protocol does support it.</p>
*/
public void setBloomFilter(BloomFilter filter) throws IOException {
if (!getPeerVersionMessage().isBloomFilteringSupported())
checkNotNull(filter, "Clearing filters is not currently supported");
final VersionMessage ver = vPeerVersionMessage;
if (ver == null || !ver.isBloomFilteringSupported())
return;
boolean shouldQueryMemPool;
lock.lock();
try {
shouldQueryMemPool = memoryPool != null || downloadData.get();
bloomFilter = filter;
} finally {
lock.unlock();
}
vBloomFilter = filter;
boolean shouldQueryMemPool = memoryPool != null || vDownloadData;
log.info("{}: Sending Bloom filter{}", this, shouldQueryMemPool ? " and querying mempool" : "");
ChannelFuture future = sendMessage(filter);
if (shouldQueryMemPool)
@ -1373,11 +1328,6 @@ public class Peer {
* the remote node what transactions to send us, in a compact manner.
*/
public BloomFilter getBloomFilter() {
lock.lock();
try {
return bloomFilter;
} finally {
lock.unlock();
}
return vBloomFilter;
}
}