MCPcopy
hub / github.com/IBM/sarama / produceWithJava

Function produceWithJava

functional_java_interop_test.go:35–83  ·  view source on GitHub ↗
(t *testing.T, topic string, codec CompressionCodec, messages []string)

Source from the content-addressed store, hash-verified

33}
34
35func produceWithJava(t *testing.T, topic string, codec CompressionCodec, messages []string) {
36 t.Helper()
37 producerPath := fmt.Sprintf("/opt/kafka-%s/bin/kafka-console-producer.sh", FunctionalTestEnv.KafkaVersion)
38 args := append(
39 []string{"compose", "exec", "-T", brokerContainer, producerPath},
40 javaProducerArgs(topic, codec)...,
41 )
42 cmd := exec.Command("docker", args...)
43
44 stdin, err := cmd.StdinPipe()
45 require.NoError(t, err)
46
47 stderr, err := cmd.StderrPipe()
48 require.NoError(t, err)
49
50 var stderrOutput strings.Builder
51 var wg sync.WaitGroup
52 wg.Add(1)
53 go func() {
54 defer wg.Done()
55 s := bufio.NewScanner(stderr)
56 for s.Scan() {
57 stderrOutput.WriteString(s.Text() + "\n")
58 }
59 }()
60
61 require.NoError(t, cmd.Start())
62
63 for _, msg := range messages {
64 _, err := fmt.Fprintln(stdin, msg)
65 if err != nil {
66 stdin.Close()
67 waitErr := cmd.Wait()
68 wg.Wait()
69 if waitErr != nil {
70 err = fmt.Errorf("failed to write message: %w; Java producer failed: %w; stderr: %s", err, waitErr, stderrOutput.String())
71 }
72 }
73 require.NoError(t, err)
74 }
75 stdin.Close()
76
77 err = cmd.Wait()
78 wg.Wait()
79 if err != nil {
80 t.Logf("Java producer stderr: %s", stderrOutput.String())
81 require.NoError(t, err, "Java producer failed")
82 }
83}
84
85func consumeWithSarama(t *testing.T, topic string, startOffset int64, count int) []string {
86 t.Helper()

Callers 1

Calls 7

javaProducerArgsFunction · 0.85
HelperMethod · 0.80
DoneMethod · 0.65
CloseMethod · 0.65
ErrorfMethod · 0.65
AddMethod · 0.45
StringMethod · 0.45

Tested by

no test coverage detected