Raise :exc:`RuntimeError` if the even loop of the installed :class:`~twisted.internet.asyncioreactor.AsyncioSelectorReactor` does not match the specified import path or if no reactor is installed.
(loop_path: str)
| 185 | |
| 186 | |
| 187 | def verify_installed_asyncio_event_loop(loop_path: str) -> None: |
| 188 | """Raise :exc:`RuntimeError` if the even loop of the installed |
| 189 | :class:`~twisted.internet.asyncioreactor.AsyncioSelectorReactor` |
| 190 | does not match the specified import path or if no reactor is installed.""" |
| 191 | if not is_reactor_installed(): |
| 192 | raise RuntimeError( |
| 193 | "verify_installed_asyncio_event_loop() called without an installed reactor." |
| 194 | ) |
| 195 | |
| 196 | from twisted.internet import reactor |
| 197 | |
| 198 | loop_class = load_object(loop_path) |
| 199 | if isinstance(reactor._asyncioEventloop, loop_class): |
| 200 | return |
| 201 | installed = ( |
| 202 | f"{reactor._asyncioEventloop.__class__.__module__}" |
| 203 | f".{reactor._asyncioEventloop.__class__.__qualname__}" |
| 204 | ) |
| 205 | raise RuntimeError( |
| 206 | "Scrapy found an asyncio Twisted reactor already " |
| 207 | f"installed, and its event loop class ({installed}) does " |
| 208 | "not match the one specified in the ASYNCIO_EVENT_LOOP " |
| 209 | f"setting ({global_object_name(loop_class)})" |
| 210 | ) |
| 211 | |
| 212 | |
| 213 | def is_reactor_installed() -> bool: |
no test coverage detected