(t *testing.T, topic string, partitions int)
| 277 | } |
| 278 | |
| 279 | func createTopic(t *testing.T, topic string, partitions int) { |
| 280 | t.Helper() |
| 281 | |
| 282 | t.Logf("createTopic(%s, %d)", topic, partitions) |
| 283 | |
| 284 | conn, err := Dial("tcp", "localhost:9092") |
| 285 | if err != nil { |
| 286 | err = fmt.Errorf("createTopic, Dial: %w", err) |
| 287 | t.Fatal(err) |
| 288 | } |
| 289 | defer conn.Close() |
| 290 | |
| 291 | controller, err := conn.Controller() |
| 292 | if err != nil { |
| 293 | err = fmt.Errorf("createTopic, conn.Controller: %w", err) |
| 294 | t.Fatal(err) |
| 295 | } |
| 296 | |
| 297 | conn, err = Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) |
| 298 | if err != nil { |
| 299 | t.Fatal(err) |
| 300 | } |
| 301 | |
| 302 | conn.SetDeadline(time.Now().Add(10 * time.Second)) |
| 303 | |
| 304 | _, err = conn.createTopics(createTopicsRequest{ |
| 305 | Topics: []createTopicsRequestV0Topic{ |
| 306 | { |
| 307 | Topic: topic, |
| 308 | NumPartitions: int32(partitions), |
| 309 | ReplicationFactor: 1, |
| 310 | }, |
| 311 | }, |
| 312 | Timeout: milliseconds(5 * time.Second), |
| 313 | }) |
| 314 | if err != nil { |
| 315 | if !errors.Is(err, TopicAlreadyExists) { |
| 316 | err = fmt.Errorf("createTopic, conn.createTopics: %w", err) |
| 317 | t.Error(err) |
| 318 | t.FailNow() |
| 319 | } |
| 320 | } |
| 321 | |
| 322 | ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) |
| 323 | defer cancel() |
| 324 | |
| 325 | waitForTopic(ctx, t, topic) |
| 326 | } |
| 327 | |
| 328 | // Block until topic exists. |
| 329 | func waitForTopic(ctx context.Context, t *testing.T, topic string) { |
no test coverage detected