(ctx context.Context, offset int64)
| 1440 | } |
| 1441 | |
| 1442 | func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, start int64, err error) { |
| 1443 | for i := 0; i != len(r.brokers) && conn == nil; i++ { |
| 1444 | broker := r.brokers[i] |
| 1445 | var first, last int64 |
| 1446 | |
| 1447 | t0 := time.Now() |
| 1448 | conn, err = r.dialer.DialLeader(ctx, "tcp", broker, r.topic, r.partition) |
| 1449 | t1 := time.Now() |
| 1450 | r.stats.dials.observe(1) |
| 1451 | r.stats.dialTime.observeDuration(t1.Sub(t0)) |
| 1452 | |
| 1453 | if err != nil { |
| 1454 | continue |
| 1455 | } |
| 1456 | |
| 1457 | if first, last, err = r.readOffsets(conn); err != nil { |
| 1458 | conn.Close() |
| 1459 | conn = nil |
| 1460 | break |
| 1461 | } |
| 1462 | |
| 1463 | switch { |
| 1464 | case offset == FirstOffset: |
| 1465 | offset = first |
| 1466 | |
| 1467 | case offset == LastOffset: |
| 1468 | offset = last |
| 1469 | |
| 1470 | case offset < first: |
| 1471 | offset = first |
| 1472 | } |
| 1473 | |
| 1474 | r.withLogger(func(log Logger) { |
| 1475 | log.Printf("the kafka reader for partition %d of %s is seeking to offset %d", r.partition, r.topic, toHumanOffset(offset)) |
| 1476 | }) |
| 1477 | |
| 1478 | if start, err = conn.Seek(offset, SeekAbsolute); err != nil { |
| 1479 | conn.Close() |
| 1480 | conn = nil |
| 1481 | break |
| 1482 | } |
| 1483 | |
| 1484 | conn.SetDeadline(time.Time{}) |
| 1485 | } |
| 1486 | |
| 1487 | return |
| 1488 | } |
| 1489 | |
| 1490 | func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) { |
| 1491 | r.stats.fetches.observe(1) |
no test coverage detected