MCPcopy
hub / github.com/segmentio/kafka-go / partitionWatcher

Method partitionWatcher

consumergroup.go:500–550  ·  view source on GitHub ↗

partitionWatcher queries kafka and watches for partition changes, triggering a rebalance if changes are found. Similar to heartbeat it's okay to return on error here as if you are unable to ask a broker for basic metadata you're in a bad spot and should rebalance. Commonly you will see an error here

(interval time.Duration, topic string)

Source from the content-addressed store, hash-verified

498// is a problem with the connection to the coordinator and a rebalance will
499// establish a new connection to the coordinator.
500func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
501 g.Start(func(ctx context.Context) {
502 g.log(func(l Logger) {
503 l.Printf("started partition watcher for group, %v, topic %v [%v]", g.GroupID, topic, interval)
504 })
505 defer g.log(func(l Logger) {
506 l.Printf("stopped partition watcher for group, %v, topic %v", g.GroupID, topic)
507 })
508
509 ticker := time.NewTicker(interval)
510 defer ticker.Stop()
511
512 ops, err := g.conn.readPartitions(topic)
513 if err != nil {
514 g.logError(func(l Logger) {
515 l.Printf("Problem getting partitions during startup, %v\n, Returning and setting up nextGeneration", err)
516 })
517 return
518 }
519 oParts := len(ops)
520 for {
521 select {
522 case <-ctx.Done():
523 return
524 case <-ticker.C:
525 ops, err := g.conn.readPartitions(topic)
526 switch {
527 case err == nil, errors.Is(err, UnknownTopicOrPartition):
528 if len(ops) != oParts {
529 g.log(func(l Logger) {
530 l.Printf("Partition changes found, rebalancing group: %v.", g.GroupID)
531 })
532 return
533 }
534
535 default:
536 g.logError(func(l Logger) {
537 l.Printf("Problem getting partitions while checking for changes, %v", err)
538 })
539 var kafkaError Error
540 if errors.As(err, &kafkaError) {
541 continue
542 }
543 // other errors imply that we lost the connection to the coordinator, so we
544 // should abort and reconnect.
545 return
546 }
547 }
548 }
549 })
550}
551
552// coordinator is a subset of the functionality in Conn in order to facilitate
553// testing the consumer group...especially for error conditions that are

Callers 2

nextGenerationMethod · 0.95

Calls 5

StartMethod · 0.95
logMethod · 0.80
DoneMethod · 0.80
PrintfMethod · 0.65
readPartitionsMethod · 0.65

Tested by 1