MCPcopy
hub / github.com/segmentio/kafka-go / TestClientProduceAndConsume

Function TestClientProduceAndConsume

client_test.go:194–304  ·  client_test.go::TestClientProduceAndConsume
(t *testing.T)

Source from the content-addressed store, hash-verified

192}
193
194func TestClientProduceAndConsume(t *testing.T) {
195 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
196 defer cancel()
197 // Tests a typical kafka use case, data is produced to a partition,
198 // then consumed back sequentially. We use snappy compression because
199 // kafka stream are often compressed, and verify that each record
200 // produced is exposed to the consumer, and order is preserved.
201 client, topic, shutdown := newLocalClientAndTopic()
202 defer shutdown()
203
204 epoch := time.Now()
205 seed := int64(0) // deterministic
206 prng := rand.New(rand.NewSource(seed))
207 offset := int64(0)
208
209 const numBatches = 100
210 const recordsPerBatch = 320
211 t.Logf("producing %d batches of %d records...", numBatches, recordsPerBatch)
212
213 for i := 0; i < numBatches; i++ { // produce 100 batches
214 records := make([]Record, recordsPerBatch)
215
216 for i := range records {
217 v := make([]byte, prng.Intn(999)+1)
218 io.ReadFull(prng, v)
219 records[i].Time = epoch
220 records[i].Value = NewBytes(v)
221 }
222
223 res, err := client.Produce(ctx, &ProduceRequest{
224 Topic: topic,
225 Partition: 0,
226 RequiredAcks: -1,
227 Records: NewRecordReader(records...),
228 Compression: compress.Snappy,
229 })
230 if err != nil {
231 t.Fatal(err)
232 }
233 if res.Error != nil {
234 t.Fatal(res.Error)
235 }
236 if res.BaseOffset != offset {
237 t.Fatalf("records were produced at an unexpected offset, want %d but got %d", offset, res.BaseOffset)
238 }
239 offset += int64(len(records))
240 }
241
242 prng.Seed(seed)
243 offset = 0 // reset
244 numFetches := 0
245 numRecords := 0
246
247 for numRecords < (numBatches * recordsPerBatch) {
248 res, err := client.Fetch(ctx, &FetchRequest{
249 Topic: topic,
250 Partition: 0,
251 Offset: offset,

Callers

nothing calls this directly

Calls 8

ProduceMethod · 0.80
FetchMethod · 0.80
newLocalClientAndTopicFunction · 0.70
NewBytesFunction · 0.70
NewRecordReaderFunction · 0.70
ReadRecordMethod · 0.65
CloseMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected