MCPcopy
hub / github.com/segmentio/kafka-go / partitions

Method partitions

writer.go:744–769  ·  view source on GitHub ↗
(ctx context.Context, topic string)

Source from the content-addressed store, hash-verified

742}
743
744func (w *Writer) partitions(ctx context.Context, topic string) (int, error) {
745 client := w.client(w.readTimeout())
746 // Here we use the transport directly as an optimization to avoid the
747 // construction of temporary request and response objects made by the
748 // (*Client).Metadata API.
749 //
750 // It is expected that the transport will optimize this request by
751 // caching recent results (the kafka.Transport types does).
752 r, err := client.transport().RoundTrip(ctx, client.Addr, &metadataAPI.Request{
753 TopicNames: []string{topic},
754 AllowAutoTopicCreation: w.AllowAutoTopicCreation,
755 })
756 if err != nil {
757 return 0, err
758 }
759 for _, t := range r.(*metadataAPI.Response).Topics {
760 if t.Name == topic {
761 // This should always hit, unless kafka has a bug.
762 if t.ErrorCode != 0 {
763 return 0, Error(t.ErrorCode)
764 }
765 return len(t.Partitions), nil
766 }
767 }
768 return 0, UnknownTopicOrPartition
769}
770
771func (w *Writer) client(timeout time.Duration) *Client {
772 return &Client{

Callers 1

WriteMessagesMethod · 0.95

Calls 5

clientMethod · 0.95
readTimeoutMethod · 0.95
transportMethod · 0.80
ErrorTypeAlias · 0.70
RoundTripMethod · 0.65

Tested by

no test coverage detected