MCPcopy
hub / github.com/redis/redis-py / AsyncComprehensiveLoadGenerator

Class AsyncComprehensiveLoadGenerator

benchmarks/otel_benchmark.py:402–699  ·  view source on GitHub ↗

Async comprehensive load generator that exercises all Redis operations concurrently. Each iteration performs operations concurrently where possible: - Command operations (SET/GET) - triggers COMMAND metrics - PubSub operations (PUBLISH) - triggers PUBSUB metrics - Streaming ope

Source from the content-addressed store, hash-verified

400
401
402class AsyncComprehensiveLoadGenerator:
403 """
404 Async comprehensive load generator that exercises all Redis operations concurrently.
405
406 Each iteration performs operations concurrently where possible:
407 - Command operations (SET/GET) - triggers COMMAND metrics
408 - PubSub operations (PUBLISH) - triggers PUBSUB metrics
409 - Streaming operations (XADD/XREAD) - triggers STREAMING metrics
410 - Connection pool operations - triggers CONNECTION metrics
411
412 This ensures consistent test conditions across all OTel configurations
413 while maximizing throughput through concurrent execution.
414 """
415
416 def __init__(self, config: LoadGeneratorConfig, redis_module: Any = None):
417 """
418 Initialize the async comprehensive load generator.
419
420 Args:
421 config: Load generator configuration
422 redis_module: Optional redis module to use (for baseline testing with
423 a different redis-py version). If None, imports redis normally.
424 """
425 self.config = config
426 self.redis_module = redis_module
427 self.latencies: List[float] = []
428 self.errors: int = 0
429 self.first_error: Optional[str] = None
430 self._value = "x" * config.value_size_bytes
431 self._key_counter = 0
432 self._message_counter = 0
433
434 # Resources (initialized in setup)
435 self.client = None
436 self.pubsub_publisher = None
437 self.pubsub = None
438 self.stream_name = f"{config.key_prefix}:stream"
439 self.pubsub_channel = f"{config.key_prefix}:channel"
440 self.consumer_group = "benchmark_group"
441 self.consumer_name = "benchmark_consumer"
442
443 # Resource tracking
444 self.cpu_samples: List[float] = []
445 self.memory_samples: List[float] = []
446 self._process = psutil.Process() if PSUTIL_AVAILABLE else None
447
448 def _get_redis_module(self) -> Any:
449 """Get the redis module to use."""
450 if self.redis_module is not None:
451 return self.redis_module
452 import redis
453 return redis
454
455 def _get_key(self) -> str:
456 """Generate a key for the current operation."""
457 key = f"{self.config.key_prefix}:{self._key_counter % 1000}"
458 self._key_counter += 1
459 return key

Callers 2

run_iteration_asyncFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected