| 90 | |
| 91 | |
| 92 | class ContractsManager: |
| 93 | contracts: ClassVar[dict[str, type[Contract]]] = {} |
| 94 | |
| 95 | def __init__(self, contracts: Iterable[type[Contract]]): |
| 96 | for contract in contracts: |
| 97 | self.contracts[contract.name] = contract |
| 98 | |
| 99 | def tested_methods_from_spidercls(self, spidercls: type[Spider]) -> list[str]: |
| 100 | is_method = re.compile(r"^\s*@", re.MULTILINE).search |
| 101 | methods = [] |
| 102 | for key, value in getmembers(spidercls): |
| 103 | if callable(value) and value.__doc__ and is_method(value.__doc__): |
| 104 | methods.append(key) |
| 105 | |
| 106 | return methods |
| 107 | |
| 108 | def extract_contracts(self, method: Callable[..., Any]) -> list[Contract]: |
| 109 | contracts: list[Contract] = [] |
| 110 | assert method.__doc__ is not None |
| 111 | for line_ in method.__doc__.split("\n"): |
| 112 | line = line_.strip() |
| 113 | |
| 114 | if line.startswith("@"): |
| 115 | m = re.match(r"@(\w+)\s*(.*)", line) |
| 116 | if m is None: |
| 117 | continue |
| 118 | name, args = m.groups() |
| 119 | args = re.split(r"\s+", args) |
| 120 | |
| 121 | contracts.append(self.contracts[name](method, *args)) |
| 122 | |
| 123 | return contracts |
| 124 | |
| 125 | def from_spider(self, spider: Spider, results: TestResult) -> list[Request | None]: |
| 126 | requests: list[Request | None] = [] |
| 127 | for method in self.tested_methods_from_spidercls(type(spider)): |
| 128 | bound_method = getattr(spider, method) |
| 129 | try: |
| 130 | requests.append(self.from_method(bound_method, results)) |
| 131 | except Exception: |
| 132 | case = _create_testcase(bound_method, "contract") |
| 133 | results.addError(case, sys.exc_info()) |
| 134 | |
| 135 | return requests |
| 136 | |
| 137 | def from_method( |
| 138 | self, method: Callable[..., Any], results: TestResult |
| 139 | ) -> Request | None: |
| 140 | contracts = self.extract_contracts(method) |
| 141 | if contracts: |
| 142 | request_cls = Request |
| 143 | for contract in contracts: |
| 144 | if contract.request_cls is not None: |
| 145 | request_cls = contract.request_cls |
| 146 | |
| 147 | # calculate request args |
| 148 | args, kwargs = get_spec(request_cls.__init__) |
| 149 |
no outgoing calls