(ctx context.Context)
| 96 | } |
| 97 | |
| 98 | func (g *Generator) readKafka(ctx context.Context) error { |
| 99 | fetches := g.kafkaClient.PollFetches(ctx) |
| 100 | fetches.EachError(func(_ string, _ int32, err error) { |
| 101 | if !errors.Is(err, context.Canceled) { |
| 102 | level.Error(g.logger).Log("msg", "failed to fetch records", "err", err) |
| 103 | } |
| 104 | }) |
| 105 | if err := fetches.Err(); err != nil && !errors.Is(err, context.Canceled) { |
| 106 | return err |
| 107 | } |
| 108 | |
| 109 | // Metric lag based on first message in each partition. |
| 110 | // This balances overhead with granularity. |
| 111 | fetches.EachPartition(func(p kgo.FetchTopicPartition) { |
| 112 | if len(p.Records) > 0 { |
| 113 | lag := time.Since(p.Records[0].Timestamp) |
| 114 | ingest.SetPartitionLagSeconds(g.cfg.Ingest.Kafka.ConsumerGroup, p.Partition, lag) |
| 115 | } |
| 116 | }) |
| 117 | |
| 118 | start := time.Now() |
| 119 | |
| 120 | for iter := fetches.RecordIter(); !iter.Done(); { |
| 121 | select { |
| 122 | case g.kafkaCh <- iter.Next(): |
| 123 | case <-ctx.Done(): |
| 124 | return ctx.Err() |
| 125 | } |
| 126 | } |
| 127 | |
| 128 | metricEnqueueTime.Add(time.Since(start).Seconds()) |
| 129 | |
| 130 | return nil |
| 131 | } |
| 132 | |
| 133 | // readCh reads records from the internal channel. |
| 134 | // This allows for offloading the expensive proto unmarshal |
no test coverage detected