MCPcopy
hub / github.com/IBM/sarama / consumeWithSarama

Function consumeWithSarama

functional_java_interop_test.go:85–112  ·  view source on GitHub ↗
(t *testing.T, topic string, startOffset int64, count int)

Source from the content-addressed store, hash-verified

83}
84
85func consumeWithSarama(t *testing.T, topic string, startOffset int64, count int) []string {
86 t.Helper()
87 config := NewFunctionalTestConfig()
88 consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
89 require.NoError(t, err)
90 defer consumer.Close()
91
92 partitionConsumer, err := consumer.ConsumePartition(topic, 0, startOffset)
93 require.NoError(t, err)
94 defer partitionConsumer.Close()
95
96 var messages []string
97 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
98 defer cancel()
99
100 for i := 0; i < count; i++ {
101 select {
102 case msg := <-partitionConsumer.Messages():
103 require.NotNil(t, msg)
104 messages = append(messages, string(msg.Value))
105 case err := <-partitionConsumer.Errors():
106 require.NoError(t, err)
107 case <-ctx.Done():
108 require.Fail(t, "timeout waiting for messages")
109 }
110 }
111 return messages
112}
113
114func produceWithSarama(t *testing.T, topic string, codec CompressionCodec, messages []string) {
115 t.Helper()

Callers 1

Calls 9

CloseMethod · 0.95
ConsumePartitionMethod · 0.95
NewFunctionalTestConfigFunction · 0.85
HelperMethod · 0.80
NewConsumerFunction · 0.70
CloseMethod · 0.65
MessagesMethod · 0.65
ErrorsMethod · 0.65
DoneMethod · 0.65

Tested by

no test coverage detected