partitionWatcher queries kafka and watches for partition changes, triggering a rebalance if changes are found. Similar to heartbeat it's okay to return on error here as if you are unable to ask a broker for basic metadata you're in a bad spot and should rebalance. Commonly you will see an error here
(interval time.Duration, topic string)
| 498 | // is a problem with the connection to the coordinator and a rebalance will |
| 499 | // establish a new connection to the coordinator. |
| 500 | func (g *Generation) partitionWatcher(interval time.Duration, topic string) { |
| 501 | g.Start(func(ctx context.Context) { |
| 502 | g.log(func(l Logger) { |
| 503 | l.Printf("started partition watcher for group, %v, topic %v [%v]", g.GroupID, topic, interval) |
| 504 | }) |
| 505 | defer g.log(func(l Logger) { |
| 506 | l.Printf("stopped partition watcher for group, %v, topic %v", g.GroupID, topic) |
| 507 | }) |
| 508 | |
| 509 | ticker := time.NewTicker(interval) |
| 510 | defer ticker.Stop() |
| 511 | |
| 512 | ops, err := g.conn.readPartitions(topic) |
| 513 | if err != nil { |
| 514 | g.logError(func(l Logger) { |
| 515 | l.Printf("Problem getting partitions during startup, %v\n, Returning and setting up nextGeneration", err) |
| 516 | }) |
| 517 | return |
| 518 | } |
| 519 | oParts := len(ops) |
| 520 | for { |
| 521 | select { |
| 522 | case <-ctx.Done(): |
| 523 | return |
| 524 | case <-ticker.C: |
| 525 | ops, err := g.conn.readPartitions(topic) |
| 526 | switch { |
| 527 | case err == nil, errors.Is(err, UnknownTopicOrPartition): |
| 528 | if len(ops) != oParts { |
| 529 | g.log(func(l Logger) { |
| 530 | l.Printf("Partition changes found, rebalancing group: %v.", g.GroupID) |
| 531 | }) |
| 532 | return |
| 533 | } |
| 534 | |
| 535 | default: |
| 536 | g.logError(func(l Logger) { |
| 537 | l.Printf("Problem getting partitions while checking for changes, %v", err) |
| 538 | }) |
| 539 | var kafkaError Error |
| 540 | if errors.As(err, &kafkaError) { |
| 541 | continue |
| 542 | } |
| 543 | // other errors imply that we lost the connection to the coordinator, so we |
| 544 | // should abort and reconnect. |
| 545 | return |
| 546 | } |
| 547 | } |
| 548 | } |
| 549 | }) |
| 550 | } |
| 551 | |
| 552 | // coordinator is a subset of the functionality in Conn in order to facilitate |
| 553 | // testing the consumer group...especially for error conditions that are |