()
| 450 | } |
| 451 | |
| 452 | func newLocalClientAndTopic() (*kafka.Client, string, func()) { |
| 453 | topic := makeTopic() |
| 454 | client, shutdown := newLocalClient() |
| 455 | |
| 456 | _, err := client.CreateTopics(context.Background(), &kafka.CreateTopicsRequest{ |
| 457 | Topics: []kafka.TopicConfig{{ |
| 458 | Topic: topic, |
| 459 | NumPartitions: 1, |
| 460 | ReplicationFactor: 1, |
| 461 | }}, |
| 462 | }) |
| 463 | if err != nil { |
| 464 | shutdown() |
| 465 | panic(err) |
| 466 | } |
| 467 | |
| 468 | // Topic creation seems to be asynchronous. Metadata for the topic partition |
| 469 | // layout in the cluster is available in the controller before being synced |
| 470 | // with the other brokers, which causes "Error:[3] Unknown Topic Or Partition" |
| 471 | // when sending requests to the partition leaders. |
| 472 | for i := 0; i < 20; i++ { |
| 473 | r, err := client.Fetch(context.Background(), &kafka.FetchRequest{ |
| 474 | Topic: topic, |
| 475 | Partition: 0, |
| 476 | Offset: 0, |
| 477 | }) |
| 478 | if err == nil && r.Error == nil { |
| 479 | break |
| 480 | } |
| 481 | time.Sleep(100 * time.Millisecond) |
| 482 | } |
| 483 | |
| 484 | return client, topic, func() { |
| 485 | client.DeleteTopics(context.Background(), &kafka.DeleteTopicsRequest{ |
| 486 | Topics: []string{topic}, |
| 487 | }) |
| 488 | shutdown() |
| 489 | } |
| 490 | } |
| 491 | |
| 492 | func newLocalClient() (*kafka.Client, func()) { |
| 493 | return newClient(kafka.TCP("127.0.0.1:9092")) |
no test coverage detected