TestReadTruncatedMessages uses a configuration designed to get the Broker to return truncated messages. It exercises the case where an earlier bug caused reading to time out by attempting to read beyond the current response. This test is not perfect, but it is pretty reliable about reproducing the
(t *testing.T)
| 466 | // NOTE : it currently only succeeds against kafka 0.10.1.0, so it will be |
| 467 | // skipped. It's here so that it can be manually run. |
| 468 | func TestReadTruncatedMessages(t *testing.T) { |
| 469 | // todo : it would be great to get it to work against 0.11.0.0 so we could |
| 470 | // include it in CI unit tests. |
| 471 | t.Skip() |
| 472 | |
| 473 | ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) |
| 474 | defer cancel() |
| 475 | r := NewReader(ReaderConfig{ |
| 476 | Brokers: []string{"localhost:9092"}, |
| 477 | Topic: makeTopic(), |
| 478 | MinBytes: 1, |
| 479 | MaxBytes: 100, |
| 480 | MaxWait: 100 * time.Millisecond, |
| 481 | }) |
| 482 | defer r.Close() |
| 483 | n := 500 |
| 484 | prepareReader(t, ctx, r, makeTestSequence(n)...) |
| 485 | for i := 0; i < n; i++ { |
| 486 | if _, err := r.ReadMessage(ctx); err != nil { |
| 487 | t.Fatal(err) |
| 488 | } |
| 489 | } |
| 490 | } |
| 491 | |
| 492 | func makeTestSequence(n int) []Message { |
| 493 | base := time.Now() |
nothing calls this directly
no test coverage detected