Metadata sends a metadata request to a kafka broker and returns the response.
(ctx context.Context, req *MetadataRequest)
| 40 | |
| 41 | // Metadata sends a metadata request to a kafka broker and returns the response. |
| 42 | func (c *Client) Metadata(ctx context.Context, req *MetadataRequest) (*MetadataResponse, error) { |
| 43 | m, err := c.roundTrip(ctx, req.Addr, &metadataAPI.Request{ |
| 44 | TopicNames: req.Topics, |
| 45 | }) |
| 46 | |
| 47 | if err != nil { |
| 48 | return nil, fmt.Errorf("kafka.(*Client).Metadata: %w", err) |
| 49 | } |
| 50 | |
| 51 | res := m.(*metadataAPI.Response) |
| 52 | ret := &MetadataResponse{ |
| 53 | Throttle: makeDuration(res.ThrottleTimeMs), |
| 54 | Brokers: make([]Broker, len(res.Brokers)), |
| 55 | Topics: make([]Topic, len(res.Topics)), |
| 56 | ClusterID: res.ClusterID, |
| 57 | } |
| 58 | |
| 59 | brokers := make(map[int32]Broker, len(res.Brokers)) |
| 60 | |
| 61 | for i, b := range res.Brokers { |
| 62 | broker := Broker{ |
| 63 | Host: b.Host, |
| 64 | Port: int(b.Port), |
| 65 | ID: int(b.NodeID), |
| 66 | Rack: b.Rack, |
| 67 | } |
| 68 | |
| 69 | ret.Brokers[i] = broker |
| 70 | brokers[b.NodeID] = broker |
| 71 | |
| 72 | if b.NodeID == res.ControllerID { |
| 73 | ret.Controller = broker |
| 74 | } |
| 75 | } |
| 76 | |
| 77 | for i, t := range res.Topics { |
| 78 | ret.Topics[i] = Topic{ |
| 79 | Name: t.Name, |
| 80 | Internal: t.IsInternal, |
| 81 | Partitions: make([]Partition, len(t.Partitions)), |
| 82 | Error: makeError(t.ErrorCode, ""), |
| 83 | } |
| 84 | |
| 85 | for j, p := range t.Partitions { |
| 86 | partition := Partition{ |
| 87 | Topic: t.Name, |
| 88 | ID: int(p.PartitionIndex), |
| 89 | Leader: brokers[p.LeaderID], |
| 90 | Replicas: make([]Broker, len(p.ReplicaNodes)), |
| 91 | Isr: make([]Broker, len(p.IsrNodes)), |
| 92 | Error: makeError(p.ErrorCode, ""), |
| 93 | } |
| 94 | |
| 95 | for i, id := range p.ReplicaNodes { |
| 96 | partition.Replicas[i] = brokers[id] |
| 97 | } |
| 98 | |
| 99 | for i, id := range p.IsrNodes { |