( # noqa: C901
self, connection: Connection, commands: CommandStackT, raise_on_error
)
| 1866 | return self |
| 1867 | |
| 1868 | async def _execute_transaction( # noqa: C901 |
| 1869 | self, connection: Connection, commands: CommandStackT, raise_on_error |
| 1870 | ): |
| 1871 | pre: CommandT = (("MULTI",), {}) |
| 1872 | post: CommandT = (("EXEC",), {}) |
| 1873 | cmds = (pre, *commands, post) |
| 1874 | all_cmds = connection.pack_commands( |
| 1875 | args for args, options in cmds if EMPTY_RESPONSE not in options |
| 1876 | ) |
| 1877 | await connection.send_packed_command(all_cmds) |
| 1878 | errors = [] |
| 1879 | |
| 1880 | # parse off the response for MULTI |
| 1881 | # NOTE: we need to handle ResponseErrors here and continue |
| 1882 | # so that we read all the additional command messages from |
| 1883 | # the socket |
| 1884 | try: |
| 1885 | await self.parse_response(connection, "_") |
| 1886 | except ResponseError as err: |
| 1887 | errors.append((0, err)) |
| 1888 | |
| 1889 | # and all the other commands |
| 1890 | for i, command in enumerate(commands): |
| 1891 | if EMPTY_RESPONSE in command[1]: |
| 1892 | errors.append((i, command[1][EMPTY_RESPONSE])) |
| 1893 | else: |
| 1894 | try: |
| 1895 | await self.parse_response(connection, "_") |
| 1896 | except ResponseError as err: |
| 1897 | self.annotate_exception(err, i + 1, command[0]) |
| 1898 | errors.append((i, err)) |
| 1899 | |
| 1900 | # parse the EXEC. |
| 1901 | try: |
| 1902 | response = await self.parse_response(connection, "_") |
| 1903 | except ExecAbortError as err: |
| 1904 | if errors: |
| 1905 | raise errors[0][1] from err |
| 1906 | raise |
| 1907 | |
| 1908 | # EXEC clears any watched keys |
| 1909 | self.watching = False |
| 1910 | |
| 1911 | if response is None: |
| 1912 | raise WatchError("Watched variable changed.") from None |
| 1913 | |
| 1914 | # put any parse errors into the response |
| 1915 | for i, e in errors: |
| 1916 | response.insert(i, e) |
| 1917 | |
| 1918 | if len(response) != len(commands): |
| 1919 | if self.connection: |
| 1920 | await self.connection.disconnect() |
| 1921 | raise ResponseError( |
| 1922 | "Wrong number of response items from pipeline execution" |
| 1923 | ) from None |
| 1924 | |
| 1925 | # find any errors in the response and raise if necessary |
nothing calls this directly
no test coverage detected