MCPcopy
hub / github.com/segmentio/kafka-go / Metadata

Method Metadata

metadata.go:42–108  ·  view source on GitHub ↗

Metadata sends a metadata request to a kafka broker and returns the response.

(ctx context.Context, req *MetadataRequest)

Source from the content-addressed store, hash-verified

40
41// Metadata sends a metadata request to a kafka broker and returns the response.
42func (c *Client) Metadata(ctx context.Context, req *MetadataRequest) (*MetadataResponse, error) {
43 m, err := c.roundTrip(ctx, req.Addr, &metadataAPI.Request{
44 TopicNames: req.Topics,
45 })
46
47 if err != nil {
48 return nil, fmt.Errorf("kafka.(*Client).Metadata: %w", err)
49 }
50
51 res := m.(*metadataAPI.Response)
52 ret := &MetadataResponse{
53 Throttle: makeDuration(res.ThrottleTimeMs),
54 Brokers: make([]Broker, len(res.Brokers)),
55 Topics: make([]Topic, len(res.Topics)),
56 ClusterID: res.ClusterID,
57 }
58
59 brokers := make(map[int32]Broker, len(res.Brokers))
60
61 for i, b := range res.Brokers {
62 broker := Broker{
63 Host: b.Host,
64 Port: int(b.Port),
65 ID: int(b.NodeID),
66 Rack: b.Rack,
67 }
68
69 ret.Brokers[i] = broker
70 brokers[b.NodeID] = broker
71
72 if b.NodeID == res.ControllerID {
73 ret.Controller = broker
74 }
75 }
76
77 for i, t := range res.Topics {
78 ret.Topics[i] = Topic{
79 Name: t.Name,
80 Internal: t.IsInternal,
81 Partitions: make([]Partition, len(t.Partitions)),
82 Error: makeError(t.ErrorCode, ""),
83 }
84
85 for j, p := range t.Partitions {
86 partition := Partition{
87 Topic: t.Name,
88 ID: int(p.PartitionIndex),
89 Leader: brokers[p.LeaderID],
90 Replicas: make([]Broker, len(p.ReplicaNodes)),
91 Isr: make([]Broker, len(p.IsrNodes)),
92 Error: makeError(p.ErrorCode, ""),
93 }
94
95 for i, id := range p.ReplicaNodes {
96 partition.Replicas[i] = brokers[id]
97 }
98
99 for i, id := range p.IsrNodes {

Callers 6

waitForTopicFunction · 0.95
ConsumerOffsetsMethod · 0.95
TestClientMetadataFunction · 0.80
TestClientLeaveGroupFunction · 0.80
TestClientSyncGroupFunction · 0.80
ListFunction · 0.80

Calls 3

roundTripMethod · 0.95
makeDurationFunction · 0.85
makeErrorFunction · 0.85

Tested by 4

waitForTopicFunction · 0.76
TestClientMetadataFunction · 0.64
TestClientLeaveGroupFunction · 0.64
TestClientSyncGroupFunction · 0.64