MCPcopy
hub / github.com/grafana/tempo / NewFilterForwarder

Function NewFilterForwarder

modules/distributor/forwarder/forwarder.go:82–116  ·  view source on GitHub ↗
(cfg FilterConfig, next Forwarder, logLevel dslog.Level)

Source from the content-addressed store, hash-verified

80}
81
82func NewFilterForwarder(cfg FilterConfig, next Forwarder, logLevel dslog.Level) (*FilterForwarder, error) {
83 factory := filterprocessor.NewFactory()
84
85 set := processor.Settings{
86 ID: component.MustNewID(factory.Type().String()),
87 TelemetrySettings: component.TelemetrySettings{
88 Logger: newLogger(logLevel),
89 TracerProvider: tracenoop.NewTracerProvider(),
90 MeterProvider: metricnoop.NewMeterProvider(),
91 },
92 BuildInfo: component.BuildInfo{},
93 }
94 // Use CreateDefaultConfig to get a Config with OTTL functions properly initialized
95 fpCfg := factory.CreateDefaultConfig().(*filterprocessor.Config)
96 fpCfg.ErrorMode = ottl.IgnoreError
97 fpCfg.Traces = filterprocessor.TraceFilters{
98 SpanConditions: cfg.Traces.SpanConditions,
99 SpanEventConditions: cfg.Traces.SpanEventConditions,
100 }
101 fp, err := factory.CreateTraces(context.Background(), set, fpCfg, consumerToForwarderAdapter{forwarder: next})
102 if err != nil {
103 return nil, fmt.Errorf("failed to create filter processor: %w", err)
104 }
105
106 f := &FilterForwarder{
107 filterProcessor: fp,
108 next: next,
109 }
110
111 if err := f.filterProcessor.Start(context.TODO(), f); err != nil {
112 return nil, fmt.Errorf("failed to start filter processor: %w", err)
113 }
114
115 return f, nil
116}
117
118func (f *FilterForwarder) ForwardTraces(ctx context.Context, traces ptrace.Traces) error {
119 // Copying the traces to avoid mutating the original.

Calls 3

newLoggerFunction · 0.70
StartMethod · 0.65
StringMethod · 0.45