RawProduce sends a raw produce request to a kafka broker and returns the response. If the request contained no records, an error wrapping protocol.ErrNoRecord is returned. When the request is configured with RequiredAcks=none, both the response and the error will be nil on success.
(ctx context.Context, req *RawProduceRequest)
| 48 | // When the request is configured with RequiredAcks=none, both the response and |
| 49 | // the error will be nil on success. |
| 50 | func (c *Client) RawProduce(ctx context.Context, req *RawProduceRequest) (*ProduceResponse, error) { |
| 51 | m, err := c.roundTrip(ctx, req.Addr, &rawproduce.Request{ |
| 52 | TransactionalID: req.TransactionalID, |
| 53 | Acks: int16(req.RequiredAcks), |
| 54 | Timeout: c.timeoutMs(ctx, defaultProduceTimeout), |
| 55 | Topics: []rawproduce.RequestTopic{{ |
| 56 | Topic: req.Topic, |
| 57 | Partitions: []rawproduce.RequestPartition{{ |
| 58 | Partition: int32(req.Partition), |
| 59 | RecordSet: req.RawRecords, |
| 60 | }}, |
| 61 | }}, |
| 62 | }) |
| 63 | |
| 64 | switch { |
| 65 | case err == nil: |
| 66 | case errors.Is(err, protocol.ErrNoRecord): |
| 67 | return new(ProduceResponse), nil |
| 68 | default: |
| 69 | return nil, fmt.Errorf("kafka.(*Client).RawProduce: %w", err) |
| 70 | } |
| 71 | |
| 72 | if req.RequiredAcks == RequireNone { |
| 73 | return nil, nil |
| 74 | } |
| 75 | |
| 76 | res := m.(*produceAPI.Response) |
| 77 | if len(res.Topics) == 0 { |
| 78 | return nil, fmt.Errorf("kafka.(*Client).RawProduce: %w", protocol.ErrNoTopic) |
| 79 | } |
| 80 | topic := &res.Topics[0] |
| 81 | if len(topic.Partitions) == 0 { |
| 82 | return nil, fmt.Errorf("kafka.(*Client).RawProduce: %w", protocol.ErrNoPartition) |
| 83 | } |
| 84 | partition := &topic.Partitions[0] |
| 85 | |
| 86 | ret := &ProduceResponse{ |
| 87 | Throttle: makeDuration(res.ThrottleTimeMs), |
| 88 | Error: makeError(partition.ErrorCode, partition.ErrorMessage), |
| 89 | BaseOffset: partition.BaseOffset, |
| 90 | LogAppendTime: makeTime(partition.LogAppendTime), |
| 91 | LogStartOffset: partition.LogStartOffset, |
| 92 | } |
| 93 | |
| 94 | if len(partition.RecordErrors) != 0 { |
| 95 | ret.RecordErrors = make(map[int]error, len(partition.RecordErrors)) |
| 96 | |
| 97 | for _, recErr := range partition.RecordErrors { |
| 98 | ret.RecordErrors[int(recErr.BatchIndex)] = errors.New(recErr.BatchIndexErrorMessage) |
| 99 | } |
| 100 | } |
| 101 | |
| 102 | return ret, nil |
| 103 | } |