MCPcopy
hub / github.com/grafana/tempo / NewWriterClient

Function NewWriterClient

pkg/ingest/writer_client.go:29–88  ·  view source on GitHub ↗

NewWriterClient returns the kgo.Client that should be used by the Writer. The input prometheus.Registerer must be wrapped with a prefix (the names of metrics registered don't have a prefix).

(kafkaCfg KafkaConfig, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer)

Source from the content-addressed store, hash-verified

27// The input prometheus.Registerer must be wrapped with a prefix (the names of metrics
28// registered don't have a prefix).
29func NewWriterClient(kafkaCfg KafkaConfig, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer) (*kgo.Client, error) {
30 // Do not export the client ID, because we use it to specify options to the backend.
31 metrics := kprom.NewMetrics(
32 "", // No prefix. We expect the input prometheus.Registered to be wrapped with a prefix.
33 kprom.Registerer(reg),
34 kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes))
35
36 opts := append(
37 commonKafkaClientOptions(kafkaCfg, metrics, logger),
38 kgo.RequiredAcks(kgo.AllISRAcks()),
39 kgo.DefaultProduceTopic(kafkaCfg.Topic),
40
41 // We set the partition field in each record.
42 kgo.RecordPartitioner(kgo.ManualPartitioner()),
43
44 // Set the upper bounds the size of a record batch.
45 kgo.ProducerBatchMaxBytes(producerBatchMaxBytes),
46
47 // By default, the Kafka client allows 1 Produce in-flight request per broker. Disabling write idempotency
48 // (which we don't need), we can increase the max number of in-flight Produce requests per broker. A higher
49 // number of in-flight requests, in addition to short buffering ("linger") in client side before firing the
50 // next Produce request allows us to reduce the end-to-end latency.
51 //
52 // The result of the multiplication of producer linger and max in-flight requests should match the maximum
53 // Produce latency expected by the Kafka backend in a steady state. For example, 50ms * 20 requests = 1s,
54 // which means the Kafka client will keep issuing a Produce request every 50ms as far as the Kafka backend
55 // doesn't take longer than 1s to process them (if it takes longer, the client will buffer data and stop
56 // issuing new Produce requests until some previous ones complete).
57 kgo.DisableIdempotentWrite(),
58 kgo.ProducerLinger(50*time.Millisecond),
59 kgo.MaxProduceRequestsInflightPerBroker(maxInflightProduceRequests),
60
61 // Unlimited number of Produce retries but a deadline on the max time a record can take to be delivered.
62 // With the default config it would retry infinitely.
63 //
64 // Details of the involved timeouts:
65 // - RecordDeliveryTimeout: how long a Kafka client Produce() call can take for a given record. The overhead
66 // timeout is NOT applied.
67 // - ProduceRequestTimeout: how long to wait for the response to the Produce request (the Kafka protocol message)
68 // after being sent on the network. The actual timeout is increased by the configured overhead.
69 //
70 // When a Produce request to Kafka fail, the client will retry up until the RecordDeliveryTimeout is reached.
71 // Once the timeout is reached, the Produce request will fail and all other buffered requests in the client
72 // (for the same partition) will fail too. See kgo.RecordDeliveryTimeout() documentation for more info.
73 kgo.RecordRetries(math.MaxInt),
74 kgo.RecordDeliveryTimeout(kafkaCfg.WriteTimeout),
75 kgo.ProduceRequestTimeout(kafkaCfg.WriteTimeout),
76 kgo.RequestTimeoutOverhead(writerRequestTimeoutOverhead),
77
78 // Unlimited number of buffered records because we limit on bytes in Writer. The reason why we don't use
79 // kgo.MaxBufferedBytes() is because it suffers a deadlock issue:
80 // https://github.com/twmb/franz-go/issues/777
81 kgo.MaxBufferedRecords(math.MaxInt), // Use a high value to set it as unlimited, because the client doesn't support "0 as unlimited".
82 kgo.MaxBufferedBytes(0),
83 )
84 if kafkaCfg.AutoCreateTopicEnabled {
85 kafkaCfg.SetDefaultNumberOfPartitionsForAutocreatedTopics(logger)
86 }

Callers 1

NewFunction · 0.92

Tested by

no test coverage detected