(t *testing.T)
| 396 | } |
| 397 | |
| 398 | func TestReaderOnNonZeroPartition(t *testing.T) { |
| 399 | tests := []struct { |
| 400 | scenario string |
| 401 | function func(*testing.T, context.Context, *Reader) |
| 402 | }{ |
| 403 | { |
| 404 | scenario: "topic and partition should now be included in header", |
| 405 | function: testReaderSetsTopicAndPartition, |
| 406 | }, |
| 407 | } |
| 408 | |
| 409 | for _, test := range tests { |
| 410 | testFunc := test.function |
| 411 | t.Run(test.scenario, func(t *testing.T) { |
| 412 | t.Parallel() |
| 413 | |
| 414 | topic := makeTopic() |
| 415 | createTopic(t, topic, 2) |
| 416 | defer deleteTopic(t, topic) |
| 417 | |
| 418 | ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) |
| 419 | defer cancel() |
| 420 | |
| 421 | r := NewReader(ReaderConfig{ |
| 422 | Brokers: []string{"localhost:9092"}, |
| 423 | Topic: topic, |
| 424 | Partition: 1, |
| 425 | MinBytes: 1, |
| 426 | MaxBytes: 10e6, |
| 427 | MaxWait: 100 * time.Millisecond, |
| 428 | }) |
| 429 | defer r.Close() |
| 430 | testFunc(t, ctx, r) |
| 431 | }) |
| 432 | } |
| 433 | } |
| 434 | |
| 435 | func testReaderSetsTopicAndPartition(t *testing.T, ctx context.Context, r *Reader) { |
| 436 | const N = 3 |
nothing calls this directly
no test coverage detected