MCPcopy
hub / github.com/segmentio/kafka-go / initialize

Method initialize

reader.go:1442–1488  ·  view source on GitHub ↗
(ctx context.Context, offset int64)

Source from the content-addressed store, hash-verified

1440}
1441
1442func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, start int64, err error) {
1443 for i := 0; i != len(r.brokers) && conn == nil; i++ {
1444 broker := r.brokers[i]
1445 var first, last int64
1446
1447 t0 := time.Now()
1448 conn, err = r.dialer.DialLeader(ctx, "tcp", broker, r.topic, r.partition)
1449 t1 := time.Now()
1450 r.stats.dials.observe(1)
1451 r.stats.dialTime.observeDuration(t1.Sub(t0))
1452
1453 if err != nil {
1454 continue
1455 }
1456
1457 if first, last, err = r.readOffsets(conn); err != nil {
1458 conn.Close()
1459 conn = nil
1460 break
1461 }
1462
1463 switch {
1464 case offset == FirstOffset:
1465 offset = first
1466
1467 case offset == LastOffset:
1468 offset = last
1469
1470 case offset < first:
1471 offset = first
1472 }
1473
1474 r.withLogger(func(log Logger) {
1475 log.Printf("the kafka reader for partition %d of %s is seeking to offset %d", r.partition, r.topic, toHumanOffset(offset))
1476 })
1477
1478 if start, err = conn.Seek(offset, SeekAbsolute); err != nil {
1479 conn.Close()
1480 conn = nil
1481 break
1482 }
1483
1484 conn.SetDeadline(time.Time{})
1485 }
1486
1487 return
1488}
1489
1490func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
1491 r.stats.fetches.observe(1)

Callers 1

runMethod · 0.95

Calls 10

readOffsetsMethod · 0.95
withLoggerMethod · 0.95
toHumanOffsetFunction · 0.85
DialLeaderMethod · 0.80
observeDurationMethod · 0.80
PrintfMethod · 0.65
observeMethod · 0.45
CloseMethod · 0.45
SeekMethod · 0.45
SetDeadlineMethod · 0.45

Tested by

no test coverage detected