(
pre_auth_session_id: str,
user_input_code: Union[str, None],
device_id: Union[str, None],
link_code: Union[str, None],
tenant_id: str,
user_context: Dict[str, Any],
session: Optional[SessionContainer] = None,
should_try_linking_with_session_user: Optional[bool] = None,
)
| 68 | original_consume_code_post = original_implementation.consume_code |
| 69 | |
| 70 | async def consume_code_post( |
| 71 | pre_auth_session_id: str, |
| 72 | user_input_code: Union[str, None], |
| 73 | device_id: Union[str, None], |
| 74 | link_code: Union[str, None], |
| 75 | tenant_id: str, |
| 76 | user_context: Dict[str, Any], |
| 77 | session: Optional[SessionContainer] = None, |
| 78 | should_try_linking_with_session_user: Optional[bool] = None, |
| 79 | ): |
| 80 | # First we call the original implementation of consume_code_post. |
| 81 | response = await original_consume_code_post( |
| 82 | pre_auth_session_id, |
| 83 | user_input_code, |
| 84 | device_id, |
| 85 | link_code, |
| 86 | session, |
| 87 | should_try_linking_with_session_user, |
| 88 | tenant_id, |
| 89 | user_context, |
| 90 | ) |
| 91 | |
| 92 | # Post sign up response, we check if it was successful |
| 93 | if isinstance(response, ConsumeCodeOkResult): |
| 94 | payload = { |
| 95 | "uid": response.user.id, |
| 96 | "email": response.user.emails[0], |
| 97 | } |
| 98 | if is_ee(): |
| 99 | await create_accounts(payload) |
| 100 | else: |
| 101 | raise Exception( |
| 102 | "passwordless account creation is not available in OSS." |
| 103 | ) |
| 104 | |
| 105 | return response |
| 106 | |
| 107 | original_implementation.consume_code = consume_code_post # type: ignore |
| 108 | return original_implementation |
nothing calls this directly
no test coverage detected