(t *testing.T, conn *Conn, groupID string)
| 655 | } |
| 656 | |
| 657 | func waitForCoordinator(t *testing.T, conn *Conn, groupID string) { |
| 658 | // ensure that kafka has allocated a group coordinator. oddly, issue doesn't |
| 659 | // appear to happen if the kafka been running for a while. |
| 660 | const maxAttempts = 20 |
| 661 | for attempt := 1; attempt <= maxAttempts; attempt++ { |
| 662 | _, err := conn.findCoordinator(findCoordinatorRequestV0{ |
| 663 | CoordinatorKey: groupID, |
| 664 | }) |
| 665 | if err != nil { |
| 666 | if errors.Is(err, GroupCoordinatorNotAvailable) { |
| 667 | time.Sleep(250 * time.Millisecond) |
| 668 | continue |
| 669 | } else { |
| 670 | t.Fatalf("unable to find coordinator for group: %v", err) |
| 671 | } |
| 672 | } else { |
| 673 | return |
| 674 | } |
| 675 | } |
| 676 | |
| 677 | t.Fatalf("unable to connect to coordinator after %v attempts", maxAttempts) |
| 678 | } |
| 679 | |
| 680 | func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32, memberID string, stop func()) { |
| 681 | waitForCoordinator(t, conn, groupID) |
no test coverage detected