MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / _setup_pairs

Method _setup_pairs

lib/sqlalchemy/orm/relationships.py:3149–3199  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

3147 return [(x._deannotate(), y._deannotate()) for x, y in collection]
3148
3149 def _setup_pairs(self) -> None:
3150 sync_pairs: _MutableColumnPairs = []
3151 lrp: util.OrderedSet[Tuple[ColumnElement[Any], ColumnElement[Any]]] = (
3152 util.OrderedSet([])
3153 )
3154 secondary_sync_pairs: _MutableColumnPairs = []
3155
3156 def go(
3157 joincond: ColumnElement[bool],
3158 collection: _MutableColumnPairs,
3159 ) -> None:
3160 def visit_binary(
3161 binary: BinaryExpression[Any],
3162 left: ColumnElement[Any],
3163 right: ColumnElement[Any],
3164 ) -> None:
3165 if (
3166 "remote" in right._annotations
3167 and "remote" not in left._annotations
3168 and self.can_be_synced_fn(left)
3169 ):
3170 lrp.add((left, right))
3171 elif (
3172 "remote" in left._annotations
3173 and "remote" not in right._annotations
3174 and self.can_be_synced_fn(right)
3175 ):
3176 lrp.add((right, left))
3177 if binary.operator is operators.eq and self.can_be_synced_fn(
3178 left, right
3179 ):
3180 if "foreign" in right._annotations:
3181 collection.append((left, right))
3182 elif "foreign" in left._annotations:
3183 collection.append((right, left))
3184
3185 visit_binary_product(visit_binary, joincond)
3186
3187 for joincond, collection in [
3188 (self.primaryjoin, sync_pairs),
3189 (self.secondaryjoin, secondary_sync_pairs),
3190 ]:
3191 if joincond is None:
3192 continue
3193 go(joincond, collection)
3194
3195 self.local_remote_pairs = self._deannotate_pairs(lrp)
3196 self.synchronize_pairs = self._deannotate_pairs(sync_pairs)
3197 self.secondary_synchronize_pairs = self._deannotate_pairs(
3198 secondary_sync_pairs
3199 )
3200
3201 _track_overlapping_sync_targets: weakref.WeakKeyDictionary[
3202 ColumnElement[Any],

Callers 1

__init__Method · 0.95

Calls 2

_deannotate_pairsMethod · 0.95
goFunction · 0.70

Tested by

no test coverage detected