MCPcopy
hub / github.com/segmentio/kafka-go / waitForTopic

Function waitForTopic

reader_test.go:329–371  ·  view source on GitHub ↗

Block until topic exists.

(ctx context.Context, t *testing.T, topic string)

Source from the content-addressed store, hash-verified

327
328// Block until topic exists.
329func waitForTopic(ctx context.Context, t *testing.T, topic string) {
330 t.Helper()
331
332 for {
333 select {
334 case <-ctx.Done():
335 t.Fatalf("reached deadline before verifying topic existence")
336 default:
337 }
338
339 cli := &Client{
340 Addr: TCP("localhost:9092"),
341 Timeout: 5 * time.Second,
342 }
343
344 response, err := cli.Metadata(ctx, &MetadataRequest{
345 Addr: cli.Addr,
346 Topics: []string{topic},
347 })
348 if err != nil {
349 t.Fatalf("waitForTopic: error listing topics: %s", err.Error())
350 }
351
352 // Find a topic which has at least 1 partition in the metadata response
353 for _, top := range response.Topics {
354 if top.Name != topic {
355 continue
356 }
357
358 numPartitions := len(top.Partitions)
359 t.Logf("waitForTopic: found topic %q with %d partitions",
360 topic, numPartitions)
361
362 if numPartitions > 0 {
363 return
364 }
365 }
366
367 t.Logf("retrying after 100ms")
368 time.Sleep(100 * time.Millisecond)
369 continue
370 }
371}
372
373func deleteTopic(t *testing.T, topic ...string) {
374 t.Helper()

Callers 4

createTopicFunction · 0.85

Calls 4

MetadataMethod · 0.95
TCPFunction · 0.85
DoneMethod · 0.80
ErrorMethod · 0.45

Tested by

no test coverage detected