()
| 1143 | } |
| 1144 | |
| 1145 | func (r *Reader) activateReadLag() { |
| 1146 | if r.config.ReadLagInterval > 0 && atomic.CompareAndSwapUint32(&r.once, 0, 1) { |
| 1147 | // read lag will only be calculated when not using consumer groups |
| 1148 | // todo discuss how capturing read lag should interact with rebalancing |
| 1149 | if !r.useConsumerGroup() { |
| 1150 | go r.readLag(r.stctx) |
| 1151 | } |
| 1152 | } |
| 1153 | } |
| 1154 | |
| 1155 | func (r *Reader) readLag(ctx context.Context) { |
| 1156 | ticker := time.NewTicker(r.config.ReadLagInterval) |
no test coverage detected