AddPartitionsToTnx sends an add partitions to txn request to a kafka broker and returns the response.
( ctx context.Context, req *AddPartitionsToTxnRequest, )
| 60 | |
| 61 | // AddPartitionsToTnx sends an add partitions to txn request to a kafka broker and returns the response. |
| 62 | func (c *Client) AddPartitionsToTxn( |
| 63 | ctx context.Context, |
| 64 | req *AddPartitionsToTxnRequest, |
| 65 | ) (*AddPartitionsToTxnResponse, error) { |
| 66 | protoReq := &addpartitionstotxn.Request{ |
| 67 | TransactionalID: req.TransactionalID, |
| 68 | ProducerID: int64(req.ProducerID), |
| 69 | ProducerEpoch: int16(req.ProducerEpoch), |
| 70 | } |
| 71 | protoReq.Topics = make([]addpartitionstotxn.RequestTopic, 0, len(req.Topics)) |
| 72 | |
| 73 | for topic, partitions := range req.Topics { |
| 74 | reqTopic := addpartitionstotxn.RequestTopic{ |
| 75 | Name: topic, |
| 76 | Partitions: make([]int32, len(partitions)), |
| 77 | } |
| 78 | for i, partition := range partitions { |
| 79 | reqTopic.Partitions[i] = int32(partition.Partition) |
| 80 | } |
| 81 | protoReq.Topics = append(protoReq.Topics, reqTopic) |
| 82 | } |
| 83 | |
| 84 | m, err := c.roundTrip(ctx, req.Addr, protoReq) |
| 85 | if err != nil { |
| 86 | return nil, fmt.Errorf("kafka.(*Client).AddPartitionsToTxn: %w", err) |
| 87 | } |
| 88 | |
| 89 | r := m.(*addpartitionstotxn.Response) |
| 90 | |
| 91 | res := &AddPartitionsToTxnResponse{ |
| 92 | Throttle: makeDuration(r.ThrottleTimeMs), |
| 93 | Topics: make(map[string][]AddPartitionToTxnPartition, len(r.Results)), |
| 94 | } |
| 95 | |
| 96 | for _, result := range r.Results { |
| 97 | partitions := make([]AddPartitionToTxnPartition, 0, len(result.Results)) |
| 98 | for _, rp := range result.Results { |
| 99 | partitions = append(partitions, AddPartitionToTxnPartition{ |
| 100 | Partition: int(rp.PartitionIndex), |
| 101 | Error: makeError(rp.ErrorCode, ""), |
| 102 | }) |
| 103 | } |
| 104 | res.Topics[result.Name] = partitions |
| 105 | } |
| 106 | |
| 107 | return res, nil |
| 108 | } |