(conn_cls, conn, dsn)
| 154 | @pytest.mark.slow |
| 155 | @pytest.mark.timing |
| 156 | def test_stop_after(conn_cls, conn, dsn): |
| 157 | conn.set_autocommit(True) |
| 158 | conn.execute("listen foo") |
| 159 | |
| 160 | def notifier(): |
| 161 | with conn_cls.connect(dsn, autocommit=True) as nconn: |
| 162 | nconn.execute("notify foo, '1'") |
| 163 | sleep(0.1) |
| 164 | nconn.execute("notify foo, '2'") |
| 165 | sleep(0.1) |
| 166 | nconn.execute("notify foo, '3'") |
| 167 | |
| 168 | worker = spawn(notifier) |
| 169 | try: |
| 170 | ns = list(conn.notifies(timeout=1.0, stop_after=2)) |
| 171 | assert len(ns) == 2 |
| 172 | assert ns[0].payload == "1" |
| 173 | assert ns[1].payload == "2" |
| 174 | finally: |
| 175 | gather(worker) |
| 176 | |
| 177 | ns = list(conn.notifies(timeout=0.0)) |
| 178 | assert len(ns) == 1 |
| 179 | assert ns[0].payload == "3" |
| 180 | |
| 181 | |
| 182 | @pytest.mark.timing |
nothing calls this directly
no test coverage detected