MCPcopy
hub / github.com/IBM/sarama / buildRequest

Method buildRequest

produce_set.go:180–278  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

178}
179
180func (ps *produceSet) buildRequest() *ProduceRequest {
181 req := &ProduceRequest{
182 RequiredAcks: ps.parent.conf.Producer.RequiredAcks,
183 Timeout: int32(ps.parent.conf.Producer.Timeout / time.Millisecond),
184 }
185 if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
186 req.Version = 2
187 }
188 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
189 req.Version = 3
190 if ps.parent.IsTransactional() {
191 req.TransactionalID = &ps.parent.conf.Producer.Transaction.ID
192 }
193 }
194 if ps.parent.conf.Version.IsAtLeast(V1_0_0_0) {
195 req.Version = 5
196 }
197 if ps.parent.conf.Version.IsAtLeast(V2_0_0_0) {
198 req.Version = 6
199 }
200 if ps.parent.conf.Version.IsAtLeast(V2_1_0_0) {
201 req.Version = 7
202 }
203 if ps.parent.conf.Version.IsAtLeast(V2_4_0_0) {
204 req.Version = 8
205 }
206 if ps.parent.conf.Version.IsAtLeast(V2_8_0_0) {
207 req.Version = 9
208 }
209
210 for topic, partitionSets := range ps.msgs {
211 for partition, set := range partitionSets {
212 if req.Version >= 3 {
213 // If the API version we're hitting is 3 or greater, we need to calculate
214 // offsets for each record in the batch relative to FirstOffset.
215 // Additionally, we must set LastOffsetDelta to the value of the last offset
216 // in the batch. Since the OffsetDelta of the first record is 0, we know that the
217 // final record of any batch will have an offset of (# of records in batch) - 1.
218 // (See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
219 // under the RecordBatch section for details.)
220 rb := set.recordsToSend.RecordBatch
221 if len(rb.Records) > 0 {
222 rb.LastOffsetDelta = int32(len(rb.Records) - 1)
223 var maxTimestampDelta time.Duration
224 for i, record := range rb.Records {
225 record.OffsetDelta = int64(i)
226 maxTimestampDelta = max(maxTimestampDelta, record.TimestampDelta)
227 }
228 // Also set the MaxTimestamp similar to other clients.
229 rb.MaxTimestamp = rb.FirstTimestamp.Add(maxTimestampDelta)
230 }
231
232 // Set the batch as transactional when a transactionalID is set
233 rb.IsTransactional = ps.parent.IsTransactional()
234
235 req.AddBatch(topic, partition, rb)
236 continue
237 }

Calls 8

AddBatchMethod · 0.95
AddSetMethod · 0.95
AddMessageMethod · 0.95
encodeFunction · 0.85
IsAtLeastMethod · 0.80
IsTransactionalMethod · 0.65
PrintlnMethod · 0.65
AddMethod · 0.45