MCPcopy
hub / github.com/grafana/tempo / RunJobs

Method RunJobs

tempodb/pool/pool.go:81–136  ·  tempodb/pool/pool.go::Pool.RunJobs
(ctx context.Context, payloads []interface{}, fn JobFunc)

Source from the content-addressed store, hash-verified

79}
80
81func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) ([]interface{}, []error, error) {
82 ctx, cancel := context.WithCancel(ctx)
83 defer cancel()
84
85 totalJobs := len(payloads)
86
87 // sanity check before we even attempt to start adding jobs
88 if int(p.size.Load())+totalJobs > p.cfg.QueueDepth {
89 return nil, nil, fmt.Errorf("queue doesn't have room for %d jobs", len(payloads))
90 }
91
92 resultsCh := make(chan result, totalJobs) // way for jobs to send back results
93 stop := atomic.NewBool(false) // way to signal to the jobs to quit
94 wg := &sync.WaitGroup{} // way to wait for all jobs to complete
95
96 // add each job one at a time. even though we checked length above these might still fail
97 for _, payload := range payloads {
98 wg.Add(1)
99 j := &job{
100 ctx: ctx,
101 fn: fn,
102 payload: payload,
103 wg: wg,
104 resultsCh: resultsCh,
105 stop: stop,
106 }
107
108 select {
109 case p.workQueue <- j:
110 p.size.Inc()
111 default:
112 wg.Done()
113 stop.Store(true)
114 return nil, nil, fmt.Errorf("failed to add a job to work queue")
115 }
116 }
117
118 // wait for all jobs to finish
119 wg.Wait()
120
121 // close resultsCh
122 close(resultsCh)
123
124 // read all from results channel
125 var data []interface{}
126 var funcErrs []error
127 for result := range resultsCh {
128 if result.err != nil {
129 funcErrs = append(funcErrs, result.err)
130 } else {
131 data = append(data, result.data)
132 }
133 }
134
135 return data, funcErrs, nil
136}
137
138func (p *Pool) Shutdown() {

Callers 13

TestResultsFunction · 0.95
TestNoResultsFunction · 0.95
TestMultipleHitsFunction · 0.95
TestErrorFunction · 0.95
TestMultipleErrorsFunction · 0.95
TestTooManyJobsFunction · 0.95
TestOneWorkerFunction · 0.95
TestGoingHamFunction · 0.95
TestShutdownFunction · 0.95
TestDataEncodingsFunction · 0.95
TestCancellationFunction · 0.95

Calls 5

AddMethod · 0.65
IncMethod · 0.65
DoneMethod · 0.65
StoreMethod · 0.65
WaitMethod · 0.65

Tested by 12

TestResultsFunction · 0.76
TestNoResultsFunction · 0.76
TestMultipleHitsFunction · 0.76
TestErrorFunction · 0.76
TestMultipleErrorsFunction · 0.76
TestTooManyJobsFunction · 0.76
TestOneWorkerFunction · 0.76
TestGoingHamFunction · 0.76
TestShutdownFunction · 0.76
TestDataEncodingsFunction · 0.76
TestCancellationFunction · 0.76