(n=DEFAULT_ITS)
| 65 | |
| 66 | |
| 67 | def bench_apply(n=DEFAULT_ITS): |
| 68 | time_start = time.monotonic() |
| 69 | task = it._get_current_object() |
| 70 | with app.producer_or_acquire() as producer: |
| 71 | [task.apply_async((i, n), producer=producer) for i in range(n)] |
| 72 | print(f'-- apply {n} tasks: {time.monotonic() - time_start}s') |
| 73 | |
| 74 | |
| 75 | def bench_work(n=DEFAULT_ITS, loglevel='CRITICAL'): |
no test coverage detected