MCPcopy
hub / github.com/segmentio/kafka-go / OffsetDelete

Method OffsetDelete

offsetdelete.go:59–106  ·  view source on GitHub ↗

OffsetDelete sends a delete offset request to a kafka broker and returns the response.

(ctx context.Context, req *OffsetDeleteRequest)

Source from the content-addressed store, hash-verified

57// OffsetDelete sends a delete offset request to a kafka broker and returns the
58// response.
59func (c *Client) OffsetDelete(ctx context.Context, req *OffsetDeleteRequest) (*OffsetDeleteResponse, error) {
60 topics := make([]offsetdelete.RequestTopic, 0, len(req.Topics))
61
62 for topicName, partitionIndexes := range req.Topics {
63 partitions := make([]offsetdelete.RequestPartition, len(partitionIndexes))
64
65 for i, c := range partitionIndexes {
66 partitions[i] = offsetdelete.RequestPartition{
67 PartitionIndex: int32(c),
68 }
69 }
70
71 topics = append(topics, offsetdelete.RequestTopic{
72 Name: topicName,
73 Partitions: partitions,
74 })
75 }
76
77 m, err := c.roundTrip(ctx, req.Addr, &offsetdelete.Request{
78 GroupID: req.GroupID,
79 Topics: topics,
80 })
81 if err != nil {
82 return nil, fmt.Errorf("kafka.(*Client).OffsetDelete: %w", err)
83 }
84 r := m.(*offsetdelete.Response)
85
86 res := &OffsetDeleteResponse{
87 Error: makeError(r.ErrorCode, ""),
88 Throttle: makeDuration(r.ThrottleTimeMs),
89 Topics: make(map[string][]OffsetDeletePartition, len(r.Topics)),
90 }
91
92 for _, topic := range r.Topics {
93 partitions := make([]OffsetDeletePartition, len(topic.Partitions))
94
95 for i, p := range topic.Partitions {
96 partitions[i] = OffsetDeletePartition{
97 Partition: int(p.PartitionIndex),
98 Error: makeError(p.ErrorCode, ""),
99 }
100 }
101
102 res.Topics[topic.Name] = partitions
103 }
104
105 return res, nil
106}

Callers 1

TestClientDeleteOffsetFunction · 0.80

Calls 3

roundTripMethod · 0.95
makeErrorFunction · 0.85
makeDurationFunction · 0.85

Tested by 1

TestClientDeleteOffsetFunction · 0.64