GetNextRequestForQuerier find next user queue and attempts to dequeue N requests as defined by the length of batchBuffer. This slice is a reusable buffer to fill up with requests
(ctx context.Context, last UserIndex, batchBuffer []Request)
| 135 | // GetNextRequestForQuerier find next user queue and attempts to dequeue N requests as defined by the length of |
| 136 | // batchBuffer. This slice is a reusable buffer to fill up with requests |
| 137 | func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, batchBuffer []Request) ([]Request, UserIndex, error) { |
| 138 | requestedCount := len(batchBuffer) |
| 139 | if requestedCount == 0 { |
| 140 | return nil, last, errors.New("batch buffer must have len > 0") |
| 141 | } |
| 142 | |
| 143 | q.mtx.Lock() |
| 144 | defer q.mtx.Unlock() |
| 145 | |
| 146 | querierWait := false |
| 147 | |
| 148 | FindQueue: |
| 149 | // We need to wait if there are no users, or no pending requests for given querier. |
| 150 | for (q.queues.len() == 0 || querierWait) && ctx.Err() == nil && !q.stopped { |
| 151 | querierWait = false |
| 152 | q.cond.Wait(ctx) |
| 153 | } |
| 154 | |
| 155 | if q.stopped { |
| 156 | return nil, last, ErrStopped |
| 157 | } |
| 158 | |
| 159 | if err := ctx.Err(); err != nil { |
| 160 | return nil, last, err |
| 161 | } |
| 162 | |
| 163 | queue, userID, idx := q.queues.getNextQueueForQuerier(last.last) |
| 164 | last.last = idx |
| 165 | if queue != nil { |
| 166 | // this is all threadsafe b/c all users queues are blocked by q.mtx |
| 167 | batchBuffer := q.getBatchBuffer(batchBuffer, userID, queue) |
| 168 | return batchBuffer, last, nil |
| 169 | } |
| 170 | |
| 171 | // There are no unexpired requests, so we can get back |
| 172 | // and wait for more requests. |
| 173 | querierWait = true |
| 174 | goto FindQueue |
| 175 | } |
| 176 | |
| 177 | func (q *RequestQueue) getBatchBuffer(batchBuffer []Request, userID string, queue chan Request) []Request { |
| 178 | requestedCount := len(batchBuffer) |