Do does the doing of getting a blocklist
(parentCtx context.Context, previous *List)
| 138 | |
| 139 | // Do does the doing of getting a blocklist |
| 140 | func (p *Poller) Do(parentCtx context.Context, previous *List) (PerTenant, PerTenantCompacted, error) { |
| 141 | start := time.Now() |
| 142 | |
| 143 | parentCtx, cancel := context.WithCancel(parentCtx) |
| 144 | defer cancel() |
| 145 | |
| 146 | parentCtx, parentSpan := tracer.Start(parentCtx, "Poller.Do") |
| 147 | defer parentSpan.End() |
| 148 | |
| 149 | tenants, err := p.reader.Tenants(parentCtx) |
| 150 | if err != nil { |
| 151 | metricBlocklistErrors.WithLabelValues("").Inc() |
| 152 | return nil, nil, err |
| 153 | } |
| 154 | |
| 155 | var ( |
| 156 | wg = boundedwaitgroup.New(p.cfg.TenantPollConcurrency) |
| 157 | mtx = sync.Mutex{} |
| 158 | |
| 159 | blocklist = PerTenant{} |
| 160 | compactedBlocklist = PerTenantCompacted{} |
| 161 | |
| 162 | tenantFailuresRemaining = atomic.NewInt32(int32(p.cfg.TolerateTenantFailures)) |
| 163 | |
| 164 | link = trace.LinkFromContext(parentCtx) |
| 165 | bgCtx = context.Background() |
| 166 | ) |
| 167 | |
| 168 | for _, tenantID := range tenants { |
| 169 | // Do not continue if we have been canceled. |
| 170 | if parentCtx.Err() != nil { |
| 171 | // Wait for our work to complete. |
| 172 | wg.Wait() |
| 173 | return nil, nil, parentCtx.Err() |
| 174 | } |
| 175 | |
| 176 | // Exit early if we have exceeded our tolerance for number of failing tenants. |
| 177 | if tenantFailuresRemaining.Load() < 0 { |
| 178 | level.Error(p.logger).Log("msg", "exiting polling loop early because too many errors") |
| 179 | break |
| 180 | } |
| 181 | |
| 182 | wg.Add(1) |
| 183 | go func(tenantID string) { |
| 184 | defer wg.Done() |
| 185 | |
| 186 | bgCtx, bgSpan := tracer.Start(bgCtx, "Poller.Do.func") |
| 187 | defer bgSpan.End() |
| 188 | |
| 189 | bgSpan.SetAttributes(attribute.String("tenant", tenantID)) |
| 190 | bgSpan.AddLink(link) |
| 191 | |
| 192 | var ( |
| 193 | consecutiveErrorsRemaining = p.cfg.TolerateConsecutiveErrors |
| 194 | newBlockList = make([]*backend.BlockMeta, 0) |
| 195 | newCompactedBlockList = make([]*backend.CompactedBlockMeta, 0) |
| 196 | err error |
| 197 | ) |