(t *testing.T)
| 504 | } |
| 505 | |
| 506 | func testWriterBatchSize(t *testing.T) { |
| 507 | topic := makeTopic() |
| 508 | createTopic(t, topic, 1) |
| 509 | defer deleteTopic(t, topic) |
| 510 | |
| 511 | offset, err := readOffset(topic, 0) |
| 512 | if err != nil { |
| 513 | t.Fatal(err) |
| 514 | } |
| 515 | |
| 516 | w := newTestWriter(WriterConfig{ |
| 517 | Topic: topic, |
| 518 | BatchSize: 2, |
| 519 | BatchTimeout: math.MaxInt32 * time.Second, |
| 520 | Balancer: &RoundRobin{}, |
| 521 | }) |
| 522 | defer w.Close() |
| 523 | |
| 524 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 525 | defer cancel() |
| 526 | if err := w.WriteMessages(ctx, []Message{ |
| 527 | {Value: []byte("Hi")}, // 24 Bytes |
| 528 | {Value: []byte("By")}, // 24 Bytes |
| 529 | }...); err != nil { |
| 530 | t.Error(err) |
| 531 | return |
| 532 | } |
| 533 | |
| 534 | if w.Stats().Writes > 1 { |
| 535 | t.Error("didn't batch messages") |
| 536 | return |
| 537 | } |
| 538 | msgs, err := readPartition(topic, 0, offset) |
| 539 | if err != nil { |
| 540 | t.Error("error reading partition", err) |
| 541 | return |
| 542 | } |
| 543 | |
| 544 | if len(msgs) != 2 { |
| 545 | t.Error("bad messages in partition", msgs) |
| 546 | return |
| 547 | } |
| 548 | |
| 549 | for _, m := range msgs { |
| 550 | if string(m.Value) == "Hi" || string(m.Value) == "By" { |
| 551 | continue |
| 552 | } |
| 553 | t.Error("bad messages in partition", msgs) |
| 554 | } |
| 555 | } |
| 556 | |
| 557 | func testWriterSmallBatchBytes(t *testing.T) { |
| 558 | topic := makeTopic() |
nothing calls this directly
no test coverage detected