Benchmark time-to-first-chunk and time-to-last-chunk.
(results, iterations=100)
| 283 | |
| 284 | |
| 285 | async def benchmark_streaming_latency(results, iterations=100): |
| 286 | """Benchmark time-to-first-chunk and time-to-last-chunk.""" |
| 287 | worker = create_worker() |
| 288 | |
| 289 | first_chunk_times = [] |
| 290 | total_times = [] |
| 291 | |
| 292 | for _ in range(iterations): |
| 293 | writer = MockStreamWriter() |
| 294 | |
| 295 | async def gen_3_chunks(): |
| 296 | yield "first" |
| 297 | yield "second" |
| 298 | yield "third" |
| 299 | |
| 300 | async def mock_execute(app_path, action, args, kwargs): |
| 301 | return gen_3_chunks() |
| 302 | |
| 303 | start = time.perf_counter() |
| 304 | |
| 305 | with mock.patch.object(worker, 'execute', side_effect=mock_execute): |
| 306 | request = make_request("bench-latency", "benchmark:App", "stream") |
| 307 | await worker.handle_request(request, writer) |
| 308 | |
| 309 | # Find time when first chunk was received |
| 310 | if writer.messages: |
| 311 | first_chunk_times.append(time.perf_counter() - start) |
| 312 | |
| 313 | total_times.append(time.perf_counter() - start) |
| 314 | |
| 315 | avg_first_chunk = sum(first_chunk_times) / len(first_chunk_times) if first_chunk_times else 0 |
| 316 | avg_total = sum(total_times) / len(total_times) |
| 317 | |
| 318 | print(f"\nLatency Results ({iterations} iterations):") |
| 319 | print(f" Avg time-to-first-chunk: {avg_first_chunk * 1000:.3f}ms") |
| 320 | print(f" Avg time-to-last-chunk: {avg_total * 1000:.3f}ms") |
| 321 | |
| 322 | results.add( |
| 323 | f"Streaming latency ({iterations} iterations)", |
| 324 | iterations=iterations, |
| 325 | duration=sum(total_times), |
| 326 | chunks=iterations * 3 |
| 327 | ) |
| 328 | |
| 329 | |
| 330 | async def benchmark_concurrent_streams(results, num_streams=10, chunks_per_stream=100): |
no test coverage detected