ReadLag returns the current lag of the reader by fetching the last offset of the topic and partition and computing the difference between that value and the offset of the last message returned by ReadMessage. This method is intended to be used in cases where a program may be unable to call ReadMess
(ctx context.Context)
| 924 | // The function returns a lag of zero when the reader's current offset is |
| 925 | // negative. |
| 926 | func (r *Reader) ReadLag(ctx context.Context) (lag int64, err error) { |
| 927 | if r.useConsumerGroup() { |
| 928 | return 0, errNotAvailableWithGroup |
| 929 | } |
| 930 | |
| 931 | type offsets struct { |
| 932 | first int64 |
| 933 | last int64 |
| 934 | } |
| 935 | |
| 936 | offch := make(chan offsets, 1) |
| 937 | errch := make(chan error, 1) |
| 938 | |
| 939 | go func() { |
| 940 | var off offsets |
| 941 | var err error |
| 942 | |
| 943 | for _, broker := range r.config.Brokers { |
| 944 | var conn *Conn |
| 945 | |
| 946 | if conn, err = r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition); err != nil { |
| 947 | continue |
| 948 | } |
| 949 | |
| 950 | deadline, _ := ctx.Deadline() |
| 951 | conn.SetDeadline(deadline) |
| 952 | |
| 953 | off.first, off.last, err = conn.ReadOffsets() |
| 954 | conn.Close() |
| 955 | |
| 956 | if err == nil { |
| 957 | break |
| 958 | } |
| 959 | } |
| 960 | |
| 961 | if err != nil { |
| 962 | errch <- err |
| 963 | } else { |
| 964 | offch <- off |
| 965 | } |
| 966 | }() |
| 967 | |
| 968 | select { |
| 969 | case off := <-offch: |
| 970 | switch cur := r.Offset(); { |
| 971 | case cur == FirstOffset: |
| 972 | lag = off.last - off.first |
| 973 | |
| 974 | case cur == LastOffset: |
| 975 | lag = 0 |
| 976 | |
| 977 | default: |
| 978 | lag = off.last - cur |
| 979 | } |
| 980 | case err = <-errch: |
| 981 | case <-ctx.Done(): |
| 982 | err = ctx.Err() |
| 983 | } |