MCPcopy
hub / github.com/segmentio/kafka-go / readOffset

Method readOffset

conn.go:926–958  ·  view source on GitHub ↗
(t int64)

Source from the content-addressed store, hash-verified

924}
925
926func (c *Conn) readOffset(t int64) (offset int64, err error) {
927 err = c.readOperation(
928 func(deadline time.Time, id int32) error {
929 return c.wb.writeListOffsetRequestV1(id, c.clientID, c.topic, c.partition, t)
930 },
931 func(deadline time.Time, size int) error {
932 return expectZeroSize(readArrayWith(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
933 // We skip the topic name because we've made a request for
934 // a single topic.
935 size, err := discardString(r, size)
936 if err != nil {
937 return size, err
938 }
939
940 // Reading the array of partitions, there will be only one
941 // partition which gives the offset we're looking for.
942 return readArrayWith(r, size, func(r *bufio.Reader, size int) (int, error) {
943 var p partitionOffsetV1
944 size, err := p.readFrom(r, size)
945 if err != nil {
946 return size, err
947 }
948 if p.ErrorCode != 0 {
949 return size, Error(p.ErrorCode)
950 }
951 offset = p.Offset
952 return size, nil
953 })
954 }))
955 },
956 )
957 return
958}
959
960// ReadPartitions returns the list of available partitions for the given list of
961// topics.

Callers 3

ReadOffsetMethod · 0.95
ReadFirstOffsetMethod · 0.95
ReadLastOffsetMethod · 0.95

Calls 7

readOperationMethod · 0.95
readFromMethod · 0.95
expectZeroSizeFunction · 0.85
readArrayWithFunction · 0.85
discardStringFunction · 0.85
ErrorTypeAlias · 0.70

Tested by

no test coverage detected