wtclient: remove temporary bug demonstration logic and test

This commit removes the temporary members added to the DiskOverflowQueue
that made it possible to more easily demonstrate a previous bug that
existed.
This commit is contained in:
Elle Mouton 2024-01-15 16:58:42 +02:00
parent 3d201dde50
commit 65de80be7d
No known key found for this signature in database
GPG Key ID: D7D916376026F177
2 changed files with 8 additions and 172 deletions

View File

@ -92,21 +92,6 @@ type DiskOverflowQueue[T any] struct {
// task.
leftOverItem3 *T
// waitForDiskWriteSignal is true if blockDiskWrite should be used to
// wait for an explicit signal before writing an item to disk. This is
// for the purpose of testing only and will be removed after the fix of
// a test flake has been demonstrated.
waitForDiskWriteSignal bool
blockDiskWrite chan struct{}
// waitForFeedOutputSignal is true if startFeedOutputChan should be used
// to wait for an explicit signal before allowing the feedOutput
// goroutine to begin its duties. This is for the purpose of testing
// only and will be removed after the fix of a test flake has been
// demonstrated.
waitForFeedOutputSignal bool
startFeedOutputChan chan struct{}
quit chan struct{}
wg sync.WaitGroup
}
@ -121,16 +106,14 @@ func NewDiskOverflowQueue[T any](db wtdb.Queue[T], maxQueueSize uint64,
}
q := &DiskOverflowQueue[T]{
log: logger,
db: db,
inputList: list.New(),
newDiskItemSignal: make(chan struct{}, 1),
inputChan: make(chan *internalTask[T]),
memQueue: make(chan T, maxQueueSize-2),
outputChan: make(chan T),
blockDiskWrite: make(chan struct{}, 1),
startFeedOutputChan: make(chan struct{}, 1),
quit: make(chan struct{}),
log: logger,
db: db,
inputList: list.New(),
newDiskItemSignal: make(chan struct{}, 1),
inputChan: make(chan *internalTask[T]),
memQueue: make(chan T, maxQueueSize-2),
outputChan: make(chan T),
quit: make(chan struct{}),
}
q.inputListCond = sync.NewCond(&q.inputListMu)
@ -147,27 +130,6 @@ func (q *DiskOverflowQueue[T]) Start() error {
return err
}
// allowDiskWrite is used to explicitly signal that a disk write may take place.
// This is for the purposes of testing only and will be removed once a specific
// test flake fix has been demonstrated.
func (q *DiskOverflowQueue[T]) allowDiskWrite() {
select {
case q.blockDiskWrite <- struct{}{}:
default:
}
}
// startFeedOutput is used to explicitly signal that the feedOutput goroutine
// may start.
// This is for the purposes of testing only and will be removed once a specific
// test flake fix has been demonstrated.
func (q *DiskOverflowQueue[T]) startFeedOutput() {
select {
case q.startFeedOutputChan <- struct{}{}:
default:
}
}
// start kicks off all the goroutines that are required to manage the queue.
func (q *DiskOverflowQueue[T]) start() error {
numDisk, err := q.db.Len()
@ -383,21 +345,6 @@ func (q *DiskOverflowQueue[T]) pushToActiveQueue(task T) bool {
// If the queue is in disk mode then any new items should be put
// straight into the disk queue.
if q.toDisk.Load() {
// If waitForDiskWriteSignal is true, then we wait here for
// an explicit signal before writing the item to disk. This is
// for testing only and will be removed once a specific test
// flake fix has been demonstrated.
if q.waitForDiskWriteSignal {
select {
case <-q.blockDiskWrite:
case <-q.quit:
q.leftOverItem3 = &task
return false
}
}
err := q.db.Push(task)
if err != nil {
// Log and back off for a few seconds and then
@ -605,18 +552,6 @@ func (q *DiskOverflowQueue[T]) feedOutputChan() {
q.wg.Done()
}()
// If waitForFeedOutputSignal is true, then we wait here for an explicit
// signal before starting the main loop of the function. This is for
// testing only and will be removed once a specific test flake fix has
// been demonstrated.
if q.waitForFeedOutputSignal {
select {
case <-q.startFeedOutputChan:
case <-q.quit:
return
}
}
for {
select {
case nextTask, ok := <-q.memQueue:

View File

@ -38,10 +38,6 @@ func TestDiskOverflowQueue(t *testing.T) {
name: "start stop queue",
run: testStartStopQueue,
},
{
name: "demo flake",
run: demoFlake,
},
}
initDB := func() wtdb.Queue[*wtdb.BackupID] {
@ -75,101 +71,6 @@ func TestDiskOverflowQueue(t *testing.T) {
}
}
// demoFlake is a temporary test demonstrating the fix of a race condition that
// existed in the DiskOverflowQueue. It contrives a scenario that once resulted
// in the queue being in memory mode when it really should be in disk mode.
func demoFlake(t *testing.T, db wtdb.Queue[*wtdb.BackupID]) {
// Generate some backup IDs that we want to add to the queue.
tasks := genBackupIDs(10)
// New mock logger.
log := newMockLogger(t.Logf)
// Init the queue with the mock DB.
q, err := NewDiskOverflowQueue[*wtdb.BackupID](
db, maxInMemItems, log,
)
require.NoError(t, err)
// Set the two test variables to true so that we have more control over
// the feedOutput goroutine and disk writes from the test.
q.waitForDiskWriteSignal = true
q.waitForFeedOutputSignal = true
// Start the queue.
require.NoError(t, q.Start())
// Initially there should be no items on disk.
assertNumDisk(t, db, 0)
// Start filling up the queue. The maxInMemItems is 5, meaning that the
// memQueue capacity is 3. Since the feedOutput goroutine has not yet
// started, these first 3 items (tasks 0-2) will fill the memQueue.
enqueue(t, q, tasks[0])
enqueue(t, q, tasks[1])
enqueue(t, q, tasks[2])
// Adding task 3 is expected to result in the mode being changed to disk
// mode.
enqueue(t, q, tasks[3])
// Show that the mode does actually change to disk mode.
err = wait.Predicate(func() bool {
return q.toDisk.Load()
}, waitTime)
require.NoError(t, err)
// Allow task 3 to be written to disk. This will send a signal on
// newDiskItemsSignal.
q.allowDiskWrite()
// Task 3 will almost immediately be popped from disk again due to
// the newDiskItemsSignal causing feedMemQueue to call feedFromDisk.
waitForNumDisk(t, db, 0)
// Enqueue task 4 but don't allow it to be written to disk yet.
enqueue(t, q, tasks[4])
// Wait a bit just to make sure that task 4 has passed the
// if q.toDisk.Load() check in pushToActiveQueue and is waiting on the
// allowDiskWrite signal.
time.Sleep(time.Second)
// Now, start the feedOutput goroutine. This will pop 1 item from the
// memQueue meaning that feedMemQueue will now manage to push an item
// onto the memQueue & will go onto the next iteration of feedFromDisk
// which will then see that there are no items on disk and so will
// change the mode to memory-mode.
q.startFeedOutput()
err = wait.Predicate(func() bool {
return !q.toDisk.Load()
}, waitTime)
require.NoError(t, err)
// Now, we allow task 4 to be written to disk. This will result in a
// newDiskItemsSignal being sent meaning that feedMemQueue will read
// from disk and block on pushing the new item to memQueue.
q.allowDiskWrite()
// The above will result in feeMemQueue switching the mode to disk-mode.
err = wait.Predicate(func() bool {
return q.toDisk.Load()
}, waitTime)
require.NoError(t, err)
// Now, if we enqueue task 5, it _will_ be written to disk since the
// queue is currently in disk mode.
enqueue(t, q, tasks[5])
q.allowDiskWrite()
// Show that there is an item on disk at this point. This demonstrates
// that the bug has been fixed.
waitForNumDisk(t, db, 1)
require.NoError(t, q.Stop())
}
// testOverflowToDisk is a basic test that ensures that the queue correctly
// overflows items to disk and then correctly reloads them.
func testOverflowToDisk(t *testing.T, db wtdb.Queue[*wtdb.BackupID]) {