MCPcopy
hub / github.com/segmentio/kafka-go / TestClientDeleteGroups

Function TestClientDeleteGroups

deletegroups_test.go:12–80  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

10)
11
12func TestClientDeleteGroups(t *testing.T) {
13 if !ktesting.KafkaIsAtLeast("1.1.0") {
14 t.Skip("Skipping test because kafka version is not high enough.")
15 }
16
17 client, shutdown := newLocalClient()
18 defer shutdown()
19
20 topic := makeTopic()
21 createTopic(t, topic, 1)
22
23 groupID := makeGroupID()
24
25 group, err := NewConsumerGroup(ConsumerGroupConfig{
26 ID: groupID,
27 Topics: []string{topic},
28 Brokers: []string{"localhost:9092"},
29 HeartbeatInterval: 2 * time.Second,
30 RebalanceTimeout: 2 * time.Second,
31 RetentionTime: time.Hour,
32 Logger: &testKafkaLogger{T: t},
33 })
34 if err != nil {
35 t.Fatal(err)
36 }
37 defer group.Close()
38
39 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
40 defer cancel()
41
42 gen, err := group.Next(ctx)
43 if gen == nil {
44 t.Fatalf("expected generation 1 not to be nil")
45 }
46 if err != nil {
47 t.Fatalf("expected no error, but got %+v", err)
48 }
49
50 // delete not empty group
51 res, err := client.DeleteGroups(ctx, &DeleteGroupsRequest{
52 GroupIDs: []string{groupID},
53 })
54
55 if err != nil {
56 t.Fatal(err)
57 }
58
59 if !errors.Is(res.Errors[groupID], NonEmptyGroup) {
60 t.Fatalf("expected NonEmptyGroup error, but got %+v", res.Errors[groupID])
61 }
62
63 err = group.Close()
64 if err != nil {
65 t.Fatal(err)
66 }
67
68 // delete empty group
69 res, err = client.DeleteGroups(ctx, &DeleteGroupsRequest{

Callers

nothing calls this directly

Calls 9

CloseMethod · 0.95
NextMethod · 0.95
createTopicFunction · 0.85
makeGroupIDFunction · 0.85
NewConsumerGroupFunction · 0.85
DeleteGroupsMethod · 0.80
newLocalClientFunction · 0.70
makeTopicFunction · 0.70
ErrorMethod · 0.45

Tested by

no test coverage detected