MCPcopy
hub / github.com/grafana/tempo / newQueueList

Function newQueueList

modules/distributor/forwarder/manager.go:218–251  ·  view source on GitHub ↗
(logger log.Logger, tenantID string, forwarderNames []string, forwarderNameToForwarder map[string]Forwarder)

Source from the content-addressed store, hash-verified

216}
217
218func newQueueList(logger log.Logger, tenantID string, forwarderNames []string, forwarderNameToForwarder map[string]Forwarder) (*queueList, error) {
219 forwarderNameToQueue := make(map[string]*queue.Queue[ptrace.Traces], len(forwarderNames))
220 list := make(List, 0, len(forwarderNames))
221 for _, forwarderName := range forwarderNames {
222 forwarder, found := forwarderNameToForwarder[forwarderName]
223 if !found {
224 return nil, fmt.Errorf("failed to find forwarder by name: forwarderName=%s, tenantID=%s", forwarderName, tenantID)
225 }
226
227 queueCfg := queue.Config{
228 Name: forwarderName,
229 TenantID: tenantID,
230 Size: defaultQueueSize,
231 WorkerCount: defaultWorkerCount,
232 }
233
234 processFunc := func(ctx context.Context, traces ptrace.Traces) {
235 if err := forwarder.ForwardTraces(ctx, traces); err != nil {
236 _ = level.Warn(logger).Log("msg", "failed to forward batches", "forwarderName", forwarderName, "tenantID", tenantID, "err", err)
237 }
238 }
239 newQueue := queue.New(queueCfg, logger, processFunc)
240 newQueue.StartWorkers()
241 forwarderNameToQueue[forwarderName] = newQueue
242 list = append(list, queueAdapter{queue: newQueue})
243 }
244
245 return &queueList{
246 logger: logger,
247 tenantID: tenantID,
248 forwarderNameToQueue: forwarderNameToQueue,
249 list: list,
250 }, nil
251}
252
253func (l *queueList) shouldUpdate(forwarderNames []string) bool {
254 if len(forwarderNames) != len(l.forwarderNameToQueue) {

Callers 2

getOrCreateQueueListMethod · 0.85
updateQueueListsMethod · 0.85

Calls 4

NewFunction · 0.92
StartWorkersMethod · 0.80
ForwardTracesMethod · 0.65
LogMethod · 0.65

Tested by

no test coverage detected