(ctx context.Context, env *testEnvironment)
| 333 | } |
| 334 | |
| 335 | func prepareTestTopics(ctx context.Context, env *testEnvironment) error { |
| 336 | Logger.Println("creating test topics") |
| 337 | var testTopicNames []string |
| 338 | for topic := range testTopicDetails { |
| 339 | testTopicNames = append(testTopicNames, topic) |
| 340 | } |
| 341 | |
| 342 | Logger.Println("Creating topics") |
| 343 | config := NewFunctionalTestConfig() |
| 344 | config.Metadata.Retry.Max = 5 |
| 345 | config.Metadata.Retry.Backoff = 10 * time.Second |
| 346 | config.ClientID = "sarama-prepareTestTopics" |
| 347 | |
| 348 | client, err := NewClient(env.KafkaBrokerAddrs, config) |
| 349 | if err != nil { |
| 350 | return fmt.Errorf("failed to connect to kafka: %w", err) |
| 351 | } |
| 352 | defer client.Close() |
| 353 | |
| 354 | controller, err := client.Controller() |
| 355 | if err != nil { |
| 356 | return fmt.Errorf("failed to connect to kafka controller: %w", err) |
| 357 | } |
| 358 | defer controller.Close() |
| 359 | |
| 360 | // Start by deleting the test topics (if they already exist) |
| 361 | { |
| 362 | request := NewDeleteTopicsRequest(config.Version, testTopicNames, time.Minute) |
| 363 | deleteRes, err := controller.DeleteTopics(request) |
| 364 | if err != nil { |
| 365 | return fmt.Errorf("failed to delete test topics: %w", err) |
| 366 | } |
| 367 | for topic, topicErr := range deleteRes.TopicErrorCodes { |
| 368 | if !isTopicNotExistsErrorOrOk(topicErr) { |
| 369 | return fmt.Errorf("failed to delete topic %s: %w", topic, topicErr) |
| 370 | } |
| 371 | } |
| 372 | } |
| 373 | |
| 374 | // wait for the topics to _actually_ be gone - the delete is not guaranteed to be processed |
| 375 | // synchronously |
| 376 | { |
| 377 | var topicsOk bool |
| 378 | request := NewMetadataRequest(config.Version, testTopicNames) |
| 379 | for i := 0; i < 600 && !topicsOk; i++ { |
| 380 | time.Sleep(100 * time.Millisecond) |
| 381 | md, err := controller.GetMetadata(request) |
| 382 | if err != nil { |
| 383 | return fmt.Errorf("failed to get metadata for test topics: %w", err) |
| 384 | } |
| 385 | |
| 386 | if len(md.Topics) == len(testTopicNames) { |
| 387 | topicsOk = true |
| 388 | for _, topicsMd := range md.Topics { |
| 389 | if !isTopicNotExistsErrorOrOk(topicsMd.Err) { |
| 390 | topicsOk = false |
| 391 | } |
| 392 | } |
no test coverage detected