(t *testing.T)
| 12 | ) |
| 13 | |
| 14 | func TestClientSyncGroup(t *testing.T) { |
| 15 | // In order to get to a sync group call we need to first |
| 16 | // join a group. |
| 17 | topic := makeTopic() |
| 18 | client, shutdown := newLocalClient() |
| 19 | client.Timeout = time.Minute |
| 20 | // Although at higher api versions ClientID is nullable |
| 21 | // for some reason the SyncGroup API call errors |
| 22 | // when ClientID is null. |
| 23 | // The Java Kafka Consumer generates a ClientID if one is not |
| 24 | // present or if the provided ClientID is empty. |
| 25 | client.Transport.(*Transport).ClientID = "test-client" |
| 26 | defer shutdown() |
| 27 | |
| 28 | err := clientCreateTopic(client, topic, 3) |
| 29 | if err != nil { |
| 30 | t.Fatal(err) |
| 31 | } |
| 32 | |
| 33 | groupID := makeGroupID() |
| 34 | |
| 35 | ctx, cancel := context.WithTimeout(context.Background(), time.Minute) |
| 36 | defer cancel() |
| 37 | respc, err := waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{ |
| 38 | Addr: client.Addr, |
| 39 | Key: groupID, |
| 40 | KeyType: CoordinatorKeyTypeConsumer, |
| 41 | }) |
| 42 | if err != nil { |
| 43 | t.Fatal(err) |
| 44 | } |
| 45 | |
| 46 | if respc.Error != nil { |
| 47 | t.Fatal(err) |
| 48 | } |
| 49 | |
| 50 | groupInstanceID := "group-instance-id" |
| 51 | userData := "user-data" |
| 52 | |
| 53 | var rrGroupBalancer RoundRobinGroupBalancer |
| 54 | |
| 55 | req := &JoinGroupRequest{ |
| 56 | GroupID: groupID, |
| 57 | GroupInstanceID: groupInstanceID, |
| 58 | ProtocolType: "consumer", |
| 59 | SessionTimeout: time.Minute, |
| 60 | RebalanceTimeout: time.Minute, |
| 61 | Protocols: []GroupProtocol{ |
| 62 | { |
| 63 | Name: rrGroupBalancer.ProtocolName(), |
| 64 | Metadata: GroupProtocolSubscription{ |
| 65 | Topics: []string{topic}, |
| 66 | UserData: []byte(userData), |
| 67 | OwnedPartitions: map[string][]int{ |
| 68 | topic: {0, 1, 2}, |
| 69 | }, |
| 70 | }, |
| 71 | }, |
nothing calls this directly
no test coverage detected