mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-03-15 11:59:26 +01:00
discovery: fix state transition in GossipSyncer
Previously, we would set the state of the syncer after sending the msg, which has the following flow, 1. In state `queryNewChannels`, we send the msg `QueryShortChanIDs`. 2. Once the msg is sent, we change to state `waitingQueryChanReply`. But there's no guarantee the remote won't reply back inbetween the two step. When that happens, our syncer would still be in state `queryNewChannels`, causing the following error, ``` [ERR] DISC gossiper.go:873: Process query msg from peer [Alice] got unexpected msg *lnwire.ReplyShortChanIDsEnd received in state queryNewChannels ``` To fix it, we now make sure the state is updated before sending the msg.
This commit is contained in:
parent
4d05730c79
commit
37799b95b7
2 changed files with 13 additions and 14 deletions
|
@ -571,15 +571,11 @@ func (g *GossipSyncer) channelGraphSyncer() {
|
||||||
// First, we'll attempt to continue our channel
|
// First, we'll attempt to continue our channel
|
||||||
// synchronization by continuing to send off another
|
// synchronization by continuing to send off another
|
||||||
// query chunk.
|
// query chunk.
|
||||||
done, err := g.synchronizeChanIDs()
|
done := g.synchronizeChanIDs()
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Unable to sync chan IDs: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// If this wasn't our last query, then we'll need to
|
// If this wasn't our last query, then we'll need to
|
||||||
// transition to our waiting state.
|
// transition to our waiting state.
|
||||||
if !done {
|
if !done {
|
||||||
g.setSyncState(waitingQueryChanReply)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -736,14 +732,15 @@ func (g *GossipSyncer) sendGossipTimestampRange(firstTimestamp time.Time,
|
||||||
// been queried for with a response received. We'll chunk our requests as
|
// been queried for with a response received. We'll chunk our requests as
|
||||||
// required to ensure they fit into a single message. We may re-renter this
|
// required to ensure they fit into a single message. We may re-renter this
|
||||||
// state in the case that chunking is required.
|
// state in the case that chunking is required.
|
||||||
func (g *GossipSyncer) synchronizeChanIDs() (bool, error) {
|
func (g *GossipSyncer) synchronizeChanIDs() bool {
|
||||||
// If we're in this state yet there are no more new channels to query
|
// If we're in this state yet there are no more new channels to query
|
||||||
// for, then we'll transition to our final synced state and return true
|
// for, then we'll transition to our final synced state and return true
|
||||||
// to signal that we're fully synchronized.
|
// to signal that we're fully synchronized.
|
||||||
if len(g.newChansToQuery) == 0 {
|
if len(g.newChansToQuery) == 0 {
|
||||||
log.Infof("GossipSyncer(%x): no more chans to query",
|
log.Infof("GossipSyncer(%x): no more chans to query",
|
||||||
g.cfg.peerPub[:])
|
g.cfg.peerPub[:])
|
||||||
return true, nil
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Otherwise, we'll issue our next chunked query to receive replies
|
// Otherwise, we'll issue our next chunked query to receive replies
|
||||||
|
@ -767,6 +764,9 @@ func (g *GossipSyncer) synchronizeChanIDs() (bool, error) {
|
||||||
log.Infof("GossipSyncer(%x): querying for %v new channels",
|
log.Infof("GossipSyncer(%x): querying for %v new channels",
|
||||||
g.cfg.peerPub[:], len(queryChunk))
|
g.cfg.peerPub[:], len(queryChunk))
|
||||||
|
|
||||||
|
// Change the state before sending the query msg.
|
||||||
|
g.setSyncState(waitingQueryChanReply)
|
||||||
|
|
||||||
// With our chunk obtained, we'll send over our next query, then return
|
// With our chunk obtained, we'll send over our next query, then return
|
||||||
// false indicating that we're net yet fully synced.
|
// false indicating that we're net yet fully synced.
|
||||||
err := g.cfg.sendToPeer(&lnwire.QueryShortChanIDs{
|
err := g.cfg.sendToPeer(&lnwire.QueryShortChanIDs{
|
||||||
|
@ -774,8 +774,11 @@ func (g *GossipSyncer) synchronizeChanIDs() (bool, error) {
|
||||||
EncodingType: lnwire.EncodingSortedPlain,
|
EncodingType: lnwire.EncodingSortedPlain,
|
||||||
ShortChanIDs: queryChunk,
|
ShortChanIDs: queryChunk,
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Unable to sync chan IDs: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
return false, err
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is
|
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is
|
||||||
|
|
|
@ -1478,10 +1478,7 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
|
||||||
|
|
||||||
for i := 0; i < chunkSize*2; i += 2 {
|
for i := 0; i < chunkSize*2; i += 2 {
|
||||||
// With our set up complete, we'll request a sync of chan ID's.
|
// With our set up complete, we'll request a sync of chan ID's.
|
||||||
done, err := syncer.synchronizeChanIDs()
|
done := syncer.synchronizeChanIDs()
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unable to sync chan IDs: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// At this point, we shouldn't yet be done as only 2 items
|
// At this point, we shouldn't yet be done as only 2 items
|
||||||
// should have been queried for.
|
// should have been queried for.
|
||||||
|
@ -1528,8 +1525,7 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we issue another query, the syncer should tell us that it's done.
|
// If we issue another query, the syncer should tell us that it's done.
|
||||||
done, err := syncer.synchronizeChanIDs()
|
done := syncer.synchronizeChanIDs()
|
||||||
require.NoError(t, err, "unable to sync chan IDs")
|
|
||||||
if done {
|
if done {
|
||||||
t.Fatalf("syncer should be finished!")
|
t.Fatalf("syncer should be finished!")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue