MCPcopy
hub / github.com/segmentio/kafka-go / testWriterMaxBytes

Function testWriterMaxBytes

writer_test.go:351–404  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

349}
350
351func testWriterMaxBytes(t *testing.T) {
352 topic := makeTopic()
353 createTopic(t, topic, 1)
354 defer deleteTopic(t, topic)
355
356 w := newTestWriter(WriterConfig{
357 Topic: topic,
358 BatchBytes: 25,
359 })
360 defer w.Close()
361
362 if err := w.WriteMessages(context.Background(), Message{
363 Value: []byte("Hi"),
364 }); err != nil {
365 t.Error(err)
366 return
367 }
368
369 firstMsg := []byte("Hello World!")
370 secondMsg := []byte("LeftOver!")
371 msgs := []Message{
372 {
373 Value: firstMsg,
374 },
375 {
376 Value: secondMsg,
377 },
378 }
379 if err := w.WriteMessages(context.Background(), msgs...); err == nil {
380 t.Error("expected error")
381 return
382 } else if err != nil {
383 var e MessageTooLargeError
384 switch {
385 case errors.As(err, &e):
386 if string(e.Message.Value) != string(firstMsg) {
387 t.Errorf("unxpected returned message. Expected: %s, Got %s", firstMsg, e.Message.Value)
388 return
389 }
390 if len(e.Remaining) != 1 {
391 t.Error("expected remaining errors; found none")
392 return
393 }
394 if string(e.Remaining[0].Value) != string(secondMsg) {
395 t.Errorf("unxpected returned message. Expected: %s, Got %s", secondMsg, e.Message.Value)
396 return
397 }
398
399 default:
400 t.Errorf("unexpected error: %s", err)
401 return
402 }
403 }
404}
405
406// readOffset gets the latest offset for the given topic/partition.
407func readOffset(topic string, partition int) (offset int64, err error) {

Callers

nothing calls this directly

Calls 7

createTopicFunction · 0.85
deleteTopicFunction · 0.85
newTestWriterFunction · 0.85
makeTopicFunction · 0.70
CloseMethod · 0.45
WriteMessagesMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected