MCPcopy
hub / github.com/segmentio/kafka-go / TestOffsetFetchRequestWithNoTopic

Function TestOffsetFetchRequestWithNoTopic

offsetfetch_test.go:51–110  ·  offsetfetch_test.go::TestOffsetFetchRequestWithNoTopic
(t *testing.T)

Source from the content-addressed store, hash-verified

49}
50
51func TestOffsetFetchRequestWithNoTopic(t *testing.T) {
52 if !ktesting.KafkaIsAtLeast("0.10.2.0") {
53 t.Logf("Test %s is not applicable for kafka versions below 0.10.2.0", t.Name())
54 t.SkipNow()
55 }
56 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
57 topic1 := makeTopic()
58 defer deleteTopic(t, topic1)
59 topic2 := makeTopic()
60 defer deleteTopic(t, topic2)
61 consumeGroup := makeGroupID()
62 numMsgs := 50
63 defer cancel()
64 r1 := NewReader(ReaderConfig{
65 Brokers: []string{"localhost:9092"},
66 Topic: topic1,
67 GroupID: consumeGroup,
68 MinBytes: 1,
69 MaxBytes: 100,
70 MaxWait: 100 * time.Millisecond,
71 })
72 defer r1.Close()
73 prepareReader(t, ctx, r1, makeTestSequence(numMsgs)...)
74 r2 := NewReader(ReaderConfig{
75 Brokers: []string{"localhost:9092"},
76 Topic: topic2,
77 GroupID: consumeGroup,
78 MinBytes: 1,
79 MaxBytes: 100,
80 MaxWait: 100 * time.Millisecond,
81 })
82 defer r2.Close()
83 prepareReader(t, ctx, r2, makeTestSequence(numMsgs)...)
84
85 for i := 0; i < numMsgs; i++ {
86 if _, err := r1.ReadMessage(ctx); err != nil {
87 t.Fatal(err)
88 }
89 }
90 for i := 0; i < numMsgs; i++ {
91 if _, err := r2.ReadMessage(ctx); err != nil {
92 t.Fatal(err)
93 }
94 }
95
96 client := Client{Addr: TCP("localhost:9092")}
97
98 topicOffsets, err := client.OffsetFetch(ctx, &OffsetFetchRequest{GroupID: consumeGroup})
99
100 if err != nil {
101 t.Error(err)
102 t.FailNow()
103 }
104
105 if len(topicOffsets.Topics) != 2 {
106 t.Error(err)
107 t.FailNow()
108 }

Callers

nothing calls this directly

Calls 12

CloseMethod · 0.95
ReadMessageMethod · 0.95
OffsetFetchMethod · 0.95
deleteTopicFunction · 0.85
makeGroupIDFunction · 0.85
NewReaderFunction · 0.85
prepareReaderFunction · 0.85
makeTestSequenceFunction · 0.85
TCPFunction · 0.85
makeTopicFunction · 0.70
NameMethod · 0.65
ErrorMethod · 0.45

Tested by

no test coverage detected