MCPcopy
hub / github.com/grafana/tempo / backendJobsFunc

Function backendJobsFunc

modules/frontend/search_sharder.go:424–485  ·  view source on GitHub ↗

backendJobsFunc provides an iter func with 2 callbacks designed to be used once to calculate job and shard metrics and a second time to generate actual jobs.

(blocks []*backend.BlockMeta, targetBytesPerRequest int, maxShards int, end uint32)

Source from the content-addressed store, hash-verified

422// backendJobsFunc provides an iter func with 2 callbacks designed to be used once to calculate job and shard metrics and a second time
423// to generate actual jobs.
424func backendJobsFunc(blocks []*backend.BlockMeta, targetBytesPerRequest int, maxShards int, end uint32) func(shardIterFn, jobIterFn) {
425 blocksPerShard := len(blocks) / maxShards
426
427 // if we have fewer blocks than shards then every shard is one block
428 if blocksPerShard == 0 {
429 blocksPerShard = 1
430 }
431
432 return func(shardIterCallback shardIterFn, jobIterCallback jobIterFn) {
433 currentShard := 0
434 jobsInShard := 0
435 bytesInShard := uint64(0)
436 blocksInShard := 0
437
438 for _, b := range blocks {
439 pages := pagesPerRequest(b, targetBytesPerRequest)
440 jobsInBlock := 0
441
442 if pages == 0 {
443 continue
444 }
445
446 // if jobIterCallBack is nil we can skip the loop and directly calc the jobsInBlock
447 if jobIterCallback == nil {
448 jobsInBlock = int(b.TotalRecords) / pages
449 if int(b.TotalRecords)%pages != 0 {
450 jobsInBlock++
451 }
452 } else {
453 for startPage := 0; startPage < int(b.TotalRecords); startPage += pages {
454 jobIterCallback(b, currentShard, startPage, pages)
455 jobsInBlock++
456 }
457 }
458
459 // do we need to roll to a new shard?
460 jobsInShard += jobsInBlock
461 bytesInShard += b.Size_
462 blocksInShard++
463
464 // -1 b/c we will likely add a final shard below
465 // end comparison b/c there's no point in ending a shard that can't release any results
466 if blocksInShard >= blocksPerShard && currentShard < maxShards-1 && b.EndTime.Unix() < int64(end) {
467 if shardIterCallback != nil {
468 shardIterCallback(jobsInShard, bytesInShard, uint32(b.EndTime.Unix()))
469 }
470 currentShard++
471
472 jobsInShard = 0
473 bytesInShard = 0
474 blocksInShard = 0
475 }
476 }
477
478 // final shard - note that we are overpacking the final shard due to the integer math as well as the limit of 200 shards total. if the search
479 // this is the least impactful shard to place extra jobs in as it is searched last. if we make it here the chances of this being an exhaustive search
480 // are higher
481 if shardIterCallback != nil && jobsInShard > 0 {

Callers 6

TestBuildBackendRequestsFunction · 0.85
TestBackendShardsFunction · 0.85
backendRequestsMethod · 0.85
backendRequestsMethod · 0.85

Calls 1

pagesPerRequestFunction · 0.85