TestConcurrentDequeue verifies that multiple goroutines calling Dequeue concurrently on a shared queue process every item exactly once.
(t *testing.T)
| 160 | // TestConcurrentDequeue verifies that multiple goroutines calling Dequeue |
| 161 | // concurrently on a shared queue process every item exactly once. |
| 162 | func TestConcurrentDequeue(t *testing.T) { |
| 163 | q := New[mockOp](nil) |
| 164 | |
| 165 | totalItems := 500 |
| 166 | numWorkers := 10 |
| 167 | |
| 168 | // Enqueue all items up front |
| 169 | keys := make([]string, totalItems) |
| 170 | for i := range totalItems { |
| 171 | keys[i] = uuid.New().String() |
| 172 | require.NoError(t, q.Enqueue(mockOp{key: keys[i]})) |
| 173 | } |
| 174 | |
| 175 | // Track which keys each worker dequeued |
| 176 | var mu sync.Mutex |
| 177 | seen := make(map[string]int) // key -> count |
| 178 | |
| 179 | // itemsDone tracks when all items have been processed |
| 180 | var itemsDone sync.WaitGroup |
| 181 | itemsDone.Add(totalItems) |
| 182 | |
| 183 | var workers sync.WaitGroup |
| 184 | for range numWorkers { |
| 185 | workers.Add(1) |
| 186 | go func() { |
| 187 | defer workers.Done() |
| 188 | for { |
| 189 | op := q.Dequeue() |
| 190 | if op.Key() == "" { |
| 191 | return // queue closed |
| 192 | } |
| 193 | mu.Lock() |
| 194 | seen[op.Key()]++ |
| 195 | mu.Unlock() |
| 196 | q.Clear(op) |
| 197 | itemsDone.Done() |
| 198 | } |
| 199 | }() |
| 200 | } |
| 201 | |
| 202 | // Wait for all items to be processed, then shut down workers |
| 203 | itemsDone.Wait() |
| 204 | q.Stop() |
| 205 | workers.Wait() |
| 206 | |
| 207 | // Every key must have been dequeued exactly once |
| 208 | for _, key := range keys { |
| 209 | assert.Equal(t, 1, seen[key], "key %s dequeued %d times", key, seen[key]) |
| 210 | } |
| 211 | assert.Equal(t, totalItems, len(seen), "expected %d unique keys, got %d", totalItems, len(seen)) |
| 212 | } |
| 213 | |
| 214 | // TestStopUnblocksAllWaiters verifies that calling Stop on an empty queue |
| 215 | // unblocks all goroutines waiting in Dequeue, returning zero values. |