MCPcopy
hub / github.com/grafana/tempo / commonKafkaClientOptions

Function commonKafkaClientOptions

pkg/ingest/writer_client.go:102–173  ·  view source on GitHub ↗
(cfg KafkaConfig, metrics *kprom.Metrics, logger log.Logger)

Source from the content-addressed store, hash-verified

100}
101
102func commonKafkaClientOptions(cfg KafkaConfig, metrics *kprom.Metrics, logger log.Logger) []kgo.Opt {
103 opts := []kgo.Opt{
104 kgo.ClientID(cfg.ClientID),
105 kgo.SeedBrokers(cfg.Address),
106 kgo.DialTimeout(cfg.DialTimeout),
107
108 // A cluster metadata update is a request sent to a broker and getting back the map of partitions and
109 // the leader broker for each partition. The cluster metadata can be updated (a) periodically or
110 // (b) when some events occur (e.g. backoff due to errors).
111 //
112 // MetadataMinAge() sets the minimum time between two cluster metadata updates due to events.
113 // MetadataMaxAge() sets how frequently the periodic update should occur.
114 //
115 // It's important to note that the periodic update is also used to discover new brokers (e.g. during a
116 // rolling update or after a scale up). For this reason, it's important to run the update frequently.
117 //
118 // The other two side effects of frequently updating the cluster metadata:
119 // 1. The "metadata" request may be expensive to run on the Kafka backend.
120 // 2. If the backend returns each time a different authoritative owner for a partition, then each time
121 // the cluster metadata is updated the Kafka client will create a new connection for each partition,
122 // leading to a high connections churn rate.
123 //
124 // We currently set min and max age to the same value to have constant load on the Kafka backend: regardless
125 // there are errors or not, the metadata requests frequency doesn't change.
126 kgo.MetadataMinAge(10 * time.Second),
127 kgo.MetadataMaxAge(10 * time.Second),
128
129 kgo.WithLogger(newLogger(logger)),
130
131 kgo.RetryTimeoutFn(func(key int16) time.Duration {
132 if key == ((*kmsg.ListOffsetsRequest)(nil)).Key() {
133 return cfg.LastProducedOffsetRetryTimeout
134 }
135
136 // 30s is the default timeout in the Kafka client.
137 return 30 * time.Second
138 }),
139 }
140
141 // Disable client metrics if explicitly disabled to reduce noise.
142 if cfg.DisableKafkaTelemetry {
143 opts = append(opts, kgo.DisableClientMetrics())
144 }
145
146 if cfg.AutoCreateTopicEnabled {
147 opts = append(opts, kgo.AllowAutoTopicCreation())
148 }
149
150 // SASL plain auth.
151 if cfg.SASLUsername != "" && cfg.SASLPassword.String() != "" {
152 opts = append(opts, kgo.SASL(plain.Plain(func(_ context.Context) (plain.Auth, error) {
153 return plain.Auth{
154 User: cfg.SASLUsername,
155 Pass: cfg.SASLPassword.String(),
156 }, nil
157 })))
158 }
159

Callers 4

NewReaderClientFunction · 0.85
NewWriterClientFunction · 0.85
createTestKafkaClientFunction · 0.85

Calls 3

newLoggerFunction · 0.70
KeyMethod · 0.65
StringMethod · 0.45

Tested by 1

createTestKafkaClientFunction · 0.68