NewProducer returns a new KafkaProducer. The input prometheus.Registerer must be wrapped with a prefix (the names of metrics registered don't have a prefix).
(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Registerer)
| 197 | // The input prometheus.Registerer must be wrapped with a prefix (the names of metrics |
| 198 | // registered don't have a prefix). |
| 199 | func NewProducer(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Registerer) *Producer { |
| 200 | producer := &Producer{ |
| 201 | Client: client, |
| 202 | closeOnce: &sync.Once{}, |
| 203 | closed: make(chan struct{}), |
| 204 | bufferedBytes: atomic.NewInt64(0), |
| 205 | maxBufferedBytes: maxBufferedBytes, |
| 206 | |
| 207 | // Metrics. |
| 208 | bufferedProduceBytes: promauto.With(reg).NewSummary( |
| 209 | prometheus.SummaryOpts{ |
| 210 | Namespace: "tempo", |
| 211 | Subsystem: "distributor", |
| 212 | Name: "buffered_produce_bytes", |
| 213 | Help: "The buffered produce records in bytes. Quantile buckets keep track of buffered records size over the last 60s.", |
| 214 | Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001, 1: 0.001}, |
| 215 | MaxAge: time.Minute, |
| 216 | AgeBuckets: 6, |
| 217 | }), |
| 218 | bufferedProduceBytesLimit: promauto.With(reg).NewGauge( |
| 219 | prometheus.GaugeOpts{ |
| 220 | Namespace: "tempo", |
| 221 | Subsystem: "distributor", |
| 222 | Name: "buffered_produce_bytes_limit", |
| 223 | Help: "The bytes limit on buffered produce records. Produce requests fail once this limit is reached.", |
| 224 | }), |
| 225 | |
| 226 | produceRecordsFailuresTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ |
| 227 | Namespace: "tempo", |
| 228 | Subsystem: "distributor", |
| 229 | Name: "produce_failures_total", |
| 230 | Help: "Total number of failed produce records issued to Kafka.", |
| 231 | }, []string{"reason"}), |
| 232 | } |
| 233 | |
| 234 | producer.bufferedProduceBytesLimit.Set(float64(maxBufferedBytes)) |
| 235 | |
| 236 | go producer.updateMetricsLoop() |
| 237 | |
| 238 | return producer |
| 239 | } |
| 240 | |
| 241 | func (c *Producer) Close() { |
| 242 | c.closeOnce.Do(func() { |
no test coverage detected