Fix data race in BitcoindDaemonTest

Prevent intermittent test failures, caused by a race between checking
whether the mock socket is closed upon accepting a new connection and
setting 'socketClosed' to true during shutdown. Waiting to accept and
then checking the flag needs to be done in a synchronized block.
This commit is contained in:
Steven Barclay 2021-01-02 18:57:35 +00:00
parent 93b46e3a40
commit 796097abbc
No known key found for this signature in database
GPG Key ID: 9FED6BF1176D500B

View File

@ -24,6 +24,7 @@ import java.net.SocketException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -37,8 +38,6 @@ import org.junit.Test;
import static org.mockito.Mockito.*;
// FIXME: There appears to be a data race in some of these tests, causing intermittent Mockito.verify failures.
// We should remove the two Thread.sleep lines which were added below to try to prevent them.
public class BitcoindDaemonTest {
private BitcoindDaemon daemon;
private int acceptAnotherCount;
@ -52,13 +51,12 @@ public class BitcoindDaemonTest {
public void setUp() throws Exception {
var serverSocket = mock(ServerSocket.class);
when(serverSocket.accept()).then(invocation -> {
waitToAccept();
when(serverSocket.accept()).then(invocation -> waitToAccept(() -> {
if (socketClosed) {
throw new SocketException();
}
return socket;
});
}));
doAnswer((VoidAnswer) invocation -> {
socketClosed = true;
acceptAnother(1);
@ -79,42 +77,35 @@ public class BitcoindDaemonTest {
public void testNoBlocksMissedDuringFloodOfIncomingBlocks() throws Exception {
var latch = new CountDownLatch(1); // to block all the daemon worker threads until shutdown, as if stuck
synchronized (this) {
doAnswer((VoidAnswer) invocation -> latch.await()).when(blockListener).blockDetected(any());
when(socket.getInputStream()).then(invocation -> new ByteArrayInputStream("foo".getBytes()));
doAnswer((VoidAnswer) invocation -> latch.await()).when(blockListener).blockDetected(any());
when(socket.getInputStream()).then(invocation -> new ByteArrayInputStream("foo".getBytes()));
acceptAnother(50);
waitUntilAllAccepted();
}
acceptAnother(50);
waitUntilAllAccepted();
// Unblock all the daemon worker threads and shut down.
latch.countDown();
daemon.shutdown();
Thread.sleep(100);
verify(blockListener, times(50)).blockDetected("foo");
}
@Test
public void testBlockHashIsTrimmed() throws Exception {
synchronized (this) {
when(socket.getInputStream()).then(invocation -> new ByteArrayInputStream("\r\nbar \n".getBytes()));
when(socket.getInputStream()).then(invocation -> new ByteArrayInputStream("\r\nbar \n".getBytes()));
acceptAnother(1);
waitUntilAllAccepted();
}
acceptAnother(1);
waitUntilAllAccepted();
daemon.shutdown();
Thread.sleep(100);
verify(blockListener).blockDetected("bar");
}
@Test
public void testBrokenSocketRead() throws Exception {
synchronized (this) {
when(socket.getInputStream()).thenThrow(IOException.class);
acceptAnother(1);
}
when(socket.getInputStream()).thenThrow(IOException.class);
acceptAnother(1);
errorHandlerLatch.await(5, TimeUnit.SECONDS);
verify(errorHandler).accept(argThat(t -> t instanceof NotificationHandlerException &&
@ -123,13 +114,12 @@ public class BitcoindDaemonTest {
@Test
public void testRuntimeExceptionInBlockListener() throws Exception {
synchronized (this) {
daemon.setBlockListener(blockHash -> {
throw new IndexOutOfBoundsException();
});
when(socket.getInputStream()).then(invocation -> new ByteArrayInputStream("foo".getBytes()));
acceptAnother(1);
}
daemon.setBlockListener(blockHash -> {
throw new IndexOutOfBoundsException();
});
when(socket.getInputStream()).then(invocation -> new ByteArrayInputStream("foo".getBytes()));
acceptAnother(1);
errorHandlerLatch.await(5, TimeUnit.SECONDS);
verify(errorHandler).accept(argThat(t -> t instanceof NotificationHandlerException &&
@ -161,12 +151,14 @@ public class BitcoindDaemonTest {
notifyAll();
}
private synchronized void waitToAccept() throws InterruptedException {
private synchronized <V> V waitToAccept(Callable<V> onAccept) throws Exception {
while (acceptAnotherCount == 0) {
wait();
}
var result = onAccept.call();
acceptAnotherCount--;
notifyAll();
return result;
}
private synchronized void waitUntilAllAccepted() throws InterruptedException {