MCPcopy
hub / github.com/grafana/tempo / NewProducer

Function NewProducer

pkg/ingest/writer_client.go:199–239  ·  view source on GitHub ↗

NewProducer returns a new KafkaProducer. The input prometheus.Registerer must be wrapped with a prefix (the names of metrics registered don't have a prefix).

(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Registerer)

Source from the content-addressed store, hash-verified

197// The input prometheus.Registerer must be wrapped with a prefix (the names of metrics
198// registered don't have a prefix).
199func NewProducer(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Registerer) *Producer {
200 producer := &Producer{
201 Client: client,
202 closeOnce: &sync.Once{},
203 closed: make(chan struct{}),
204 bufferedBytes: atomic.NewInt64(0),
205 maxBufferedBytes: maxBufferedBytes,
206
207 // Metrics.
208 bufferedProduceBytes: promauto.With(reg).NewSummary(
209 prometheus.SummaryOpts{
210 Namespace: "tempo",
211 Subsystem: "distributor",
212 Name: "buffered_produce_bytes",
213 Help: "The buffered produce records in bytes. Quantile buckets keep track of buffered records size over the last 60s.",
214 Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001, 1: 0.001},
215 MaxAge: time.Minute,
216 AgeBuckets: 6,
217 }),
218 bufferedProduceBytesLimit: promauto.With(reg).NewGauge(
219 prometheus.GaugeOpts{
220 Namespace: "tempo",
221 Subsystem: "distributor",
222 Name: "buffered_produce_bytes_limit",
223 Help: "The bytes limit on buffered produce records. Produce requests fail once this limit is reached.",
224 }),
225
226 produceRecordsFailuresTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
227 Namespace: "tempo",
228 Subsystem: "distributor",
229 Name: "produce_failures_total",
230 Help: "Total number of failed produce records issued to Kafka.",
231 }, []string{"reason"}),
232 }
233
234 producer.bufferedProduceBytesLimit.Set(float64(maxBufferedBytes))
235
236 go producer.updateMetricsLoop()
237
238 return producer
239}
240
241func (c *Producer) Close() {
242 c.closeOnce.Do(func() {

Callers 1

NewFunction · 0.92

Calls 3

updateMetricsLoopMethod · 0.95
NewGaugeMethod · 0.65
SetMethod · 0.65

Tested by

no test coverage detected