consumeKeyedFromPartition consumes up to count messages from a specific partition, returning them as key+value pairs.
(t *testing.T, topic string, partition int32, startOffset int64, count int)
| 366 | // consumeKeyedFromPartition consumes up to count messages from a specific partition, |
| 367 | // returning them as key+value pairs. |
| 368 | func consumeKeyedFromPartition(t *testing.T, topic string, partition int32, startOffset int64, count int) map[string]string { |
| 369 | t.Helper() |
| 370 | config := NewFunctionalTestConfig() |
| 371 | consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 372 | require.NoError(t, err) |
| 373 | defer consumer.Close() |
| 374 | |
| 375 | pc, err := consumer.ConsumePartition(topic, partition, startOffset) |
| 376 | require.NoError(t, err) |
| 377 | defer pc.Close() |
| 378 | |
| 379 | result := make(map[string]string, count) |
| 380 | ctx, cancel := context.WithTimeout(t.Context(), 15*time.Second) |
| 381 | defer cancel() |
| 382 | |
| 383 | for len(result) < count { |
| 384 | select { |
| 385 | case msg := <-pc.Messages(): |
| 386 | require.NotNil(t, msg) |
| 387 | result[string(msg.Key)] = string(msg.Value) |
| 388 | case err := <-pc.Errors(): |
| 389 | require.NoError(t, err) |
| 390 | case <-ctx.Done(): |
| 391 | return result |
| 392 | } |
| 393 | } |
| 394 | return result |
| 395 | } |
| 396 | |
| 397 | // TestJavaMurmur2PartitionerInterop verifies that Sarama's NewMurmur2Partitioner |
| 398 | // routes keyed messages to the same partitions as the Apache Kafka Java client's |
no test coverage detected