| 51 | } |
| 52 | |
| 53 | func (g *Generator) stopKafka() { |
| 54 | g.kafkaStop() |
| 55 | g.kafkaWG.Wait() |
| 56 | close(g.kafkaCh) |
| 57 | // When enabled, with static membership (InstanceID) franz-go does not send |
| 58 | // LeaveGroup on Close(); explicitly leave by instance ID so the coordinator |
| 59 | // can rebalance immediately. When disabled, avoid two rebalances (leave then |
| 60 | // join) e.g. when all replicas go down then up together. |
| 61 | if g.cfg.LeaveConsumerGroupOnShutdown && g.cfg.InstanceID != "" { |
| 62 | ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
| 63 | defer cancel() |
| 64 | if err := g.leaveGroupFn(ctx); err != nil { |
| 65 | level.Warn(g.logger).Log( |
| 66 | "msg", "failed to leave Kafka consumer group by instance ID (partitions may reassign after session timeout)", |
| 67 | "err", err, |
| 68 | "instance_id", g.cfg.InstanceID, |
| 69 | "group", g.cfg.Ingest.Kafka.ConsumerGroup, |
| 70 | ) |
| 71 | } |
| 72 | } |
| 73 | g.kafkaClient.Close() |
| 74 | } |
| 75 | |
| 76 | func (g *Generator) listenKafka(ctx context.Context) { |
| 77 | defer g.kafkaWG.Done() |