MCPcopy
hub / github.com/grafana/tempo / GetNextRequestForQuerier

Method GetNextRequestForQuerier

modules/frontend/queue/queue.go:137–175  ·  view source on GitHub ↗

GetNextRequestForQuerier find next user queue and attempts to dequeue N requests as defined by the length of batchBuffer. This slice is a reusable buffer to fill up with requests

(ctx context.Context, last UserIndex, batchBuffer []Request)

Source from the content-addressed store, hash-verified

135// GetNextRequestForQuerier find next user queue and attempts to dequeue N requests as defined by the length of
136// batchBuffer. This slice is a reusable buffer to fill up with requests
137func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, batchBuffer []Request) ([]Request, UserIndex, error) {
138 requestedCount := len(batchBuffer)
139 if requestedCount == 0 {
140 return nil, last, errors.New("batch buffer must have len > 0")
141 }
142
143 q.mtx.Lock()
144 defer q.mtx.Unlock()
145
146 querierWait := false
147
148FindQueue:
149 // We need to wait if there are no users, or no pending requests for given querier.
150 for (q.queues.len() == 0 || querierWait) && ctx.Err() == nil && !q.stopped {
151 querierWait = false
152 q.cond.Wait(ctx)
153 }
154
155 if q.stopped {
156 return nil, last, ErrStopped
157 }
158
159 if err := ctx.Err(); err != nil {
160 return nil, last, err
161 }
162
163 queue, userID, idx := q.queues.getNextQueueForQuerier(last.last)
164 last.last = idx
165 if queue != nil {
166 // this is all threadsafe b/c all users queues are blocked by q.mtx
167 batchBuffer := q.getBatchBuffer(batchBuffer, userID, queue)
168 return batchBuffer, last, nil
169 }
170
171 // There are no unexpired requests, so we can get back
172 // and wait for more requests.
173 querierWait = true
174 goto FindQueue
175}
176
177func (q *RequestQueue) getBatchBuffer(batchBuffer []Request, userID string, queue chan Request) []Request {
178 requestedCount := len(batchBuffer)

Callers 2

queueWithListenersFunction · 0.95
ProcessMethod · 0.80

Calls 4

getBatchBufferMethod · 0.95
WaitMethod · 0.65
lenMethod · 0.45

Tested by 1

queueWithListenersFunction · 0.76