run provides the main consumer group management loop. Each iteration performs the handshake to join the Reader to the consumer group. This function is responsible for closing the consumer group upon exit.
(cg *ConsumerGroup)
| 289 | // |
| 290 | // This function is responsible for closing the consumer group upon exit. |
| 291 | func (r *Reader) run(cg *ConsumerGroup) { |
| 292 | defer close(r.done) |
| 293 | defer cg.Close() |
| 294 | |
| 295 | r.withLogger(func(l Logger) { |
| 296 | l.Printf("entering loop for consumer group, %v\n", r.config.GroupID) |
| 297 | }) |
| 298 | |
| 299 | for { |
| 300 | // Limit the number of attempts at waiting for the next |
| 301 | // consumer generation. |
| 302 | var err error |
| 303 | var gen *Generation |
| 304 | for attempt := 1; attempt <= r.config.MaxAttempts; attempt++ { |
| 305 | gen, err = cg.Next(r.stctx) |
| 306 | if err == nil { |
| 307 | break |
| 308 | } |
| 309 | if errors.Is(err, r.stctx.Err()) { |
| 310 | return |
| 311 | } |
| 312 | r.stats.errors.observe(1) |
| 313 | r.withErrorLogger(func(l Logger) { |
| 314 | l.Printf("%v", err) |
| 315 | }) |
| 316 | // Continue with next attempt... |
| 317 | } |
| 318 | if err != nil { |
| 319 | // All attempts have failed. |
| 320 | select { |
| 321 | case r.runError <- err: |
| 322 | // If somebody's receiving on the runError, let |
| 323 | // them know the error occurred. |
| 324 | default: |
| 325 | // Otherwise, don't block to allow healing. |
| 326 | } |
| 327 | continue |
| 328 | } |
| 329 | |
| 330 | r.stats.rebalances.observe(1) |
| 331 | |
| 332 | r.subscribe(gen.Assignments) |
| 333 | |
| 334 | gen.Start(func(ctx context.Context) { |
| 335 | r.commitLoop(ctx, gen) |
| 336 | }) |
| 337 | gen.Start(func(ctx context.Context) { |
| 338 | // wait for the generation to end and then unsubscribe. |
| 339 | select { |
| 340 | case <-ctx.Done(): |
| 341 | // continue to next generation |
| 342 | case <-r.stctx.Done(): |
| 343 | // this will be the last loop because the reader is closed. |
| 344 | } |
| 345 | r.unsubscribe() |
| 346 | }) |
| 347 | } |
| 348 | } |
no test coverage detected