MCPcopy
hub / github.com/grafana/tempo / TestConcurrentDequeue

Function TestConcurrentDequeue

pkg/flushqueues/exclusivequeues_test.go:162–212  ·  view source on GitHub ↗

TestConcurrentDequeue verifies that multiple goroutines calling Dequeue concurrently on a shared queue process every item exactly once.

(t *testing.T)

Source from the content-addressed store, hash-verified

160// TestConcurrentDequeue verifies that multiple goroutines calling Dequeue
161// concurrently on a shared queue process every item exactly once.
162func TestConcurrentDequeue(t *testing.T) {
163 q := New[mockOp](nil)
164
165 totalItems := 500
166 numWorkers := 10
167
168 // Enqueue all items up front
169 keys := make([]string, totalItems)
170 for i := range totalItems {
171 keys[i] = uuid.New().String()
172 require.NoError(t, q.Enqueue(mockOp{key: keys[i]}))
173 }
174
175 // Track which keys each worker dequeued
176 var mu sync.Mutex
177 seen := make(map[string]int) // key -> count
178
179 // itemsDone tracks when all items have been processed
180 var itemsDone sync.WaitGroup
181 itemsDone.Add(totalItems)
182
183 var workers sync.WaitGroup
184 for range numWorkers {
185 workers.Add(1)
186 go func() {
187 defer workers.Done()
188 for {
189 op := q.Dequeue()
190 if op.Key() == "" {
191 return // queue closed
192 }
193 mu.Lock()
194 seen[op.Key()]++
195 mu.Unlock()
196 q.Clear(op)
197 itemsDone.Done()
198 }
199 }()
200 }
201
202 // Wait for all items to be processed, then shut down workers
203 itemsDone.Wait()
204 q.Stop()
205 workers.Wait()
206
207 // Every key must have been dequeued exactly once
208 for _, key := range keys {
209 assert.Equal(t, 1, seen[key], "key %s dequeued %d times", key, seen[key])
210 }
211 assert.Equal(t, totalItems, len(seen), "expected %d unique keys, got %d", totalItems, len(seen))
212}
213
214// TestStopUnblocksAllWaiters verifies that calling Stop on an empty queue
215// unblocks all goroutines waiting in Dequeue, returning zero values.

Callers

nothing calls this directly

Calls 10

AddMethod · 0.65
DoneMethod · 0.65
KeyMethod · 0.65
ClearMethod · 0.65
WaitMethod · 0.65
StopMethod · 0.65
StringMethod · 0.45
EnqueueMethod · 0.45
DequeueMethod · 0.45
EqualMethod · 0.45

Tested by

no test coverage detected