fn: GoroutineManager.Go returns bool, not error

This was requested in https://github.com/lightningnetwork/lnd/pull/9140#discussion_r1821181787
This commit is contained in:
Boris Nagaev 2024-11-14 01:10:56 -03:00
parent 669ebf49d4
commit 891e9621d7
No known key found for this signature in database
2 changed files with 15 additions and 17 deletions

View File

@ -2,13 +2,9 @@ package fn
import ( import (
"context" "context"
"errors"
"sync" "sync"
) )
// ErrStopping is returned when trying to add a new goroutine while stopping.
var ErrStopping = errors.New("can not add goroutine, stopping")
// GoroutineManager is used to launch goroutines until context expires or the // GoroutineManager is used to launch goroutines until context expires or the
// manager is stopped. The Stop method blocks until all started goroutines stop. // manager is stopped. The Stop method blocks until all started goroutines stop.
type GoroutineManager struct { type GoroutineManager struct {
@ -29,8 +25,10 @@ func NewGoroutineManager(ctx context.Context) *GoroutineManager {
} }
} }
// Go starts a new goroutine if the manager is not stopping. // Go tries to start a new goroutine and returns a boolean indicating its
func (g *GoroutineManager) Go(f func(ctx context.Context)) error { // success. It fails iff the goroutine manager is stopping or its context passed
// to NewGoroutineManager has expired.
func (g *GoroutineManager) Go(f func(ctx context.Context)) bool {
// Calling wg.Add(1) and wg.Wait() when wg's counter is 0 is a race // Calling wg.Add(1) and wg.Wait() when wg's counter is 0 is a race
// condition, since it is not clear should Wait() block or not. This // condition, since it is not clear should Wait() block or not. This
// kind of race condition is detected by Go runtime and results in a // kind of race condition is detected by Go runtime and results in a
@ -43,7 +41,7 @@ func (g *GoroutineManager) Go(f func(ctx context.Context)) error {
defer g.mu.Unlock() defer g.mu.Unlock()
if g.ctx.Err() != nil { if g.ctx.Err() != nil {
return ErrStopping return false
} }
g.wg.Add(1) g.wg.Add(1)
@ -52,7 +50,7 @@ func (g *GoroutineManager) Go(f func(ctx context.Context)) error {
f(g.ctx) f(g.ctx)
}() }()
return nil return true
} }
// Stop prevents new goroutines from being added and waits for all running // Stop prevents new goroutines from being added and waits for all running
@ -66,7 +64,7 @@ func (g *GoroutineManager) Stop() {
// safe, since it can't run in parallel with wg.Add(1) call in Go, since // safe, since it can't run in parallel with wg.Add(1) call in Go, since
// we just cancelled the context and even if Go call starts running here // we just cancelled the context and even if Go call starts running here
// after acquiring the mutex, it would see that the context has expired // after acquiring the mutex, it would see that the context has expired
// and return ErrStopping instead of calling wg.Add(1). // and return false instead of calling wg.Add(1).
g.wg.Wait() g.wg.Wait()
} }

View File

@ -20,7 +20,7 @@ func TestGoroutineManager(t *testing.T) {
taskChan := make(chan struct{}) taskChan := make(chan struct{})
require.NoError(t, m.Go(func(ctx context.Context) { require.True(t, m.Go(func(ctx context.Context) {
<-taskChan <-taskChan
})) }))
@ -38,7 +38,7 @@ func TestGoroutineManager(t *testing.T) {
require.Greater(t, stopDelay, time.Second) require.Greater(t, stopDelay, time.Second)
// Make sure new goroutines do not start after Stop. // Make sure new goroutines do not start after Stop.
require.ErrorIs(t, m.Go(func(ctx context.Context) {}), ErrStopping) require.False(t, m.Go(func(ctx context.Context) {}))
// When Stop() is called, the internal context expires and m.Done() is // When Stop() is called, the internal context expires and m.Done() is
// closed. Test this. // closed. Test this.
@ -57,7 +57,7 @@ func TestGoroutineManagerContextExpires(t *testing.T) {
m := NewGoroutineManager(ctx) m := NewGoroutineManager(ctx)
require.NoError(t, m.Go(func(ctx context.Context) { require.True(t, m.Go(func(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
})) }))
@ -80,7 +80,7 @@ func TestGoroutineManagerContextExpires(t *testing.T) {
} }
// Make sure new goroutines do not start after context expiry. // Make sure new goroutines do not start after context expiry.
require.ErrorIs(t, m.Go(func(ctx context.Context) {}), ErrStopping) require.False(t, m.Go(func(ctx context.Context) {}))
// Stop will wait for all goroutines to stop. // Stop will wait for all goroutines to stop.
m.Stop() m.Stop()
@ -108,11 +108,11 @@ func TestGoroutineManagerStress(t *testing.T) {
// implementation, this test crashes under `-race`. // implementation, this test crashes under `-race`.
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
taskChan := make(chan struct{}) taskChan := make(chan struct{})
err := m.Go(func(ctx context.Context) { ok := m.Go(func(ctx context.Context) {
close(taskChan) close(taskChan)
}) })
// If goroutine was started, wait for its completion. // If goroutine was started, wait for its completion.
if err == nil { if ok {
<-taskChan <-taskChan
} }
} }
@ -134,10 +134,10 @@ func TestGoroutineManagerStopsStress(t *testing.T) {
jobChan := make(chan struct{}) jobChan := make(chan struct{})
// Start a task and wait inside it until we start calling Stop() method. // Start a task and wait inside it until we start calling Stop() method.
err := m.Go(func(ctx context.Context) { ok := m.Go(func(ctx context.Context) {
<-jobChan <-jobChan
}) })
require.NoError(t, err) require.True(t, ok)
// Now launch many gorotines calling Stop() method in parallel. // Now launch many gorotines calling Stop() method in parallel.
var wg sync.WaitGroup var wg sync.WaitGroup