heartbeatLoop checks in with the consumer group coordinator at the provided interval. It exits if it ever encounters an error, which would signal the end of the generation.
(interval time.Duration)
| 462 | // interval. It exits if it ever encounters an error, which would signal the |
| 463 | // end of the generation. |
| 464 | func (g *Generation) heartbeatLoop(interval time.Duration) { |
| 465 | g.Start(func(ctx context.Context) { |
| 466 | g.log(func(l Logger) { |
| 467 | l.Printf("started heartbeat for group, %v [%v]", g.GroupID, interval) |
| 468 | }) |
| 469 | defer g.log(func(l Logger) { |
| 470 | l.Printf("stopped heartbeat for group %s\n", g.GroupID) |
| 471 | }) |
| 472 | |
| 473 | ticker := time.NewTicker(interval) |
| 474 | defer ticker.Stop() |
| 475 | |
| 476 | for { |
| 477 | select { |
| 478 | case <-ctx.Done(): |
| 479 | return |
| 480 | case <-ticker.C: |
| 481 | _, err := g.conn.heartbeat(heartbeatRequestV0{ |
| 482 | GroupID: g.GroupID, |
| 483 | GenerationID: g.ID, |
| 484 | MemberID: g.MemberID, |
| 485 | }) |
| 486 | if err != nil { |
| 487 | return |
| 488 | } |
| 489 | } |
| 490 | } |
| 491 | }) |
| 492 | } |
| 493 | |
| 494 | // partitionWatcher queries kafka and watches for partition changes, triggering |
| 495 | // a rebalance if changes are found. Similar to heartbeat it's okay to return on |