(t *testing.T, topic string, codec CompressionCodec, messages []string)
| 33 | } |
| 34 | |
| 35 | func produceWithJava(t *testing.T, topic string, codec CompressionCodec, messages []string) { |
| 36 | t.Helper() |
| 37 | producerPath := fmt.Sprintf("/opt/kafka-%s/bin/kafka-console-producer.sh", FunctionalTestEnv.KafkaVersion) |
| 38 | args := append( |
| 39 | []string{"compose", "exec", "-T", brokerContainer, producerPath}, |
| 40 | javaProducerArgs(topic, codec)..., |
| 41 | ) |
| 42 | cmd := exec.Command("docker", args...) |
| 43 | |
| 44 | stdin, err := cmd.StdinPipe() |
| 45 | require.NoError(t, err) |
| 46 | |
| 47 | stderr, err := cmd.StderrPipe() |
| 48 | require.NoError(t, err) |
| 49 | |
| 50 | var stderrOutput strings.Builder |
| 51 | var wg sync.WaitGroup |
| 52 | wg.Add(1) |
| 53 | go func() { |
| 54 | defer wg.Done() |
| 55 | s := bufio.NewScanner(stderr) |
| 56 | for s.Scan() { |
| 57 | stderrOutput.WriteString(s.Text() + "\n") |
| 58 | } |
| 59 | }() |
| 60 | |
| 61 | require.NoError(t, cmd.Start()) |
| 62 | |
| 63 | for _, msg := range messages { |
| 64 | _, err := fmt.Fprintln(stdin, msg) |
| 65 | if err != nil { |
| 66 | stdin.Close() |
| 67 | waitErr := cmd.Wait() |
| 68 | wg.Wait() |
| 69 | if waitErr != nil { |
| 70 | err = fmt.Errorf("failed to write message: %w; Java producer failed: %w; stderr: %s", err, waitErr, stderrOutput.String()) |
| 71 | } |
| 72 | } |
| 73 | require.NoError(t, err) |
| 74 | } |
| 75 | stdin.Close() |
| 76 | |
| 77 | err = cmd.Wait() |
| 78 | wg.Wait() |
| 79 | if err != nil { |
| 80 | t.Logf("Java producer stderr: %s", stderrOutput.String()) |
| 81 | require.NoError(t, err, "Java producer failed") |
| 82 | } |
| 83 | } |
| 84 | |
| 85 | func consumeWithSarama(t *testing.T, topic string, startOffset int64, count int) []string { |
| 86 | t.Helper() |
no test coverage detected