Execute the authentication flow asynchronously. By default, this defers to `.auth_flow()`. You should override this method when the authentication scheme does I/O and/or uses concurrency primitives.
(
self, request: Request
)
| 85 | break |
| 86 | |
| 87 | async def async_auth_flow( |
| 88 | self, request: Request |
| 89 | ) -> typing.AsyncGenerator[Request, Response]: |
| 90 | """ |
| 91 | Execute the authentication flow asynchronously. |
| 92 | |
| 93 | By default, this defers to `.auth_flow()`. You should override this method |
| 94 | when the authentication scheme does I/O and/or uses concurrency primitives. |
| 95 | """ |
| 96 | if self.requires_request_body: |
| 97 | await request.aread() |
| 98 | |
| 99 | flow = self.auth_flow(request) |
| 100 | request = next(flow) |
| 101 | |
| 102 | while True: |
| 103 | response = yield request |
| 104 | if self.requires_response_body: |
| 105 | await response.aread() |
| 106 | |
| 107 | try: |
| 108 | request = flow.send(response) |
| 109 | except StopIteration: |
| 110 | break |
| 111 | |
| 112 | |
| 113 | class FunctionAuth(Auth): |
no test coverage detected