(
self,
state: InstanceState[Any],
attribute_names: Optional[Iterable[str]],
)
| 3376 | self._expire_state(state, attribute_names) |
| 3377 | |
| 3378 | def _expire_state( |
| 3379 | self, |
| 3380 | state: InstanceState[Any], |
| 3381 | attribute_names: Optional[Iterable[str]], |
| 3382 | ) -> None: |
| 3383 | self._validate_persistent(state) |
| 3384 | if attribute_names: |
| 3385 | state._expire_attributes(state.dict, attribute_names) |
| 3386 | else: |
| 3387 | # pre-fetch the full cascade since the expire is going to |
| 3388 | # remove associations |
| 3389 | cascaded = list( |
| 3390 | state.manager.mapper.cascade_iterator("refresh-expire", state) |
| 3391 | ) |
| 3392 | self._conditional_expire(state) |
| 3393 | for o, m, st_, dct_ in cascaded: |
| 3394 | self._conditional_expire(st_) |
| 3395 | |
| 3396 | def _conditional_expire( |
| 3397 | self, state: InstanceState[Any], autoflush: Optional[bool] = None |
no test coverage detected