MCPcopy
hub / github.com/IBM/sarama / prepareTestTopics

Function prepareTestTopics

functional_test.go:335–441  ·  view source on GitHub ↗
(ctx context.Context, env *testEnvironment)

Source from the content-addressed store, hash-verified

333}
334
335func prepareTestTopics(ctx context.Context, env *testEnvironment) error {
336 Logger.Println("creating test topics")
337 var testTopicNames []string
338 for topic := range testTopicDetails {
339 testTopicNames = append(testTopicNames, topic)
340 }
341
342 Logger.Println("Creating topics")
343 config := NewFunctionalTestConfig()
344 config.Metadata.Retry.Max = 5
345 config.Metadata.Retry.Backoff = 10 * time.Second
346 config.ClientID = "sarama-prepareTestTopics"
347
348 client, err := NewClient(env.KafkaBrokerAddrs, config)
349 if err != nil {
350 return fmt.Errorf("failed to connect to kafka: %w", err)
351 }
352 defer client.Close()
353
354 controller, err := client.Controller()
355 if err != nil {
356 return fmt.Errorf("failed to connect to kafka controller: %w", err)
357 }
358 defer controller.Close()
359
360 // Start by deleting the test topics (if they already exist)
361 {
362 request := NewDeleteTopicsRequest(config.Version, testTopicNames, time.Minute)
363 deleteRes, err := controller.DeleteTopics(request)
364 if err != nil {
365 return fmt.Errorf("failed to delete test topics: %w", err)
366 }
367 for topic, topicErr := range deleteRes.TopicErrorCodes {
368 if !isTopicNotExistsErrorOrOk(topicErr) {
369 return fmt.Errorf("failed to delete topic %s: %w", topic, topicErr)
370 }
371 }
372 }
373
374 // wait for the topics to _actually_ be gone - the delete is not guaranteed to be processed
375 // synchronously
376 {
377 var topicsOk bool
378 request := NewMetadataRequest(config.Version, testTopicNames)
379 for i := 0; i < 600 && !topicsOk; i++ {
380 time.Sleep(100 * time.Millisecond)
381 md, err := controller.GetMetadata(request)
382 if err != nil {
383 return fmt.Errorf("failed to get metadata for test topics: %w", err)
384 }
385
386 if len(md.Topics) == len(testTopicNames) {
387 topicsOk = true
388 for _, topicsMd := range md.Topics {
389 if !isTopicNotExistsErrorOrOk(topicsMd.Err) {
390 topicsOk = false
391 }
392 }

Callers 1

testMainFunction · 0.85

Calls 14

CloseMethod · 0.95
ControllerMethod · 0.95
NewFunctionalTestConfigFunction · 0.85
NewDeleteTopicsRequestFunction · 0.85
NewMetadataRequestFunction · 0.85
NewCreateTopicsRequestFunction · 0.85
isTopicExistsErrorOrOkFunction · 0.85
DeleteTopicsMethod · 0.80
GetMetadataMethod · 0.80
CreateTopicsMethod · 0.80
NewClientFunction · 0.70

Tested by

no test coverage detected