| 100 | } |
| 101 | |
| 102 | func commonKafkaClientOptions(cfg KafkaConfig, metrics *kprom.Metrics, logger log.Logger) []kgo.Opt { |
| 103 | opts := []kgo.Opt{ |
| 104 | kgo.ClientID(cfg.ClientID), |
| 105 | kgo.SeedBrokers(cfg.Address), |
| 106 | kgo.DialTimeout(cfg.DialTimeout), |
| 107 | |
| 108 | // A cluster metadata update is a request sent to a broker and getting back the map of partitions and |
| 109 | // the leader broker for each partition. The cluster metadata can be updated (a) periodically or |
| 110 | // (b) when some events occur (e.g. backoff due to errors). |
| 111 | // |
| 112 | // MetadataMinAge() sets the minimum time between two cluster metadata updates due to events. |
| 113 | // MetadataMaxAge() sets how frequently the periodic update should occur. |
| 114 | // |
| 115 | // It's important to note that the periodic update is also used to discover new brokers (e.g. during a |
| 116 | // rolling update or after a scale up). For this reason, it's important to run the update frequently. |
| 117 | // |
| 118 | // The other two side effects of frequently updating the cluster metadata: |
| 119 | // 1. The "metadata" request may be expensive to run on the Kafka backend. |
| 120 | // 2. If the backend returns each time a different authoritative owner for a partition, then each time |
| 121 | // the cluster metadata is updated the Kafka client will create a new connection for each partition, |
| 122 | // leading to a high connections churn rate. |
| 123 | // |
| 124 | // We currently set min and max age to the same value to have constant load on the Kafka backend: regardless |
| 125 | // there are errors or not, the metadata requests frequency doesn't change. |
| 126 | kgo.MetadataMinAge(10 * time.Second), |
| 127 | kgo.MetadataMaxAge(10 * time.Second), |
| 128 | |
| 129 | kgo.WithLogger(newLogger(logger)), |
| 130 | |
| 131 | kgo.RetryTimeoutFn(func(key int16) time.Duration { |
| 132 | if key == ((*kmsg.ListOffsetsRequest)(nil)).Key() { |
| 133 | return cfg.LastProducedOffsetRetryTimeout |
| 134 | } |
| 135 | |
| 136 | // 30s is the default timeout in the Kafka client. |
| 137 | return 30 * time.Second |
| 138 | }), |
| 139 | } |
| 140 | |
| 141 | // Disable client metrics if explicitly disabled to reduce noise. |
| 142 | if cfg.DisableKafkaTelemetry { |
| 143 | opts = append(opts, kgo.DisableClientMetrics()) |
| 144 | } |
| 145 | |
| 146 | if cfg.AutoCreateTopicEnabled { |
| 147 | opts = append(opts, kgo.AllowAutoTopicCreation()) |
| 148 | } |
| 149 | |
| 150 | // SASL plain auth. |
| 151 | if cfg.SASLUsername != "" && cfg.SASLPassword.String() != "" { |
| 152 | opts = append(opts, kgo.SASL(plain.Plain(func(_ context.Context) (plain.Auth, error) { |
| 153 | return plain.Auth{ |
| 154 | User: cfg.SASLUsername, |
| 155 | Pass: cfg.SASLPassword.String(), |
| 156 | }, nil |
| 157 | }))) |
| 158 | } |
| 159 | |