Set up all Redis resources.
(self)
| 459 | return key |
| 460 | |
| 461 | async def setup(self) -> None: |
| 462 | """Set up all Redis resources.""" |
| 463 | redis_mod = self._get_redis_module() |
| 464 | Redis = redis_mod.asyncio.Redis |
| 465 | |
| 466 | # Main client for commands |
| 467 | self.client = Redis( |
| 468 | host=self.config.redis_host, |
| 469 | port=self.config.redis_port, |
| 470 | decode_responses=True |
| 471 | ) |
| 472 | |
| 473 | # PubSub publisher (separate connection) |
| 474 | self.pubsub_publisher = Redis( |
| 475 | host=self.config.redis_host, |
| 476 | port=self.config.redis_port, |
| 477 | decode_responses=True |
| 478 | ) |
| 479 | |
| 480 | # PubSub subscriber |
| 481 | subscriber = Redis( |
| 482 | host=self.config.redis_host, |
| 483 | port=self.config.redis_port, |
| 484 | decode_responses=True |
| 485 | ) |
| 486 | self.pubsub = subscriber.pubsub() |
| 487 | await self.pubsub.subscribe(self.pubsub_channel) |
| 488 | # Consume subscription confirmation |
| 489 | await self.pubsub.get_message(timeout=1.0) |
| 490 | |
| 491 | # Create stream and consumer group |
| 492 | try: |
| 493 | await self.client.xgroup_create( |
| 494 | self.stream_name, |
| 495 | self.consumer_group, |
| 496 | id="0", |
| 497 | mkstream=True |
| 498 | ) |
| 499 | except Exception: |
| 500 | # Group may already exist |
| 501 | pass |
| 502 | |
| 503 | async def teardown(self) -> None: |
| 504 | """Clean up all Redis resources.""" |
no test coverage detected