| 224 | } |
| 225 | |
| 226 | func (r *PartitionReader) recordFetchesMetrics(fetches kgo.Fetches) { |
| 227 | var ( |
| 228 | now = time.Now() |
| 229 | numRecords = 0 |
| 230 | ) |
| 231 | |
| 232 | fetches.EachRecord(func(record *kgo.Record) { |
| 233 | numRecords++ |
| 234 | r.metrics.receiveDelay.Observe(now.Sub(record.Timestamp).Seconds()) |
| 235 | }) |
| 236 | |
| 237 | r.metrics.recordsPerFetch.Observe(float64(numRecords)) |
| 238 | } |
| 239 | |
| 240 | func (r *PartitionReader) fetchLastCommittedOffsetWithRetries(ctx context.Context) (offset kgo.Offset, err error) { |
| 241 | retry := backoff.New(ctx, backoff.Config{ |