2018-10-12 17:08:14 +02:00
|
|
|
package queue_test
|
2017-09-29 21:09:37 +02:00
|
|
|
|
|
|
|
import (
|
|
|
|
"testing"
|
|
|
|
|
2018-10-12 17:08:14 +02:00
|
|
|
"github.com/lightningnetwork/lnd/queue"
|
2017-09-29 21:09:37 +02:00
|
|
|
)
|
|
|
|
|
2018-12-01 01:37:01 +01:00
|
|
|
func testQueueAddDrain(t *testing.T, size, numStart, numStop, numAdd, numDrain int) {
|
|
|
|
t.Helper()
|
|
|
|
|
|
|
|
queue := queue.NewConcurrentQueue(size)
|
|
|
|
for i := 0; i < numStart; i++ {
|
|
|
|
queue.Start()
|
|
|
|
}
|
|
|
|
for i := 0; i < numStop; i++ {
|
2022-08-27 09:07:23 +02:00
|
|
|
t.Cleanup(queue.Stop)
|
2018-12-01 01:37:01 +01:00
|
|
|
}
|
2017-09-29 21:09:37 +02:00
|
|
|
|
|
|
|
// Pushes should never block for long.
|
2018-12-01 01:37:01 +01:00
|
|
|
for i := 0; i < numAdd; i++ {
|
2017-09-29 21:09:37 +02:00
|
|
|
queue.ChanIn() <- i
|
|
|
|
}
|
|
|
|
|
|
|
|
// Pops also should not block for long. Expect elements in FIFO order.
|
2018-12-01 01:37:01 +01:00
|
|
|
for i := 0; i < numDrain; i++ {
|
2017-09-29 21:09:37 +02:00
|
|
|
item := <-queue.ChanOut()
|
|
|
|
if i != item.(int) {
|
2018-12-01 01:37:01 +01:00
|
|
|
t.Fatalf("Dequeued wrong value: expected %d, got %d",
|
|
|
|
i, item.(int))
|
2017-09-29 21:09:37 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2018-12-01 01:37:01 +01:00
|
|
|
|
|
|
|
// TestConcurrentQueue tests that the queue properly adds 1000 items, drain all
|
|
|
|
// of them, and exit cleanly.
|
|
|
|
func TestConcurrentQueue(t *testing.T) {
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
testQueueAddDrain(t, 100, 1, 1, 1000, 1000)
|
|
|
|
}
|
|
|
|
|
|
|
|
// TestConcurrentQueueEarlyStop tests that the queue properly adds 1000 items,
|
|
|
|
// drain half of them, and still exit cleanly.
|
|
|
|
func TestConcurrentQueueEarlyStop(t *testing.T) {
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
testQueueAddDrain(t, 100, 1, 1, 1000, 500)
|
|
|
|
}
|
|
|
|
|
|
|
|
// TestConcurrentQueueIdempotentStart asserts that calling Start multiple times
|
|
|
|
// doesn't fail, and that the queue can still exit cleanly.
|
|
|
|
func TestConcurrentQueueIdempotentStart(t *testing.T) {
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
testQueueAddDrain(t, 100, 10, 1, 1000, 1000)
|
|
|
|
}
|
|
|
|
|
|
|
|
// TestConcurrentQueueIdempotentStop asserts that calling Stop multiple times
|
|
|
|
// doesn't fail, and that exiting doesn't block on subsequent Stops.
|
|
|
|
func TestConcurrentQueueIdempotentStop(t *testing.T) {
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
testQueueAddDrain(t, 100, 1, 10, 1000, 1000)
|
|
|
|
}
|
2020-04-06 14:47:49 +02:00
|
|
|
|
|
|
|
// TestQueueCloseIncoming tests that the queue properly handles an incoming
|
|
|
|
// channel that is closed.
|
|
|
|
func TestQueueCloseIncoming(t *testing.T) {
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
queue := queue.NewConcurrentQueue(10)
|
|
|
|
queue.Start()
|
|
|
|
|
|
|
|
queue.ChanIn() <- 1
|
|
|
|
close(queue.ChanIn())
|
|
|
|
|
|
|
|
item := <-queue.ChanOut()
|
|
|
|
if item.(int) != 1 {
|
|
|
|
t.Fatalf("unexpected item")
|
|
|
|
}
|
|
|
|
|
|
|
|
_, ok := <-queue.ChanOut()
|
|
|
|
if ok {
|
|
|
|
t.Fatalf("expected outgoing channel being closed")
|
|
|
|
}
|
|
|
|
}
|