AddOffsetsToTnx sends an add offsets to txn request to a kafka broker and returns the response.
( ctx context.Context, req *AddOffsetsToTxnRequest, )
| 43 | |
| 44 | // AddOffsetsToTnx sends an add offsets to txn request to a kafka broker and returns the response. |
| 45 | func (c *Client) AddOffsetsToTxn( |
| 46 | ctx context.Context, |
| 47 | req *AddOffsetsToTxnRequest, |
| 48 | ) (*AddOffsetsToTxnResponse, error) { |
| 49 | m, err := c.roundTrip(ctx, req.Addr, &addoffsetstotxn.Request{ |
| 50 | TransactionalID: req.TransactionalID, |
| 51 | ProducerID: int64(req.ProducerID), |
| 52 | ProducerEpoch: int16(req.ProducerEpoch), |
| 53 | GroupID: req.GroupID, |
| 54 | }) |
| 55 | if err != nil { |
| 56 | return nil, fmt.Errorf("kafka.(*Client).AddOffsetsToTxn: %w", err) |
| 57 | } |
| 58 | |
| 59 | r := m.(*addoffsetstotxn.Response) |
| 60 | |
| 61 | res := &AddOffsetsToTxnResponse{ |
| 62 | Throttle: makeDuration(r.ThrottleTimeMs), |
| 63 | Error: makeError(r.ErrorCode, ""), |
| 64 | } |
| 65 | |
| 66 | return res, nil |
| 67 | } |