discovery: fix boltdb deadlock bug in ann retrans, retrans at start

This commit does two things. First we fix a deadlock bug within boltdb
that could arise when the channel router attempted to open a
transaction, while the retransmission loop was attempting to send a
message to the channel router (circular wait). Second, we’ve refactored
out the retransmission into its own function. This allows us to kick
off retransmission as soon as the AuthenticatedGossiper is created.
This commit is contained in:
Olaoluwa Osuntokun 2017-10-04 20:07:54 -07:00
parent 3b7855e449
commit 4be2e95a78
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

View File

@ -346,6 +346,13 @@ func (d *AuthenticatedGossiper) networkHandler() {
trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
defer trickleTimer.Stop()
// To start, we'll first check to see if there're any stale channels
// that we need to re-transmit.
if err := d.retransmitStaleChannels(); err != nil {
log.Errorf("unable to rebroadcast stale channels: %v",
err)
}
for {
select {
// A new fee update has arrived. We'll commit it to the
@ -456,58 +463,8 @@ func (d *AuthenticatedGossiper) networkHandler() {
// channel advertisements that have been dropped, or not properly
// propagated through the network.
case <-retransmitTimer.C:
var selfChans []lnwire.Message
// Iterate over all of our channels and check if any of
// them fall within the prune interval or re-broadcast
// interval.
err := d.cfg.Router.ForAllOutgoingChannels(func(
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {
const broadcastInterval = time.Hour * 24
timeElapsed := time.Since(edge.LastUpdate)
// If it's been a full day since we've
// re-broadcasted the channel, then we'll
// re-sign it with an updated time stamp.
if timeElapsed >= broadcastInterval {
// Re-sign and update the channel on
// disk and retrieve our ChannelUpdate
// to broadcast.
chanUpdate, err := d.updateChannel(info, edge)
if err != nil {
log.Errorf("unable to update channel: %v", err)
return err
}
selfChans = append(selfChans, chanUpdate)
}
return nil
})
if err != nil {
log.Errorf("error while retrieving outgoing "+
"channels: %v", err)
continue
}
// If we don't have any channels to re-broadcast, then
// continue.
if len(selfChans) == 0 {
continue
}
log.Debugf("Retransmitting %v outgoing channels",
len(selfChans))
// TODO(roasbeef): also send the channel ann?
// With all the wire announcements properly crafted,
// we'll broadcast our known outgoing channels to all
// our immediate peers.
if err := d.cfg.Broadcast(nil, selfChans...); err != nil {
log.Errorf("unable to re-broadcast "+
if err := d.retransmitStaleChannels(); err != nil {
log.Errorf("unable to rebroadcast stale "+
"channels: %v", err)
}
@ -531,6 +488,74 @@ func (d *AuthenticatedGossiper) networkHandler() {
}
}
// retransmitStaleChannels eaxmines all outgoing channels that the source node
// is known to maintain to check to see if any of them are "stale". A channel
// is stale iff, the last timestamp of it's rebroadcast is older then
// broadcastInterval.
func (d *AuthenticatedGossiper) retransmitStaleChannels() error {
// Iterate over all of our channels and check if any of them fall
// within the prune interval or re-broadcast interval.
type updateTuple struct {
info *channeldb.ChannelEdgeInfo
edge *channeldb.ChannelEdgePolicy
}
var edgesToUpdate []updateTuple
err := d.cfg.Router.ForAllOutgoingChannels(func(
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {
const broadcastInterval = time.Hour * 24
timeElapsed := time.Since(edge.LastUpdate)
// If it's been a full day since we've re-broadcasted the
// channel, add the channel to the set of edges we need to
// update.
if timeElapsed >= broadcastInterval {
edgesToUpdate = append(edgesToUpdate, updateTuple{
info: info,
edge: edge,
})
}
return nil
})
if err != nil {
return fmt.Errorf("error while retrieving outgoing "+
"channels: %v", err)
}
var signedUpdates []lnwire.Message
for _, chanToUpdate := range edgesToUpdate {
// Re-sign and update the channel on disk and retrieve our
// ChannelUpdate to broadcast.
chanUpdate, err := d.updateChannel(chanToUpdate.info,
chanToUpdate.edge)
if err != nil {
return fmt.Errorf("unable to update channel: %v", err)
}
signedUpdates = append(signedUpdates, chanUpdate)
}
// If we don't have any channels to re-broadcast, then we'll exit
// early.
if len(signedUpdates) == 0 {
return nil
}
log.Infof("Retransmitting %v outgoing channels", len(signedUpdates))
// TODO(roasbeef): also send the channel ann?
// With all the wire announcements properly crafted, we'll broadcast
// our known outgoing channels to all our immediate peers.
if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil {
return fmt.Errorf("unable to re-broadcast channels: %v", err)
}
return nil
}
// processFeeChanUpdate generates a new set of channel updates with the new fee
// schema applied for each specified channel identified by its channel point.
// In the case that no channel points are specified, then the fee update will