(t *testing.T)
| 73 | } |
| 74 | |
| 75 | func TestForwarder_shutdown(t *testing.T) { |
| 76 | oCfg := overrides.Config{} |
| 77 | oCfg.RegisterFlagsAndApplyDefaults(&flag.FlagSet{}) |
| 78 | oCfg.Defaults.MetricsGenerator.Forwarder.QueueSize = 200 |
| 79 | |
| 80 | id, err := util.HexStringToTraceID("1234567890abcdef") |
| 81 | require.NoError(t, err) |
| 82 | |
| 83 | b := test.MakeBatch(10, id) |
| 84 | keys, rebatchedTraces, _, _, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10, 1000) |
| 85 | require.NoError(t, err) |
| 86 | |
| 87 | o, err := overrides.NewOverrides(oCfg, nil, prometheus.DefaultRegisterer) |
| 88 | require.NoError(t, err) |
| 89 | |
| 90 | signalCh := make(chan struct{}) |
| 91 | f := newGeneratorForwarder( |
| 92 | log.NewNopLogger(), |
| 93 | func(_ context.Context, userID string, k []uint32, traces []*rebatchedTrace, noGenerateMetrics bool) error { |
| 94 | <-signalCh |
| 95 | |
| 96 | assert.Equal(t, tenantID, userID) |
| 97 | assert.Equal(t, keys, k) |
| 98 | assert.Equal(t, rebatchedTraces, traces) |
| 99 | assert.False(t, noGenerateMetrics) |
| 100 | return nil |
| 101 | }, |
| 102 | o, |
| 103 | ) |
| 104 | |
| 105 | require.NoError(t, f.start(context.Background())) |
| 106 | defer func() { |
| 107 | go func() { |
| 108 | // Wait to unblock processing of requests so shutdown and draining the queue is done in parallel |
| 109 | time.Sleep(time.Second) |
| 110 | close(signalCh) |
| 111 | }() |
| 112 | require.NoError(t, f.stop(nil)) |
| 113 | }() |
| 114 | |
| 115 | for i := 0; i < 100; i++ { |
| 116 | f.SendTraces(context.Background(), tenantID, keys, rebatchedTraces) |
| 117 | } |
| 118 | } |
nothing calls this directly
no test coverage detected