Verify that the results of a query are valid. Verify that the query returned at least one result and that they all represent a valid result from the database.
(self, results: list[PGresult])
| 466 | return pgq |
| 467 | |
| 468 | def _check_results(self, results: list[PGresult]) -> None: |
| 469 | """ |
| 470 | Verify that the results of a query are valid. |
| 471 | |
| 472 | Verify that the query returned at least one result and that they all |
| 473 | represent a valid result from the database. |
| 474 | """ |
| 475 | if not results: |
| 476 | raise e.InternalError("got no result from the query") |
| 477 | |
| 478 | for res in results: |
| 479 | if ( |
| 480 | (status := res.status) != TUPLES_OK |
| 481 | and status != COMMAND_OK |
| 482 | and status != EMPTY_QUERY |
| 483 | and status != COPY_BOTH |
| 484 | ): |
| 485 | self._raise_for_result(res) |
| 486 | |
| 487 | def _raise_for_result(self, result: PGresult) -> NoReturn: |
| 488 | """ |
no test coverage detected