rebuildPendingIndexes rebuilds the pendingBlocks, pendingByTenant, and runningBlocks indexes from all shards' Pending and Jobs maps. Caller must hold w.mtx (e.g. during LoadFromLocal).
()
| 600 | // rebuildPendingIndexes rebuilds the pendingBlocks, pendingByTenant, and runningBlocks indexes |
| 601 | // from all shards' Pending and Jobs maps. Caller must hold w.mtx (e.g. during LoadFromLocal). |
| 602 | func (w *Work) rebuildPendingIndexes() { |
| 603 | w.pendingMtx.Lock() |
| 604 | defer w.pendingMtx.Unlock() |
| 605 | |
| 606 | w.pendingBlocks = make(map[string]string) |
| 607 | w.pendingByTenant = make(map[string]map[tempopb.JobType][]string) |
| 608 | |
| 609 | for i := range ShardCount { |
| 610 | shard := w.Shards[i] |
| 611 | shard.mtx.Lock() |
| 612 | for _, j := range shard.Pending { |
| 613 | if key := j.PendingBlockKey(); key != "" { |
| 614 | w.pendingBlocks[key] = j.ID |
| 615 | } |
| 616 | tenant := j.JobDetail.Tenant |
| 617 | if w.pendingByTenant[tenant] == nil { |
| 618 | w.pendingByTenant[tenant] = make(map[tempopb.JobType][]string) |
| 619 | } |
| 620 | w.pendingByTenant[tenant][j.Type] = append(w.pendingByTenant[tenant][j.Type], j.ID) |
| 621 | } |
| 622 | shard.mtx.Unlock() |
| 623 | } |
| 624 | |
| 625 | w.runningBlocks = make(map[string]*Job) |
| 626 | for i := range ShardCount { |
| 627 | shard := w.Shards[i] |
| 628 | shard.mtx.Lock() |
| 629 | for _, j := range shard.Jobs { |
| 630 | switch j.GetStatus() { |
| 631 | case tempopb.JobStatus_JOB_STATUS_UNSPECIFIED, |
| 632 | tempopb.JobStatus_JOB_STATUS_RUNNING: |
| 633 | for _, key := range runningBlockKeys(j) { |
| 634 | w.runningBlocks[key] = j |
| 635 | } |
| 636 | } |
| 637 | } |
| 638 | shard.mtx.Unlock() |
| 639 | } |
| 640 | } |
| 641 | |
| 642 | // AddPendingJobs adds jobs to the appropriate shards' Pending maps and updates the blocks-pending index. |
| 643 | func (w *Work) AddPendingJobs(jobs []*Job) error { |
no test coverage detected