MCPcopy
hub / github.com/grafana/tempo / handlePartitionsAssigned

Method handlePartitionsAssigned

modules/generator/generator_kafka.go:186–198  ·  view source on GitHub ↗
(m map[string][]int32)

Source from the content-addressed store, hash-verified

184}
185
186func (g *Generator) handlePartitionsAssigned(m map[string][]int32) {
187 assigned := m[g.cfg.Ingest.Kafka.Topic]
188 level.Info(g.logger).Log("msg", "partitions assigned", "partitions", formatInt32Slice(assigned))
189 g.partitionMtx.Lock()
190 defer g.partitionMtx.Unlock()
191
192 // In cooperative (incremental) rebalancing this callback fires with only
193 // newly added partitions; stable partitions are not re-reported. Append
194 // rather than replace so we don't lose partitions that weren't moved.
195 g.assignedPartitions = append(g.assignedPartitions, assigned...)
196 sort.Slice(g.assignedPartitions, func(i, j int) bool { return g.assignedPartitions[i] < g.assignedPartitions[j] })
197 metricAssignedPartitions.Set(float64(len(g.assignedPartitions)))
198}
199
200func (g *Generator) handlePartitionsRevoked(partitions map[string][]int32) {
201 revoked := partitions[g.cfg.Ingest.Kafka.Topic]

Calls 3

formatInt32SliceFunction · 0.85
LogMethod · 0.65
SetMethod · 0.65