Ensure the shared health check loop and thread are running. Args: timeout: Maximum seconds to wait for the loop to start. Raises: RuntimeError: If the loop fails to start within the timeout.
(self, timeout: float = 5.0)
| 196 | future.add_done_callback(on_complete) |
| 197 | |
| 198 | def _ensure_health_check_loop(self, timeout: float = 5.0): |
| 199 | """ |
| 200 | Ensure the shared health check loop and thread are running. |
| 201 | |
| 202 | Args: |
| 203 | timeout: Maximum seconds to wait for the loop to start. |
| 204 | |
| 205 | Raises: |
| 206 | RuntimeError: If the loop fails to start within the timeout. |
| 207 | """ |
| 208 | # Fast path: if loop is already running, return immediately |
| 209 | if self._health_check_loop_ready.is_set(): |
| 210 | with self._lock: |
| 211 | if ( |
| 212 | self._health_check_loop is not None |
| 213 | and self._health_check_loop.is_running() |
| 214 | ): |
| 215 | return |
| 216 | |
| 217 | with self._lock: |
| 218 | # Double-check after acquiring the lock |
| 219 | if ( |
| 220 | self._health_check_loop is not None |
| 221 | and self._health_check_loop.is_running() |
| 222 | ): |
| 223 | return |
| 224 | |
| 225 | # Clear the event - we're about to start a new loop |
| 226 | self._health_check_loop_ready.clear() |
| 227 | |
| 228 | # Create a new event loop for health checks |
| 229 | self._health_check_loop = asyncio.new_event_loop() |
| 230 | self._event_loops.append(self._health_check_loop) |
| 231 | |
| 232 | # Start the loop in a background thread |
| 233 | self._health_check_thread = threading.Thread( |
| 234 | target=self._run_health_check_loop, |
| 235 | daemon=True, |
| 236 | ) |
| 237 | self._health_check_thread.start() |
| 238 | |
| 239 | # Wait for loop to be running INSIDE the lock with a timeout. |
| 240 | # This prevents other threads from trying to create another loop |
| 241 | # before this one is fully started, while avoiding permanent deadlock |
| 242 | # if the background thread fails to start the loop. |
| 243 | if not self._health_check_loop_ready.wait(timeout=timeout): |
| 244 | # Timeout expired - the loop failed to start |
| 245 | # Clean up the failed loop to allow retry |
| 246 | failed_loop = self._health_check_loop |
| 247 | self._health_check_loop = None |
| 248 | if failed_loop in self._event_loops: |
| 249 | self._event_loops.remove(failed_loop) |
| 250 | try: |
| 251 | failed_loop.close() |
| 252 | except Exception: |
| 253 | pass |
| 254 | raise RuntimeError( |
| 255 | f"Health check event loop failed to start within {timeout} seconds" |
no test coverage detected