(t *testing.T, n int, addr net.Addr, topic string, compression compress.Codec)
| 14 | ) |
| 15 | |
| 16 | func produceRecords(t *testing.T, n int, addr net.Addr, topic string, compression compress.Codec) []Record { |
| 17 | conn, err := (&Dialer{ |
| 18 | Resolver: &net.Resolver{}, |
| 19 | }).DialLeader(context.Background(), addr.Network(), addr.String(), topic, 0) |
| 20 | |
| 21 | if err != nil { |
| 22 | t.Fatal("failed to open a new kafka connection:", err) |
| 23 | } |
| 24 | defer conn.Close() |
| 25 | |
| 26 | msgs := makeTestSequence(n) |
| 27 | if compression == nil { |
| 28 | _, err = conn.WriteMessages(msgs...) |
| 29 | } else { |
| 30 | _, err = conn.WriteCompressedMessages(compression, msgs...) |
| 31 | } |
| 32 | if err != nil { |
| 33 | t.Fatal(err) |
| 34 | } |
| 35 | |
| 36 | records := make([]Record, len(msgs)) |
| 37 | for offset, msg := range msgs { |
| 38 | records[offset] = Record{ |
| 39 | Offset: int64(offset), |
| 40 | Key: NewBytes(msg.Key), |
| 41 | Value: NewBytes(msg.Value), |
| 42 | Headers: msg.Headers, |
| 43 | } |
| 44 | } |
| 45 | |
| 46 | return records |
| 47 | } |
| 48 | |
| 49 | func TestClientFetch(t *testing.T) { |
| 50 | client, topic, shutdown := newLocalClientAndTopic() |
no test coverage detected