(
self,
dict_: _InstanceDict,
attr: Optional[_AttributeImpl],
previous: Any,
collection: bool = False,
is_userland: bool = False,
)
| 893 | ) |
| 894 | |
| 895 | def _modified_event( |
| 896 | self, |
| 897 | dict_: _InstanceDict, |
| 898 | attr: Optional[_AttributeImpl], |
| 899 | previous: Any, |
| 900 | collection: bool = False, |
| 901 | is_userland: bool = False, |
| 902 | ) -> None: |
| 903 | if attr: |
| 904 | if not attr.send_modified_events: |
| 905 | return |
| 906 | if is_userland and attr.key not in dict_: |
| 907 | raise sa_exc.InvalidRequestError( |
| 908 | "Can't flag attribute '%s' modified; it's not present in " |
| 909 | "the object state" % attr.key |
| 910 | ) |
| 911 | if attr.key not in self.committed_state or is_userland: |
| 912 | if collection: |
| 913 | if TYPE_CHECKING: |
| 914 | assert is_collection_impl(attr) |
| 915 | if previous is NEVER_SET: |
| 916 | if attr.key in dict_: |
| 917 | previous = dict_[attr.key] |
| 918 | |
| 919 | if previous not in (None, NO_VALUE, NEVER_SET): |
| 920 | previous = attr.copy(previous) |
| 921 | self.committed_state[attr.key] = previous |
| 922 | |
| 923 | lkv = self._last_known_values |
| 924 | if lkv is not None and attr.key in lkv: |
| 925 | lkv[attr.key] = NO_VALUE |
| 926 | |
| 927 | # assert self._strong_obj is None or self.modified |
| 928 | |
| 929 | if (self.session_id and self._strong_obj is None) or not self.modified: |
| 930 | self.modified = True |
| 931 | instance_dict = self._instance_dict() |
| 932 | if instance_dict: |
| 933 | has_modified = bool(instance_dict._modified) |
| 934 | instance_dict._modified.add(self) |
| 935 | else: |
| 936 | has_modified = False |
| 937 | |
| 938 | # only create _strong_obj link if attached |
| 939 | # to a session |
| 940 | |
| 941 | inst = self.obj() |
| 942 | if self.session_id: |
| 943 | self._strong_obj = inst |
| 944 | |
| 945 | # if identity map already had modified objects, |
| 946 | # assume autobegin already occurred, else check |
| 947 | # for autobegin |
| 948 | if not has_modified: |
| 949 | # inline of autobegin, to ensure session transaction |
| 950 | # snapshot is established |
| 951 | try: |
| 952 | session = _sessions[self.session_id] |
no test coverage detected