MCPcopy
hub / github.com/segmentio/kafka-go / CreateTopics

Method CreateTopics

createtopics.go:49–82  ·  view source on GitHub ↗

CreateTopics sends a topic creation request to a kafka broker and returns the response.

(ctx context.Context, req *CreateTopicsRequest)

Source from the content-addressed store, hash-verified

47// CreateTopics sends a topic creation request to a kafka broker and returns the
48// response.
49func (c *Client) CreateTopics(ctx context.Context, req *CreateTopicsRequest) (*CreateTopicsResponse, error) {
50 topics := make([]createtopics.RequestTopic, len(req.Topics))
51
52 for i, t := range req.Topics {
53 topics[i] = createtopics.RequestTopic{
54 Name: t.Topic,
55 NumPartitions: int32(t.NumPartitions),
56 ReplicationFactor: int16(t.ReplicationFactor),
57 Assignments: t.assignments(),
58 Configs: t.configs(),
59 }
60 }
61
62 m, err := c.roundTrip(ctx, req.Addr, &createtopics.Request{
63 Topics: topics,
64 TimeoutMs: c.timeoutMs(ctx, defaultCreateTopicsTimeout),
65 ValidateOnly: req.ValidateOnly,
66 })
67 if err != nil {
68 return nil, fmt.Errorf("kafka.(*Client).CreateTopics: %w", err)
69 }
70
71 res := m.(*createtopics.Response)
72 ret := &CreateTopicsResponse{
73 Throttle: makeDuration(res.ThrottleTimeMs),
74 Errors: make(map[string]error, len(res.Topics)),
75 }
76
77 for _, t := range res.Topics {
78 ret.Errors[t.Name] = makeError(t.ErrorCode, t.ErrorMessage)
79 }
80
81 return ret, nil
82}
83
84type ConfigEntry struct {
85 ConfigName string

Callers

nothing calls this directly

Calls 6

roundTripMethod · 0.95
timeoutMsMethod · 0.95
makeDurationFunction · 0.85
makeErrorFunction · 0.85
configsMethod · 0.80
assignmentsMethod · 0.45

Tested by

no test coverage detected