Brokers retrieve the broker list from the Kafka metadata.
()
| 277 | |
| 278 | // Brokers retrieve the broker list from the Kafka metadata. |
| 279 | func (c *Conn) Brokers() ([]Broker, error) { |
| 280 | var brokers []Broker |
| 281 | err := c.readOperation( |
| 282 | func(deadline time.Time, id int32) error { |
| 283 | return c.writeRequest(metadata, v1, id, topicMetadataRequestV1([]string{})) |
| 284 | }, |
| 285 | func(deadline time.Time, size int) error { |
| 286 | var res metadataResponseV1 |
| 287 | |
| 288 | if err := c.readResponse(size, &res); err != nil { |
| 289 | return err |
| 290 | } |
| 291 | |
| 292 | brokers = make([]Broker, len(res.Brokers)) |
| 293 | for i, brokerMeta := range res.Brokers { |
| 294 | brokers[i] = Broker{ |
| 295 | ID: int(brokerMeta.NodeID), |
| 296 | Port: int(brokerMeta.Port), |
| 297 | Host: brokerMeta.Host, |
| 298 | Rack: brokerMeta.Rack, |
| 299 | } |
| 300 | } |
| 301 | return nil |
| 302 | }, |
| 303 | ) |
| 304 | return brokers, err |
| 305 | } |
| 306 | |
| 307 | // DeleteTopics deletes the specified topics. |
| 308 | func (c *Conn) DeleteTopics(topics ...string) error { |