(
self,
crawler: Crawler,
update_vars: Callable[[dict[str, Any]], None] | None = None,
code: str | None = None,
*,
loop: asyncio.AbstractEventLoop | None = None,
)
| 100 | relevant_classes: tuple[type, ...] = (Crawler, Spider, Request, Response, Settings) |
| 101 | |
| 102 | def __init__( |
| 103 | self, |
| 104 | crawler: Crawler, |
| 105 | update_vars: Callable[[dict[str, Any]], None] | None = None, |
| 106 | code: str | None = None, |
| 107 | *, |
| 108 | loop: asyncio.AbstractEventLoop | None = None, |
| 109 | ): |
| 110 | self._use_reactor = crawler.settings.getbool("TWISTED_REACTOR_ENABLED") |
| 111 | if not self._use_reactor and not loop: # pragma: no cover |
| 112 | raise RuntimeError( |
| 113 | "Shell needs the crawler loop reference when TWISTED_REACTOR_ENABLED=False." |
| 114 | ) |
| 115 | self._loop = loop |
| 116 | self.crawler: Crawler = crawler |
| 117 | self.update_vars: Callable[[dict[str, Any]], None] = update_vars or ( |
| 118 | lambda x: None |
| 119 | ) |
| 120 | self.item_class: type = load_object(crawler.settings["DEFAULT_ITEM_CLASS"]) |
| 121 | self.spider: Spider | None = None |
| 122 | if self._use_reactor: |
| 123 | self._inthread: bool = not threadable.isInIOThread() |
| 124 | else: |
| 125 | try: |
| 126 | # in case there is also a running loop in the main thread |
| 127 | current_loop = asyncio.get_running_loop() |
| 128 | self._inthread = current_loop is not self._loop |
| 129 | except RuntimeError: |
| 130 | self._inthread = True |
| 131 | self.code: str | None = code |
| 132 | self.vars: dict[str, Any] = {} |
| 133 | |
| 134 | @property |
| 135 | def inthread(self) -> bool: # pragma: no cover |
nothing calls this directly
no test coverage detected