(t *testing.T)
| 88 | } |
| 89 | |
| 90 | func TestGetNextBatches(t *testing.T) { |
| 91 | t.Parallel() |
| 92 | messages := 10 |
| 93 | |
| 94 | ctx, cancel := context.WithCancel(context.Background()) |
| 95 | defer cancel() |
| 96 | stop := make(chan struct{}) |
| 97 | requestsPulled := atomic.NewInt32(0) |
| 98 | |
| 99 | q, start := queueWithListeners(ctx, 100, 3, func(r []Request) { |
| 100 | if requestsPulled.Add(int32(len(r))) == int32(messages) { |
| 101 | close(stop) |
| 102 | } |
| 103 | }) |
| 104 | close(start) |
| 105 | |
| 106 | for j := 0; j < messages; j++ { |
| 107 | err := q.EnqueueRequest("user", &mockRequest{}) |
| 108 | require.NoError(t, err) |
| 109 | } |
| 110 | |
| 111 | <-stop |
| 112 | |
| 113 | require.Equal(t, int32(messages), requestsPulled.Load()) |
| 114 | |
| 115 | err := q.stopping(nil) |
| 116 | require.NoError(t, err) |
| 117 | } |
| 118 | |
| 119 | func BenchmarkGetNextForQuerier100(b *testing.B) { |
| 120 | benchmarkGetNextForQuerier(b, 100, messages) |
nothing calls this directly
no test coverage detected