MCPcopy
hub / github.com/segmentio/kafka-go / readLag

Method readLag

reader.go:1155–1179  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

1153}
1154
1155func (r *Reader) readLag(ctx context.Context) {
1156 ticker := time.NewTicker(r.config.ReadLagInterval)
1157 defer ticker.Stop()
1158
1159 for {
1160 timeout, cancel := context.WithTimeout(ctx, r.config.ReadLagInterval/2)
1161 lag, err := r.ReadLag(timeout)
1162 cancel()
1163
1164 if err != nil {
1165 r.stats.errors.observe(1)
1166 r.withErrorLogger(func(log Logger) {
1167 log.Printf("kafka reader failed to read lag of partition %d of %s: %s", r.config.Partition, r.config.Topic, err)
1168 })
1169 } else {
1170 r.stats.lag.observe(lag)
1171 }
1172
1173 select {
1174 case <-ticker.C:
1175 case <-ctx.Done():
1176 return
1177 }
1178 }
1179}
1180
1181func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
1182 if r.closed {

Callers 1

activateReadLagMethod · 0.95

Calls 5

ReadLagMethod · 0.95
withErrorLoggerMethod · 0.95
DoneMethod · 0.80
PrintfMethod · 0.65
observeMethod · 0.45

Tested by

no test coverage detected