readOffset gets the latest offset for the given topic/partition.
(topic string, partition int)
| 405 | |
| 406 | // readOffset gets the latest offset for the given topic/partition. |
| 407 | func readOffset(topic string, partition int) (offset int64, err error) { |
| 408 | var conn *Conn |
| 409 | |
| 410 | ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
| 411 | defer cancel() |
| 412 | |
| 413 | if conn, err = DialLeader(ctx, "tcp", "localhost:9092", topic, partition); err != nil { |
| 414 | err = fmt.Errorf("readOffset, DialLeader: %w", err) |
| 415 | return |
| 416 | } |
| 417 | defer conn.Close() |
| 418 | |
| 419 | offset, err = conn.ReadLastOffset() |
| 420 | if err != nil { |
| 421 | err = fmt.Errorf("readOffset, conn.ReadLastOffset: %w", err) |
| 422 | } |
| 423 | return |
| 424 | } |
| 425 | |
| 426 | func readPartition(topic string, partition int, offset int64) (msgs []Message, err error) { |
| 427 | var conn *Conn |
no test coverage detected