MCPcopy
hub / github.com/segmentio/kafka-go / clientCreateTopic

Function clientCreateTopic

client_test.go:37–69  ·  view source on GitHub ↗
(client *Client, topic string, partitions int)

Source from the content-addressed store, hash-verified

35}
36
37func clientCreateTopic(client *Client, topic string, partitions int) error {
38 _, err := client.CreateTopics(context.Background(), &CreateTopicsRequest{
39 Topics: []TopicConfig{{
40 Topic: topic,
41 NumPartitions: partitions,
42 ReplicationFactor: 1,
43 }},
44 })
45 if err != nil {
46 return err
47 }
48
49 // Topic creation seems to be asynchronous. Metadata for the topic partition
50 // layout in the cluster is available in the controller before being synced
51 // with the other brokers, which causes "Error:[3] Unknown Topic Or Partition"
52 // when sending requests to the partition leaders.
53 //
54 // This loop will wait up to 2 seconds polling the cluster until no errors
55 // are returned.
56 for i := 0; i < 20; i++ {
57 r, err := client.Fetch(context.Background(), &FetchRequest{
58 Topic: topic,
59 Partition: 0,
60 Offset: 0,
61 })
62 if err == nil && r.Error == nil {
63 break
64 }
65 time.Sleep(100 * time.Millisecond)
66 }
67
68 return nil
69}
70
71func clientEndTxn(client *Client, req *EndTxnRequest) error {
72 ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)

Callers 7

TestClientJoinGroupFunction · 0.70
newLocalClientWithTopicFunction · 0.70
TestClientLeaveGroupFunction · 0.70
TestClientSyncGroupFunction · 0.70

Calls 2

FetchMethod · 0.80
CreateTopicsMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…