backendJobsFunc provides an iter func with 2 callbacks designed to be used once to calculate job and shard metrics and a second time to generate actual jobs.
(blocks []*backend.BlockMeta, targetBytesPerRequest int, maxShards int, end uint32)
| 422 | // backendJobsFunc provides an iter func with 2 callbacks designed to be used once to calculate job and shard metrics and a second time |
| 423 | // to generate actual jobs. |
| 424 | func backendJobsFunc(blocks []*backend.BlockMeta, targetBytesPerRequest int, maxShards int, end uint32) func(shardIterFn, jobIterFn) { |
| 425 | blocksPerShard := len(blocks) / maxShards |
| 426 | |
| 427 | // if we have fewer blocks than shards then every shard is one block |
| 428 | if blocksPerShard == 0 { |
| 429 | blocksPerShard = 1 |
| 430 | } |
| 431 | |
| 432 | return func(shardIterCallback shardIterFn, jobIterCallback jobIterFn) { |
| 433 | currentShard := 0 |
| 434 | jobsInShard := 0 |
| 435 | bytesInShard := uint64(0) |
| 436 | blocksInShard := 0 |
| 437 | |
| 438 | for _, b := range blocks { |
| 439 | pages := pagesPerRequest(b, targetBytesPerRequest) |
| 440 | jobsInBlock := 0 |
| 441 | |
| 442 | if pages == 0 { |
| 443 | continue |
| 444 | } |
| 445 | |
| 446 | // if jobIterCallBack is nil we can skip the loop and directly calc the jobsInBlock |
| 447 | if jobIterCallback == nil { |
| 448 | jobsInBlock = int(b.TotalRecords) / pages |
| 449 | if int(b.TotalRecords)%pages != 0 { |
| 450 | jobsInBlock++ |
| 451 | } |
| 452 | } else { |
| 453 | for startPage := 0; startPage < int(b.TotalRecords); startPage += pages { |
| 454 | jobIterCallback(b, currentShard, startPage, pages) |
| 455 | jobsInBlock++ |
| 456 | } |
| 457 | } |
| 458 | |
| 459 | // do we need to roll to a new shard? |
| 460 | jobsInShard += jobsInBlock |
| 461 | bytesInShard += b.Size_ |
| 462 | blocksInShard++ |
| 463 | |
| 464 | // -1 b/c we will likely add a final shard below |
| 465 | // end comparison b/c there's no point in ending a shard that can't release any results |
| 466 | if blocksInShard >= blocksPerShard && currentShard < maxShards-1 && b.EndTime.Unix() < int64(end) { |
| 467 | if shardIterCallback != nil { |
| 468 | shardIterCallback(jobsInShard, bytesInShard, uint32(b.EndTime.Unix())) |
| 469 | } |
| 470 | currentShard++ |
| 471 | |
| 472 | jobsInShard = 0 |
| 473 | bytesInShard = 0 |
| 474 | blocksInShard = 0 |
| 475 | } |
| 476 | } |
| 477 | |
| 478 | // final shard - note that we are overpacking the final shard due to the integer math as well as the limit of 200 shards total. if the search |
| 479 | // this is the least impactful shard to place extra jobs in as it is searched last. if we make it here the chances of this being an exhaustive search |
| 480 | // are higher |
| 481 | if shardIterCallback != nil && jobsInShard > 0 { |