readCh reads records from the internal channel. This allows for offloading the expensive proto unmarshal to multiple goroutines.
(ctx context.Context)
| 134 | // This allows for offloading the expensive proto unmarshal |
| 135 | // to multiple goroutines. |
| 136 | func (g *Generator) readCh(ctx context.Context) { |
| 137 | defer g.kafkaWG.Done() |
| 138 | |
| 139 | var c ingest.GeneratorCodec |
| 140 | switch g.cfg.Codec { |
| 141 | case codecPushBytes: |
| 142 | c = ingest.NewPushBytesDecoder() |
| 143 | case codecOTLP: |
| 144 | c = ingest.NewOTLPDecoder() |
| 145 | } |
| 146 | |
| 147 | for { |
| 148 | var r *kgo.Record |
| 149 | select { |
| 150 | case r = <-g.kafkaCh: |
| 151 | case <-ctx.Done(): |
| 152 | return |
| 153 | } |
| 154 | |
| 155 | tenant := string(r.Key) |
| 156 | |
| 157 | i, err := g.getOrCreateInstance(tenant) |
| 158 | if err != nil { |
| 159 | level.Error(g.logger).Log("tenant", tenant, "msg", "consumeKafkaChannel getOrCreateInstance", "err", err) |
| 160 | continue |
| 161 | } |
| 162 | |
| 163 | iterator, err := c.Decode(r.Value) |
| 164 | if err != nil { |
| 165 | level.Error(g.logger).Log("tenant", tenant, "msg", "consumeKafkaChannel decode", "err", err) |
| 166 | continue |
| 167 | } |
| 168 | |
| 169 | for resourceSpans, err := range iterator { |
| 170 | if err != nil { |
| 171 | level.Error(g.logger).Log("tenant", tenant, "msg", "consumeKafkaChannel unmarshal", "err", err) |
| 172 | continue |
| 173 | } |
| 174 | |
| 175 | i.pushSpansFromQueue(ctx, r.Timestamp, resourceSpans) |
| 176 | } |
| 177 | } |
| 178 | } |
| 179 | |
| 180 | func (g *Generator) getAssignedActivePartitions() []int32 { |
| 181 | g.partitionMtx.Lock() |
no test coverage detected