(ctx context.Context, env *testEnvironment)
| 141 | } |
| 142 | |
| 143 | func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) error { |
| 144 | const expectedBrokers = 5 |
| 145 | |
| 146 | Logger.Println("bringing up docker-based test environment") |
| 147 | |
| 148 | // Always (try to) tear down first. |
| 149 | if err := tearDownDockerTestEnvironment(ctx, env); err != nil { |
| 150 | return fmt.Errorf("failed to tear down existing env: %w", err) |
| 151 | } |
| 152 | |
| 153 | if version, ok := os.LookupEnv("KAFKA_VERSION"); ok { |
| 154 | env.KafkaVersion = version |
| 155 | } else { |
| 156 | env.KafkaVersion = "3.5.1" |
| 157 | } |
| 158 | // docker compose v2.17.0 or newer required for `--wait-timeout` support |
| 159 | args := []string{"compose", "up", "-d", "--quiet-pull", "--timestamps", "--wait", "--wait-timeout", "600"} |
| 160 | v, _ := ParseKafkaVersion(env.KafkaVersion) |
| 161 | // use zookeeper for kafka < 4 |
| 162 | if !v.IsAtLeast(V4_0_0_0) { |
| 163 | args = append([]string{"compose", "--profile", "zookeeper"}, args[1:]...) |
| 164 | } |
| 165 | c := exec.Command("docker", args...) |
| 166 | c.Stdout = os.Stdout |
| 167 | c.Stderr = os.Stderr |
| 168 | c.Env = append(os.Environ(), fmt.Sprintf("KAFKA_VERSION=%s", env.KafkaVersion)) |
| 169 | err := c.Run() |
| 170 | if err != nil { |
| 171 | return fmt.Errorf("failed to run docker compose to start test environment: %w", err) |
| 172 | } |
| 173 | |
| 174 | if err := setupToxiProxies(env, "http://localhost:8474"); err != nil { |
| 175 | return fmt.Errorf("failed to setup toxiproxies: %w", err) |
| 176 | } |
| 177 | |
| 178 | dialCheck := func(addr string, timeout time.Duration) error { |
| 179 | conn, err := net.DialTimeout("tcp", addr, timeout) |
| 180 | if err != nil { |
| 181 | return err |
| 182 | } |
| 183 | return conn.Close() |
| 184 | } |
| 185 | |
| 186 | config := NewFunctionalTestConfig() |
| 187 | config.Net.DialTimeout = 1 * time.Second |
| 188 | config.Net.ReadTimeout = 1 * time.Second |
| 189 | config.Net.WriteTimeout = 1 * time.Second |
| 190 | config.ClientID = "sarama-tests" |
| 191 | |
| 192 | // wait for the kafka brokers to come up |
| 193 | allBrokersUp := false |
| 194 | |
| 195 | Logger.Printf("waiting for kafka %s brokers to come up...\n", env.KafkaVersion) |
| 196 | |
| 197 | mainLoop: |
| 198 | for i := 0; i < 90 && !allBrokersUp; i++ { |
| 199 | if i > 0 { |
| 200 | Logger.Printf("still waiting for kafka %s brokers to come up...\n", env.KafkaVersion) |
no test coverage detected