lnd/fn/goroutine_manager_test.go
2024-12-12 09:32:57 +02:00

146 lines
3.8 KiB
Go

package fn
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
)
// TestGoroutineManager tests the behaviour of the GoroutineManager.
func TestGoroutineManager(t *testing.T) {
t.Parallel()
// Here we test that the GoroutineManager starts goroutines until it has
// been stopped.
t.Run("GM is stopped", func(t *testing.T) {
t.Parallel()
var (
ctx = context.Background()
m = NewGoroutineManager()
taskChan = make(chan struct{})
)
// The gm has not stopped yet and the passed in context has not
// expired, so we expect the goroutine to start. The taskChan is
// blocking, so this goroutine will be live for a while.
require.True(t, m.Go(ctx, func(ctx context.Context) {
<-taskChan
}))
t1 := time.Now()
// Close taskChan in 1s, causing the goroutine to stop.
time.AfterFunc(time.Second, func() {
close(taskChan)
})
m.Stop()
stopDelay := time.Since(t1)
// Make sure Stop was waiting for the goroutine to stop.
require.Greater(t, stopDelay, time.Second)
// Make sure new goroutines do not start after Stop.
require.False(t, m.Go(ctx, func(ctx context.Context) {}))
// When Stop() is called, gm quit channel has been closed and so
// Done() should return.
select {
case <-m.Done():
default:
t.Errorf("Done() channel must be closed at this point")
}
})
// Test that the GoroutineManager fails to start a goroutine or exits a
// goroutine if the caller context has expired.
t.Run("Caller context expires", func(t *testing.T) {
t.Parallel()
var (
ctx = context.Background()
m = NewGoroutineManager()
taskChan = make(chan struct{})
)
// Derive a child context with a cancel function.
ctxc, cancel := context.WithCancel(ctx)
// The gm has not stopped yet and the passed in context has not
// expired, so we expect the goroutine to start.
require.True(t, m.Go(ctxc, func(ctx context.Context) {
select {
case <-ctx.Done():
case <-taskChan:
t.Fatalf("The task was performed when it " +
"should not have")
}
}))
// Give the GM a little bit of time to start the goroutine so
// that we can be sure that it is already listening on the
// ctx and taskChan before calling cancel.
time.Sleep(time.Millisecond * 500)
// Cancel the context so that the goroutine exits.
cancel()
// Attempt to send a signal on the task channel, nothing should
// happen since the goroutine has already exited.
select {
case taskChan <- struct{}{}:
case <-time.After(time.Millisecond * 200):
}
// Again attempt to add a goroutine with the same cancelled
// context. This should fail since the context has already
// expired.
require.False(t, m.Go(ctxc, func(ctx context.Context) {
t.Fatalf("The goroutine should not have started")
}))
// Stop the goroutine manager.
m.Stop()
})
// Start many goroutines while calling Stop. We do this to make sure
// that the GoroutineManager does not crash when these calls are done in
// parallel because of the potential race between wg.Add() and
// wg.Done() when the wg counter is 0.
t.Run("Stress test", func(t *testing.T) {
t.Parallel()
var (
ctx = context.Background()
m = NewGoroutineManager()
stopChan = make(chan struct{})
)
time.AfterFunc(1*time.Millisecond, func() {
m.Stop()
close(stopChan)
})
// Start 100 goroutines sequentially. Sequential order is
// needed to keep wg.counter low (0 or 1) to increase
// probability of the race condition to triggered if it exists.
// If mutex is removed in the implementation, this test crashes
// under `-race`.
for i := 0; i < 100; i++ {
taskChan := make(chan struct{})
ok := m.Go(ctx, func(ctx context.Context) {
close(taskChan)
})
// If goroutine was started, wait for its completion.
if ok {
<-taskChan
}
}
// Wait for Stop to complete.
<-stopChan
})
}