(aconn, aconn_cls, dsn, recwarn)
| 222 | @pytest.mark.slow |
| 223 | @skip_free_threaded("warnings are context-local in the free-threaded build >= 3.14") |
| 224 | async def test_generator_and_handler(aconn, aconn_cls, dsn, recwarn): |
| 225 | # NOTE: we don't support generator+handlers anymore. So, if in the future |
| 226 | # this behaviour will change, we will not consider it a regression. However |
| 227 | # we will want to keep the warning check. |
| 228 | |
| 229 | recwarn.clear() |
| 230 | |
| 231 | await aconn.set_autocommit(True) |
| 232 | await aconn.execute("listen foo") |
| 233 | |
| 234 | n1 = None |
| 235 | n2 = None |
| 236 | |
| 237 | def set_n2(n): |
| 238 | nonlocal n2 |
| 239 | n2 = n |
| 240 | |
| 241 | aconn.add_notify_handler(set_n2) |
| 242 | |
| 243 | async def listener(): |
| 244 | nonlocal n1 |
| 245 | async for n1 in aconn.notifies(timeout=1, stop_after=1): |
| 246 | pass |
| 247 | |
| 248 | worker = spawn(listener) |
| 249 | try: |
| 250 | # Make sure the listener is listening |
| 251 | if not aconn.lock.locked(): |
| 252 | await asleep(0.01) |
| 253 | |
| 254 | async with await aconn_cls.connect(dsn, autocommit=True) as nconn: |
| 255 | await nconn.execute("notify foo, '1'") |
| 256 | |
| 257 | finally: |
| 258 | await gather(worker) |
| 259 | |
| 260 | assert n1 |
| 261 | assert n2 |
| 262 | |
| 263 | msg = str(recwarn.pop(RuntimeWarning).message) |
| 264 | assert "notifies()" in msg |
| 265 | |
| 266 | |
| 267 | @pytest.mark.parametrize("query_between", [True, False]) |
nothing calls this directly
no test coverage detected