MCPcopy
hub / github.com/segmentio/kafka-go / ReadPartitions

Method ReadPartitions

conn.go:966–998  ·  view source on GitHub ↗

ReadPartitions returns the list of available partitions for the given list of topics. If the method is called with no topic, it uses the topic configured on the connection. If there are none, the method fetches all partitions of the kafka cluster.

(topics ...string)

Source from the content-addressed store, hash-verified

964// connection. If there are none, the method fetches all partitions of the kafka
965// cluster.
966func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) {
967
968 if len(topics) == 0 {
969 if len(c.topic) != 0 {
970 defaultTopics := [...]string{c.topic}
971 topics = defaultTopics[:]
972 } else {
973 // topics needs to be explicitly nil-ed out or the broker will
974 // interpret it as a request for 0 partitions instead of all.
975 topics = nil
976 }
977 }
978 metadataVersion, err := c.negotiateVersion(metadata, v1, v6)
979 if err != nil {
980 return nil, err
981 }
982
983 err = c.readOperation(
984 func(deadline time.Time, id int32) error {
985 switch metadataVersion {
986 case v6:
987 return c.writeRequest(metadata, v6, id, topicMetadataRequestV6{Topics: topics, AllowAutoTopicCreation: true})
988 default:
989 return c.writeRequest(metadata, v1, id, topicMetadataRequestV1(topics))
990 }
991 },
992 func(deadline time.Time, size int) error {
993 partitions, err = c.readPartitionsResponse(metadataVersion, size)
994 return err
995 },
996 )
997 return
998}
999
1000func (c *Conn) readPartitionsResponse(metadataVersion apiVersion, size int) ([]Partition, error) {
1001 switch metadataVersion {

Callers 6

testDeleteTopicsFunction · 0.45
LookupPartitionMethod · 0.45
LookupPartitionsMethod · 0.45
readPartitionsMethod · 0.45

Calls 5

negotiateVersionMethod · 0.95
readOperationMethod · 0.95
writeRequestMethod · 0.95
topicMetadataRequestV1TypeAlias · 0.85

Tested by 3

testDeleteTopicsFunction · 0.36