MCPcopy
hub / github.com/segmentio/kafka-go / offsetCommit

Method offsetCommit

conn.go:454–479  ·  view source on GitHub ↗

offsetCommit commits the specified topic partition offsets See http://kafka.apache.org/protocol.html#The_Messages_OffsetCommit

(request offsetCommitRequestV2)

Source from the content-addressed store, hash-verified

452//
453// See http://kafka.apache.org/protocol.html#The_Messages_OffsetCommit
454func (c *Conn) offsetCommit(request offsetCommitRequestV2) (offsetCommitResponseV2, error) {
455 var response offsetCommitResponseV2
456
457 err := c.writeOperation(
458 func(deadline time.Time, id int32) error {
459 return c.writeRequest(offsetCommit, v2, id, request)
460 },
461 func(deadline time.Time, size int) error {
462 return expectZeroSize(func() (remain int, err error) {
463 return (&response).readFrom(&c.rbuf, size)
464 }())
465 },
466 )
467 if err != nil {
468 return offsetCommitResponseV2{}, err
469 }
470 for _, r := range response.Responses {
471 for _, pr := range r.PartitionResponses {
472 if pr.ErrorCode != 0 {
473 return offsetCommitResponseV2{}, Error(pr.ErrorCode)
474 }
475 }
476 }
477
478 return response, nil
479}
480
481// offsetFetch fetches the offsets for the specified topic partitions.
482// -1 indicates that there is no offset saved for the partition.

Callers

nothing calls this directly

Calls 5

writeOperationMethod · 0.95
writeRequestMethod · 0.95
expectZeroSizeFunction · 0.85
ErrorTypeAlias · 0.70
readFromMethod · 0.45

Tested by

no test coverage detected