zmq test: fix flakiness by using more robust sync method

After connecting the subscriber sockets to the node, there is no
guarantee that the node's zmq publisher interfaces are ready yet, which
means that potentially the first expected notification messages could
get lost and the test fails. Currently this is handled by just waiting
for a short period of time (200ms), which works most of the time but is
still problematic, as in some rare cases the setup time takes much
longer, even in the range of multiple seconds.

The solution in this commit approaches the problem by using a more
robust method of syncing up, originally proposed by instagibbs:
    1. Generate a block on the node
    2. Try to receive a notification on all subscribers
    3. If all subscribers get a message within the timeout (1 second),
       we are done, otherwise repeat starting from step 1
This commit is contained in:
Sebastian Falbesoner 2021-01-26 00:30:24 +01:00
parent 8666033630
commit 5c6546362d

View File

@ -87,23 +87,45 @@ class ZMQTest (BitcoinTestFramework):
# Restart node with the specified zmq notifications enabled, subscribe to
# all of them and return the corresponding ZMQSubscriber objects.
def setup_zmq_test(self, services, recv_timeout=60, connect_nodes=False):
def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True):
subscribers = []
for topic, address in services:
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, recv_timeout*1000)
subscribers.append(ZMQSubscriber(socket, topic.encode()))
self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services])
if connect_nodes:
self.connect_nodes(0, 1)
for i, sub in enumerate(subscribers):
sub.socket.connect(services[i][1])
# Relax so that the subscribers are ready before publishing zmq messages
sleep(0.2)
# Ensure that all zmq publisher notification interfaces are ready by
# running the following "sync up" procedure:
# 1. Generate a block on the node
# 2. Try to receive a notification on all subscribers
# 3. If all subscribers get a message within the timeout (1 second),
# we are done, otherwise repeat starting from step 1
for sub in subscribers:
sub.socket.set(zmq.RCVTIMEO, 1000)
while True:
self.nodes[0].generate(1)
recv_failed = False
for sub in subscribers:
try:
sub.receive()
except zmq.error.Again:
self.log.debug("Didn't receive sync-up notification, trying again.")
recv_failed = True
if not recv_failed:
self.log.debug("ZMQ sync-up completed, all subscribers are ready.")
break
# set subscriber's desired timeout for the test
for sub in subscribers:
sub.socket.set(zmq.RCVTIMEO, recv_timeout*1000)
self.connect_nodes(0, 1)
if sync_blocks:
self.sync_blocks()
return subscribers
@ -113,9 +135,7 @@ class ZMQTest (BitcoinTestFramework):
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
address = 'tcp://127.0.0.1:28332'
subs = self.setup_zmq_test(
[(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]],
connect_nodes=True)
subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]])
hashblock = subs[0]
hashtx = subs[1]
@ -192,6 +212,7 @@ class ZMQTest (BitcoinTestFramework):
hashblock, hashtx = self.setup_zmq_test(
[(topic, address) for topic in ["hashblock", "hashtx"]],
recv_timeout=2) # 2 second timeout to check end of notifications
self.disconnect_nodes(0, 1)
# Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
@ -240,6 +261,7 @@ class ZMQTest (BitcoinTestFramework):
"""
self.log.info("Testing 'sequence' publisher")
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
self.disconnect_nodes(0, 1)
# Mempool sequence number starts at 1
seq_num = 1
@ -390,7 +412,7 @@ class ZMQTest (BitcoinTestFramework):
return
self.log.info("Testing 'mempool sync' usage of sequence notifier")
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")], connect_nodes=True)
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
# In-memory counter, should always start at 1
next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
@ -490,10 +512,13 @@ class ZMQTest (BitcoinTestFramework):
def test_multiple_interfaces(self):
# Set up two subscribers with different addresses
# (note that after the reorg test, syncing would fail due to different
# chain lengths on node0 and node1; for this test we only need node0, so
# we can disable syncing blocks on the setup)
subscribers = self.setup_zmq_test([
("hashblock", "tcp://127.0.0.1:28334"),
("hashblock", "tcp://127.0.0.1:28335"),
])
], sync_blocks=False)
# Generate 1 block in nodes[0] and receive all notifications
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)