MCPcopy
hub / github.com/segmentio/kafka-go / TestOffsetFetchRequestWithOneTopic

Function TestOffsetFetchRequestWithOneTopic

offsetfetch_test.go:112–167  ·  offsetfetch_test.go::TestOffsetFetchRequestWithOneTopic
(t *testing.T)

Source from the content-addressed store, hash-verified

110}
111
112func TestOffsetFetchRequestWithOneTopic(t *testing.T) {
113 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
114 topic1 := makeTopic()
115 defer deleteTopic(t, topic1)
116 topic2 := makeTopic()
117 defer deleteTopic(t, topic2)
118 consumeGroup := makeGroupID()
119 numMsgs := 50
120 defer cancel()
121 r1 := NewReader(ReaderConfig{
122 Brokers: []string{"localhost:9092"},
123 Topic: topic1,
124 GroupID: consumeGroup,
125 MinBytes: 1,
126 MaxBytes: 100,
127 MaxWait: 100 * time.Millisecond,
128 })
129 defer r1.Close()
130 prepareReader(t, ctx, r1, makeTestSequence(numMsgs)...)
131 r2 := NewReader(ReaderConfig{
132 Brokers: []string{"localhost:9092"},
133 Topic: topic2,
134 GroupID: consumeGroup,
135 MinBytes: 1,
136 MaxBytes: 100,
137 MaxWait: 100 * time.Millisecond,
138 })
139 defer r2.Close()
140 prepareReader(t, ctx, r2, makeTestSequence(numMsgs)...)
141
142 for i := 0; i < numMsgs; i++ {
143 if _, err := r1.ReadMessage(ctx); err != nil {
144 t.Fatal(err)
145 }
146 }
147 for i := 0; i < numMsgs; i++ {
148 if _, err := r2.ReadMessage(ctx); err != nil {
149 t.Fatal(err)
150 }
151 }
152
153 client := Client{Addr: TCP("localhost:9092")}
154 topicOffsets, err := client.OffsetFetch(ctx, &OffsetFetchRequest{GroupID: consumeGroup, Topics: map[string][]int{
155 topic1: {0},
156 }})
157
158 if err != nil {
159 t.Error(err)
160 t.FailNow()
161 }
162
163 if len(topicOffsets.Topics) != 1 {
164 t.Error(err)
165 t.FailNow()
166 }
167}

Callers

nothing calls this directly

Calls 11

CloseMethod · 0.95
ReadMessageMethod · 0.95
OffsetFetchMethod · 0.95
deleteTopicFunction · 0.85
makeGroupIDFunction · 0.85
NewReaderFunction · 0.85
prepareReaderFunction · 0.85
makeTestSequenceFunction · 0.85
TCPFunction · 0.85
makeTopicFunction · 0.70
ErrorMethod · 0.45

Tested by

no test coverage detected