MCPcopy
hub / github.com/segmentio/kafka-go / ReadLag

Method ReadLag

reader.go:926–986  ·  view source on GitHub ↗

ReadLag returns the current lag of the reader by fetching the last offset of the topic and partition and computing the difference between that value and the offset of the last message returned by ReadMessage. This method is intended to be used in cases where a program may be unable to call ReadMess

(ctx context.Context)

Source from the content-addressed store, hash-verified

924// The function returns a lag of zero when the reader's current offset is
925// negative.
926func (r *Reader) ReadLag(ctx context.Context) (lag int64, err error) {
927 if r.useConsumerGroup() {
928 return 0, errNotAvailableWithGroup
929 }
930
931 type offsets struct {
932 first int64
933 last int64
934 }
935
936 offch := make(chan offsets, 1)
937 errch := make(chan error, 1)
938
939 go func() {
940 var off offsets
941 var err error
942
943 for _, broker := range r.config.Brokers {
944 var conn *Conn
945
946 if conn, err = r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition); err != nil {
947 continue
948 }
949
950 deadline, _ := ctx.Deadline()
951 conn.SetDeadline(deadline)
952
953 off.first, off.last, err = conn.ReadOffsets()
954 conn.Close()
955
956 if err == nil {
957 break
958 }
959 }
960
961 if err != nil {
962 errch <- err
963 } else {
964 offch <- off
965 }
966 }()
967
968 select {
969 case off := <-offch:
970 switch cur := r.Offset(); {
971 case cur == FirstOffset:
972 lag = off.last - off.first
973
974 case cur == LastOffset:
975 lag = 0
976
977 default:
978 lag = off.last - cur
979 }
980 case err = <-errch:
981 case <-ctx.Done():
982 err = ctx.Err()
983 }

Callers 3

readLagMethod · 0.95
testReaderReadLagFunction · 0.80

Calls 9

useConsumerGroupMethod · 0.95
SetDeadlineMethod · 0.95
ReadOffsetsMethod · 0.95
CloseMethod · 0.95
OffsetMethod · 0.95
DialLeaderMethod · 0.80
DeadlineMethod · 0.80
DoneMethod · 0.80
ErrMethod · 0.45