(stream_id)
| 332 | arbiter = create_arbiter() |
| 333 | |
| 334 | async def run_stream(stream_id): |
| 335 | messages = [] |
| 336 | for i in range(chunks_per_stream): |
| 337 | messages.append(make_chunk_message(f"stream-{stream_id}", f"chunk-{i}")) |
| 338 | messages.append(make_end_message(f"stream-{stream_id}")) |
| 339 | |
| 340 | mock_reader = MockStreamReader(messages) |
| 341 | async def mock_get_connection(pid): |
| 342 | return mock_reader, MockStreamWriter() |
| 343 | |
| 344 | arbiter._get_worker_connection = mock_get_connection |
| 345 | client_writer = MockStreamWriter() |
| 346 | |
| 347 | request = make_request(f"bench-concurrent-{stream_id}", "benchmark:App", "stream") |
| 348 | await arbiter._execute_on_worker(1234, request, client_writer) |
| 349 | return len(client_writer.messages) |
| 350 | |
| 351 | gc.collect() |
| 352 | start = time.perf_counter() |
no test coverage detected