| 79 | } |
| 80 | |
| 81 | func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) ([]interface{}, []error, error) { |
| 82 | ctx, cancel := context.WithCancel(ctx) |
| 83 | defer cancel() |
| 84 | |
| 85 | totalJobs := len(payloads) |
| 86 | |
| 87 | // sanity check before we even attempt to start adding jobs |
| 88 | if int(p.size.Load())+totalJobs > p.cfg.QueueDepth { |
| 89 | return nil, nil, fmt.Errorf("queue doesn't have room for %d jobs", len(payloads)) |
| 90 | } |
| 91 | |
| 92 | resultsCh := make(chan result, totalJobs) // way for jobs to send back results |
| 93 | stop := atomic.NewBool(false) // way to signal to the jobs to quit |
| 94 | wg := &sync.WaitGroup{} // way to wait for all jobs to complete |
| 95 | |
| 96 | // add each job one at a time. even though we checked length above these might still fail |
| 97 | for _, payload := range payloads { |
| 98 | wg.Add(1) |
| 99 | j := &job{ |
| 100 | ctx: ctx, |
| 101 | fn: fn, |
| 102 | payload: payload, |
| 103 | wg: wg, |
| 104 | resultsCh: resultsCh, |
| 105 | stop: stop, |
| 106 | } |
| 107 | |
| 108 | select { |
| 109 | case p.workQueue <- j: |
| 110 | p.size.Inc() |
| 111 | default: |
| 112 | wg.Done() |
| 113 | stop.Store(true) |
| 114 | return nil, nil, fmt.Errorf("failed to add a job to work queue") |
| 115 | } |
| 116 | } |
| 117 | |
| 118 | // wait for all jobs to finish |
| 119 | wg.Wait() |
| 120 | |
| 121 | // close resultsCh |
| 122 | close(resultsCh) |
| 123 | |
| 124 | // read all from results channel |
| 125 | var data []interface{} |
| 126 | var funcErrs []error |
| 127 | for result := range resultsCh { |
| 128 | if result.err != nil { |
| 129 | funcErrs = append(funcErrs, result.err) |
| 130 | } else { |
| 131 | data = append(data, result.data) |
| 132 | } |
| 133 | } |
| 134 | |
| 135 | return data, funcErrs, nil |
| 136 | } |
| 137 | |
| 138 | func (p *Pool) Shutdown() { |