Block until topic exists.
(ctx context.Context, t *testing.T, topic string)
| 327 | |
| 328 | // Block until topic exists. |
| 329 | func waitForTopic(ctx context.Context, t *testing.T, topic string) { |
| 330 | t.Helper() |
| 331 | |
| 332 | for { |
| 333 | select { |
| 334 | case <-ctx.Done(): |
| 335 | t.Fatalf("reached deadline before verifying topic existence") |
| 336 | default: |
| 337 | } |
| 338 | |
| 339 | cli := &Client{ |
| 340 | Addr: TCP("localhost:9092"), |
| 341 | Timeout: 5 * time.Second, |
| 342 | } |
| 343 | |
| 344 | response, err := cli.Metadata(ctx, &MetadataRequest{ |
| 345 | Addr: cli.Addr, |
| 346 | Topics: []string{topic}, |
| 347 | }) |
| 348 | if err != nil { |
| 349 | t.Fatalf("waitForTopic: error listing topics: %s", err.Error()) |
| 350 | } |
| 351 | |
| 352 | // Find a topic which has at least 1 partition in the metadata response |
| 353 | for _, top := range response.Topics { |
| 354 | if top.Name != topic { |
| 355 | continue |
| 356 | } |
| 357 | |
| 358 | numPartitions := len(top.Partitions) |
| 359 | t.Logf("waitForTopic: found topic %q with %d partitions", |
| 360 | topic, numPartitions) |
| 361 | |
| 362 | if numPartitions > 0 { |
| 363 | return |
| 364 | } |
| 365 | } |
| 366 | |
| 367 | t.Logf("retrying after 100ms") |
| 368 | time.Sleep(100 * time.Millisecond) |
| 369 | continue |
| 370 | } |
| 371 | } |
| 372 | |
| 373 | func deleteTopic(t *testing.T, topic ...string) { |
| 374 | t.Helper() |
no test coverage detected