MCPcopy
hub / github.com/segmentio/kafka-go / produceRecords

Function produceRecords

fetch_test.go:16–47  ·  view source on GitHub ↗
(t *testing.T, n int, addr net.Addr, topic string, compression compress.Codec)

Source from the content-addressed store, hash-verified

14)
15
16func produceRecords(t *testing.T, n int, addr net.Addr, topic string, compression compress.Codec) []Record {
17 conn, err := (&Dialer{
18 Resolver: &net.Resolver{},
19 }).DialLeader(context.Background(), addr.Network(), addr.String(), topic, 0)
20
21 if err != nil {
22 t.Fatal("failed to open a new kafka connection:", err)
23 }
24 defer conn.Close()
25
26 msgs := makeTestSequence(n)
27 if compression == nil {
28 _, err = conn.WriteMessages(msgs...)
29 } else {
30 _, err = conn.WriteCompressedMessages(compression, msgs...)
31 }
32 if err != nil {
33 t.Fatal(err)
34 }
35
36 records := make([]Record, len(msgs))
37 for offset, msg := range msgs {
38 records[offset] = Record{
39 Offset: int64(offset),
40 Key: NewBytes(msg.Key),
41 Value: NewBytes(msg.Value),
42 Headers: msg.Headers,
43 }
44 }
45
46 return records
47}
48
49func TestClientFetch(t *testing.T) {
50 client, topic, shutdown := newLocalClientAndTopic()

Callers 2

TestClientFetchFunction · 0.85

Calls 8

makeTestSequenceFunction · 0.85
DialLeaderMethod · 0.80
NewBytesFunction · 0.70
NetworkMethod · 0.45
StringMethod · 0.45
CloseMethod · 0.45
WriteMessagesMethod · 0.45

Tested by

no test coverage detected