(t *testing.T)
| 13 | ) |
| 14 | |
| 15 | func TestClientJoinGroup(t *testing.T) { |
| 16 | topic := makeTopic() |
| 17 | client, shutdown := newLocalClient() |
| 18 | defer shutdown() |
| 19 | |
| 20 | err := clientCreateTopic(client, topic, 3) |
| 21 | if err != nil { |
| 22 | t.Fatal(err) |
| 23 | } |
| 24 | |
| 25 | groupID := makeGroupID() |
| 26 | |
| 27 | ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) |
| 28 | defer cancel() |
| 29 | respc, err := waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{ |
| 30 | Addr: client.Addr, |
| 31 | Key: groupID, |
| 32 | KeyType: CoordinatorKeyTypeConsumer, |
| 33 | }) |
| 34 | if err != nil { |
| 35 | t.Fatal(err) |
| 36 | } |
| 37 | |
| 38 | if respc.Error != nil { |
| 39 | t.Fatal(err) |
| 40 | } |
| 41 | |
| 42 | groupInstanceID := "group-instance-id" |
| 43 | if !ktesting.KafkaIsAtLeast("2.4.1") { |
| 44 | groupInstanceID = "" |
| 45 | } |
| 46 | const userData = "user-data" |
| 47 | |
| 48 | req := &JoinGroupRequest{ |
| 49 | GroupID: groupID, |
| 50 | GroupInstanceID: groupInstanceID, |
| 51 | ProtocolType: "consumer", |
| 52 | SessionTimeout: time.Minute, |
| 53 | RebalanceTimeout: time.Minute, |
| 54 | Protocols: []GroupProtocol{ |
| 55 | { |
| 56 | Name: RoundRobinGroupBalancer{}.ProtocolName(), |
| 57 | Metadata: GroupProtocolSubscription{ |
| 58 | Topics: []string{topic}, |
| 59 | UserData: []byte(userData), |
| 60 | OwnedPartitions: map[string][]int{ |
| 61 | topic: {0, 1, 2}, |
| 62 | }, |
| 63 | }, |
| 64 | }, |
| 65 | }, |
| 66 | } |
| 67 | |
| 68 | var resp *JoinGroupResponse |
| 69 | |
| 70 | for { |
| 71 | resp, err = client.JoinGroup(ctx, req) |
| 72 | if err != nil { |
nothing calls this directly
no test coverage detected
searching dependent graphs…