setupToxiProxies will configure the toxiproxy proxies with routes for the kafka brokers if they don't already exist
(env *testEnvironment, endpoint string)
| 118 | // setupToxiProxies will configure the toxiproxy proxies with routes for the |
| 119 | // kafka brokers if they don't already exist |
| 120 | func setupToxiProxies(env *testEnvironment, endpoint string) error { |
| 121 | env.ToxiproxyClient = toxiproxy.NewClient(endpoint) |
| 122 | env.Proxies = map[string]*toxiproxy.Proxy{} |
| 123 | env.KafkaBrokerAddrs = nil |
| 124 | for i := 1; i <= 5; i++ { |
| 125 | proxyName := fmt.Sprintf("kafka%d", i) |
| 126 | proxy, err := env.ToxiproxyClient.Proxy(proxyName) |
| 127 | if err != nil { |
| 128 | proxy, err = env.ToxiproxyClient.CreateProxy( |
| 129 | proxyName, |
| 130 | fmt.Sprintf("0.0.0.0:%d", 29090+i), |
| 131 | fmt.Sprintf("kafka-%d:%d", i, 29090+i), |
| 132 | ) |
| 133 | if err != nil { |
| 134 | return fmt.Errorf("failed to create toxiproxy: %w", err) |
| 135 | } |
| 136 | } |
| 137 | env.Proxies[proxyName] = proxy |
| 138 | env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090+i)) |
| 139 | } |
| 140 | return nil |
| 141 | } |
| 142 | |
| 143 | func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) error { |
| 144 | const expectedBrokers = 5 |
no test coverage detected