processBottomK implements TopKBottomK bottomk method
(input SeriesSet, valueLength, limit int)
| 2219 | |
| 2220 | // processBottomK implements TopKBottomK bottomk method |
| 2221 | func processBottomK(input SeriesSet, valueLength, limit int) SeriesSet { |
| 2222 | result := make(SeriesSet) |
| 2223 | |
| 2224 | // Max heap for bottom-k (largest values at top for easy replacement) |
| 2225 | h := &reverseSeriesHeap{} |
| 2226 | heap.Init(h) |
| 2227 | |
| 2228 | // Process each timestamp |
| 2229 | for i := 0; i < valueLength; i++ { |
| 2230 | // Process each series for this timestamp |
| 2231 | for key, series := range input { |
| 2232 | if i >= len(series.Values) { |
| 2233 | continue |
| 2234 | } |
| 2235 | |
| 2236 | value := series.Values[i] |
| 2237 | if math.IsNaN(value) { |
| 2238 | continue // Skip NaN values |
| 2239 | } |
| 2240 | |
| 2241 | // If heap not full yet, add the value |
| 2242 | if h.Len() < limit { |
| 2243 | heap.Push(h, seriesValue{ |
| 2244 | key: key, |
| 2245 | value: value, |
| 2246 | }) |
| 2247 | continue |
| 2248 | } |
| 2249 | |
| 2250 | // If new value is less than largest in heap, replace it |
| 2251 | // For ties (equal values), use series key comparison to maintain determinism |
| 2252 | largest := (*h)[0] |
| 2253 | if dataPointLessThan(seriesValue{key: key, value: value}, largest) { |
| 2254 | heap.Pop(h) |
| 2255 | heap.Push(h, seriesValue{ |
| 2256 | key: key, |
| 2257 | value: value, |
| 2258 | }) |
| 2259 | } |
| 2260 | } |
| 2261 | |
| 2262 | // we have iterated over all series for this timestamp |
| 2263 | // empty the heap and record these series in result set |
| 2264 | for h.Len() > 0 { |
| 2265 | sv := heap.Pop(h).(seriesValue) |
| 2266 | initSeriesInResult(result, sv.key, input, valueLength) |
| 2267 | // Set only this timestamp's value |
| 2268 | result[sv.key].Values[i] = input[sv.key].Values[i] |
| 2269 | } |
| 2270 | } |
| 2271 | |
| 2272 | return result |
| 2273 | } |
| 2274 | |
| 2275 | // initSeriesInResult ensures that a series exists in the result map, and |
| 2276 | // initializes it with NaN values if it doesn't exist in the result set. |