(ctx context.Context)
| 1153 | } |
| 1154 | |
| 1155 | func (r *Reader) readLag(ctx context.Context) { |
| 1156 | ticker := time.NewTicker(r.config.ReadLagInterval) |
| 1157 | defer ticker.Stop() |
| 1158 | |
| 1159 | for { |
| 1160 | timeout, cancel := context.WithTimeout(ctx, r.config.ReadLagInterval/2) |
| 1161 | lag, err := r.ReadLag(timeout) |
| 1162 | cancel() |
| 1163 | |
| 1164 | if err != nil { |
| 1165 | r.stats.errors.observe(1) |
| 1166 | r.withErrorLogger(func(log Logger) { |
| 1167 | log.Printf("kafka reader failed to read lag of partition %d of %s: %s", r.config.Partition, r.config.Topic, err) |
| 1168 | }) |
| 1169 | } else { |
| 1170 | r.stats.lag.observe(lag) |
| 1171 | } |
| 1172 | |
| 1173 | select { |
| 1174 | case <-ticker.C: |
| 1175 | case <-ctx.Done(): |
| 1176 | return |
| 1177 | } |
| 1178 | } |
| 1179 | } |
| 1180 | |
| 1181 | func (r *Reader) start(offsetsByPartition map[topicPartition]int64) { |
| 1182 | if r.closed { |
no test coverage detected