(self, state: InstanceState[Any], obj: object)
| 4424 | self._after_attach(state, obj) |
| 4425 | |
| 4426 | def _before_attach(self, state: InstanceState[Any], obj: object) -> bool: |
| 4427 | self._autobegin_t() |
| 4428 | |
| 4429 | if state.session_id == self.hash_key: |
| 4430 | return False |
| 4431 | |
| 4432 | if state.session_id and state.session_id in _sessions: |
| 4433 | raise sa_exc.InvalidRequestError( |
| 4434 | "Object '%s' is already attached to session '%s' " |
| 4435 | "(this is '%s')" |
| 4436 | % (state_str(state), state.session_id, self.hash_key) |
| 4437 | ) |
| 4438 | |
| 4439 | self.dispatch.before_attach(self, state) |
| 4440 | |
| 4441 | return True |
| 4442 | |
| 4443 | def _after_attach(self, state: InstanceState[Any], obj: object) -> None: |
| 4444 | state.session_id = self.hash_key |
no test coverage detected