DeleteTopics sends a topic deletion request to a kafka broker and returns the response.
(ctx context.Context, req *DeleteTopicsRequest)
| 40 | // DeleteTopics sends a topic deletion request to a kafka broker and returns the |
| 41 | // response. |
| 42 | func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*DeleteTopicsResponse, error) { |
| 43 | m, err := c.roundTrip(ctx, req.Addr, &deletetopics.Request{ |
| 44 | TopicNames: req.Topics, |
| 45 | TimeoutMs: c.timeoutMs(ctx, defaultDeleteTopicsTimeout), |
| 46 | }) |
| 47 | |
| 48 | if err != nil { |
| 49 | return nil, fmt.Errorf("kafka.(*Client).DeleteTopics: %w", err) |
| 50 | } |
| 51 | |
| 52 | res := m.(*deletetopics.Response) |
| 53 | ret := &DeleteTopicsResponse{ |
| 54 | Throttle: makeDuration(res.ThrottleTimeMs), |
| 55 | Errors: make(map[string]error, len(res.Responses)), |
| 56 | } |
| 57 | |
| 58 | for _, t := range res.Responses { |
| 59 | if t.ErrorCode == 0 { |
| 60 | ret.Errors[t.Name] = nil |
| 61 | } else { |
| 62 | ret.Errors[t.Name] = Error(t.ErrorCode) |
| 63 | } |
| 64 | } |
| 65 | |
| 66 | return ret, nil |
| 67 | } |
| 68 | |
| 69 | // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics |
| 70 | type deleteTopicsRequest struct { |
nothing calls this directly
no test coverage detected