Benchmark arbiter message forwarding throughput.
(results, num_chunks=1000)
| 246 | |
| 247 | |
| 248 | async def benchmark_arbiter_forwarding(results, num_chunks=1000): |
| 249 | """Benchmark arbiter message forwarding throughput.""" |
| 250 | arbiter = create_arbiter() |
| 251 | |
| 252 | messages = [] |
| 253 | for i in range(num_chunks): |
| 254 | messages.append(make_chunk_message(f"bench-{i}", f"data-{i}")) |
| 255 | messages.append(make_end_message(f"bench-{num_chunks}")) |
| 256 | |
| 257 | mock_reader = MockStreamReader(messages) |
| 258 | |
| 259 | async def mock_get_connection(pid): |
| 260 | return mock_reader, MockStreamWriter() |
| 261 | |
| 262 | arbiter._get_worker_connection = mock_get_connection |
| 263 | |
| 264 | client_writer = MockStreamWriter() |
| 265 | |
| 266 | gc.collect() |
| 267 | start = time.perf_counter() |
| 268 | |
| 269 | request = make_request("bench-forward", "benchmark:App", "stream") |
| 270 | await arbiter._execute_on_worker(1234, request, client_writer) |
| 271 | |
| 272 | duration = time.perf_counter() - start |
| 273 | |
| 274 | results.add( |
| 275 | f"Arbiter forwarding ({num_chunks} chunks)", |
| 276 | iterations=1, |
| 277 | duration=duration, |
| 278 | chunks=num_chunks, |
| 279 | bytes_total=client_writer.bytes_written |
| 280 | ) |
| 281 | |
| 282 | arbiter._cleanup_sync() |
| 283 | |
| 284 | |
| 285 | async def benchmark_streaming_latency(results, iterations=100): |
no test coverage detected