MCPcopy
hub / github.com/segmentio/kafka-go / CommitOffsets

Method CommitOffsets

consumergroup.go:418–459  ·  view source on GitHub ↗

CommitOffsets commits the provided topic+partition+offset combos to the consumer group coordinator. This can be used to reset the consumer to explicit offsets.

(offsets map[string]map[int]int64)

Source from the content-addressed store, hash-verified

416// consumer group coordinator. This can be used to reset the consumer to
417// explicit offsets.
418func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {
419 if len(offsets) == 0 {
420 return nil
421 }
422
423 topics := make([]offsetCommitRequestV2Topic, 0, len(offsets))
424 for topic, partitions := range offsets {
425 t := offsetCommitRequestV2Topic{Topic: topic}
426 for partition, offset := range partitions {
427 t.Partitions = append(t.Partitions, offsetCommitRequestV2Partition{
428 Partition: int32(partition),
429 Offset: offset,
430 })
431 }
432 topics = append(topics, t)
433 }
434
435 request := offsetCommitRequestV2{
436 GroupID: g.GroupID,
437 GenerationID: g.ID,
438 MemberID: g.MemberID,
439 RetentionTime: g.retentionMillis,
440 Topics: topics,
441 }
442
443 _, err := g.conn.offsetCommit(request)
444 if err == nil {
445 // if logging is enabled, print out the partitions that were committed.
446 g.log(func(l Logger) {
447 var report []string
448 for _, t := range request.Topics {
449 report = append(report, fmt.Sprintf("\ttopic: %s", t.Topic))
450 for _, p := range t.Partitions {
451 report = append(report, fmt.Sprintf("\t\tpartition %d: %d", p.Partition, p.Offset))
452 }
453 }
454 l.Printf("committed offsets for group %s: \n%s", g.GroupID, strings.Join(report, "\n"))
455 })
456 }
457
458 return err
459}
460
461// heartbeatLoop checks in with the consumer group coordinator at the provided
462// interval. It exits if it ever encounters an error, which would signal the

Calls 3

logMethod · 0.80
offsetCommitMethod · 0.65
PrintfMethod · 0.65