Merge pull request #6054 from yyforyongyu/itest-fix-gossiper

discovery: resend premature messages when block height reached
This commit is contained in:
Olaoluwa Osuntokun 2022-01-04 17:52:18 -08:00 committed by GitHub
commit ebc1547abc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 303 additions and 78 deletions

View file

@ -614,8 +614,8 @@ func (n *TxNotifier) RegisterConf(txid *chainhash.Hash, pkScript []byte,
if err == nil {
if hint > startHeight {
Log.Debugf("Using height hint %d retrieved from cache "+
"for %v instead of %d", hint, ntfn.ConfRequest,
startHeight)
"for %v instead of %d for conf subscription",
hint, ntfn.ConfRequest, startHeight)
startHeight = hint
}
} else if err != ErrConfirmHintNotFound {
@ -1009,8 +1009,8 @@ func (n *TxNotifier) RegisterSpend(outpoint *wire.OutPoint, pkScript []byte,
if err == nil {
if hint > startHeight {
Log.Debugf("Using height hint %d retrieved from cache "+
"for %v instead of %d", hint, ntfn.SpendRequest,
startHeight)
"for %v instead of %d for spend subscription",
hint, ntfn.SpendRequest, startHeight)
startHeight = hint
}
} else if err != ErrSpendHintNotFound {

View file

@ -309,6 +309,14 @@ func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
return k
}
// sourceToPub returns a serialized-compressed public key for use in the reject
// cache.
func sourceToPub(pk *btcec.PublicKey) [33]byte {
var pub [33]byte
copy(pub[:], pk.SerializeCompressed())
return pub
}
// cachedReject is the empty value used to track the value for rejects.
type cachedReject struct {
}
@ -360,6 +368,13 @@ type AuthenticatedGossiper struct {
// networkHandler.
networkMsgs chan *networkMsg
// futureMsgs is a list of premature network messages that have a block
// height specified in the future. We will save them and resend it to
// the chan networkMsgs once the block height has reached. The cached
// map format is,
// {blockHeight: [msg1, msg2, ...], ...}
futureMsgs *lru.Cache
// chanPolicyUpdates is a channel that requests to update the
// forwarding policy of a set of channels is sent over.
chanPolicyUpdates chan *chanPolicyUpdateRequest
@ -415,6 +430,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
selfKeyLoc: selfKeyDesc.KeyLocator,
cfg: &cfg,
networkMsgs: make(chan *networkMsg),
futureMsgs: lru.NewCache(maxPrematureUpdates),
quit: make(chan struct{}),
chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
prematureChannelUpdates: lru.NewCache(maxPrematureUpdates),
@ -514,12 +530,82 @@ func (d *AuthenticatedGossiper) start() error {
d.syncMgr.Start()
d.wg.Add(1)
// Start receiving blocks in its dedicated goroutine.
d.wg.Add(2)
go d.syncBlockHeight()
go d.networkHandler()
return nil
}
// syncBlockHeight syncs the best block height for the gossiper by reading
// blockEpochs.
//
// NOTE: must be run as a goroutine.
func (d *AuthenticatedGossiper) syncBlockHeight() {
defer d.wg.Done()
for {
select {
// A new block has arrived, so we can re-process the previously
// premature announcements.
case newBlock, ok := <-d.blockEpochs.Epochs:
// If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves.
if !ok {
return
}
// Once a new block arrives, we update our running
// track of the height of the chain tip.
d.Lock()
blockHeight := uint32(newBlock.Height)
d.bestHeight = blockHeight
d.Unlock()
log.Debugf("New block: height=%d, hash=%s", blockHeight,
newBlock.Hash)
// Resend future messages, if any.
d.resendFutureMessages(blockHeight)
case <-d.quit:
return
}
}
}
// resendFutureMessages takes a block height, resends all the future messages
// found at that height and deletes those messages found in the gossiper's
// futureMsgs.
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
result, err := d.futureMsgs.Get(height)
// Return early if no messages found.
if err == cache.ErrElementNotFound {
return
}
// The error must nil, we will log an error and exit.
if err != nil {
log.Errorf("Reading future messages got error: %v", err)
return
}
msgs := result.(*cachedNetworkMsg).msgs
log.Debugf("Resending %d network messages at height %d",
len(msgs), height)
for _, msg := range msgs {
select {
case d.networkMsgs <- msg:
case <-d.quit:
msg.err <- ErrGossiperShuttingDown
}
}
}
// Stop signals any active goroutines for a graceful closure.
func (d *AuthenticatedGossiper) Stop() error {
d.stopped.Do(func() {
@ -757,6 +843,8 @@ func (d *deDupedAnnouncements) reset() {
// and the set of senders is updated to reflect which node sent us this
// message.
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
log.Tracef("Adding network message: %v to batch", message.msg.MsgType())
// Depending on the message type (channel announcement, channel update,
// or node announcement), the message is added to the corresponding map
// in deDupedAnnouncements. Because each identifying key can have at
@ -806,6 +894,10 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) {
// If we already had this message with a strictly newer
// timestamp, then we'll just discard the message we got.
if oldTimestamp > msg.Timestamp {
log.Debugf("Ignored outdated network message: "+
"peer=%v, source=%x, msg=%s, ", message.peer,
message.source.SerializeCompressed(),
msg.MsgType())
return
}
@ -1025,6 +1117,9 @@ func (d *AuthenticatedGossiper) networkHandler() {
// sub-systems below us, then craft, sign, and broadcast a new
// ChannelUpdate for the set of affected clients.
case policyUpdate := <-d.chanPolicyUpdates:
log.Tracef("Received channel %d policy update requests",
len(policyUpdate.edgesToUpdate))
// First, we'll now create new fully signed updates for
// the affected channels and also update the underlying
// graph with the new state.
@ -1044,6 +1139,13 @@ func (d *AuthenticatedGossiper) networkHandler() {
announcements.AddMsgs(newChanUpdates...)
case announcement := <-d.networkMsgs:
log.Tracef("Received network message: "+
"peer=%v, source=%x, msg=%s, is_remote=%v",
announcement.peer,
announcement.source.SerializeCompressed(),
announcement.msg.MsgType(),
announcement.isRemote)
// We should only broadcast this message forward if it
// originated from us or it wasn't received as part of
// our initial historical sync.
@ -1057,6 +1159,11 @@ func (d *AuthenticatedGossiper) networkHandler() {
emittedAnnouncements, _ := d.processNetworkAnnouncement(
announcement,
)
log.Debugf("Processed network message %s, "+
"returned len(announcements)=%v",
announcement.msg.MsgType(),
len(emittedAnnouncements))
if emittedAnnouncements != nil {
announcements.AddMsgs(
emittedAnnouncements...,
@ -1068,7 +1175,8 @@ func (d *AuthenticatedGossiper) networkHandler() {
// If this message was recently rejected, then we won't
// attempt to re-process it.
if announcement.isRemote && d.isRecentlyRejectedMsg(
announcement.msg, announcement.peer.PubKey(),
announcement.msg,
sourceToPub(announcement.source),
) {
announcement.err <- fmt.Errorf("recently " +
"rejected")
@ -1092,8 +1200,11 @@ func (d *AuthenticatedGossiper) networkHandler() {
announcement.msg,
)
if err != nil {
if err != routing.ErrVBarrierShuttingDown &&
err != routing.ErrParentValidationFailed {
if !routing.IsError(
err,
routing.ErrVBarrierShuttingDown,
routing.ErrParentValidationFailed,
) {
log.Warnf("unexpected error "+
"during validation "+
"barrier shutdown: %v",
@ -1111,6 +1222,13 @@ func (d *AuthenticatedGossiper) networkHandler() {
announcement,
)
log.Tracef("Processed network message %s, "+
"returned len(announcements)=%v, "+
"allowDependents=%v",
announcement.msg.MsgType(),
len(emittedAnnouncements),
allowDependents)
// If this message had any dependencies, then
// we can now signal them to continue.
validationBarrier.SignalDependants(
@ -1135,25 +1253,6 @@ func (d *AuthenticatedGossiper) networkHandler() {
}()
// A new block has arrived, so we can re-process the previously
// premature announcements.
case newBlock, ok := <-d.blockEpochs.Epochs:
// If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves.
if !ok {
return
}
// Once a new block arrives, we update our running
// track of the height of the chain tip.
d.Lock()
blockHeight := uint32(newBlock.Height)
d.bestHeight = blockHeight
d.Unlock()
log.Debugf("New block: height=%d, hash=%s", blockHeight,
newBlock.Hash)
// The trickle timer has ticked, which indicates we should
// flush to the network the pending batch of new announcements
// we've received since the last trickle tick.
@ -1579,6 +1678,64 @@ func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
return d.cfg.Router.AddNode(node, op...)
}
// isPremature decides whether a given network message has a block height+delta
// value specified in the future. If so, the message will be added to the
// future message map and be processed when the block height as reached.
//
// NOTE: must be used inside a lock.
func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
delta uint32, msg *networkMsg) bool {
// TODO(roasbeef) make height delta 6
// * or configurable
msgHeight := chanID.BlockHeight + delta
// The message height is smaller or equal to our best known height,
// thus the message is mature.
if msgHeight <= d.bestHeight {
return false
}
// Add the premature message to our future messages which will
// be resent once the block height has reached.
//
// Init an empty cached message and overwrite it if there are cached
// messages found.
cachedMsgs := &cachedNetworkMsg{
msgs: make([]*networkMsg, 0),
}
result, err := d.futureMsgs.Get(msgHeight)
// No error returned means we have old messages cached.
if err == nil {
cachedMsgs = result.(*cachedNetworkMsg)
}
// Copy the networkMsgs since the old message's err chan will
// be consumed.
copied := &networkMsg{
peer: msg.peer,
source: msg.source,
msg: msg.msg,
optionalMsgFields: msg.optionalMsgFields,
isRemote: msg.isRemote,
err: make(chan error, 1),
}
// Add the network message.
cachedMsgs.msgs = append(cachedMsgs.msgs, copied)
_, err = d.futureMsgs.Put(msgHeight, cachedMsgs)
if err != nil {
log.Errorf("Adding future message got error: %v", err)
}
log.Debugf("Network message: %v added to future messages for "+
"msgHeight=%d, bestHeight=%d", msg.msg.MsgType(),
msgHeight, d.bestHeight)
return true
}
// processNetworkAnnouncement processes a new network relate authenticated
// channel or node announcement or announcements proofs. If the announcement
// didn't affect the internal state due to either being out of date, invalid,
@ -1589,11 +1746,9 @@ func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
nMsg *networkMsg) ([]networkMsg, bool) {
isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool {
// TODO(roasbeef) make height delta 6
// * or configurable
return chanID.BlockHeight+delta > d.bestHeight
}
log.Debugf("Processing network message: peer=%v, source=%x, msg=%s, "+
"is_remote=%v", nMsg.peer, nMsg.source.SerializeCompressed(),
nMsg.msg.MsgType(), nMsg.isRemote)
// If this is a remote update, we set the scheduler option to lazily
// add it to the graph.
@ -1616,16 +1771,22 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// newer update for this node so we can skip validating
// signatures if not required.
if d.cfg.Router.IsStaleNode(msg.NodeID, timestamp) {
log.Debugf("Skipped processing stale node: %x",
msg.NodeID)
nMsg.err <- nil
return nil, true
}
if err := d.addNode(msg, schedulerOp...); err != nil {
if routing.IsError(err, routing.ErrOutdated,
routing.ErrIgnored) {
log.Debugf("Adding node: %x got error: %v",
msg.NodeID, err)
log.Debug(err)
} else if err != routing.ErrVBarrierShuttingDown {
if !routing.IsError(
err,
routing.ErrOutdated,
routing.ErrIgnored,
routing.ErrVBarrierShuttingDown,
) {
log.Error(err)
}
@ -1676,7 +1837,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
key := newRejectCacheKey(
msg.ShortChannelID.ToUint64(),
nMsg.peer.PubKey(),
sourceToPub(nMsg.source),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})
@ -1687,8 +1848,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// If the advertised inclusionary block is beyond our knowledge
// of the chain tip, then we'll ignore for it now.
d.Lock()
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) {
log.Infof("Announcement for chan_id=(%v), is "+
if nMsg.isRemote && d.isPremature(msg.ShortChannelID, 0, nMsg) {
log.Warnf("Announcement for chan_id=(%v), is "+
"premature: advertises height %v, only "+
"height %v is known",
msg.ShortChannelID.ToUint64(),
@ -1719,7 +1880,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
key := newRejectCacheKey(
msg.ShortChannelID.ToUint64(),
nMsg.peer.PubKey(),
sourceToPub(nMsg.source),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})
@ -1782,8 +1943,10 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// making decisions based on this DB state, before it
// writes to the DB.
d.channelMtx.Lock(msg.ShortChannelID.ToUint64())
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64())
if err := d.cfg.Router.AddEdge(edge, schedulerOp...); err != nil {
err := d.cfg.Router.AddEdge(edge, schedulerOp...)
if err != nil {
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64())
// If the edge was rejected due to already being known,
// then it may be that case that this new message has a
// fresh channel proof, so we'll check.
@ -1795,7 +1958,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
key := newRejectCacheKey(
msg.ShortChannelID.ToUint64(),
nMsg.peer.PubKey(),
sourceToPub(nMsg.source),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})
@ -1817,12 +1980,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
log.Debugf("Router rejected channel "+
"edge: %v", err)
} else {
log.Tracef("Router rejected channel "+
log.Debugf("Router rejected channel "+
"edge: %v", err)
key := newRejectCacheKey(
msg.ShortChannelID.ToUint64(),
nMsg.peer.PubKey(),
sourceToPub(nMsg.source),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})
}
@ -1831,6 +1994,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
return nil, false
}
// If err is nil, release the lock immediately.
d.channelMtx.Unlock(msg.ShortChannelID.ToUint64())
// If we earlier received any ChannelUpdates for this channel,
// we can now process them, as the channel is added to the
// graph.
@ -1909,7 +2075,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
key := newRejectCacheKey(
msg.ShortChannelID.ToUint64(),
nMsg.peer.PubKey(),
sourceToPub(nMsg.source),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})
@ -1924,8 +2090,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// of the chain tip, then we'll put the announcement in limbo
// to be fully verified once we advance forward in the chain.
d.Lock()
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) {
log.Infof("Update announcement for "+
if nMsg.isRemote && d.isPremature(msg.ShortChannelID, 0, nMsg) {
log.Warnf("Update announcement for "+
"short_chan_id(%v), is premature: advertises "+
"height %v, only height %v is known",
shortChanID, blockHeight,
@ -1943,6 +2109,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
if d.cfg.Router.IsStaleEdgePolicy(
msg.ShortChannelID, timestamp, msg.ChannelFlags,
) {
log.Debugf("Ignored stale edge policy: peer=%v, "+
"source=%x, msg=%s, is_remote=%v", nMsg.peer,
nMsg.source.SerializeCompressed(),
nMsg.msg.MsgType(), nMsg.isRemote)
nMsg.err <- nil
return nil, true
}
@ -2035,7 +2207,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
key := newRejectCacheKey(
msg.ShortChannelID.ToUint64(),
nMsg.peer.PubKey(),
sourceToPub(nMsg.source),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})
@ -2137,14 +2309,17 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
}
if err := d.cfg.Router.UpdateEdge(update, schedulerOp...); err != nil {
if routing.IsError(err, routing.ErrOutdated,
routing.ErrIgnored) {
if routing.IsError(
err, routing.ErrOutdated,
routing.ErrIgnored,
routing.ErrVBarrierShuttingDown,
) {
log.Debug(err)
} else if err != routing.ErrVBarrierShuttingDown {
} else {
key := newRejectCacheKey(
msg.ShortChannelID.ToUint64(),
nMsg.peer.PubKey(),
sourceToPub(nMsg.source),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})
@ -2166,6 +2341,10 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
chanInfo, msg.ChannelFlags,
)
log.Debugf("The message %v has no AuthProof, sending "+
"the update to remote peer %x",
msg.MsgType(), remotePubKey)
// Now, we'll attempt to send the channel update message
// reliably to the remote peer in the background, so
// that we don't block if the peer happens to be offline
@ -2217,8 +2396,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// registered in bitcoin blockchain. Therefore, we check if the
// proof is premature.
d.Lock()
if isPremature(msg.ShortChannelID, d.cfg.ProofMatureDelta) {
log.Infof("Premature proof announcement, current "+
premature := d.isPremature(
msg.ShortChannelID, d.cfg.ProofMatureDelta, nMsg,
)
if premature {
log.Warnf("Premature proof announcement, current "+
"block height lower than needed: %v < %v",
d.bestHeight, needBlockHeight)
d.Unlock()

View file

@ -131,10 +131,10 @@ spawnHandler:
// spawnPeerMsgHandler spawns a peerHandler for the given peer if there isn't
// one already active. The boolean returned signals whether there was already
// one active or not.
func (s *reliableSender) spawnPeerHandler(peerPubKey [33]byte) (peerManager, bool) {
s.activePeersMtx.Lock()
defer s.activePeersMtx.Unlock()
func (s *reliableSender) spawnPeerHandler(
peerPubKey [33]byte) (peerManager, bool) {
s.activePeersMtx.Lock()
msgHandler, ok := s.activePeers[peerPubKey]
if !ok {
msgHandler = peerManager{
@ -142,7 +142,12 @@ func (s *reliableSender) spawnPeerHandler(peerPubKey [33]byte) (peerManager, boo
done: make(chan struct{}),
}
s.activePeers[peerPubKey] = msgHandler
}
s.activePeersMtx.Unlock()
// If this is a newly initiated peerManager, we will create a
// peerHandler.
if !ok {
s.wg.Add(1)
go s.peerHandler(msgHandler, peerPubKey)
}

View file

@ -398,6 +398,13 @@ func (m *SyncManager) syncerHandler() {
continue
}
// We may not even have enough inactive syncers to be
// transitted. In that case, we will transit all the
// inactive syncers.
if len(m.inactiveSyncers) < numActiveLeft {
numActiveLeft = len(m.inactiveSyncers)
}
log.Debugf("Attempting to transition %v passive "+
"GossipSyncers to active", numActiveLeft)
@ -492,6 +499,10 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
// handle any sync transitions.
s.setSyncState(chansSynced)
s.setSyncType(PassiveSync)
log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%v",
s.syncState(), s.SyncType(), peer)
return s
}

View file

@ -18,6 +18,11 @@
due to the cancel signal is processed before the creation. It is now properly
handled by moving creation before deletion.
* When the block height+delta specified by a network message is greater than
the gossiper's best height, it will be considered as premature and ignored.
[These premature messages are now saved into a cache and processed once the
height has reached.](https://github.com/lightningnetwork/lnd/pull/6054)
## Misc
* [An example systemd service file](https://github.com/lightningnetwork/lnd/pull/6033)

View file

@ -553,6 +553,19 @@ const (
addedToRouterGraph
)
func (c channelOpeningState) String() string {
switch c {
case markedOpen:
return "markedOpen"
case fundingLockedSent:
return "fundingLocked"
case addedToRouterGraph:
return "addedToRouterGraph"
default:
return "unknown"
}
}
// NewFundingManager creates and initializes a new instance of the
// fundingManager.
func NewFundingManager(cfg Config) (*Manager, error) {
@ -2773,7 +2786,7 @@ func (f *Manager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
}
log.Debugf("Channel with ChannelPoint(%v), short_chan_id=%v "+
"announced", &fundingPoint, shortChanID)
"sent to gossiper", &fundingPoint, shortChanID)
}
return nil

View file

@ -28,6 +28,15 @@ const (
// ErrInvalidFundingOutput is returned if the channle funding output
// fails validation.
ErrInvalidFundingOutput
// ErrVBarrierShuttingDown signals that the barrier has been requested
// to shutdown, and that the caller should not treat the wait condition
// as fulfilled.
ErrVBarrierShuttingDown
// ErrParentValidationFailed signals that the validation of a
// dependent's parent failed, so the dependent must not be processed.
ErrParentValidationFailed
)
// routerError is a structure that represent the error inside the routing package,

View file

@ -1062,13 +1062,19 @@ func (r *ChannelRouter) networkHandler() {
update.msg,
)
if err != nil {
switch err {
case ErrVBarrierShuttingDown:
switch {
case IsError(
err, ErrVBarrierShuttingDown,
):
update.err <- err
case ErrParentValidationFailed:
case IsError(
err, ErrParentValidationFailed,
):
update.err <- newErrf(
ErrIgnored, err.Error(),
)
default:
log.Warnf("unexpected error "+
"during validation "+

View file

@ -1,7 +1,6 @@
package routing
import (
"errors"
"sync"
"github.com/lightningnetwork/lnd/channeldb"
@ -9,17 +8,6 @@ import (
"github.com/lightningnetwork/lnd/routing/route"
)
var (
// ErrVBarrierShuttingDown signals that the barrier has been requested
// to shutdown, and that the caller should not treat the wait condition
// as fulfilled.
ErrVBarrierShuttingDown = errors.New("validation barrier shutting down")
// ErrParentValidationFailed signals that the validation of a
// dependent's parent failed, so the dependent must not be processed.
ErrParentValidationFailed = errors.New("parent validation failed")
)
// validationSignals contains two signals which allows the ValidationBarrier to
// communicate back to the caller whether a dependent should be processed or not
// based on whether its parent was successfully validated. Only one of these
@ -228,9 +216,11 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error {
if ok {
select {
case <-v.quit:
return ErrVBarrierShuttingDown
return newErrf(ErrVBarrierShuttingDown,
"validation barrier shutting down")
case <-signals.deny:
return ErrParentValidationFailed
return newErrf(ErrParentValidationFailed,
"parent validation failed")
case <-signals.allow:
return nil
}

View file

@ -141,14 +141,18 @@ func TestValidationBarrierQuit(t *testing.T) {
switch {
// First half should return without failure.
case i < numTasks/4 && err != routing.ErrParentValidationFailed:
case i < numTasks/4 && !routing.IsError(
err, routing.ErrParentValidationFailed,
):
t.Fatalf("unexpected failure while waiting: %v", err)
case i >= numTasks/4 && i < numTasks/2 && err != nil:
t.Fatalf("unexpected failure while waiting: %v", err)
// Last half should return the shutdown error.
case i >= numTasks/2 && err != routing.ErrVBarrierShuttingDown:
case i >= numTasks/2 && !routing.IsError(
err, routing.ErrVBarrierShuttingDown,
):
t.Fatalf("expected failure after quitting: want %v, "+
"got %v", routing.ErrVBarrierShuttingDown, err)
}