MCPcopy
hub / github.com/segmentio/kafka-go / CreatePartitions

Method CreatePartitions

createpartitions.go:42–74  ·  view source on GitHub ↗

CreatePartitions sends a partitions creation request to a kafka broker and returns the response.

(ctx context.Context, req *CreatePartitionsRequest)

Source from the content-addressed store, hash-verified

40// CreatePartitions sends a partitions creation request to a kafka broker and returns the
41// response.
42func (c *Client) CreatePartitions(ctx context.Context, req *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
43 topics := make([]createpartitions.RequestTopic, len(req.Topics))
44
45 for i, t := range req.Topics {
46 topics[i] = createpartitions.RequestTopic{
47 Name: t.Name,
48 Count: t.Count,
49 Assignments: t.assignments(),
50 }
51 }
52
53 m, err := c.roundTrip(ctx, req.Addr, &createpartitions.Request{
54 Topics: topics,
55 TimeoutMs: c.timeoutMs(ctx, defaultCreatePartitionsTimeout),
56 ValidateOnly: req.ValidateOnly,
57 })
58
59 if err != nil {
60 return nil, fmt.Errorf("kafka.(*Client).CreatePartitions: %w", err)
61 }
62
63 res := m.(*createpartitions.Response)
64 ret := &CreatePartitionsResponse{
65 Throttle: makeDuration(res.ThrottleTimeMs),
66 Errors: make(map[string]error, len(res.Results)),
67 }
68
69 for _, t := range res.Results {
70 ret.Errors[t.Name] = makeError(t.ErrorCode, t.ErrorMessage)
71 }
72
73 return ret, nil
74}
75
76type TopicPartitionsConfig struct {
77 // Topic name

Calls 5

roundTripMethod · 0.95
timeoutMsMethod · 0.95
makeDurationFunction · 0.85
makeErrorFunction · 0.85
assignmentsMethod · 0.45