MCPcopy
hub / github.com/segmentio/kafka-go / AddPartitionsToTxn

Method AddPartitionsToTxn

addpartitionstotxn.go:62–108  ·  view source on GitHub ↗

AddPartitionsToTnx sends an add partitions to txn request to a kafka broker and returns the response.

(
	ctx context.Context,
	req *AddPartitionsToTxnRequest,
)

Source from the content-addressed store, hash-verified

60
61// AddPartitionsToTnx sends an add partitions to txn request to a kafka broker and returns the response.
62func (c *Client) AddPartitionsToTxn(
63 ctx context.Context,
64 req *AddPartitionsToTxnRequest,
65) (*AddPartitionsToTxnResponse, error) {
66 protoReq := &addpartitionstotxn.Request{
67 TransactionalID: req.TransactionalID,
68 ProducerID: int64(req.ProducerID),
69 ProducerEpoch: int16(req.ProducerEpoch),
70 }
71 protoReq.Topics = make([]addpartitionstotxn.RequestTopic, 0, len(req.Topics))
72
73 for topic, partitions := range req.Topics {
74 reqTopic := addpartitionstotxn.RequestTopic{
75 Name: topic,
76 Partitions: make([]int32, len(partitions)),
77 }
78 for i, partition := range partitions {
79 reqTopic.Partitions[i] = int32(partition.Partition)
80 }
81 protoReq.Topics = append(protoReq.Topics, reqTopic)
82 }
83
84 m, err := c.roundTrip(ctx, req.Addr, protoReq)
85 if err != nil {
86 return nil, fmt.Errorf("kafka.(*Client).AddPartitionsToTxn: %w", err)
87 }
88
89 r := m.(*addpartitionstotxn.Response)
90
91 res := &AddPartitionsToTxnResponse{
92 Throttle: makeDuration(r.ThrottleTimeMs),
93 Topics: make(map[string][]AddPartitionToTxnPartition, len(r.Results)),
94 }
95
96 for _, result := range r.Results {
97 partitions := make([]AddPartitionToTxnPartition, 0, len(result.Results))
98 for _, rp := range result.Results {
99 partitions = append(partitions, AddPartitionToTxnPartition{
100 Partition: int(rp.PartitionIndex),
101 Error: makeError(rp.ErrorCode, ""),
102 })
103 }
104 res.Topics[result.Name] = partitions
105 }
106
107 return res, nil
108}

Callers 2

Calls 3

roundTripMethod · 0.95
makeDurationFunction · 0.85
makeErrorFunction · 0.85

Tested by 2