MCPcopy
hub / github.com/segmentio/kafka-go / RawProduce

Method RawProduce

rawproduce.go:50–103  ·  view source on GitHub ↗

RawProduce sends a raw produce request to a kafka broker and returns the response. If the request contained no records, an error wrapping protocol.ErrNoRecord is returned. When the request is configured with RequiredAcks=none, both the response and the error will be nil on success.

(ctx context.Context, req *RawProduceRequest)

Source from the content-addressed store, hash-verified

48// When the request is configured with RequiredAcks=none, both the response and
49// the error will be nil on success.
50func (c *Client) RawProduce(ctx context.Context, req *RawProduceRequest) (*ProduceResponse, error) {
51 m, err := c.roundTrip(ctx, req.Addr, &rawproduce.Request{
52 TransactionalID: req.TransactionalID,
53 Acks: int16(req.RequiredAcks),
54 Timeout: c.timeoutMs(ctx, defaultProduceTimeout),
55 Topics: []rawproduce.RequestTopic{{
56 Topic: req.Topic,
57 Partitions: []rawproduce.RequestPartition{{
58 Partition: int32(req.Partition),
59 RecordSet: req.RawRecords,
60 }},
61 }},
62 })
63
64 switch {
65 case err == nil:
66 case errors.Is(err, protocol.ErrNoRecord):
67 return new(ProduceResponse), nil
68 default:
69 return nil, fmt.Errorf("kafka.(*Client).RawProduce: %w", err)
70 }
71
72 if req.RequiredAcks == RequireNone {
73 return nil, nil
74 }
75
76 res := m.(*produceAPI.Response)
77 if len(res.Topics) == 0 {
78 return nil, fmt.Errorf("kafka.(*Client).RawProduce: %w", protocol.ErrNoTopic)
79 }
80 topic := &res.Topics[0]
81 if len(topic.Partitions) == 0 {
82 return nil, fmt.Errorf("kafka.(*Client).RawProduce: %w", protocol.ErrNoPartition)
83 }
84 partition := &topic.Partitions[0]
85
86 ret := &ProduceResponse{
87 Throttle: makeDuration(res.ThrottleTimeMs),
88 Error: makeError(partition.ErrorCode, partition.ErrorMessage),
89 BaseOffset: partition.BaseOffset,
90 LogAppendTime: makeTime(partition.LogAppendTime),
91 LogStartOffset: partition.LogStartOffset,
92 }
93
94 if len(partition.RecordErrors) != 0 {
95 ret.RecordErrors = make(map[int]error, len(partition.RecordErrors))
96
97 for _, recErr := range partition.RecordErrors {
98 ret.RecordErrors[int(recErr.BatchIndex)] = errors.New(recErr.BatchIndexErrorMessage)
99 }
100 }
101
102 return ret, nil
103}

Callers 3

TestClientRawProduceFunction · 0.80

Calls 5

roundTripMethod · 0.95
timeoutMsMethod · 0.95
makeDurationFunction · 0.85
makeErrorFunction · 0.85
makeTimeFunction · 0.70

Tested by 3

TestClientRawProduceFunction · 0.64