NewWriterClient returns the kgo.Client that should be used by the Writer. The input prometheus.Registerer must be wrapped with a prefix (the names of metrics registered don't have a prefix).
(kafkaCfg KafkaConfig, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer)
| 27 | // The input prometheus.Registerer must be wrapped with a prefix (the names of metrics |
| 28 | // registered don't have a prefix). |
| 29 | func NewWriterClient(kafkaCfg KafkaConfig, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer) (*kgo.Client, error) { |
| 30 | // Do not export the client ID, because we use it to specify options to the backend. |
| 31 | metrics := kprom.NewMetrics( |
| 32 | "", // No prefix. We expect the input prometheus.Registered to be wrapped with a prefix. |
| 33 | kprom.Registerer(reg), |
| 34 | kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes)) |
| 35 | |
| 36 | opts := append( |
| 37 | commonKafkaClientOptions(kafkaCfg, metrics, logger), |
| 38 | kgo.RequiredAcks(kgo.AllISRAcks()), |
| 39 | kgo.DefaultProduceTopic(kafkaCfg.Topic), |
| 40 | |
| 41 | // We set the partition field in each record. |
| 42 | kgo.RecordPartitioner(kgo.ManualPartitioner()), |
| 43 | |
| 44 | // Set the upper bounds the size of a record batch. |
| 45 | kgo.ProducerBatchMaxBytes(producerBatchMaxBytes), |
| 46 | |
| 47 | // By default, the Kafka client allows 1 Produce in-flight request per broker. Disabling write idempotency |
| 48 | // (which we don't need), we can increase the max number of in-flight Produce requests per broker. A higher |
| 49 | // number of in-flight requests, in addition to short buffering ("linger") in client side before firing the |
| 50 | // next Produce request allows us to reduce the end-to-end latency. |
| 51 | // |
| 52 | // The result of the multiplication of producer linger and max in-flight requests should match the maximum |
| 53 | // Produce latency expected by the Kafka backend in a steady state. For example, 50ms * 20 requests = 1s, |
| 54 | // which means the Kafka client will keep issuing a Produce request every 50ms as far as the Kafka backend |
| 55 | // doesn't take longer than 1s to process them (if it takes longer, the client will buffer data and stop |
| 56 | // issuing new Produce requests until some previous ones complete). |
| 57 | kgo.DisableIdempotentWrite(), |
| 58 | kgo.ProducerLinger(50*time.Millisecond), |
| 59 | kgo.MaxProduceRequestsInflightPerBroker(maxInflightProduceRequests), |
| 60 | |
| 61 | // Unlimited number of Produce retries but a deadline on the max time a record can take to be delivered. |
| 62 | // With the default config it would retry infinitely. |
| 63 | // |
| 64 | // Details of the involved timeouts: |
| 65 | // - RecordDeliveryTimeout: how long a Kafka client Produce() call can take for a given record. The overhead |
| 66 | // timeout is NOT applied. |
| 67 | // - ProduceRequestTimeout: how long to wait for the response to the Produce request (the Kafka protocol message) |
| 68 | // after being sent on the network. The actual timeout is increased by the configured overhead. |
| 69 | // |
| 70 | // When a Produce request to Kafka fail, the client will retry up until the RecordDeliveryTimeout is reached. |
| 71 | // Once the timeout is reached, the Produce request will fail and all other buffered requests in the client |
| 72 | // (for the same partition) will fail too. See kgo.RecordDeliveryTimeout() documentation for more info. |
| 73 | kgo.RecordRetries(math.MaxInt), |
| 74 | kgo.RecordDeliveryTimeout(kafkaCfg.WriteTimeout), |
| 75 | kgo.ProduceRequestTimeout(kafkaCfg.WriteTimeout), |
| 76 | kgo.RequestTimeoutOverhead(writerRequestTimeoutOverhead), |
| 77 | |
| 78 | // Unlimited number of buffered records because we limit on bytes in Writer. The reason why we don't use |
| 79 | // kgo.MaxBufferedBytes() is because it suffers a deadlock issue: |
| 80 | // https://github.com/twmb/franz-go/issues/777 |
| 81 | kgo.MaxBufferedRecords(math.MaxInt), // Use a high value to set it as unlimited, because the client doesn't support "0 as unlimited". |
| 82 | kgo.MaxBufferedBytes(0), |
| 83 | ) |
| 84 | if kafkaCfg.AutoCreateTopicEnabled { |
| 85 | kafkaCfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(logger) |
| 86 | } |
no test coverage detected