(t *testing.T)
| 192 | } |
| 193 | |
| 194 | func TestClientProduceAndConsume(t *testing.T) { |
| 195 | ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) |
| 196 | defer cancel() |
| 197 | // Tests a typical kafka use case, data is produced to a partition, |
| 198 | // then consumed back sequentially. We use snappy compression because |
| 199 | // kafka stream are often compressed, and verify that each record |
| 200 | // produced is exposed to the consumer, and order is preserved. |
| 201 | client, topic, shutdown := newLocalClientAndTopic() |
| 202 | defer shutdown() |
| 203 | |
| 204 | epoch := time.Now() |
| 205 | seed := int64(0) // deterministic |
| 206 | prng := rand.New(rand.NewSource(seed)) |
| 207 | offset := int64(0) |
| 208 | |
| 209 | const numBatches = 100 |
| 210 | const recordsPerBatch = 320 |
| 211 | t.Logf("producing %d batches of %d records...", numBatches, recordsPerBatch) |
| 212 | |
| 213 | for i := 0; i < numBatches; i++ { // produce 100 batches |
| 214 | records := make([]Record, recordsPerBatch) |
| 215 | |
| 216 | for i := range records { |
| 217 | v := make([]byte, prng.Intn(999)+1) |
| 218 | io.ReadFull(prng, v) |
| 219 | records[i].Time = epoch |
| 220 | records[i].Value = NewBytes(v) |
| 221 | } |
| 222 | |
| 223 | res, err := client.Produce(ctx, &ProduceRequest{ |
| 224 | Topic: topic, |
| 225 | Partition: 0, |
| 226 | RequiredAcks: -1, |
| 227 | Records: NewRecordReader(records...), |
| 228 | Compression: compress.Snappy, |
| 229 | }) |
| 230 | if err != nil { |
| 231 | t.Fatal(err) |
| 232 | } |
| 233 | if res.Error != nil { |
| 234 | t.Fatal(res.Error) |
| 235 | } |
| 236 | if res.BaseOffset != offset { |
| 237 | t.Fatalf("records were produced at an unexpected offset, want %d but got %d", offset, res.BaseOffset) |
| 238 | } |
| 239 | offset += int64(len(records)) |
| 240 | } |
| 241 | |
| 242 | prng.Seed(seed) |
| 243 | offset = 0 // reset |
| 244 | numFetches := 0 |
| 245 | numRecords := 0 |
| 246 | |
| 247 | for numRecords < (numBatches * recordsPerBatch) { |
| 248 | res, err := client.Fetch(ctx, &FetchRequest{ |
| 249 | Topic: topic, |
| 250 | Partition: 0, |
| 251 | Offset: offset, |
nothing calls this directly
no test coverage detected