(idx int)
| 80 | } |
| 81 | |
| 82 | func (s *LiveStore) globalCompleteLoop(idx int) { |
| 83 | level.Info(s.logger).Log("msg", "starting completing loop", "index", idx) |
| 84 | defer func() { |
| 85 | level.Info(s.logger).Log("msg", "shutdown completing loop", "index", idx) |
| 86 | }() |
| 87 | for { |
| 88 | op := s.completeQueues.Dequeue() |
| 89 | if op == nil { |
| 90 | return // queue is closed |
| 91 | } |
| 92 | op.attempts++ |
| 93 | |
| 94 | if op.attempts > maxFlushAttempts { |
| 95 | level.Error(s.logger).Log("msg", "failed to complete operation", "tenant", op.tenantID, "block", op.blockID, "attempts", op.attempts) |
| 96 | observeFailedOp(op) |
| 97 | continue |
| 98 | } |
| 99 | |
| 100 | if err := s.processCompleteOp(op); err != nil { |
| 101 | return |
| 102 | } |
| 103 | } |
| 104 | } |
| 105 | |
| 106 | // processCompleteOp completes a single block. Returns an error if global loop should exit. |
| 107 | func (s *LiveStore) processCompleteOp(op *completeOp) error { |
no test coverage detected