MCPcopy
hub / github.com/segmentio/kafka-go / CommitMessages

Method CommitMessages

reader.go:878–913  ·  view source on GitHub ↗

CommitMessages commits the list of messages passed as argument. The program may pass a context to asynchronously cancel the commit operation when it was configured to be blocking. Because kafka consumer groups track a single offset per partition, the highest message offset passed to CommitMessages

(ctx context.Context, msgs ...Message)

Source from the content-addressed store, hash-verified

876// topic/partition it belonged to forward, effectively committing all previous
877// messages in the partition.
878func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
879 if !r.useConsumerGroup() {
880 return errOnlyAvailableWithGroup
881 }
882
883 var errch <-chan error
884 creq := commitRequest{
885 commits: makeCommits(msgs...),
886 }
887
888 if r.useSyncCommits() {
889 ch := make(chan error, 1)
890 errch, creq.errch = ch, ch
891 }
892
893 select {
894 case r.commits <- creq:
895 case <-ctx.Done():
896 return ctx.Err()
897 case <-r.stctx.Done():
898 // This context is used to ensure we don't allow commits after the
899 // reader was closed.
900 return io.ErrClosedPipe
901 }
902
903 if !r.useSyncCommits() {
904 return nil
905 }
906
907 select {
908 case <-ctx.Done():
909 return ctx.Err()
910 case err := <-errch:
911 return err
912 }
913}
914
915// ReadLag returns the current lag of the reader by fetching the last offset of
916// the topic and partition and computing the difference between that value and

Calls 5

useConsumerGroupMethod · 0.95
useSyncCommitsMethod · 0.95
makeCommitsFunction · 0.85
DoneMethod · 0.80
ErrMethod · 0.45