Push pushes data onto a queue. If the queue is full, the data is dropped
(ctx context.Context, data T)
| 78 | // Push pushes data onto a queue. |
| 79 | // If the queue is full, the data is dropped |
| 80 | func (m *Queue[T]) Push(ctx context.Context, data T) error { |
| 81 | if m.readOnly.Load() { |
| 82 | return fmt.Errorf("queue is read-only") |
| 83 | } |
| 84 | |
| 85 | m.pushesTotalMetrics.WithLabelValues(m.name, m.tenantID).Inc() |
| 86 | |
| 87 | select { |
| 88 | case <-ctx.Done(): |
| 89 | m.pushesFailuresTotalMetrics.WithLabelValues(m.name, m.tenantID).Inc() |
| 90 | return fmt.Errorf("failed to push data to queue for tenant=%s and queue_name=%s: %w", m.tenantID, m.name, ctx.Err()) |
| 91 | default: |
| 92 | } |
| 93 | |
| 94 | select { |
| 95 | case m.reqChan <- data: |
| 96 | m.lengthMetric.WithLabelValues(m.name, m.tenantID).Inc() |
| 97 | return nil |
| 98 | default: |
| 99 | } |
| 100 | |
| 101 | // Fail fast if the queue is full |
| 102 | m.pushesFailuresTotalMetrics.WithLabelValues(m.name, m.tenantID).Inc() |
| 103 | |
| 104 | return fmt.Errorf("failed to push data to queue for tenant=%s and queue_name=%s: queue is full", m.tenantID, m.name) |
| 105 | } |
| 106 | |
| 107 | func (m *Queue[T]) StartWorkers() { |
| 108 | for i := 0; i < m.workerCount; i++ { |