Async comprehensive load generator that exercises all Redis operations concurrently. Each iteration performs operations concurrently where possible: - Command operations (SET/GET) - triggers COMMAND metrics - PubSub operations (PUBLISH) - triggers PUBSUB metrics - Streaming ope
| 400 | |
| 401 | |
| 402 | class AsyncComprehensiveLoadGenerator: |
| 403 | """ |
| 404 | Async comprehensive load generator that exercises all Redis operations concurrently. |
| 405 | |
| 406 | Each iteration performs operations concurrently where possible: |
| 407 | - Command operations (SET/GET) - triggers COMMAND metrics |
| 408 | - PubSub operations (PUBLISH) - triggers PUBSUB metrics |
| 409 | - Streaming operations (XADD/XREAD) - triggers STREAMING metrics |
| 410 | - Connection pool operations - triggers CONNECTION metrics |
| 411 | |
| 412 | This ensures consistent test conditions across all OTel configurations |
| 413 | while maximizing throughput through concurrent execution. |
| 414 | """ |
| 415 | |
| 416 | def __init__(self, config: LoadGeneratorConfig, redis_module: Any = None): |
| 417 | """ |
| 418 | Initialize the async comprehensive load generator. |
| 419 | |
| 420 | Args: |
| 421 | config: Load generator configuration |
| 422 | redis_module: Optional redis module to use (for baseline testing with |
| 423 | a different redis-py version). If None, imports redis normally. |
| 424 | """ |
| 425 | self.config = config |
| 426 | self.redis_module = redis_module |
| 427 | self.latencies: List[float] = [] |
| 428 | self.errors: int = 0 |
| 429 | self.first_error: Optional[str] = None |
| 430 | self._value = "x" * config.value_size_bytes |
| 431 | self._key_counter = 0 |
| 432 | self._message_counter = 0 |
| 433 | |
| 434 | # Resources (initialized in setup) |
| 435 | self.client = None |
| 436 | self.pubsub_publisher = None |
| 437 | self.pubsub = None |
| 438 | self.stream_name = f"{config.key_prefix}:stream" |
| 439 | self.pubsub_channel = f"{config.key_prefix}:channel" |
| 440 | self.consumer_group = "benchmark_group" |
| 441 | self.consumer_name = "benchmark_consumer" |
| 442 | |
| 443 | # Resource tracking |
| 444 | self.cpu_samples: List[float] = [] |
| 445 | self.memory_samples: List[float] = [] |
| 446 | self._process = psutil.Process() if PSUTIL_AVAILABLE else None |
| 447 | |
| 448 | def _get_redis_module(self) -> Any: |
| 449 | """Get the redis module to use.""" |
| 450 | if self.redis_module is not None: |
| 451 | return self.redis_module |
| 452 | import redis |
| 453 | return redis |
| 454 | |
| 455 | def _get_key(self) -> str: |
| 456 | """Generate a key for the current operation.""" |
| 457 | key = f"{self.config.key_prefix}:{self._key_counter % 1000}" |
| 458 | self._key_counter += 1 |
| 459 | return key |
no outgoing calls
no test coverage detected