| 55 | } |
| 56 | |
| 57 | func NewPool(cfg *Config) *Pool { |
| 58 | if cfg == nil { |
| 59 | cfg = defaultConfig() |
| 60 | } |
| 61 | |
| 62 | q := make(chan *job, cfg.QueueDepth) |
| 63 | p := &Pool{ |
| 64 | cfg: cfg, |
| 65 | workQueue: q, |
| 66 | size: atomic.NewInt32(0), |
| 67 | shutdownCh: make(chan struct{}), |
| 68 | } |
| 69 | |
| 70 | for i := 0; i < cfg.MaxWorkers; i++ { |
| 71 | go p.worker(q) |
| 72 | } |
| 73 | |
| 74 | p.reportQueueLength() |
| 75 | |
| 76 | metricQueryQueueMax.Set(float64(cfg.QueueDepth)) |
| 77 | |
| 78 | return p |
| 79 | } |
| 80 | |
| 81 | func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) ([]interface{}, []error, error) { |
| 82 | ctx, cancel := context.WithCancel(ctx) |