MCPcopy
hub / github.com/segmentio/kafka-go / TestProduceRequest

Function TestProduceRequest

protocol/produce/produce_test.go:19–154  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

17)
18
19func TestProduceRequest(t *testing.T) {
20 t0 := time.Now().Truncate(time.Millisecond)
21 t1 := t0.Add(1 * time.Millisecond)
22 t2 := t0.Add(2 * time.Millisecond)
23
24 prototest.TestRequest(t, v0, &produce.Request{
25 Acks: 1,
26 Timeout: 500,
27 Topics: []produce.RequestTopic{
28 {
29 Topic: "topic-1",
30 Partitions: []produce.RequestPartition{
31 {
32 Partition: 0,
33 RecordSet: protocol.RecordSet{
34 Version: 1,
35 Records: protocol.NewRecordReader(
36 protocol.Record{Offset: 0, Time: t0, Key: nil, Value: nil},
37 ),
38 },
39 },
40 {
41 Partition: 1,
42 RecordSet: protocol.RecordSet{
43 Version: 1,
44 Records: protocol.NewRecordReader(
45 protocol.Record{Offset: 0, Time: t0, Key: nil, Value: prototest.String("msg-0")},
46 protocol.Record{Offset: 1, Time: t1, Key: nil, Value: prototest.String("msg-1")},
47 protocol.Record{Offset: 2, Time: t2, Key: prototest.Bytes([]byte{1}), Value: prototest.String("msg-2")},
48 ),
49 },
50 },
51 },
52 },
53
54 {
55 Topic: "topic-2",
56 Partitions: []produce.RequestPartition{
57 {
58 Partition: 0,
59 RecordSet: protocol.RecordSet{
60 Version: 1,
61 Attributes: protocol.Gzip,
62 Records: protocol.NewRecordReader(
63 protocol.Record{Offset: 0, Time: t0, Key: nil, Value: prototest.String("msg-0")},
64 protocol.Record{Offset: 1, Time: t1, Key: nil, Value: prototest.String("msg-1")},
65 protocol.Record{Offset: 2, Time: t2, Key: prototest.Bytes([]byte{1}), Value: prototest.String("msg-2")},
66 ),
67 },
68 },
69 },
70 },
71 },
72 })
73
74 prototest.TestRequest(t, v3, &produce.Request{
75 TransactionalID: "1234",
76 Acks: 1,

Callers

nothing calls this directly

Calls 5

TestRequestFunction · 0.92
NewRecordReaderFunction · 0.92
StringFunction · 0.92
BytesFunction · 0.92
TruncateMethod · 0.45

Tested by

no test coverage detected