MCPcopy
hub / github.com/segmentio/kafka-go / TestClientRawProduce

Function TestClientRawProduce

rawproduce_test.go:13–46  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

11)
12
13func TestClientRawProduce(t *testing.T) {
14 // The RawProduce request records are encoded in the format introduced in Kafka 0.11.0.
15 if !ktesting.KafkaIsAtLeast("0.11.0") {
16 t.Skip("Skipping because the RawProduce request is not supported by Kafka versions below 0.11.0")
17 }
18
19 client, topic, shutdown := newLocalClientAndTopic()
20 defer shutdown()
21
22 now := time.Now()
23
24 res, err := client.RawProduce(context.Background(), &RawProduceRequest{
25 Topic: topic,
26 Partition: 0,
27 RequiredAcks: -1,
28 RawRecords: NewRawRecordSet(NewRecordReader(
29 Record{Time: now, Value: NewBytes([]byte(`hello-1`))},
30 Record{Time: now, Value: NewBytes([]byte(`hello-2`))},
31 Record{Time: now, Value: NewBytes([]byte(`hello-3`))},
32 ), 0),
33 })
34
35 if err != nil {
36 t.Fatal(err)
37 }
38
39 if res.Error != nil {
40 t.Error(res.Error)
41 }
42
43 for index, err := range res.RecordErrors {
44 t.Errorf("record at index %d produced an error: %v", index, err)
45 }
46}
47
48func TestClientRawProduceCompressed(t *testing.T) {
49 // The RawProduce request records are encoded in the format introduced in Kafka 0.11.0.

Callers

nothing calls this directly

Calls 6

RawProduceMethod · 0.80
newLocalClientAndTopicFunction · 0.70
NewRawRecordSetFunction · 0.70
NewRecordReaderFunction · 0.70
NewBytesFunction · 0.70
ErrorMethod · 0.45

Tested by

no test coverage detected