(pgconn: PGconn)
| 292 | |
| 293 | |
| 294 | def notifies(pgconn: PGconn) -> PQGen[list[pq.PGnotify]]: |
| 295 | yield WAIT_R |
| 296 | pgconn.consume_input() |
| 297 | |
| 298 | ns = [] |
| 299 | while n := pgconn.notifies(): |
| 300 | ns.append(n) |
| 301 | if pgconn.notify_handler: |
| 302 | pgconn.notify_handler(n) |
| 303 | |
| 304 | return ns |
| 305 | |
| 306 | |
| 307 | def copy_from(pgconn: PGconn) -> PQGen[memoryview | PGresult]: |
no test coverage detected