MCPcopy
hub / github.com/segmentio/kafka-go / TestClientJoinGroup

Function TestClientJoinGroup

joingroup_test.go:15–125  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

13)
14
15func TestClientJoinGroup(t *testing.T) {
16 topic := makeTopic()
17 client, shutdown := newLocalClient()
18 defer shutdown()
19
20 err := clientCreateTopic(client, topic, 3)
21 if err != nil {
22 t.Fatal(err)
23 }
24
25 groupID := makeGroupID()
26
27 ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
28 defer cancel()
29 respc, err := waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{
30 Addr: client.Addr,
31 Key: groupID,
32 KeyType: CoordinatorKeyTypeConsumer,
33 })
34 if err != nil {
35 t.Fatal(err)
36 }
37
38 if respc.Error != nil {
39 t.Fatal(err)
40 }
41
42 groupInstanceID := "group-instance-id"
43 if !ktesting.KafkaIsAtLeast("2.4.1") {
44 groupInstanceID = ""
45 }
46 const userData = "user-data"
47
48 req := &JoinGroupRequest{
49 GroupID: groupID,
50 GroupInstanceID: groupInstanceID,
51 ProtocolType: "consumer",
52 SessionTimeout: time.Minute,
53 RebalanceTimeout: time.Minute,
54 Protocols: []GroupProtocol{
55 {
56 Name: RoundRobinGroupBalancer{}.ProtocolName(),
57 Metadata: GroupProtocolSubscription{
58 Topics: []string{topic},
59 UserData: []byte(userData),
60 OwnedPartitions: map[string][]int{
61 topic: {0, 1, 2},
62 },
63 },
64 },
65 },
66 }
67
68 var resp *JoinGroupResponse
69
70 for {
71 resp, err = client.JoinGroup(ctx, req)
72 if err != nil {

Callers

nothing calls this directly

Calls 7

makeGroupIDFunction · 0.85
JoinGroupMethod · 0.80
makeTopicFunction · 0.70
newLocalClientFunction · 0.70
clientCreateTopicFunction · 0.70
ProtocolNameMethod · 0.65

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…