MCPcopy
hub / github.com/segmentio/kafka-go / TestClientHeartbeat

Function TestClientHeartbeat

heartbeat_test.go:14–57  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

12)
13
14func TestClientHeartbeat(t *testing.T) {
15 client, topic, shutdown := newLocalClientAndTopic()
16 defer shutdown()
17
18 ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
19 defer cancel()
20
21 groupID := makeGroupID()
22
23 group, err := NewConsumerGroup(ConsumerGroupConfig{
24 ID: groupID,
25 Topics: []string{topic},
26 Brokers: []string{"localhost:9092"},
27 HeartbeatInterval: 2 * time.Second,
28 RebalanceTimeout: 2 * time.Second,
29 RetentionTime: time.Hour,
30 Logger: log.New(os.Stdout, "cg-test: ", 0),
31 })
32 if err != nil {
33 t.Fatal(err)
34 }
35 defer group.Close()
36
37 gen, err := group.Next(ctx)
38 if err != nil {
39 t.Fatal(err)
40 }
41
42 ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
43 defer cancel()
44
45 resp, err := client.Heartbeat(ctx, &HeartbeatRequest{
46 GroupID: groupID,
47 GenerationID: gen.ID,
48 MemberID: gen.MemberID,
49 })
50 if err != nil {
51 t.Fatal(err)
52 }
53
54 if resp.Error != nil {
55 t.Error(resp.Error)
56 }
57}
58
59func TestHeartbeatRequestV0(t *testing.T) {
60 item := heartbeatResponseV0{

Callers

nothing calls this directly

Calls 7

CloseMethod · 0.95
NextMethod · 0.95
makeGroupIDFunction · 0.85
NewConsumerGroupFunction · 0.85
HeartbeatMethod · 0.80
newLocalClientAndTopicFunction · 0.70
ErrorMethod · 0.45

Tested by

no test coverage detected