Benchmark multiple concurrent streams.
(results, num_streams=10, chunks_per_stream=100)
| 328 | |
| 329 | |
| 330 | async def benchmark_concurrent_streams(results, num_streams=10, chunks_per_stream=100): |
| 331 | """Benchmark multiple concurrent streams.""" |
| 332 | arbiter = create_arbiter() |
| 333 | |
| 334 | async def run_stream(stream_id): |
| 335 | messages = [] |
| 336 | for i in range(chunks_per_stream): |
| 337 | messages.append(make_chunk_message(f"stream-{stream_id}", f"chunk-{i}")) |
| 338 | messages.append(make_end_message(f"stream-{stream_id}")) |
| 339 | |
| 340 | mock_reader = MockStreamReader(messages) |
| 341 | async def mock_get_connection(pid): |
| 342 | return mock_reader, MockStreamWriter() |
| 343 | |
| 344 | arbiter._get_worker_connection = mock_get_connection |
| 345 | client_writer = MockStreamWriter() |
| 346 | |
| 347 | request = make_request(f"bench-concurrent-{stream_id}", "benchmark:App", "stream") |
| 348 | await arbiter._execute_on_worker(1234, request, client_writer) |
| 349 | return len(client_writer.messages) |
| 350 | |
| 351 | gc.collect() |
| 352 | start = time.perf_counter() |
| 353 | |
| 354 | # Run streams concurrently |
| 355 | tasks = [run_stream(i) for i in range(num_streams)] |
| 356 | results_list = await asyncio.gather(*tasks) |
| 357 | |
| 358 | duration = time.perf_counter() - start |
| 359 | |
| 360 | total_chunks = sum(results_list) |
| 361 | |
| 362 | results.add( |
| 363 | f"Concurrent streams ({num_streams} streams, {chunks_per_stream} chunks each)", |
| 364 | iterations=num_streams, |
| 365 | duration=duration, |
| 366 | chunks=total_chunks |
| 367 | ) |
| 368 | |
| 369 | arbiter._cleanup_sync() |
| 370 | |
| 371 | |
| 372 | async def benchmark_memory_stability(results, iterations=10, chunks=1000): |
no test coverage detected