| 289 | @pytest.mark.parametrize("sleep_on", ["server", "client"]) |
| 290 | @pytest.mark.parametrize("listen_by", ["callback", "generator"]) |
| 291 | def test_notify_query_notify(conn_cls, dsn, sleep_on, listen_by): |
| 292 | e = Event() |
| 293 | notifies: list[int] = [] |
| 294 | workers = [] |
| 295 | |
| 296 | def notifier(): |
| 297 | with conn_cls.connect(dsn, autocommit=True) as conn: |
| 298 | sleep(0.1) |
| 299 | for i in range(3): |
| 300 | conn.execute("select pg_notify('counter', %s)", (str(i),)) |
| 301 | sleep(0.2) |
| 302 | |
| 303 | def nap(conn): |
| 304 | if sleep_on == "server": |
| 305 | conn.execute("select pg_sleep(0.2)") |
| 306 | else: |
| 307 | assert sleep_on == "client" |
| 308 | sleep(0.2) |
| 309 | |
| 310 | if listen_by == "callback": |
| 311 | |
| 312 | def listener(): |
| 313 | with conn_cls.connect(dsn, autocommit=True) as conn: |
| 314 | conn.add_notify_handler(lambda n: notifies.append(int(n.payload))) |
| 315 | |
| 316 | conn.execute("listen counter") |
| 317 | e.set() |
| 318 | |
| 319 | nap(conn) |
| 320 | conn.execute("") |
| 321 | nap(conn) |
| 322 | conn.execute("") |
| 323 | nap(conn) |
| 324 | conn.execute("") |
| 325 | |
| 326 | else: |
| 327 | |
| 328 | def listener(): |
| 329 | with conn_cls.connect(dsn, autocommit=True) as conn: |
| 330 | conn.execute("listen counter") |
| 331 | e.set() |
| 332 | for n in conn.notifies(timeout=0.2): |
| 333 | notifies.append(int(n.payload)) |
| 334 | |
| 335 | nap(conn) |
| 336 | |
| 337 | for n in conn.notifies(timeout=0.2): |
| 338 | notifies.append(int(n.payload)) |
| 339 | |
| 340 | workers.append(spawn(listener)) |
| 341 | e.wait() |
| 342 | workers.append(spawn(notifier)) |
| 343 | gather(*workers) |
| 344 | |
| 345 | assert notifies == list(range(3)) |