(t *testing.T)
| 207 | } |
| 208 | |
| 209 | func TestClientPipeline(t *testing.T) { |
| 210 | client, topic, shutdown := newLocalClientAndTopic() |
| 211 | defer shutdown() |
| 212 | |
| 213 | const numBatches = 100 |
| 214 | const recordsPerBatch = 30 |
| 215 | |
| 216 | unixEpoch := time.Unix(0, 0) |
| 217 | records := make([]Record, recordsPerBatch) |
| 218 | content := []byte("1234567890") |
| 219 | |
| 220 | for i := 0; i < numBatches; i++ { |
| 221 | for j := range records { |
| 222 | records[j] = Record{Value: NewBytes(content)} |
| 223 | } |
| 224 | |
| 225 | _, err := client.Produce(context.Background(), &ProduceRequest{ |
| 226 | Topic: topic, |
| 227 | RequiredAcks: -1, |
| 228 | Records: NewRecordReader(records...), |
| 229 | Compression: Snappy, |
| 230 | }) |
| 231 | if err != nil { |
| 232 | t.Fatal(err) |
| 233 | } |
| 234 | } |
| 235 | |
| 236 | offset := int64(0) |
| 237 | |
| 238 | for i := 0; i < (numBatches * recordsPerBatch); { |
| 239 | req := &FetchRequest{ |
| 240 | Topic: topic, |
| 241 | Offset: offset, |
| 242 | MinBytes: 1, |
| 243 | MaxBytes: 8192, |
| 244 | MaxWait: 500 * time.Millisecond, |
| 245 | } |
| 246 | |
| 247 | res, err := client.Fetch(context.Background(), req) |
| 248 | if err != nil { |
| 249 | t.Fatal(err) |
| 250 | } |
| 251 | |
| 252 | if res.Error != nil { |
| 253 | t.Fatal(res.Error) |
| 254 | } |
| 255 | |
| 256 | for { |
| 257 | r, err := res.Records.ReadRecord() |
| 258 | if err != nil { |
| 259 | if errors.Is(err, io.EOF) { |
| 260 | break |
| 261 | } |
| 262 | t.Fatal(err) |
| 263 | } |
| 264 | |
| 265 | if r.Key != nil { |
| 266 | r.Key.Close() |
nothing calls this directly
no test coverage detected