(conn_cls, conn, dsn)
| 181 | |
| 182 | @pytest.mark.timing |
| 183 | def test_stop_after_batch(conn_cls, conn, dsn): |
| 184 | conn.set_autocommit(True) |
| 185 | conn.execute("listen foo") |
| 186 | |
| 187 | def notifier(): |
| 188 | with conn_cls.connect(dsn, autocommit=True) as nconn: |
| 189 | with nconn.transaction(): |
| 190 | nconn.execute("notify foo, '1'") |
| 191 | nconn.execute("notify foo, '2'") |
| 192 | |
| 193 | worker = spawn(notifier) |
| 194 | try: |
| 195 | ns = list(conn.notifies(timeout=1.0, stop_after=1)) |
| 196 | assert len(ns) == 2 |
| 197 | assert ns[0].payload == "1" |
| 198 | assert ns[1].payload == "2" |
| 199 | finally: |
| 200 | gather(worker) |
| 201 | |
| 202 | |
| 203 | @pytest.mark.slow |
nothing calls this directly
no test coverage detected