MCPcopy
hub / github.com/IBM/sarama / consumeKeyedFromPartition

Function consumeKeyedFromPartition

functional_java_interop_test.go:368–395  ·  view source on GitHub ↗

consumeKeyedFromPartition consumes up to count messages from a specific partition, returning them as key+value pairs.

(t *testing.T, topic string, partition int32, startOffset int64, count int)

Source from the content-addressed store, hash-verified

366// consumeKeyedFromPartition consumes up to count messages from a specific partition,
367// returning them as key+value pairs.
368func consumeKeyedFromPartition(t *testing.T, topic string, partition int32, startOffset int64, count int) map[string]string {
369 t.Helper()
370 config := NewFunctionalTestConfig()
371 consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
372 require.NoError(t, err)
373 defer consumer.Close()
374
375 pc, err := consumer.ConsumePartition(topic, partition, startOffset)
376 require.NoError(t, err)
377 defer pc.Close()
378
379 result := make(map[string]string, count)
380 ctx, cancel := context.WithTimeout(t.Context(), 15*time.Second)
381 defer cancel()
382
383 for len(result) < count {
384 select {
385 case msg := <-pc.Messages():
386 require.NotNil(t, msg)
387 result[string(msg.Key)] = string(msg.Value)
388 case err := <-pc.Errors():
389 require.NoError(t, err)
390 case <-ctx.Done():
391 return result
392 }
393 }
394 return result
395}
396
397// TestJavaMurmur2PartitionerInterop verifies that Sarama's NewMurmur2Partitioner
398// routes keyed messages to the same partitions as the Apache Kafka Java client's

Callers 1

Calls 10

CloseMethod · 0.95
ConsumePartitionMethod · 0.95
NewFunctionalTestConfigFunction · 0.85
HelperMethod · 0.80
NewConsumerFunction · 0.70
CloseMethod · 0.65
ContextMethod · 0.65
MessagesMethod · 0.65
ErrorsMethod · 0.65
DoneMethod · 0.65

Tested by

no test coverage detected