(ctx context.Context, offset int64, conn *Conn)
| 1488 | } |
| 1489 | |
| 1490 | func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) { |
| 1491 | r.stats.fetches.observe(1) |
| 1492 | r.stats.offset.observe(offset) |
| 1493 | |
| 1494 | t0 := time.Now() |
| 1495 | conn.SetReadDeadline(t0.Add(r.maxWait)) |
| 1496 | |
| 1497 | batch := conn.ReadBatchWith(ReadBatchConfig{ |
| 1498 | MinBytes: r.minBytes, |
| 1499 | MaxBytes: r.maxBytes, |
| 1500 | IsolationLevel: r.isolationLevel, |
| 1501 | }) |
| 1502 | highWaterMark := batch.HighWaterMark() |
| 1503 | |
| 1504 | t1 := time.Now() |
| 1505 | r.stats.waitTime.observeDuration(t1.Sub(t0)) |
| 1506 | |
| 1507 | var msg Message |
| 1508 | var err error |
| 1509 | var size int64 |
| 1510 | var bytes int64 |
| 1511 | |
| 1512 | for { |
| 1513 | conn.SetReadDeadline(time.Now().Add(r.readBatchTimeout)) |
| 1514 | |
| 1515 | if msg, err = batch.ReadMessage(); err != nil { |
| 1516 | batch.Close() |
| 1517 | break |
| 1518 | } |
| 1519 | |
| 1520 | n := int64(len(msg.Key) + len(msg.Value)) |
| 1521 | r.stats.messages.observe(1) |
| 1522 | r.stats.bytes.observe(n) |
| 1523 | |
| 1524 | if err = r.sendMessage(ctx, msg, highWaterMark); err != nil { |
| 1525 | batch.Close() |
| 1526 | break |
| 1527 | } |
| 1528 | |
| 1529 | offset = msg.Offset + 1 |
| 1530 | r.stats.offset.observe(offset) |
| 1531 | r.stats.lag.observe(highWaterMark - offset) |
| 1532 | |
| 1533 | size++ |
| 1534 | bytes += n |
| 1535 | } |
| 1536 | |
| 1537 | conn.SetReadDeadline(time.Time{}) |
| 1538 | |
| 1539 | t2 := time.Now() |
| 1540 | r.stats.readTime.observeDuration(t2.Sub(t1)) |
| 1541 | r.stats.fetchSize.observe(size) |
| 1542 | r.stats.fetchBytes.observe(bytes) |
| 1543 | return offset, err |
| 1544 | } |
| 1545 | |
| 1546 | func (r *reader) readOffsets(conn *Conn) (first, last int64, err error) { |
| 1547 | conn.SetDeadline(time.Now().Add(10 * time.Second)) |
no test coverage detected