(logger log.Logger, tenantID string, forwarderNames []string, forwarderNameToForwarder map[string]Forwarder)
| 216 | } |
| 217 | |
| 218 | func newQueueList(logger log.Logger, tenantID string, forwarderNames []string, forwarderNameToForwarder map[string]Forwarder) (*queueList, error) { |
| 219 | forwarderNameToQueue := make(map[string]*queue.Queue[ptrace.Traces], len(forwarderNames)) |
| 220 | list := make(List, 0, len(forwarderNames)) |
| 221 | for _, forwarderName := range forwarderNames { |
| 222 | forwarder, found := forwarderNameToForwarder[forwarderName] |
| 223 | if !found { |
| 224 | return nil, fmt.Errorf("failed to find forwarder by name: forwarderName=%s, tenantID=%s", forwarderName, tenantID) |
| 225 | } |
| 226 | |
| 227 | queueCfg := queue.Config{ |
| 228 | Name: forwarderName, |
| 229 | TenantID: tenantID, |
| 230 | Size: defaultQueueSize, |
| 231 | WorkerCount: defaultWorkerCount, |
| 232 | } |
| 233 | |
| 234 | processFunc := func(ctx context.Context, traces ptrace.Traces) { |
| 235 | if err := forwarder.ForwardTraces(ctx, traces); err != nil { |
| 236 | _ = level.Warn(logger).Log("msg", "failed to forward batches", "forwarderName", forwarderName, "tenantID", tenantID, "err", err) |
| 237 | } |
| 238 | } |
| 239 | newQueue := queue.New(queueCfg, logger, processFunc) |
| 240 | newQueue.StartWorkers() |
| 241 | forwarderNameToQueue[forwarderName] = newQueue |
| 242 | list = append(list, queueAdapter{queue: newQueue}) |
| 243 | } |
| 244 | |
| 245 | return &queueList{ |
| 246 | logger: logger, |
| 247 | tenantID: tenantID, |
| 248 | forwarderNameToQueue: forwarderNameToQueue, |
| 249 | list: list, |
| 250 | }, nil |
| 251 | } |
| 252 | |
| 253 | func (l *queueList) shouldUpdate(forwarderNames []string) bool { |
| 254 | if len(forwarderNames) != len(l.forwarderNameToQueue) { |
no test coverage detected