MCPcopy
hub / github.com/IBM/sarama / prepareDockerTestEnvironment

Function prepareDockerTestEnvironment

functional_test.go:143–262  ·  view source on GitHub ↗
(ctx context.Context, env *testEnvironment)

Source from the content-addressed store, hash-verified

141}
142
143func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) error {
144 const expectedBrokers = 5
145
146 Logger.Println("bringing up docker-based test environment")
147
148 // Always (try to) tear down first.
149 if err := tearDownDockerTestEnvironment(ctx, env); err != nil {
150 return fmt.Errorf("failed to tear down existing env: %w", err)
151 }
152
153 if version, ok := os.LookupEnv("KAFKA_VERSION"); ok {
154 env.KafkaVersion = version
155 } else {
156 env.KafkaVersion = "3.5.1"
157 }
158 // docker compose v2.17.0 or newer required for `--wait-timeout` support
159 args := []string{"compose", "up", "-d", "--quiet-pull", "--timestamps", "--wait", "--wait-timeout", "600"}
160 v, _ := ParseKafkaVersion(env.KafkaVersion)
161 // use zookeeper for kafka < 4
162 if !v.IsAtLeast(V4_0_0_0) {
163 args = append([]string{"compose", "--profile", "zookeeper"}, args[1:]...)
164 }
165 c := exec.Command("docker", args...)
166 c.Stdout = os.Stdout
167 c.Stderr = os.Stderr
168 c.Env = append(os.Environ(), fmt.Sprintf("KAFKA_VERSION=%s", env.KafkaVersion))
169 err := c.Run()
170 if err != nil {
171 return fmt.Errorf("failed to run docker compose to start test environment: %w", err)
172 }
173
174 if err := setupToxiProxies(env, "http://localhost:8474"); err != nil {
175 return fmt.Errorf("failed to setup toxiproxies: %w", err)
176 }
177
178 dialCheck := func(addr string, timeout time.Duration) error {
179 conn, err := net.DialTimeout("tcp", addr, timeout)
180 if err != nil {
181 return err
182 }
183 return conn.Close()
184 }
185
186 config := NewFunctionalTestConfig()
187 config.Net.DialTimeout = 1 * time.Second
188 config.Net.ReadTimeout = 1 * time.Second
189 config.Net.WriteTimeout = 1 * time.Second
190 config.ClientID = "sarama-tests"
191
192 // wait for the kafka brokers to come up
193 allBrokersUp := false
194
195 Logger.Printf("waiting for kafka %s brokers to come up...\n", env.KafkaVersion)
196
197mainLoop:
198 for i := 0; i < 90 && !allBrokersUp; i++ {
199 if i > 0 {
200 Logger.Printf("still waiting for kafka %s brokers to come up...\n", env.KafkaVersion)

Callers 1

testMainFunction · 0.85

Calls 15

CloseMethod · 0.95
RefreshMetadataMethod · 0.95
BrokersMethod · 0.95
ConfigMethod · 0.95
ParseKafkaVersionFunction · 0.85
setupToxiProxiesFunction · 0.85
NewFunctionalTestConfigFunction · 0.85
IsAtLeastMethod · 0.80
RunMethod · 0.80
OpenMethod · 0.80
IsMethod · 0.80

Tested by

no test coverage detected