watchOverrides watches the overrides for changes and updates the queues accordingly
()
| 135 | // watchOverrides watches the overrides for changes |
| 136 | // and updates the queues accordingly |
| 137 | func (f *generatorForwarder) watchOverrides() { |
| 138 | ticker := time.NewTicker(f.overridesInterval) |
| 139 | |
| 140 | for { |
| 141 | select { |
| 142 | case <-ticker.C: |
| 143 | f.mutex.Lock() |
| 144 | |
| 145 | var ( |
| 146 | queuesToDelete []*queue.Queue[*request] |
| 147 | queuesToAdd []struct { |
| 148 | tenantID string |
| 149 | queueSize, workerCount int |
| 150 | } |
| 151 | ) |
| 152 | |
| 153 | for tenantID, q := range f.queues { |
| 154 | queueSize, workerCount := f.getQueueConfig(tenantID) |
| 155 | // if the queue size or worker count has changed, shutdown the queue manager and create a new one |
| 156 | if q.ShouldUpdate(queueSize, workerCount) { |
| 157 | _ = level.Info(f.logger).Log( |
| 158 | "msg", "Marking queue manager for update", |
| 159 | "tenant", tenantID, |
| 160 | "old_queue_size", q.Size(), |
| 161 | "new_queue_size", queueSize, |
| 162 | "old_worker_count", q.WorkerCount(), |
| 163 | "new_worker_count", workerCount, |
| 164 | ) |
| 165 | queuesToDelete = append(queuesToDelete, q) |
| 166 | queuesToAdd = append(queuesToAdd, struct { |
| 167 | tenantID string |
| 168 | queueSize, workerCount int |
| 169 | }{tenantID: tenantID, queueSize: queueSize, workerCount: workerCount}) |
| 170 | } |
| 171 | } |
| 172 | |
| 173 | // Spawn a goroutine to asynchronously shut down queue managers |
| 174 | go func() { |
| 175 | for _, q := range queuesToDelete { |
| 176 | // shutdown the queue manager |
| 177 | // this will block until all workers have finished and the queue is drained |
| 178 | _ = level.Info(f.logger).Log("msg", "Shutting down queue manager", "tenant", q.TenantID()) |
| 179 | if err := q.Shutdown(context.Background()); err != nil { |
| 180 | _ = level.Error(f.logger).Log("msg", "error shutting down queue manager", "tenant", q.TenantID(), "err", err) |
| 181 | } |
| 182 | } |
| 183 | }() |
| 184 | |
| 185 | // Synchronously update queue managers |
| 186 | for _, q := range queuesToAdd { |
| 187 | _ = level.Info(f.logger).Log("msg", "Updating queue manager", "tenant", q.tenantID) |
| 188 | f.queues[q.tenantID] = f.createQueueAndStartWorkers(q.tenantID, q.queueSize, q.workerCount) |
| 189 | } |
| 190 | |
| 191 | f.mutex.Unlock() |
| 192 | case <-f.shutdown: |
| 193 | ticker.Stop() |
| 194 | return |
no test coverage detected