MCPcopy
hub / github.com/grafana/tempo / watchOverrides

Method watchOverrides

modules/distributor/forwarder.go:137–197  ·  view source on GitHub ↗

watchOverrides watches the overrides for changes and updates the queues accordingly

()

Source from the content-addressed store, hash-verified

135// watchOverrides watches the overrides for changes
136// and updates the queues accordingly
137func (f *generatorForwarder) watchOverrides() {
138 ticker := time.NewTicker(f.overridesInterval)
139
140 for {
141 select {
142 case <-ticker.C:
143 f.mutex.Lock()
144
145 var (
146 queuesToDelete []*queue.Queue[*request]
147 queuesToAdd []struct {
148 tenantID string
149 queueSize, workerCount int
150 }
151 )
152
153 for tenantID, q := range f.queues {
154 queueSize, workerCount := f.getQueueConfig(tenantID)
155 // if the queue size or worker count has changed, shutdown the queue manager and create a new one
156 if q.ShouldUpdate(queueSize, workerCount) {
157 _ = level.Info(f.logger).Log(
158 "msg", "Marking queue manager for update",
159 "tenant", tenantID,
160 "old_queue_size", q.Size(),
161 "new_queue_size", queueSize,
162 "old_worker_count", q.WorkerCount(),
163 "new_worker_count", workerCount,
164 )
165 queuesToDelete = append(queuesToDelete, q)
166 queuesToAdd = append(queuesToAdd, struct {
167 tenantID string
168 queueSize, workerCount int
169 }{tenantID: tenantID, queueSize: queueSize, workerCount: workerCount})
170 }
171 }
172
173 // Spawn a goroutine to asynchronously shut down queue managers
174 go func() {
175 for _, q := range queuesToDelete {
176 // shutdown the queue manager
177 // this will block until all workers have finished and the queue is drained
178 _ = level.Info(f.logger).Log("msg", "Shutting down queue manager", "tenant", q.TenantID())
179 if err := q.Shutdown(context.Background()); err != nil {
180 _ = level.Error(f.logger).Log("msg", "error shutting down queue manager", "tenant", q.TenantID(), "err", err)
181 }
182 }
183 }()
184
185 // Synchronously update queue managers
186 for _, q := range queuesToAdd {
187 _ = level.Info(f.logger).Log("msg", "Updating queue manager", "tenant", q.tenantID)
188 f.queues[q.tenantID] = f.createQueueAndStartWorkers(q.tenantID, q.queueSize, q.workerCount)
189 }
190
191 f.mutex.Unlock()
192 case <-f.shutdown:
193 ticker.Stop()
194 return

Callers 1

startMethod · 0.95

Calls 10

getQueueConfigMethod · 0.95
ShouldUpdateMethod · 0.80
WorkerCountMethod · 0.80
TenantIDMethod · 0.80
LogMethod · 0.65
SizeMethod · 0.65
ShutdownMethod · 0.65
ErrorMethod · 0.65
StopMethod · 0.65

Tested by

no test coverage detected