MCPcopy
hub / github.com/RoaringBitmap/roaring / parallelExecutorBSIResults

Function parallelExecutorBSIResults

roaring64/bsi64.go:262–308  ·  view source on GitHub ↗
(parallelism int, input *BSI, e bsiAction, foundSet, filterSet *Bitmap, sumResults bool)

Source from the content-addressed store, hash-verified

260type bsiAction func(input *BSI, filterSet *Bitmap, batch []uint64, resultsChan chan *BSI, wg *sync.WaitGroup)
261
262func parallelExecutorBSIResults(parallelism int, input *BSI, e bsiAction, foundSet, filterSet *Bitmap, sumResults bool) *BSI {
263
264 var n int = parallelism
265 if n == 0 {
266 n = runtime.NumCPU()
267 }
268
269 resultsChan := make(chan *BSI, n)
270
271 card := foundSet.GetCardinality()
272 x := card / uint64(n)
273
274 remainder := card - (x * uint64(n))
275 var batch []uint64
276 var wg sync.WaitGroup
277 iter := foundSet.ManyIterator()
278 for i := 0; i < n; i++ {
279 if i == n-1 {
280 batch = make([]uint64, x+remainder)
281 } else {
282 batch = make([]uint64, x)
283 }
284 iter.NextMany(batch)
285 wg.Add(1)
286 go e(input, filterSet, batch, resultsChan, &wg)
287 }
288
289 wg.Wait()
290
291 close(resultsChan)
292
293 ba := make([]*BSI, 0)
294 for bm := range resultsChan {
295 ba = append(ba, bm)
296 }
297
298 results := NewDefaultBSI()
299 if sumResults {
300 for _, v := range ba {
301 results.Add(v)
302 }
303 } else {
304 results.ParOr(0, ba...)
305 }
306 return results
307
308}
309
310// Operation identifier
311type Operation int

Callers 1

TransposeWithCountsMethod · 0.70

Calls 6

NewDefaultBSIFunction · 0.70
NextManyMethod · 0.65
GetCardinalityMethod · 0.45
ManyIteratorMethod · 0.45
AddMethod · 0.45
ParOrMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…