OffsetDelete sends a delete offset request to a kafka broker and returns the response.
(ctx context.Context, req *OffsetDeleteRequest)
| 57 | // OffsetDelete sends a delete offset request to a kafka broker and returns the |
| 58 | // response. |
| 59 | func (c *Client) OffsetDelete(ctx context.Context, req *OffsetDeleteRequest) (*OffsetDeleteResponse, error) { |
| 60 | topics := make([]offsetdelete.RequestTopic, 0, len(req.Topics)) |
| 61 | |
| 62 | for topicName, partitionIndexes := range req.Topics { |
| 63 | partitions := make([]offsetdelete.RequestPartition, len(partitionIndexes)) |
| 64 | |
| 65 | for i, c := range partitionIndexes { |
| 66 | partitions[i] = offsetdelete.RequestPartition{ |
| 67 | PartitionIndex: int32(c), |
| 68 | } |
| 69 | } |
| 70 | |
| 71 | topics = append(topics, offsetdelete.RequestTopic{ |
| 72 | Name: topicName, |
| 73 | Partitions: partitions, |
| 74 | }) |
| 75 | } |
| 76 | |
| 77 | m, err := c.roundTrip(ctx, req.Addr, &offsetdelete.Request{ |
| 78 | GroupID: req.GroupID, |
| 79 | Topics: topics, |
| 80 | }) |
| 81 | if err != nil { |
| 82 | return nil, fmt.Errorf("kafka.(*Client).OffsetDelete: %w", err) |
| 83 | } |
| 84 | r := m.(*offsetdelete.Response) |
| 85 | |
| 86 | res := &OffsetDeleteResponse{ |
| 87 | Error: makeError(r.ErrorCode, ""), |
| 88 | Throttle: makeDuration(r.ThrottleTimeMs), |
| 89 | Topics: make(map[string][]OffsetDeletePartition, len(r.Topics)), |
| 90 | } |
| 91 | |
| 92 | for _, topic := range r.Topics { |
| 93 | partitions := make([]OffsetDeletePartition, len(topic.Partitions)) |
| 94 | |
| 95 | for i, p := range topic.Partitions { |
| 96 | partitions[i] = OffsetDeletePartition{ |
| 97 | Partition: int(p.PartitionIndex), |
| 98 | Error: makeError(p.ErrorCode, ""), |
| 99 | } |
| 100 | } |
| 101 | |
| 102 | res.Topics[topic.Name] = partitions |
| 103 | } |
| 104 | |
| 105 | return res, nil |
| 106 | } |