mirror of
https://github.com/ACINQ/eclair.git
synced 2024-11-20 02:27:32 +01:00
router: consistency fixes
- use the same signature for main and mainWithLogs - remove useless fields from Router.State - start in 'uninitialized' mode and wait for State message (either empty or retrieved from db)
This commit is contained in:
parent
b4af63b728
commit
14da69a612
@ -46,7 +46,19 @@ class Router(watcher: ActorRef, db: SimpleDb) extends Actor with ActorLogging {
|
||||
|
||||
context.system.scheduler.schedule(10 seconds, 60 seconds, self, 'tick_broadcast)
|
||||
|
||||
def receive: Receive = main(State.empty)
|
||||
def saveState(nodes: Map[BinaryData, NodeAnnouncement], channels: Map[Long, ChannelAnnouncement], updates: Map[ChannelDesc, ChannelUpdate], rebroadcast: Seq[RoutingMessage]): Unit = {
|
||||
routerDb.put("router.state", State(nodes, channels, updates, rebroadcast).fixme)
|
||||
}
|
||||
|
||||
def receive: Receive = {
|
||||
case state: State =>
|
||||
import state._
|
||||
state.nodes.values.map(n => self ! n)
|
||||
state.channels.values.map(c => self ! c)
|
||||
context become main(nodes, channels, updates, rebroadcast, Set(), Seq())
|
||||
|
||||
case other => log.warning(s"unhandled message $other, router has not been initialized yet")
|
||||
}
|
||||
|
||||
def mainWithLog(nodes: Map[BinaryData, NodeAnnouncement],
|
||||
channels: Map[Long, ChannelAnnouncement],
|
||||
@ -55,19 +67,17 @@ class Router(watcher: ActorRef, db: SimpleDb) extends Actor with ActorLogging {
|
||||
awaiting: Set[ChannelAnnouncement],
|
||||
stash: Seq[RoutingMessage]) = {
|
||||
log.info(s"current status channels=${channels.size} nodes=${nodes.size} updates=${updates.size}")
|
||||
val state = State(nodes, channels, updates, rebroadcast, awaiting, stash)
|
||||
routerDb.put("router.state", state.fixme)
|
||||
main(state)
|
||||
main(nodes, channels, updates, rebroadcast, awaiting, stash)
|
||||
}
|
||||
|
||||
def main(state: State): Receive = {
|
||||
case newState: State =>
|
||||
newState.nodes.values.map(n => self ! n)
|
||||
newState.channels.values.map(c => self ! c)
|
||||
context become main(newState)
|
||||
def main(nodes: Map[BinaryData, NodeAnnouncement],
|
||||
channels: Map[Long, ChannelAnnouncement],
|
||||
updates: Map[ChannelDesc, ChannelUpdate],
|
||||
rebroadcast: Seq[RoutingMessage],
|
||||
awaiting: Set[ChannelAnnouncement],
|
||||
stash: Seq[RoutingMessage]): Receive = {
|
||||
|
||||
case SendRoutingState(remote) =>
|
||||
import state._
|
||||
log.info(s"info sending all announcements to $remote: channels=${channels.size} nodes=${nodes.size} updates=${updates.size}")
|
||||
channels.values.foreach(remote ! _)
|
||||
updates.values.foreach(remote ! _)
|
||||
@ -78,24 +88,20 @@ class Router(watcher: ActorRef, db: SimpleDb) extends Actor with ActorLogging {
|
||||
log.error(s"bad signature for announcement $c")
|
||||
sender ! Error(0, "bad announcement sig!!!".getBytes())
|
||||
|
||||
case c: ChannelAnnouncement if state.channels.containsKey(c.channelId) =>
|
||||
case c: ChannelAnnouncement if channels.containsKey(c.channelId) =>
|
||||
log.debug(s"ignoring $c (duplicate)")
|
||||
|
||||
case c: ChannelAnnouncement =>
|
||||
import state._
|
||||
val (blockHeight, txIndex, outputIndex) = fromShortId(c.channelId)
|
||||
log.info(s"retrieving raw tx with blockHeight=$blockHeight and txIndex=$txIndex")
|
||||
watcher ! GetTx(blockHeight, txIndex, outputIndex, c)
|
||||
val state1 = state.copy(awaiting = awaiting + c)
|
||||
routerDb.put("router.state", state1.fixme)
|
||||
context become main(state1)
|
||||
context become main(nodes, channels, updates, rebroadcast, awaiting + c, stash)
|
||||
|
||||
|
||||
case GetTxResponse(tx, isSpendable, c: ChannelAnnouncement) if !isSpendable =>
|
||||
log.debug(s"ignoring $c (funding tx spent)")
|
||||
|
||||
case GetTxResponse(tx, _, c: ChannelAnnouncement) =>
|
||||
import state._
|
||||
// TODO: check sigs
|
||||
// TODO: blacklist if already received same channel id and different node ids
|
||||
val (_, _, outputIndex) = fromShortId(c.channelId)
|
||||
@ -119,8 +125,7 @@ class Router(watcher: ActorRef, db: SimpleDb) extends Actor with ActorLogging {
|
||||
log.error(s"bad signature for announcement $n")
|
||||
sender ! Error(0, "bad announcement sig!!!".getBytes())
|
||||
|
||||
case WatchEventSpent(BITCOIN_FUNDING_OTHER_CHANNEL_SPENT(channelId), tx) if state.channels.containsKey(channelId) =>
|
||||
import state._
|
||||
case WatchEventSpent(BITCOIN_FUNDING_OTHER_CHANNEL_SPENT(channelId), tx) if channels.containsKey(channelId) =>
|
||||
val lostChannel = channels(channelId)
|
||||
log.info(s"funding tx of channelId=$channelId has been spent by txid=${tx.txid}")
|
||||
log.info(s"removed channel channelId=$channelId")
|
||||
@ -138,70 +143,66 @@ class Router(watcher: ActorRef, db: SimpleDb) extends Actor with ActorLogging {
|
||||
val lostNodes = isNodeLost(lostChannel.nodeId1).toSeq ++ isNodeLost(lostChannel.nodeId2).toSeq
|
||||
context become mainWithLog(nodes -- lostNodes, channels - channelId, updates.filterKeys(_.id != channelId), rebroadcast, awaiting, stash)
|
||||
|
||||
case n: NodeAnnouncement if state.awaiting.size > 0 =>
|
||||
val state1 = state.copy(stash = state.stash :+ n)
|
||||
routerDb.put("router.state", state1.fixme)
|
||||
context become main(state1)
|
||||
case n: NodeAnnouncement if awaiting.size > 0 =>
|
||||
context become main(nodes, channels, updates, rebroadcast, awaiting, stash :+ n)
|
||||
|
||||
case n: NodeAnnouncement if !state.channels.values.exists(c => c.nodeId1 == n.nodeId || c.nodeId2 == n.nodeId) =>
|
||||
case n: NodeAnnouncement if !channels.values.exists(c => c.nodeId1 == n.nodeId || c.nodeId2 == n.nodeId) =>
|
||||
log.debug(s"ignoring $n (no related channel found)")
|
||||
|
||||
case n: NodeAnnouncement if state.nodes.containsKey(n.nodeId) && state.nodes(n.nodeId).timestamp >= n.timestamp =>
|
||||
case n: NodeAnnouncement if nodes.containsKey(n.nodeId) && nodes(n.nodeId).timestamp >= n.timestamp =>
|
||||
log.debug(s"ignoring announcement $n (old timestamp or duplicate)")
|
||||
|
||||
case n: NodeAnnouncement if state.nodes.containsKey(n.nodeId) =>
|
||||
import state._
|
||||
case n: NodeAnnouncement if nodes.containsKey(n.nodeId) =>
|
||||
log.info(s"updated node nodeId=${n.nodeId}")
|
||||
context.system.eventStream.publish(NodeUpdated(n))
|
||||
saveState(nodes + (n.nodeId -> n), channels, updates, rebroadcast :+ n)
|
||||
context become mainWithLog(nodes + (n.nodeId -> n), channels, updates, rebroadcast :+ n, awaiting, stash)
|
||||
|
||||
case n: NodeAnnouncement =>
|
||||
import state._
|
||||
log.info(s"added node nodeId=${n.nodeId}")
|
||||
context.system.eventStream.publish(NodeDiscovered(n))
|
||||
saveState(nodes + (n.nodeId -> n), channels, updates, rebroadcast :+ n)
|
||||
context become mainWithLog(nodes + (n.nodeId -> n), channels, updates, rebroadcast :+ n, awaiting, stash)
|
||||
|
||||
case u: ChannelUpdate if state.awaiting.size > 0 =>
|
||||
context become main(state.copy(stash = state.stash :+ u))
|
||||
case u: ChannelUpdate if awaiting.size > 0 =>
|
||||
context become main(nodes, channels, updates, rebroadcast, awaiting, stash :+ u)
|
||||
|
||||
case u: ChannelUpdate if !state.channels.contains(u.channelId) =>
|
||||
case u: ChannelUpdate if !channels.contains(u.channelId) =>
|
||||
log.debug(s"ignoring $u (no related channel found)")
|
||||
|
||||
case u: ChannelUpdate if !Announcements.checkSig(u, getDesc(u, state.channels(u.channelId)).a) =>
|
||||
case u: ChannelUpdate if !Announcements.checkSig(u, getDesc(u, channels(u.channelId)).a) =>
|
||||
// TODO: (dirty) this will make the origin channel close the connection
|
||||
log.error(s"bad signature for announcement $u")
|
||||
sender ! Error(0, "bad announcement sig!!!".getBytes())
|
||||
|
||||
case u: ChannelUpdate =>
|
||||
import state._
|
||||
val channel = channels(u.channelId)
|
||||
val desc = getDesc(u, channel)
|
||||
if (updates.contains(desc) && updates(desc).timestamp >= u.timestamp) {
|
||||
log.debug(s"ignoring $u (old timestamp or duplicate)")
|
||||
} else {
|
||||
saveState(nodes, channels, updates + (desc -> u), rebroadcast :+ u)
|
||||
context become mainWithLog(nodes, channels, updates + (desc -> u), rebroadcast :+ u, awaiting, stash)
|
||||
}
|
||||
|
||||
case 'tick_broadcast if state.rebroadcast.size == 0 =>
|
||||
case 'tick_broadcast if rebroadcast.size == 0 =>
|
||||
// no-op
|
||||
|
||||
case 'tick_broadcast =>
|
||||
import state._
|
||||
log.info(s"broadcasting ${rebroadcast.size} routing messages")
|
||||
rebroadcast.foreach(context.actorSelection(Register.actorPathToPeers) ! _)
|
||||
val state1 = State(nodes, channels, updates, Nil, awaiting, stash)
|
||||
routerDb.put("router.state", state1.fixme)
|
||||
context become main(state1)
|
||||
saveState(nodes, channels, updates, Nil)
|
||||
context become main(nodes, channels, updates, Nil, awaiting, stash)
|
||||
|
||||
case 'nodes => sender ! state.nodes.values
|
||||
case 'nodes => sender ! nodes.values
|
||||
|
||||
case 'channels => sender ! state.channels.values
|
||||
case 'channels => sender ! channels.values
|
||||
|
||||
case 'updates => sender ! state.updates.values
|
||||
case 'updates => sender ! updates.values
|
||||
|
||||
case 'dot => graph2dot(state.nodes, state.channels) pipeTo sender
|
||||
case 'dot => graph2dot(nodes, channels) pipeTo sender
|
||||
|
||||
case RouteRequest(start, end) => findRoute(start, end, state.updates).map(RouteResponse(_)) pipeTo sender
|
||||
case RouteRequest(start, end) => findRoute(start, end, updates).map(RouteResponse(_)) pipeTo sender
|
||||
|
||||
case other => log.warning(s"unhandled message $other")
|
||||
}
|
||||
@ -290,16 +291,14 @@ object Router {
|
||||
case class State(nodes: Map[BinaryData, NodeAnnouncement],
|
||||
channels: Map[Long, ChannelAnnouncement],
|
||||
updates: Map[ChannelDesc, ChannelUpdate],
|
||||
rebroadcast: Seq[RoutingMessage],
|
||||
awaiting: Set[ChannelAnnouncement],
|
||||
stash: Seq[RoutingMessage]) {
|
||||
rebroadcast: Seq[RoutingMessage]) {
|
||||
// see http://stackoverflow.com/questions/32900862/map-can-not-be-serializable-in-scala :(
|
||||
// we alse remove transient fields (awaiting and stash)
|
||||
def fixme = this.copy(nodes = nodes.map(identity), channels = channels.map(identity), updates = updates.map(identity), awaiting = Set(), stash = Seq())
|
||||
def fixme = this.copy(nodes = nodes.map(identity), channels = channels.map(identity), updates = updates.map(identity))
|
||||
}
|
||||
|
||||
object State {
|
||||
val empty = State(nodes = Map(), channels = Map(), updates = Map(), rebroadcast = Nil, awaiting = Set(), stash = Nil)
|
||||
val empty = State(nodes = Map(), channels = Map(), updates = Map(), rebroadcast = Nil)
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -69,6 +69,7 @@ abstract class BaseRouterSpec extends TestkitBaseClass {
|
||||
// first we set up the router
|
||||
val watcher = TestProbe()
|
||||
val router = system.actorOf(Router.props(watcher.ref, new DummyDb))
|
||||
router ! Router.State.empty
|
||||
// we announce channels
|
||||
router ! chan_ab
|
||||
router ! chan_bc
|
||||
|
Loading…
Reference in New Issue
Block a user