Clean up all Redis resources.
(self)
| 203 | self.csc_client = None |
| 204 | |
| 205 | def teardown(self) -> None: |
| 206 | """Clean up all Redis resources.""" |
| 207 | if self.pubsub: |
| 208 | try: |
| 209 | self.pubsub.unsubscribe() |
| 210 | self.pubsub.close() |
| 211 | except Exception: |
| 212 | pass |
| 213 | |
| 214 | if self.pubsub_publisher: |
| 215 | try: |
| 216 | self.pubsub_publisher.close() |
| 217 | except Exception: |
| 218 | pass |
| 219 | |
| 220 | if self.client: |
| 221 | try: |
| 222 | self.client.delete(self.stream_name) |
| 223 | except Exception: |
| 224 | pass |
| 225 | try: |
| 226 | self.client.close() |
| 227 | except Exception: |
| 228 | pass |
| 229 | |
| 230 | if self.csc_client: |
| 231 | try: |
| 232 | self.csc_client.close() |
| 233 | except Exception: |
| 234 | pass |
| 235 | |
| 236 | def _run_operation(self) -> float: |
| 237 | """ |
no test coverage detected