(t *testing.T)
| 451 | } |
| 452 | |
| 453 | func testWriterBatchBytes(t *testing.T) { |
| 454 | topic := makeTopic() |
| 455 | createTopic(t, topic, 1) |
| 456 | defer deleteTopic(t, topic) |
| 457 | |
| 458 | offset, err := readOffset(topic, 0) |
| 459 | if err != nil { |
| 460 | t.Fatal(err) |
| 461 | } |
| 462 | |
| 463 | w := newTestWriter(WriterConfig{ |
| 464 | Topic: topic, |
| 465 | BatchBytes: 50, |
| 466 | BatchTimeout: math.MaxInt32 * time.Second, |
| 467 | Balancer: &RoundRobin{}, |
| 468 | }) |
| 469 | defer w.Close() |
| 470 | |
| 471 | ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
| 472 | defer cancel() |
| 473 | if err := w.WriteMessages(ctx, []Message{ |
| 474 | {Value: []byte("M0")}, // 25 Bytes |
| 475 | {Value: []byte("M1")}, // 25 Bytes |
| 476 | {Value: []byte("M2")}, // 25 Bytes |
| 477 | {Value: []byte("M3")}, // 25 Bytes |
| 478 | }...); err != nil { |
| 479 | t.Error(err) |
| 480 | return |
| 481 | } |
| 482 | |
| 483 | if w.Stats().Writes != 2 { |
| 484 | t.Error("didn't create expected batches") |
| 485 | return |
| 486 | } |
| 487 | msgs, err := readPartition(topic, 0, offset) |
| 488 | if err != nil { |
| 489 | t.Error("error reading partition", err) |
| 490 | return |
| 491 | } |
| 492 | |
| 493 | if len(msgs) != 4 { |
| 494 | t.Error("bad messages in partition", msgs) |
| 495 | return |
| 496 | } |
| 497 | |
| 498 | for i, m := range msgs { |
| 499 | if string(m.Value) == "M"+strconv.Itoa(i) { |
| 500 | continue |
| 501 | } |
| 502 | t.Error("bad messages in partition", string(m.Value)) |
| 503 | } |
| 504 | } |
| 505 | |
| 506 | func testWriterBatchSize(t *testing.T) { |
| 507 | topic := makeTopic() |
nothing calls this directly
no test coverage detected