Execute the authentication flow synchronously. By default, this defers to `.auth_flow()`. You should override this method when the authentication scheme does I/O and/or uses concurrency primitives.
(
self, request: Request
)
| 60 | yield request |
| 61 | |
| 62 | def sync_auth_flow( |
| 63 | self, request: Request |
| 64 | ) -> typing.Generator[Request, Response, None]: |
| 65 | """ |
| 66 | Execute the authentication flow synchronously. |
| 67 | |
| 68 | By default, this defers to `.auth_flow()`. You should override this method |
| 69 | when the authentication scheme does I/O and/or uses concurrency primitives. |
| 70 | """ |
| 71 | if self.requires_request_body: |
| 72 | request.read() |
| 73 | |
| 74 | flow = self.auth_flow(request) |
| 75 | request = next(flow) |
| 76 | |
| 77 | while True: |
| 78 | response = yield request |
| 79 | if self.requires_response_body: |
| 80 | response.read() |
| 81 | |
| 82 | try: |
| 83 | request = flow.send(response) |
| 84 | except StopIteration: |
| 85 | break |
| 86 | |
| 87 | async def async_auth_flow( |
| 88 | self, request: Request |
no test coverage detected