MCPcopy
hub / github.com/segmentio/kafka-go / waitForCoordinator

Function waitForCoordinator

conn_test.go:657–678  ·  view source on GitHub ↗
(t *testing.T, conn *Conn, groupID string)

Source from the content-addressed store, hash-verified

655}
656
657func waitForCoordinator(t *testing.T, conn *Conn, groupID string) {
658 // ensure that kafka has allocated a group coordinator. oddly, issue doesn't
659 // appear to happen if the kafka been running for a while.
660 const maxAttempts = 20
661 for attempt := 1; attempt <= maxAttempts; attempt++ {
662 _, err := conn.findCoordinator(findCoordinatorRequestV0{
663 CoordinatorKey: groupID,
664 })
665 if err != nil {
666 if errors.Is(err, GroupCoordinatorNotAvailable) {
667 time.Sleep(250 * time.Millisecond)
668 continue
669 } else {
670 t.Fatalf("unable to find coordinator for group: %v", err)
671 }
672 } else {
673 return
674 }
675 }
676
677 t.Fatalf("unable to connect to coordinator after %v attempts", maxAttempts)
678}
679
680func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32, memberID string, stop func()) {
681 waitForCoordinator(t, conn, groupID)

Callers 5

createGroupFunction · 0.85
testConnLeaveGroupErrFunction · 0.85
testConnSyncGroupErrFunction · 0.85

Calls 1

findCoordinatorMethod · 0.65

Tested by

no test coverage detected