| 184 | } |
| 185 | |
| 186 | func (g *Generator) handlePartitionsAssigned(m map[string][]int32) { |
| 187 | assigned := m[g.cfg.Ingest.Kafka.Topic] |
| 188 | level.Info(g.logger).Log("msg", "partitions assigned", "partitions", formatInt32Slice(assigned)) |
| 189 | g.partitionMtx.Lock() |
| 190 | defer g.partitionMtx.Unlock() |
| 191 | |
| 192 | // In cooperative (incremental) rebalancing this callback fires with only |
| 193 | // newly added partitions; stable partitions are not re-reported. Append |
| 194 | // rather than replace so we don't lose partitions that weren't moved. |
| 195 | g.assignedPartitions = append(g.assignedPartitions, assigned...) |
| 196 | sort.Slice(g.assignedPartitions, func(i, j int) bool { return g.assignedPartitions[i] < g.assignedPartitions[j] }) |
| 197 | metricAssignedPartitions.Set(float64(len(g.assignedPartitions))) |
| 198 | } |
| 199 | |
| 200 | func (g *Generator) handlePartitionsRevoked(partitions map[string][]int32) { |
| 201 | revoked := partitions[g.cfg.Ingest.Kafka.Topic] |