(t int64)
| 924 | } |
| 925 | |
| 926 | func (c *Conn) readOffset(t int64) (offset int64, err error) { |
| 927 | err = c.readOperation( |
| 928 | func(deadline time.Time, id int32) error { |
| 929 | return c.wb.writeListOffsetRequestV1(id, c.clientID, c.topic, c.partition, t) |
| 930 | }, |
| 931 | func(deadline time.Time, size int) error { |
| 932 | return expectZeroSize(readArrayWith(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) { |
| 933 | // We skip the topic name because we've made a request for |
| 934 | // a single topic. |
| 935 | size, err := discardString(r, size) |
| 936 | if err != nil { |
| 937 | return size, err |
| 938 | } |
| 939 | |
| 940 | // Reading the array of partitions, there will be only one |
| 941 | // partition which gives the offset we're looking for. |
| 942 | return readArrayWith(r, size, func(r *bufio.Reader, size int) (int, error) { |
| 943 | var p partitionOffsetV1 |
| 944 | size, err := p.readFrom(r, size) |
| 945 | if err != nil { |
| 946 | return size, err |
| 947 | } |
| 948 | if p.ErrorCode != 0 { |
| 949 | return size, Error(p.ErrorCode) |
| 950 | } |
| 951 | offset = p.Offset |
| 952 | return size, nil |
| 953 | }) |
| 954 | })) |
| 955 | }, |
| 956 | ) |
| 957 | return |
| 958 | } |
| 959 | |
| 960 | // ReadPartitions returns the list of available partitions for the given list of |
| 961 | // topics. |
no test coverage detected