* ********************************************************* Data structures and functions/methods for running concurrent workers, which execute different operations, including `Read`, `Write` and `Delete`. ********************************************************* */
(t *testing.T, db *bolt.DB, workerCount int, conf concurrentConfig, testDuration time.Duration)
| 272 | ********************************************************* |
| 273 | */ |
| 274 | func runWorkers(t *testing.T, |
| 275 | db *bolt.DB, |
| 276 | workerCount int, |
| 277 | conf concurrentConfig, |
| 278 | testDuration time.Duration) historyRecords { |
| 279 | stopCh := make(chan struct{}, 1) |
| 280 | errCh := make(chan error, workerCount) |
| 281 | |
| 282 | var mu sync.Mutex |
| 283 | var rs historyRecords |
| 284 | |
| 285 | g := new(errgroup.Group) |
| 286 | for i := 0; i < workerCount; i++ { |
| 287 | w := &worker{ |
| 288 | id: i, |
| 289 | db: db, |
| 290 | |
| 291 | conf: conf, |
| 292 | |
| 293 | errCh: errCh, |
| 294 | stopCh: stopCh, |
| 295 | t: t, |
| 296 | } |
| 297 | g.Go(func() error { |
| 298 | wrs, err := runWorker(t, w, errCh) |
| 299 | mu.Lock() |
| 300 | rs = append(rs, wrs...) |
| 301 | mu.Unlock() |
| 302 | return err |
| 303 | }) |
| 304 | } |
| 305 | |
| 306 | t.Logf("Keep all workers running for about %s.", testDuration) |
| 307 | select { |
| 308 | case <-time.After(testDuration): |
| 309 | case <-errCh: |
| 310 | } |
| 311 | |
| 312 | close(stopCh) |
| 313 | t.Log("Waiting for all workers to finish.") |
| 314 | if err := g.Wait(); err != nil { |
| 315 | t.Errorf("Received error: %v", err) |
| 316 | } |
| 317 | |
| 318 | return rs |
| 319 | } |
| 320 | |
| 321 | func runWorker(t *testing.T, w *worker, errCh chan error) (historyRecords, error) { |
| 322 | rs, err := w.run() |
no test coverage detected