MCPcopy
hub / github.com/segmentio/kafka-go / read

Method read

reader.go:1490–1544  ·  view source on GitHub ↗
(ctx context.Context, offset int64, conn *Conn)

Source from the content-addressed store, hash-verified

1488}
1489
1490func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
1491 r.stats.fetches.observe(1)
1492 r.stats.offset.observe(offset)
1493
1494 t0 := time.Now()
1495 conn.SetReadDeadline(t0.Add(r.maxWait))
1496
1497 batch := conn.ReadBatchWith(ReadBatchConfig{
1498 MinBytes: r.minBytes,
1499 MaxBytes: r.maxBytes,
1500 IsolationLevel: r.isolationLevel,
1501 })
1502 highWaterMark := batch.HighWaterMark()
1503
1504 t1 := time.Now()
1505 r.stats.waitTime.observeDuration(t1.Sub(t0))
1506
1507 var msg Message
1508 var err error
1509 var size int64
1510 var bytes int64
1511
1512 for {
1513 conn.SetReadDeadline(time.Now().Add(r.readBatchTimeout))
1514
1515 if msg, err = batch.ReadMessage(); err != nil {
1516 batch.Close()
1517 break
1518 }
1519
1520 n := int64(len(msg.Key) + len(msg.Value))
1521 r.stats.messages.observe(1)
1522 r.stats.bytes.observe(n)
1523
1524 if err = r.sendMessage(ctx, msg, highWaterMark); err != nil {
1525 batch.Close()
1526 break
1527 }
1528
1529 offset = msg.Offset + 1
1530 r.stats.offset.observe(offset)
1531 r.stats.lag.observe(highWaterMark - offset)
1532
1533 size++
1534 bytes += n
1535 }
1536
1537 conn.SetReadDeadline(time.Time{})
1538
1539 t2 := time.Now()
1540 r.stats.readTime.observeDuration(t2.Sub(t1))
1541 r.stats.fetchSize.observe(size)
1542 r.stats.fetchBytes.observe(bytes)
1543 return offset, err
1544}
1545
1546func (r *reader) readOffsets(conn *Conn) (first, last int64, err error) {
1547 conn.SetDeadline(time.Now().Add(10 * time.Second))

Callers 1

runMethod · 0.95

Calls 8

sendMessageMethod · 0.95
ReadBatchWithMethod · 0.80
HighWaterMarkMethod · 0.80
observeDurationMethod · 0.80
observeMethod · 0.45
SetReadDeadlineMethod · 0.45
ReadMessageMethod · 0.45
CloseMethod · 0.45

Tested by

no test coverage detected