MCPcopy
hub / github.com/segmentio/kafka-go / heartbeatLoop

Method heartbeatLoop

consumergroup.go:464–492  ·  view source on GitHub ↗

heartbeatLoop checks in with the consumer group coordinator at the provided interval. It exits if it ever encounters an error, which would signal the end of the generation.

(interval time.Duration)

Source from the content-addressed store, hash-verified

462// interval. It exits if it ever encounters an error, which would signal the
463// end of the generation.
464func (g *Generation) heartbeatLoop(interval time.Duration) {
465 g.Start(func(ctx context.Context) {
466 g.log(func(l Logger) {
467 l.Printf("started heartbeat for group, %v [%v]", g.GroupID, interval)
468 })
469 defer g.log(func(l Logger) {
470 l.Printf("stopped heartbeat for group %s\n", g.GroupID)
471 })
472
473 ticker := time.NewTicker(interval)
474 defer ticker.Stop()
475
476 for {
477 select {
478 case <-ctx.Done():
479 return
480 case <-ticker.C:
481 _, err := g.conn.heartbeat(heartbeatRequestV0{
482 GroupID: g.GroupID,
483 GenerationID: g.ID,
484 MemberID: g.MemberID,
485 })
486 if err != nil {
487 return
488 }
489 }
490 }
491 })
492}
493
494// partitionWatcher queries kafka and watches for partition changes, triggering
495// a rebalance if changes are found. Similar to heartbeat it's okay to return on

Callers 1

nextGenerationMethod · 0.95

Calls 5

StartMethod · 0.95
logMethod · 0.80
DoneMethod · 0.80
PrintfMethod · 0.65
heartbeatMethod · 0.65

Tested by

no test coverage detected