MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / ForUpdateArg

Class ForUpdateArg

lib/sqlalchemy/sql/selectable.py:3332–3399  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

3330
3331
3332class ForUpdateArg(ClauseElement):
3333 _traverse_internals: _TraverseInternalsType = [
3334 ("of", InternalTraversal.dp_clauseelement_list),
3335 ("nowait", InternalTraversal.dp_boolean),
3336 ("read", InternalTraversal.dp_boolean),
3337 ("skip_locked", InternalTraversal.dp_boolean),
3338 ("key_share", InternalTraversal.dp_boolean),
3339 ]
3340
3341 of: Optional[Sequence[ClauseElement]]
3342 nowait: bool
3343 read: bool
3344 skip_locked: bool
3345
3346 @classmethod
3347 def _from_argument(
3348 cls, with_for_update: ForUpdateParameter
3349 ) -> Optional[ForUpdateArg]:
3350 if isinstance(with_for_update, ForUpdateArg):
3351 return with_for_update
3352 elif with_for_update in (None, False):
3353 return None
3354 elif with_for_update is True:
3355 return ForUpdateArg()
3356 else:
3357 return ForUpdateArg(**cast("Dict[str, Any]", with_for_update))
3358
3359 def __eq__(self, other: Any) -> bool:
3360 return (
3361 isinstance(other, ForUpdateArg)
3362 and other.nowait == self.nowait
3363 and other.read == self.read
3364 and other.skip_locked == self.skip_locked
3365 and other.key_share == self.key_share
3366 and other.of is self.of
3367 )
3368
3369 def __ne__(self, other: Any) -> bool:
3370 return not self.__eq__(other)
3371
3372 def __hash__(self) -> int:
3373 return id(self)
3374
3375 def __init__(
3376 self,
3377 *,
3378 nowait: bool = False,
3379 read: bool = False,
3380 of: Optional[_ForUpdateOfArgument] = None,
3381 skip_locked: bool = False,
3382 key_share: bool = False,
3383 ):
3384 """Represents arguments specified to
3385 :meth:`_expression.Select.for_update`.
3386
3387 """
3388
3389 self.nowait = nowait

Callers 4

_from_argumentMethod · 0.85
with_for_updateMethod · 0.85
with_for_updateMethod · 0.85

Calls

no outgoing calls

Tested by 1