MCPcopy
hub / github.com/IBM/sarama / TestProduceSetIdempotentRequestBuilding

Function TestProduceSetIdempotentRequestBuilding

produce_set_test.go:290–376  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

288}
289
290func TestProduceSetIdempotentRequestBuilding(t *testing.T) {
291 const pID = 1000
292 const pEpoch = 1234
293
294 config := NewTestConfig()
295 config.Producer.RequiredAcks = WaitForAll
296 config.Producer.Idempotent = true
297 config.Version = V0_11_0_0
298
299 parent := &asyncProducer{
300 conf: config,
301 txnmgr: &transactionManager{
302 producerID: pID,
303 producerEpoch: pEpoch,
304 },
305 }
306 ps := newProduceSet(parent)
307
308 now := time.Now()
309 msg := &ProducerMessage{
310 Topic: "t1",
311 Partition: 0,
312 Key: StringEncoder(TestMessage),
313 Value: StringEncoder(TestMessage),
314 Headers: []RecordHeader{
315 {
316 Key: []byte("header-1"),
317 Value: []byte("value-1"),
318 },
319 {
320 Key: []byte("header-2"),
321 Value: []byte("value-2"),
322 },
323 {
324 Key: []byte("header-3"),
325 Value: []byte("value-3"),
326 },
327 },
328 Timestamp: now,
329 sequenceNumber: 123,
330 }
331 for range 10 {
332 safeAddMessage(t, ps, msg)
333 msg.Timestamp = msg.Timestamp.Add(time.Second)
334 }
335
336 req := ps.buildRequest()
337
338 if req.Version != 3 {
339 t.Error("Wrong request version")
340 }
341
342 batch := req.records["t1"][0].RecordBatch
343 if !batch.FirstTimestamp.Equal(now.Truncate(time.Millisecond)) {
344 t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
345 }
346 if batch.ProducerID != pID {
347 t.Errorf("Wrong producerID: %v", batch.ProducerID)

Callers

nothing calls this directly

Calls 8

newProduceSetFunction · 0.85
StringEncoderTypeAlias · 0.85
safeAddMessageFunction · 0.85
buildRequestMethod · 0.80
NewTestConfigFunction · 0.70
ErrorMethod · 0.65
ErrorfMethod · 0.65
AddMethod · 0.45

Tested by

no test coverage detected