MCPcopy
hub / github.com/grafana/tempo / stopKafka

Method stopKafka

modules/generator/generator_kafka.go:53–74  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

51}
52
53func (g *Generator) stopKafka() {
54 g.kafkaStop()
55 g.kafkaWG.Wait()
56 close(g.kafkaCh)
57 // When enabled, with static membership (InstanceID) franz-go does not send
58 // LeaveGroup on Close(); explicitly leave by instance ID so the coordinator
59 // can rebalance immediately. When disabled, avoid two rebalances (leave then
60 // join) e.g. when all replicas go down then up together.
61 if g.cfg.LeaveConsumerGroupOnShutdown && g.cfg.InstanceID != "" {
62 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
63 defer cancel()
64 if err := g.leaveGroupFn(ctx); err != nil {
65 level.Warn(g.logger).Log(
66 "msg", "failed to leave Kafka consumer group by instance ID (partitions may reassign after session timeout)",
67 "err", err,
68 "instance_id", g.cfg.InstanceID,
69 "group", g.cfg.Ingest.Kafka.ConsumerGroup,
70 )
71 }
72 }
73 g.kafkaClient.Close()
74}
75
76func (g *Generator) listenKafka(ctx context.Context) {
77 defer g.kafkaWG.Done()

Callers 2

stoppingMethod · 0.95

Calls 3

WaitMethod · 0.65
LogMethod · 0.65
CloseMethod · 0.65

Tested by 1