MCPcopy
hub / github.com/segmentio/kafka-go / clientCreateTopic

Function clientCreateTopic

topics/list_topics_test.go:62–94  ·  view source on GitHub ↗
(client *kafka.Client, topic string, partitions int)

Source from the content-addressed store, hash-verified

60}
61
62func clientCreateTopic(client *kafka.Client, topic string, partitions int) error {
63 _, err := client.CreateTopics(context.Background(), &kafka.CreateTopicsRequest{
64 Topics: []kafka.TopicConfig{{
65 Topic: topic,
66 NumPartitions: partitions,
67 ReplicationFactor: 1,
68 }},
69 })
70 if err != nil {
71 return err
72 }
73
74 // Topic creation seems to be asynchronous. Metadata for the topic partition
75 // layout in the cluster is available in the controller before being synced
76 // with the other brokers, which causes "Error:[3] Unknown Topic Or Partition"
77 // when sending requests to the partition leaders.
78 //
79 // This loop will wait up to 2 seconds polling the cluster until no errors
80 // are returned.
81 for i := 0; i < 20; i++ {
82 r, err := client.Fetch(context.Background(), &kafka.FetchRequest{
83 Topic: topic,
84 Partition: 0,
85 Offset: 0,
86 })
87 if err == nil && r.Error == nil {
88 break
89 }
90 time.Sleep(100 * time.Millisecond)
91 }
92
93 return nil
94}
95
96func newLocalClient() (*kafka.Client, func()) {
97 return newClient(kafka.TCP("localhost"))

Callers 2

TestListReFunction · 0.70
newLocalClientWithTopicFunction · 0.70

Calls 2

FetchMethod · 0.80
CreateTopicsMethod · 0.45

Tested by

no test coverage detected