| 142 | } |
| 143 | |
| 144 | func (m *Manager) updateQueueLists() { |
| 145 | m.tenantToQueueListMu.Lock() |
| 146 | defer m.tenantToQueueListMu.Unlock() |
| 147 | |
| 148 | queueListsToAdd := make(map[string]*queueList) |
| 149 | for tenantID, ql := range m.tenantToQueueList { |
| 150 | forwarderNames := m.overrides.Forwarders(tenantID) |
| 151 | if len(forwarderNames) < 1 { |
| 152 | go m.shutdownQueueList(tenantID, ql) |
| 153 | delete(m.tenantToQueueList, tenantID) |
| 154 | |
| 155 | continue |
| 156 | } |
| 157 | |
| 158 | if ql.shouldUpdate(forwarderNames) { |
| 159 | go m.shutdownQueueList(tenantID, ql) |
| 160 | delete(m.tenantToQueueList, tenantID) |
| 161 | |
| 162 | newQl, err := newQueueList(m.logger, tenantID, forwarderNames, m.forwarderNameToForwarder) |
| 163 | if err != nil { |
| 164 | _ = level.Warn(m.logger).Log("msg", "failed to create queue list", "err", err) |
| 165 | |
| 166 | continue |
| 167 | } |
| 168 | |
| 169 | queueListsToAdd[tenantID] = newQl |
| 170 | } |
| 171 | } |
| 172 | |
| 173 | for tenantID, ql := range queueListsToAdd { |
| 174 | m.tenantToQueueList[tenantID] = ql |
| 175 | } |
| 176 | } |
| 177 | |
| 178 | func (m *Manager) shutdown() error { |
| 179 | ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |