lnd/pool/worker_test.go
2019-02-21 20:10:40 -08:00

354 lines
8.3 KiB
Go

package pool_test
import (
"bytes"
crand "crypto/rand"
"fmt"
"io"
"math/rand"
"testing"
"time"
"github.com/lightningnetwork/lnd/buffer"
"github.com/lightningnetwork/lnd/pool"
)
type workerPoolTest struct {
name string
newPool func() interface{}
numWorkers int
}
// TestConcreteWorkerPools asserts the behavior of any concrete implementations
// of worker pools provided by the pool package. Currently this tests the
// pool.Read and pool.Write instances.
func TestConcreteWorkerPools(t *testing.T) {
const (
gcInterval = time.Second
expiryInterval = 250 * time.Millisecond
numWorkers = 5
workerTimeout = 500 * time.Millisecond
)
tests := []workerPoolTest{
{
name: "write pool",
newPool: func() interface{} {
bp := pool.NewWriteBuffer(
gcInterval, expiryInterval,
)
return pool.NewWrite(
bp, numWorkers, workerTimeout,
)
},
numWorkers: numWorkers,
},
{
name: "read pool",
newPool: func() interface{} {
bp := pool.NewReadBuffer(
gcInterval, expiryInterval,
)
return pool.NewRead(
bp, numWorkers, workerTimeout,
)
},
numWorkers: numWorkers,
},
}
for _, test := range tests {
testWorkerPool(t, test)
}
}
func testWorkerPool(t *testing.T, test workerPoolTest) {
t.Run(test.name+" non blocking", func(t *testing.T) {
t.Parallel()
p := test.newPool()
startGeneric(t, p)
defer stopGeneric(t, p)
submitNonblockingGeneric(t, p, test.numWorkers)
})
t.Run(test.name+" blocking", func(t *testing.T) {
t.Parallel()
p := test.newPool()
startGeneric(t, p)
defer stopGeneric(t, p)
submitBlockingGeneric(t, p, test.numWorkers)
})
t.Run(test.name+" partial blocking", func(t *testing.T) {
t.Parallel()
p := test.newPool()
startGeneric(t, p)
defer stopGeneric(t, p)
submitPartialBlockingGeneric(t, p, test.numWorkers)
})
}
// submitNonblockingGeneric asserts that queueing tasks to the worker pool and
// allowing them all to unblock simultaneously results in all of the tasks being
// completed in a timely manner.
func submitNonblockingGeneric(t *testing.T, p interface{}, nWorkers int) {
// We'll submit 2*nWorkers tasks that will all be unblocked
// simultaneously.
nUnblocked := 2 * nWorkers
// First we'll queue all of the tasks for the pool.
errChan := make(chan error)
semChan := make(chan struct{})
for i := 0; i < nUnblocked; i++ {
go func() { errChan <- submitGeneric(p, semChan) }()
}
// Since we haven't signaled the semaphore, none of the them should
// complete.
pullNothing(t, errChan)
// Now, unblock them all simultaneously. All of the tasks should then be
// processed in parallel. Afterward, no more errors should come through.
close(semChan)
pullParllel(t, nUnblocked, errChan)
pullNothing(t, errChan)
}
// submitBlockingGeneric asserts that submitting blocking tasks to the pool and
// unblocking each sequentially results in a single task being processed at a
// time.
func submitBlockingGeneric(t *testing.T, p interface{}, nWorkers int) {
// We'll submit 2*nWorkers tasks that will be unblocked sequentially.
nBlocked := 2 * nWorkers
// First, queue all of the blocking tasks for the pool.
errChan := make(chan error)
semChan := make(chan struct{})
for i := 0; i < nBlocked; i++ {
go func() { errChan <- submitGeneric(p, semChan) }()
}
// Since we haven't signaled the semaphore, none of them should
// complete.
pullNothing(t, errChan)
// Now, pull each blocking task sequentially from the pool. Afterwards,
// no more errors should come through.
pullSequntial(t, nBlocked, errChan, semChan)
pullNothing(t, errChan)
}
// submitPartialBlockingGeneric tests that so long as one worker is not blocked,
// any other non-blocking submitted tasks can still be processed.
func submitPartialBlockingGeneric(t *testing.T, p interface{}, nWorkers int) {
// We'll submit nWorkers-1 tasks that will be initially blocked, the
// remainder will all be unblocked simultaneously. After the unblocked
// tasks have finished, we will sequentially unblock the nWorkers-1
// tasks that were first submitted.
nBlocked := nWorkers - 1
nUnblocked := 2*nWorkers - nBlocked
// First, submit all of the blocking tasks to the pool.
errChan := make(chan error)
semChan := make(chan struct{})
for i := 0; i < nBlocked; i++ {
go func() { errChan <- submitGeneric(p, semChan) }()
}
// Since these are all blocked, no errors should be returned yet.
pullNothing(t, errChan)
// Now, add all of the non-blocking task to the pool.
semChanNB := make(chan struct{})
for i := 0; i < nUnblocked; i++ {
go func() { errChan <- submitGeneric(p, semChanNB) }()
}
// Since we haven't unblocked the second batch, we again expect no tasks
// to finish.
pullNothing(t, errChan)
// Now, unblock the unblocked task and pull all of them. After they have
// been pulled, we should see no more tasks.
close(semChanNB)
pullParllel(t, nUnblocked, errChan)
pullNothing(t, errChan)
// Finally, unblock each the blocked tasks we added initially, and
// assert that no further errors come through.
pullSequntial(t, nBlocked, errChan, semChan)
pullNothing(t, errChan)
}
func pullNothing(t *testing.T, errChan chan error) {
t.Helper()
select {
case err := <-errChan:
t.Fatalf("received unexpected error before semaphore "+
"release: %v", err)
case <-time.After(time.Second):
}
}
func pullParllel(t *testing.T, n int, errChan chan error) {
t.Helper()
for i := 0; i < n; i++ {
select {
case err := <-errChan:
if err != nil {
t.Fatal(err)
}
case <-time.After(time.Second):
t.Fatalf("task %d was not processed in time", i)
}
}
}
func pullSequntial(t *testing.T, n int, errChan chan error, semChan chan struct{}) {
t.Helper()
for i := 0; i < n; i++ {
// Signal for another task to unblock.
select {
case semChan <- struct{}{}:
case <-time.After(time.Second):
t.Fatalf("task %d was not unblocked", i)
}
// Wait for the error to arrive, we expect it to be non-nil.
select {
case err := <-errChan:
if err != nil {
t.Fatal(err)
}
case <-time.After(time.Second):
t.Fatalf("task %d was not processed in time", i)
}
}
}
func startGeneric(t *testing.T, p interface{}) {
t.Helper()
var err error
switch pp := p.(type) {
case *pool.Write:
err = pp.Start()
case *pool.Read:
err = pp.Start()
default:
t.Fatalf("unknown worker pool type: %T", p)
}
if err != nil {
t.Fatalf("unable to start worker pool: %v", err)
}
}
func stopGeneric(t *testing.T, p interface{}) {
t.Helper()
var err error
switch pp := p.(type) {
case *pool.Write:
err = pp.Stop()
case *pool.Read:
err = pp.Stop()
default:
t.Fatalf("unknown worker pool type: %T", p)
}
if err != nil {
t.Fatalf("unable to stop worker pool: %v", err)
}
}
func submitGeneric(p interface{}, sem <-chan struct{}) error {
var err error
switch pp := p.(type) {
case *pool.Write:
err = pp.Submit(func(buf *bytes.Buffer) error {
// Verify that the provided buffer has been reset to be
// zero length.
if buf.Len() != 0 {
return fmt.Errorf("buf should be length zero, "+
"instead has length %d", buf.Len())
}
// Verify that the capacity of the buffer has the
// correct underlying size of a buffer.WriteSize.
if buf.Cap() != buffer.WriteSize {
return fmt.Errorf("buf should have capacity "+
"%d, instead has capacity %d",
buffer.WriteSize, buf.Cap())
}
// Sample some random bytes that we'll use to dirty the
// buffer.
b := make([]byte, rand.Intn(buf.Cap()))
_, err := io.ReadFull(crand.Reader, b)
if err != nil {
return err
}
// Write the random bytes the buffer.
_, err = buf.Write(b)
// Wait until this task is signaled to exit.
<-sem
return err
})
case *pool.Read:
err = pp.Submit(func(buf *buffer.Read) error {
// Assert that all of the bytes in the provided array
// are zero, indicating that the buffer was reset
// between uses.
for i := range buf[:] {
if buf[i] != 0x00 {
return fmt.Errorf("byte %d of "+
"buffer.Read should be "+
"0, instead is %d", i, buf[i])
}
}
// Sample some random bytes to read into the buffer.
_, err := io.ReadFull(crand.Reader, buf[:])
// Wait until this task is signaled to exit.
<-sem
return err
})
default:
return fmt.Errorf("unknown worker pool type: %T", p)
}
if err != nil {
return fmt.Errorf("unable to submit task: %v", err)
}
return nil
}