| 105 | } |
| 106 | |
| 107 | func New(cfg Config, tenant string, reg registry.Registry, logger log.Logger, filteredSpansCounter, invalidUTF8Counter prometheus.Counter) (gen.Processor, error) { |
| 108 | if cfg.EnableVirtualNodeLabel { |
| 109 | cfg.Dimensions = append(cfg.Dimensions, virtualNodeLabel) |
| 110 | } |
| 111 | |
| 112 | sanitizeCache := reclaimable.New(validation.SanitizeLabelName, 10000) |
| 113 | filter, err := spanfilter.NewSpanFilter(cfg.FilterPolicies) |
| 114 | if err != nil { |
| 115 | return nil, err |
| 116 | } |
| 117 | |
| 118 | p := &Processor{ |
| 119 | Cfg: cfg, |
| 120 | registry: reg, |
| 121 | closeCh: make(chan struct{}, 1), |
| 122 | |
| 123 | serviceGraphRequestTotal: reg.NewCounter(metricRequestTotal), |
| 124 | serviceGraphRequestFailedTotal: reg.NewCounter(metricRequestFailedTotal), |
| 125 | serviceGraphRequestServerSecondsHistogram: reg.NewHistogram(metricRequestServerSeconds, cfg.HistogramBuckets, cfg.HistogramOverride), |
| 126 | serviceGraphRequestClientSecondsHistogram: reg.NewHistogram(metricRequestClientSeconds, cfg.HistogramBuckets, cfg.HistogramOverride), |
| 127 | serviceGraphRequestMessagingSystemSecondsHistogram: reg.NewHistogram(metricRequestMessagingSystemSeconds, cfg.HistogramBuckets, cfg.HistogramOverride), |
| 128 | sanitizeCache: sanitizeCache, |
| 129 | filter: filter, |
| 130 | |
| 131 | filteredSpansCounter: filteredSpansCounter, |
| 132 | metricDroppedSpans: metricDroppedSpans.WithLabelValues(tenant), |
| 133 | metricDroppedEdges: metricDroppedEdges.WithLabelValues(tenant), |
| 134 | metricDroppedSpanSideCacheOverflows: metricDroppedSpanSideCacheOverflows.WithLabelValues(tenant), |
| 135 | metricTotalEdges: metricTotalEdges.WithLabelValues(tenant), |
| 136 | metricExpiredEdges: metricExpiredEdges.WithLabelValues(tenant), |
| 137 | invalidUTF8Counter: invalidUTF8Counter, |
| 138 | logger: log.With(logger, "component", "service-graphs"), |
| 139 | } |
| 140 | |
| 141 | p.store = store.NewStore(cfg.Wait, cfg.MaxItems, p.onComplete, p.onExpire, p.metricDroppedSpanSideCacheOverflows) |
| 142 | |
| 143 | expirationTicker := time.NewTicker(2 * time.Second) |
| 144 | for i := 0; i < cfg.Workers; i++ { |
| 145 | go func() { |
| 146 | for { |
| 147 | select { |
| 148 | // Periodically clean expired edges from the store |
| 149 | case <-expirationTicker.C: |
| 150 | p.store.Expire() |
| 151 | |
| 152 | case <-p.closeCh: |
| 153 | return |
| 154 | } |
| 155 | } |
| 156 | }() |
| 157 | } |
| 158 | |
| 159 | return p, nil |
| 160 | } |
| 161 | |
| 162 | func (p *Processor) Name() string { |
| 163 | return gen.ServiceGraphsName |