ReadPartitions returns the list of available partitions for the given list of topics. If the method is called with no topic, it uses the topic configured on the connection. If there are none, the method fetches all partitions of the kafka cluster.
(topics ...string)
| 964 | // connection. If there are none, the method fetches all partitions of the kafka |
| 965 | // cluster. |
| 966 | func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) { |
| 967 | |
| 968 | if len(topics) == 0 { |
| 969 | if len(c.topic) != 0 { |
| 970 | defaultTopics := [...]string{c.topic} |
| 971 | topics = defaultTopics[:] |
| 972 | } else { |
| 973 | // topics needs to be explicitly nil-ed out or the broker will |
| 974 | // interpret it as a request for 0 partitions instead of all. |
| 975 | topics = nil |
| 976 | } |
| 977 | } |
| 978 | metadataVersion, err := c.negotiateVersion(metadata, v1, v6) |
| 979 | if err != nil { |
| 980 | return nil, err |
| 981 | } |
| 982 | |
| 983 | err = c.readOperation( |
| 984 | func(deadline time.Time, id int32) error { |
| 985 | switch metadataVersion { |
| 986 | case v6: |
| 987 | return c.writeRequest(metadata, v6, id, topicMetadataRequestV6{Topics: topics, AllowAutoTopicCreation: true}) |
| 988 | default: |
| 989 | return c.writeRequest(metadata, v1, id, topicMetadataRequestV1(topics)) |
| 990 | } |
| 991 | }, |
| 992 | func(deadline time.Time, size int) error { |
| 993 | partitions, err = c.readPartitionsResponse(metadataVersion, size) |
| 994 | return err |
| 995 | }, |
| 996 | ) |
| 997 | return |
| 998 | } |
| 999 | |
| 1000 | func (c *Conn) readPartitionsResponse(metadataVersion apiVersion, size int) ([]Partition, error) { |
| 1001 | switch metadataVersion { |