Run a benchmark with specified parameters. Each concurrent worker maintains a persistent connection to the arbiter and makes sequential requests. This simulates how real HTTP workers use the dirty client (one connection per worker thread). Args:
(
self,
action: str,
args: tuple = (),
kwargs: dict = None,
total_requests: int = 1000,
concurrency: int = 10,
timeout: float = 30.0,
)
| 282 | client.execute(BENCHMARK_APP, "health") |
| 283 | |
| 284 | def run_benchmark( |
| 285 | self, |
| 286 | action: str, |
| 287 | args: tuple = (), |
| 288 | kwargs: dict = None, |
| 289 | total_requests: int = 1000, |
| 290 | concurrency: int = 10, |
| 291 | timeout: float = 30.0, |
| 292 | ) -> tuple[list[float], list[str]]: |
| 293 | """ |
| 294 | Run a benchmark with specified parameters. |
| 295 | |
| 296 | Each concurrent worker maintains a persistent connection to the arbiter |
| 297 | and makes sequential requests. This simulates how real HTTP workers |
| 298 | use the dirty client (one connection per worker thread). |
| 299 | |
| 300 | Args: |
| 301 | action: Action to call on the benchmark app |
| 302 | args: Positional arguments for the action |
| 303 | kwargs: Keyword arguments for the action |
| 304 | total_requests: Total number of requests to make |
| 305 | concurrency: Number of concurrent clients |
| 306 | timeout: Timeout per request in seconds |
| 307 | |
| 308 | Returns: |
| 309 | Tuple of (latencies in ms, error messages) |
| 310 | """ |
| 311 | kwargs = kwargs or {} |
| 312 | latencies = [] |
| 313 | errors = [] |
| 314 | lock = threading.Lock() |
| 315 | |
| 316 | # Calculate requests per worker |
| 317 | requests_per_worker = total_requests // concurrency |
| 318 | remainder = total_requests % concurrency |
| 319 | |
| 320 | def worker_task(num_requests: int) -> None: |
| 321 | """Worker that makes sequential requests on a persistent connection.""" |
| 322 | worker_latencies = [] |
| 323 | worker_errors = [] |
| 324 | |
| 325 | try: |
| 326 | client = DirtyClient(self.socket_path, timeout=timeout) |
| 327 | client.connect() |
| 328 | |
| 329 | for _ in range(num_requests): |
| 330 | try: |
| 331 | start = time.perf_counter() |
| 332 | client.execute(BENCHMARK_APP, action, *args, **kwargs) |
| 333 | elapsed = (time.perf_counter() - start) * 1000 |
| 334 | worker_latencies.append(elapsed) |
| 335 | except Exception as e: |
| 336 | worker_errors.append(str(e)) |
| 337 | # Reconnect on error |
| 338 | try: |
| 339 | client.close() |
| 340 | client = DirtyClient(self.socket_path, timeout=timeout) |
| 341 | client.connect() |
no outgoing calls
no test coverage detected