MCPcopy
hub / github.com/grafana/tempo / Test_instance_concurrency

Function Test_instance_concurrency

modules/generator/instance_test.go:32–88  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

30)
31
32func Test_instance_concurrency(t *testing.T) {
33 // Both instances use the same overrides, this map will be accessed by both
34 overrides := &mockOverrides{}
35 overrides.processors = map[string]struct{}{
36 processor.SpanMetricsName: {},
37 processor.ServiceGraphsName: {},
38 }
39 cfg := &Config{}
40 cfg.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{})
41
42 instance1, err := newInstance(cfg, "test", overrides, &noopStorage{}, log.NewNopLogger())
43 assert.NoError(t, err)
44
45 instance2, err := newInstance(cfg, "test", overrides, &noopStorage{}, log.NewNopLogger())
46 assert.NoError(t, err)
47
48 end := make(chan struct{})
49
50 accessor := func(f func()) {
51 for {
52 select {
53 case <-end:
54 return
55 default:
56 f()
57 }
58 }
59 }
60
61 go accessor(func() {
62 req := test.MakeBatch(1, nil)
63 instance1.pushSpans(context.Background(), &tempopb.PushSpansRequest{Batches: []*v1.ResourceSpans{req}})
64 })
65
66 go accessor(func() {
67 req := test.MakeBatch(1, nil)
68 instance2.pushSpans(context.Background(), &tempopb.PushSpansRequest{Batches: []*v1.ResourceSpans{req}})
69 })
70
71 go accessor(func() {
72 err := instance1.updateProcessors()
73 assert.NoError(t, err)
74 })
75
76 go accessor(func() {
77 err := instance2.updateProcessors()
78 assert.NoError(t, err)
79 })
80
81 time.Sleep(100 * time.Millisecond)
82
83 instance1.shutdown()
84 instance2.shutdown()
85
86 time.Sleep(10 * time.Millisecond)
87 close(end)
88}
89

Callers

nothing calls this directly

Calls 8

MakeBatchFunction · 0.92
fFunction · 0.85
pushSpansMethod · 0.80
updateProcessorsMethod · 0.80
newInstanceFunction · 0.70
SleepMethod · 0.65
shutdownMethod · 0.45

Tested by

no test coverage detected