offsetCommit commits the specified topic partition offsets See http://kafka.apache.org/protocol.html#The_Messages_OffsetCommit
(request offsetCommitRequestV2)
| 452 | // |
| 453 | // See http://kafka.apache.org/protocol.html#The_Messages_OffsetCommit |
| 454 | func (c *Conn) offsetCommit(request offsetCommitRequestV2) (offsetCommitResponseV2, error) { |
| 455 | var response offsetCommitResponseV2 |
| 456 | |
| 457 | err := c.writeOperation( |
| 458 | func(deadline time.Time, id int32) error { |
| 459 | return c.writeRequest(offsetCommit, v2, id, request) |
| 460 | }, |
| 461 | func(deadline time.Time, size int) error { |
| 462 | return expectZeroSize(func() (remain int, err error) { |
| 463 | return (&response).readFrom(&c.rbuf, size) |
| 464 | }()) |
| 465 | }, |
| 466 | ) |
| 467 | if err != nil { |
| 468 | return offsetCommitResponseV2{}, err |
| 469 | } |
| 470 | for _, r := range response.Responses { |
| 471 | for _, pr := range r.PartitionResponses { |
| 472 | if pr.ErrorCode != 0 { |
| 473 | return offsetCommitResponseV2{}, Error(pr.ErrorCode) |
| 474 | } |
| 475 | } |
| 476 | } |
| 477 | |
| 478 | return response, nil |
| 479 | } |
| 480 | |
| 481 | // offsetFetch fetches the offsets for the specified topic partitions. |
| 482 | // -1 indicates that there is no offset saved for the partition. |
nothing calls this directly
no test coverage detected