MCPcopy
hub / github.com/segmentio/kafka-go / TestMessageSetReader

Function TestMessageSetReader

message_test.go:239–536  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

237}
238
239func TestMessageSetReader(t *testing.T) {
240 const startOffset = 1000
241 const highWatermark = 5000
242 const topic = "test-topic"
243 msgs := make([]Message, 100)
244 for i := 0; i < 100; i++ {
245 msgs[i] = Message{
246 Time: time.Now(),
247 Offset: int64(i + startOffset),
248 Key: []byte(fmt.Sprintf("key-%d", i)),
249 Value: []byte(fmt.Sprintf("val-%d", i)),
250 Headers: []Header{
251 {
252 Key: fmt.Sprintf("header-key-%d", i),
253 Value: []byte(fmt.Sprintf("header-value-%d", i)),
254 },
255 },
256 }
257 }
258 defaultHeader := fetchResponseHeader{
259 highWatermarkOffset: highWatermark,
260 lastStableOffset: highWatermark,
261 topic: topic,
262 }
263 for _, tc := range []struct {
264 name string
265 builder fetchResponseBuilder
266 err error
267 debug bool
268 }{
269 {
270 name: "empty",
271 builder: fetchResponseBuilder{
272 header: defaultHeader,
273 },
274 err: errShortRead,
275 },
276 {
277 name: "v0",
278 builder: fetchResponseBuilder{
279 header: defaultHeader,
280 msgSets: []messageSetBuilder{
281 v0MessageSetBuilder{
282 msgs: []Message{msgs[0]},
283 },
284 },
285 },
286 },
287 {
288 name: "v0 compressed",
289 builder: fetchResponseBuilder{
290 header: defaultHeader,
291 msgSets: []messageSetBuilder{
292 v0MessageSetBuilder{
293 codec: new(gzip.Codec),
294 msgs: []Message{msgs[0]},
295 },
296 },

Callers

nothing calls this directly

Calls 8

newReaderHelperFunction · 0.85
remainingMethod · 0.80
readMessageErrMethod · 0.80
bytesMethod · 0.65
messagesMethod · 0.65
LenMethod · 0.65
readMessageMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected