1
0
Fork 0
mirror of https://github.com/ACINQ/eclair.git synced 2025-02-24 06:47:46 +01:00

Add TCP keep-alive on ZMQ socket (#1807)

One of ZMQ's drawbacks is that subscribers on an unreliable network may
silently disconnect from publishers in case of network failures.

In our case, we want to reconnect immediately when that happens, so we set
a tcp keep-alive to ensure this.

Fixes #1789
This commit is contained in:
Bastien Teinturier 2021-05-17 15:32:25 +02:00 committed by GitHub
parent 91419980bd
commit 1fbede7618
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 7 deletions

View file

@ -196,7 +196,7 @@
<dependency>
<groupId>org.zeromq</groupId>
<artifactId>jeromq</artifactId>
<version>0.5.0</version>
<version>0.5.2</version>
</dependency>
<!-- SERIALIZATION -->
<dependency>

View file

@ -41,6 +41,7 @@ class ZMQActor(address: String, topic: String, connected: Option[Promise[Done]]
subscriber.monitor("inproc://events", ZMQ.EVENT_CONNECTED | ZMQ.EVENT_DISCONNECTED)
subscriber.connect(address)
subscriber.subscribe(topic.getBytes(ZMQ.CHARSET))
subscriber.setTCPKeepAlive(1)
val monitor = ctx.createSocket(SocketType.PAIR)
monitor.connect("inproc://events")
@ -49,18 +50,18 @@ class ZMQActor(address: String, topic: String, connected: Option[Promise[Done]]
// we check messages in a non-blocking manner with an interval, making sure to retrieve all messages before waiting again
@tailrec
final def checkEvent: Unit = Option(Event.recv(monitor, ZMQ.DONTWAIT)) match {
final def checkEvent(): Unit = Option(Event.recv(monitor, ZMQ.DONTWAIT)) match {
case Some(event) =>
self ! event
checkEvent
checkEvent()
case None => ()
}
@tailrec
final def checkMsg: Unit = Option(ZMsg.recvMsg(subscriber, ZMQ.DONTWAIT)) match {
final def checkMsg(): Unit = Option(ZMsg.recvMsg(subscriber, ZMQ.DONTWAIT)) match {
case Some(msg) =>
self ! msg
checkMsg
checkMsg()
case None => ()
}
@ -69,11 +70,11 @@ class ZMQActor(address: String, topic: String, connected: Option[Promise[Done]]
override def receive: Receive = {
case Symbol("checkEvent") =>
checkEvent
checkEvent()
context.system.scheduler.scheduleOnce(1 second, self, Symbol("checkEvent"))
case Symbol("checkMsg") =>
checkMsg
checkMsg()
context.system.scheduler.scheduleOnce(1 second, self, Symbol("checkMsg"))
case event: Event => event.getEvent match {

View file

@ -84,6 +84,22 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind
}
}
test("reconnect ZMQ automatically") {
withWatcher(f => {
import f._
// When the watcher starts, it broadcasts the current height.
val block1 = listener.expectMsgType[CurrentBlockCount]
listener.expectNoMessage(100 millis)
restartBitcoind(probe)
generateBlocks(1)
val block2 = listener.expectMsgType[CurrentBlockCount]
assert(block2.blockCount === block1.blockCount + 1)
listener.expectNoMessage(100 millis)
})
}
test("add/remove watches from/to utxo map") {
val m0 = Map.empty[OutPoint, Set[Watch[_ <: WatchTriggered]]]
val txid = randomBytes32