| 175 | |
| 176 | @contextlib.contextmanager |
| 177 | def _expect_warnings( |
| 178 | exc_cls, |
| 179 | messages, |
| 180 | regex=True, |
| 181 | search_msg=False, |
| 182 | assert_=True, |
| 183 | raise_on_any_unexpected=False, |
| 184 | squelch_other_warnings=False, |
| 185 | ): |
| 186 | global _FILTERS, _SEEN, _EXC_CLS |
| 187 | |
| 188 | if regex or search_msg: |
| 189 | filters = [re.compile(msg, re.I | re.S) for msg in messages] |
| 190 | else: |
| 191 | filters = list(messages) |
| 192 | |
| 193 | if _FILTERS is not None: |
| 194 | # nested call; update _FILTERS and _SEEN, return. outer |
| 195 | # block will assert our messages |
| 196 | assert _SEEN is not None |
| 197 | assert _EXC_CLS is not None |
| 198 | _FILTERS.extend(filters) |
| 199 | _SEEN.update(filters) |
| 200 | _EXC_CLS += (exc_cls,) |
| 201 | yield |
| 202 | else: |
| 203 | seen = _SEEN = set(filters) |
| 204 | _FILTERS = filters |
| 205 | _EXC_CLS = (exc_cls,) |
| 206 | |
| 207 | if raise_on_any_unexpected: |
| 208 | |
| 209 | def real_warn(msg, *arg, **kw): |
| 210 | if isinstance(msg, sa_exc.SATestSuiteWarning): |
| 211 | warnings.warn(msg, *arg, **kw) |
| 212 | else: |
| 213 | raise AssertionError("Got unexpected warning: %r" % msg) |
| 214 | |
| 215 | else: |
| 216 | real_warn = warnings.warn |
| 217 | |
| 218 | def our_warn(msg, *arg, **kw): |
| 219 | if isinstance(msg, _EXC_CLS): |
| 220 | exception = type(msg) |
| 221 | msg = str(msg) |
| 222 | elif arg: |
| 223 | exception = arg[0] |
| 224 | else: |
| 225 | exception = None |
| 226 | |
| 227 | if not exception or not issubclass(exception, _EXC_CLS): |
| 228 | if not squelch_other_warnings: |
| 229 | return real_warn(msg, *arg, **kw) |
| 230 | else: |
| 231 | return |
| 232 | |
| 233 | if not filters and not raise_on_any_unexpected: |
| 234 | return |