(ctx context.Context, topic string)
| 742 | } |
| 743 | |
| 744 | func (w *Writer) partitions(ctx context.Context, topic string) (int, error) { |
| 745 | client := w.client(w.readTimeout()) |
| 746 | // Here we use the transport directly as an optimization to avoid the |
| 747 | // construction of temporary request and response objects made by the |
| 748 | // (*Client).Metadata API. |
| 749 | // |
| 750 | // It is expected that the transport will optimize this request by |
| 751 | // caching recent results (the kafka.Transport types does). |
| 752 | r, err := client.transport().RoundTrip(ctx, client.Addr, &metadataAPI.Request{ |
| 753 | TopicNames: []string{topic}, |
| 754 | AllowAutoTopicCreation: w.AllowAutoTopicCreation, |
| 755 | }) |
| 756 | if err != nil { |
| 757 | return 0, err |
| 758 | } |
| 759 | for _, t := range r.(*metadataAPI.Response).Topics { |
| 760 | if t.Name == topic { |
| 761 | // This should always hit, unless kafka has a bug. |
| 762 | if t.ErrorCode != 0 { |
| 763 | return 0, Error(t.ErrorCode) |
| 764 | } |
| 765 | return len(t.Partitions), nil |
| 766 | } |
| 767 | } |
| 768 | return 0, UnknownTopicOrPartition |
| 769 | } |
| 770 | |
| 771 | func (w *Writer) client(timeout time.Duration) *Client { |
| 772 | return &Client{ |
no test coverage detected