CommitMessages commits the list of messages passed as argument. The program may pass a context to asynchronously cancel the commit operation when it was configured to be blocking. Because kafka consumer groups track a single offset per partition, the highest message offset passed to CommitMessages
(ctx context.Context, msgs ...Message)
| 876 | // topic/partition it belonged to forward, effectively committing all previous |
| 877 | // messages in the partition. |
| 878 | func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error { |
| 879 | if !r.useConsumerGroup() { |
| 880 | return errOnlyAvailableWithGroup |
| 881 | } |
| 882 | |
| 883 | var errch <-chan error |
| 884 | creq := commitRequest{ |
| 885 | commits: makeCommits(msgs...), |
| 886 | } |
| 887 | |
| 888 | if r.useSyncCommits() { |
| 889 | ch := make(chan error, 1) |
| 890 | errch, creq.errch = ch, ch |
| 891 | } |
| 892 | |
| 893 | select { |
| 894 | case r.commits <- creq: |
| 895 | case <-ctx.Done(): |
| 896 | return ctx.Err() |
| 897 | case <-r.stctx.Done(): |
| 898 | // This context is used to ensure we don't allow commits after the |
| 899 | // reader was closed. |
| 900 | return io.ErrClosedPipe |
| 901 | } |
| 902 | |
| 903 | if !r.useSyncCommits() { |
| 904 | return nil |
| 905 | } |
| 906 | |
| 907 | select { |
| 908 | case <-ctx.Done(): |
| 909 | return ctx.Err() |
| 910 | case err := <-errch: |
| 911 | return err |
| 912 | } |
| 913 | } |
| 914 | |
| 915 | // ReadLag returns the current lag of the reader by fetching the last offset of |
| 916 | // the topic and partition and computing the difference between that value and |