MCPcopy
hub / github.com/benoitc/gunicorn / benchmark_concurrent_streams

Function benchmark_concurrent_streams

benchmarks/dirty_streaming.py:330–369  ·  view source on GitHub ↗

Benchmark multiple concurrent streams.

(results, num_streams=10, chunks_per_stream=100)

Source from the content-addressed store, hash-verified

328
329
330async def benchmark_concurrent_streams(results, num_streams=10, chunks_per_stream=100):
331 """Benchmark multiple concurrent streams."""
332 arbiter = create_arbiter()
333
334 async def run_stream(stream_id):
335 messages = []
336 for i in range(chunks_per_stream):
337 messages.append(make_chunk_message(f"stream-{stream_id}", f"chunk-{i}"))
338 messages.append(make_end_message(f"stream-{stream_id}"))
339
340 mock_reader = MockStreamReader(messages)
341 async def mock_get_connection(pid):
342 return mock_reader, MockStreamWriter()
343
344 arbiter._get_worker_connection = mock_get_connection
345 client_writer = MockStreamWriter()
346
347 request = make_request(f"bench-concurrent-{stream_id}", "benchmark:App", "stream")
348 await arbiter._execute_on_worker(1234, request, client_writer)
349 return len(client_writer.messages)
350
351 gc.collect()
352 start = time.perf_counter()
353
354 # Run streams concurrently
355 tasks = [run_stream(i) for i in range(num_streams)]
356 results_list = await asyncio.gather(*tasks)
357
358 duration = time.perf_counter() - start
359
360 total_chunks = sum(results_list)
361
362 results.add(
363 f"Concurrent streams ({num_streams} streams, {chunks_per_stream} chunks each)",
364 iterations=num_streams,
365 duration=duration,
366 chunks=total_chunks
367 )
368
369 arbiter._cleanup_sync()
370
371
372async def benchmark_memory_stability(results, iterations=10, chunks=1000):

Callers 1

run_full_benchmarksFunction · 0.85

Calls 4

run_streamFunction · 0.85
addMethod · 0.80
_cleanup_syncMethod · 0.80
create_arbiterFunction · 0.70

Tested by

no test coverage detected