(ctx context.Context)
| 179 | } |
| 180 | |
| 181 | func (l *PartitionInstanceLifecycler) running(ctx context.Context) error { |
| 182 | reconcile := func() { |
| 183 | l.reconcileOwnedPartition(ctx, time.Now()) |
| 184 | l.reconcileOtherPartitions(ctx, time.Now()) |
| 185 | } |
| 186 | |
| 187 | // Run a reconciliation as soon as the lifecycler, in order to not having to wait for the 1st timer tick. |
| 188 | reconcile() |
| 189 | |
| 190 | reconcileTicker := time.NewTicker(l.cfg.PollingInterval) |
| 191 | defer reconcileTicker.Stop() |
| 192 | |
| 193 | for { |
| 194 | select { |
| 195 | case <-reconcileTicker.C: |
| 196 | reconcile() |
| 197 | |
| 198 | case f := <-l.actorChan: |
| 199 | f() |
| 200 | |
| 201 | case <-ctx.Done(): |
| 202 | return nil |
| 203 | } |
| 204 | } |
| 205 | } |
| 206 | |
| 207 | func (l *PartitionInstanceLifecycler) stopping(_ error) error { |
| 208 | level.Info(l.logger).Log("msg", "partition ring lifecycler is shutting down", "ring", l.ringName) |
nothing calls this directly
no test coverage detected