MCPcopy
hub / github.com/grafana/tempo / TestForwarder_shutdown

Function TestForwarder_shutdown

modules/distributor/forwarder_test.go:75–118  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

73}
74
75func TestForwarder_shutdown(t *testing.T) {
76 oCfg := overrides.Config{}
77 oCfg.RegisterFlagsAndApplyDefaults(&flag.FlagSet{})
78 oCfg.Defaults.MetricsGenerator.Forwarder.QueueSize = 200
79
80 id, err := util.HexStringToTraceID("1234567890abcdef")
81 require.NoError(t, err)
82
83 b := test.MakeBatch(10, id)
84 keys, rebatchedTraces, _, _, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10, 1000)
85 require.NoError(t, err)
86
87 o, err := overrides.NewOverrides(oCfg, nil, prometheus.DefaultRegisterer)
88 require.NoError(t, err)
89
90 signalCh := make(chan struct{})
91 f := newGeneratorForwarder(
92 log.NewNopLogger(),
93 func(_ context.Context, userID string, k []uint32, traces []*rebatchedTrace, noGenerateMetrics bool) error {
94 <-signalCh
95
96 assert.Equal(t, tenantID, userID)
97 assert.Equal(t, keys, k)
98 assert.Equal(t, rebatchedTraces, traces)
99 assert.False(t, noGenerateMetrics)
100 return nil
101 },
102 o,
103 )
104
105 require.NoError(t, f.start(context.Background()))
106 defer func() {
107 go func() {
108 // Wait to unblock processing of requests so shutdown and draining the queue is done in parallel
109 time.Sleep(time.Second)
110 close(signalCh)
111 }()
112 require.NoError(t, f.stop(nil))
113 }()
114
115 for i := 0; i < 100; i++ {
116 f.SendTraces(context.Background(), tenantID, keys, rebatchedTraces)
117 }
118}

Callers

nothing calls this directly

Calls 11

HexStringToTraceIDFunction · 0.92
MakeBatchFunction · 0.92
NewOverridesFunction · 0.92
requestsByTraceIDFunction · 0.85
newGeneratorForwarderFunction · 0.85
SendTracesMethod · 0.80
startMethod · 0.65
SleepMethod · 0.65
stopMethod · 0.65
EqualMethod · 0.45

Tested by

no test coverage detected