deleteTopics deletes the specified topics. See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics
(request deleteTopicsRequest)
| 163 | // |
| 164 | // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics |
| 165 | func (c *Conn) deleteTopics(request deleteTopicsRequest) (deleteTopicsResponse, error) { |
| 166 | version, err := c.negotiateVersion(deleteTopics, v0, v1) |
| 167 | if err != nil { |
| 168 | return deleteTopicsResponse{}, err |
| 169 | } |
| 170 | |
| 171 | response := deleteTopicsResponse{ |
| 172 | v: version, |
| 173 | } |
| 174 | |
| 175 | err = c.writeOperation( |
| 176 | func(deadline time.Time, id int32) error { |
| 177 | if request.Timeout == 0 { |
| 178 | now := time.Now() |
| 179 | deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) |
| 180 | request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) |
| 181 | } |
| 182 | return c.writeRequest(deleteTopics, version, id, request) |
| 183 | }, |
| 184 | func(deadline time.Time, size int) error { |
| 185 | return expectZeroSize(func() (remain int, err error) { |
| 186 | return (&response).readFrom(&c.rbuf, size) |
| 187 | }()) |
| 188 | }, |
| 189 | ) |
| 190 | if err != nil { |
| 191 | return deleteTopicsResponse{}, err |
| 192 | } |
| 193 | for _, c := range response.TopicErrorCodes { |
| 194 | if c.ErrorCode != 0 { |
| 195 | return response, Error(c.ErrorCode) |
| 196 | } |
| 197 | } |
| 198 | return response, nil |
| 199 | } |
no test coverage detected