(t *testing.T, topic string, startOffset int64, count int)
| 83 | } |
| 84 | |
| 85 | func consumeWithSarama(t *testing.T, topic string, startOffset int64, count int) []string { |
| 86 | t.Helper() |
| 87 | config := NewFunctionalTestConfig() |
| 88 | consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 89 | require.NoError(t, err) |
| 90 | defer consumer.Close() |
| 91 | |
| 92 | partitionConsumer, err := consumer.ConsumePartition(topic, 0, startOffset) |
| 93 | require.NoError(t, err) |
| 94 | defer partitionConsumer.Close() |
| 95 | |
| 96 | var messages []string |
| 97 | ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) |
| 98 | defer cancel() |
| 99 | |
| 100 | for i := 0; i < count; i++ { |
| 101 | select { |
| 102 | case msg := <-partitionConsumer.Messages(): |
| 103 | require.NotNil(t, msg) |
| 104 | messages = append(messages, string(msg.Value)) |
| 105 | case err := <-partitionConsumer.Errors(): |
| 106 | require.NoError(t, err) |
| 107 | case <-ctx.Done(): |
| 108 | require.Fail(t, "timeout waiting for messages") |
| 109 | } |
| 110 | } |
| 111 | return messages |
| 112 | } |
| 113 | |
| 114 | func produceWithSarama(t *testing.T, topic string, codec CompressionCodec, messages []string) { |
| 115 | t.Helper() |
no test coverage detected