MCPcopy
hub / github.com/segmentio/kafka-go / TestClientRawProduceCompressed

Function TestClientRawProduceCompressed

rawproduce_test.go:48–81  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

46}
47
48func TestClientRawProduceCompressed(t *testing.T) {
49 // The RawProduce request records are encoded in the format introduced in Kafka 0.11.0.
50 if !ktesting.KafkaIsAtLeast("0.11.0") {
51 t.Skip("Skipping because the RawProduce request is not supported by Kafka versions below 0.11.0")
52 }
53
54 client, topic, shutdown := newLocalClientAndTopic()
55 defer shutdown()
56
57 now := time.Now()
58
59 res, err := client.RawProduce(context.Background(), &RawProduceRequest{
60 Topic: topic,
61 Partition: 0,
62 RequiredAcks: -1,
63 RawRecords: NewRawRecordSet(NewRecordReader(
64 Record{Time: now, Value: NewBytes([]byte(`hello-1`))},
65 Record{Time: now, Value: NewBytes([]byte(`hello-2`))},
66 Record{Time: now, Value: NewBytes([]byte(`hello-3`))},
67 ), protocol.Gzip),
68 })
69
70 if err != nil {
71 t.Fatal(err)
72 }
73
74 if res.Error != nil {
75 t.Error(res.Error)
76 }
77
78 for index, err := range res.RecordErrors {
79 t.Errorf("record at index %d produced an error: %v", index, err)
80 }
81}
82
83func TestClientRawProduceNilRecords(t *testing.T) {
84 client, topic, shutdown := newLocalClientAndTopic()

Callers

nothing calls this directly

Calls 6

RawProduceMethod · 0.80
newLocalClientAndTopicFunction · 0.70
NewRawRecordSetFunction · 0.70
NewRecordReaderFunction · 0.70
NewBytesFunction · 0.70
ErrorMethod · 0.45

Tested by

no test coverage detected