MCPcopy
hub / github.com/grafana/tempo / starting

Method starting

modules/backendworker/backendworker.go:129–181  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

127}
128
129func (w *BackendWorker) starting(ctx context.Context) (err error) {
130 defer func() {
131 if err == nil || w.subservices == nil {
132 return
133 }
134
135 if stopErr := services.StopManagerAndAwaitStopped(context.Background(), w.subservices); stopErr != nil {
136 level.Error(log.Logger).Log("msg", "failed to gracefully stop backend-worker dependencies", "err", stopErr)
137 }
138 }()
139
140 if w.isSharded() {
141 w.subservices, err = services.NewManager(w.ringLifecycler, w.Ring)
142 if err != nil {
143 return fmt.Errorf("failed to create subservices: %w", err)
144 }
145 w.subservicesWatcher = services.NewFailureWatcher()
146 w.subservicesWatcher.WatchManager(w.subservices)
147
148 err := services.StartManagerAndAwaitHealthy(ctx, w.subservices)
149 if err != nil {
150 return fmt.Errorf("failed to start subservices: %w", err)
151 }
152
153 // Wait until the ring client detected this instance in the ACTIVE state.
154 level.Info(log.Logger).Log("msg", "waiting until backend-worker is ACTIVE in the ring")
155 ctxWithTimeout, cancel := context.WithTimeout(ctx, w.cfg.Ring.WaitActiveInstanceTimeout)
156 defer cancel()
157 if err := ring.WaitInstanceState(ctxWithTimeout, w.Ring, w.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil {
158 return err
159 }
160 level.Info(log.Logger).Log("msg", "backend-worker is ACTIVE in the ring")
161
162 // In the event of a cluster cold start we may end up in a situation where each new backend-worker
163 // instance starts at a slightly different time and thus each one starts with a different state
164 // of the ring. It's better to just wait the ring stability for a short time.
165 if w.cfg.Ring.WaitStabilityMinDuration > 0 {
166 minWaiting := w.cfg.Ring.WaitStabilityMinDuration
167 maxWaiting := w.cfg.Ring.WaitStabilityMaxDuration
168
169 level.Info(log.Logger).Log("msg", "waiting until backend-worker ring topology is stable", "min_waiting", minWaiting.String(), "max_waiting", maxWaiting.String())
170 if err := ring.WaitRingStability(ctx, w.Ring, ringOp, minWaiting, maxWaiting); err != nil {
171 level.Warn(log.Logger).Log("msg", "backend-worker ring topology is not stable after the max waiting time, proceeding anyway")
172 } else {
173 level.Info(log.Logger).Log("msg", "backend-worker ring topology is stable")
174 }
175 }
176 }
177
178 w.store.EnablePolling(ctx, w, false)
179
180 return nil
181}
182
183func (w *BackendWorker) running(ctx context.Context) error {
184 level.Info(log.Logger).Log("msg", "backend worker running")

Callers

nothing calls this directly

Calls 5

isShardedMethod · 0.95
LogMethod · 0.65
ErrorMethod · 0.65
EnablePollingMethod · 0.65
StringMethod · 0.45

Tested by

no test coverage detected