MCPcopy
hub / github.com/segmentio/kafka-go / Produce

Method Produce

produce.go:145–203  ·  view source on GitHub ↗

Produce sends a produce request to a kafka broker and returns the response. If the request contained no records, an error wrapping protocol.ErrNoRecord is returned. When the request is configured with RequiredAcks=none, both the response and the error will be nil on success.

(ctx context.Context, req *ProduceRequest)

Source from the content-addressed store, hash-verified

143// When the request is configured with RequiredAcks=none, both the response and
144// the error will be nil on success.
145func (c *Client) Produce(ctx context.Context, req *ProduceRequest) (*ProduceResponse, error) {
146 attributes := protocol.Attributes(req.Compression) & 0x7
147
148 m, err := c.roundTrip(ctx, req.Addr, &produceAPI.Request{
149 TransactionalID: req.TransactionalID,
150 Acks: int16(req.RequiredAcks),
151 Timeout: c.timeoutMs(ctx, defaultProduceTimeout),
152 Topics: []produceAPI.RequestTopic{{
153 Topic: req.Topic,
154 Partitions: []produceAPI.RequestPartition{{
155 Partition: int32(req.Partition),
156 RecordSet: protocol.RecordSet{
157 Attributes: attributes,
158 Records: req.Records,
159 },
160 }},
161 }},
162 })
163
164 switch {
165 case err == nil:
166 case errors.Is(err, protocol.ErrNoRecord):
167 return new(ProduceResponse), nil
168 default:
169 return nil, fmt.Errorf("kafka.(*Client).Produce: %w", err)
170 }
171
172 if req.RequiredAcks == RequireNone {
173 return nil, nil
174 }
175
176 res := m.(*produceAPI.Response)
177 if len(res.Topics) == 0 {
178 return nil, fmt.Errorf("kafka.(*Client).Produce: %w", protocol.ErrNoTopic)
179 }
180 topic := &res.Topics[0]
181 if len(topic.Partitions) == 0 {
182 return nil, fmt.Errorf("kafka.(*Client).Produce: %w", protocol.ErrNoPartition)
183 }
184 partition := &topic.Partitions[0]
185
186 ret := &ProduceResponse{
187 Throttle: makeDuration(res.ThrottleTimeMs),
188 Error: makeError(partition.ErrorCode, partition.ErrorMessage),
189 BaseOffset: partition.BaseOffset,
190 LogAppendTime: makeTime(partition.LogAppendTime),
191 LogStartOffset: partition.LogStartOffset,
192 }
193
194 if len(partition.RecordErrors) != 0 {
195 ret.RecordErrors = make(map[int]error, len(partition.RecordErrors))
196
197 for _, recErr := range partition.RecordErrors {
198 ret.RecordErrors[int(recErr.BatchIndex)] = errors.New(recErr.BatchIndexErrorMessage)
199 }
200 }
201
202 return ret, nil

Callers 12

TestClientDeleteOffsetFunction · 0.80
TestClientPipelineFunction · 0.80
TestClientOffsetCommitFunction · 0.80
TestClientProduceFunction · 0.80
TestClientListOffsetsFunction · 0.80
produceMethod · 0.80

Calls 6

roundTripMethod · 0.95
timeoutMsMethod · 0.95
AttributesTypeAlias · 0.92
makeDurationFunction · 0.85
makeErrorFunction · 0.85
makeTimeFunction · 0.70

Tested by 11

TestClientDeleteOffsetFunction · 0.64
TestClientPipelineFunction · 0.64
TestClientOffsetCommitFunction · 0.64
TestClientProduceFunction · 0.64
TestClientListOffsetsFunction · 0.64