(
self,
cte: CTE,
asfrom: bool = False,
ashint: bool = False,
fromhints: Optional[_FromHintsType] = None,
visiting_cte: Optional[CTE] = None,
from_linter: Optional[FromLinter] = None,
cte_opts: selectable._CTEOpts = selectable._CTEOpts(False),
**kwargs: Any,
)
| 4232 | cte._compiler_dispatch(self, cte_opts=opt, **local_kw) |
| 4233 | |
| 4234 | def visit_cte( |
| 4235 | self, |
| 4236 | cte: CTE, |
| 4237 | asfrom: bool = False, |
| 4238 | ashint: bool = False, |
| 4239 | fromhints: Optional[_FromHintsType] = None, |
| 4240 | visiting_cte: Optional[CTE] = None, |
| 4241 | from_linter: Optional[FromLinter] = None, |
| 4242 | cte_opts: selectable._CTEOpts = selectable._CTEOpts(False), |
| 4243 | **kwargs: Any, |
| 4244 | ) -> Optional[str]: |
| 4245 | self_ctes = self._init_cte_state() |
| 4246 | assert self_ctes is self.ctes |
| 4247 | |
| 4248 | kwargs["visiting_cte"] = cte |
| 4249 | |
| 4250 | cte_name = cte.name |
| 4251 | |
| 4252 | if isinstance(cte_name, elements._truncated_label): |
| 4253 | cte_name = self._truncated_identifier("alias", cte_name) |
| 4254 | |
| 4255 | is_new_cte = True |
| 4256 | embedded_in_current_named_cte = False |
| 4257 | |
| 4258 | _reference_cte = cte._get_reference_cte() |
| 4259 | |
| 4260 | nesting = cte.nesting or cte_opts.nesting |
| 4261 | |
| 4262 | # check for CTE already encountered |
| 4263 | if _reference_cte in self.level_name_by_cte: |
| 4264 | cte_level, _, existing_cte_opts = self.level_name_by_cte[ |
| 4265 | _reference_cte |
| 4266 | ] |
| 4267 | assert _ == cte_name |
| 4268 | |
| 4269 | cte_level_name = (cte_level, cte_name) |
| 4270 | existing_cte = self.ctes_by_level_name[cte_level_name] |
| 4271 | |
| 4272 | # check if we are receiving it here with a specific |
| 4273 | # "nest_here" location; if so, move it to this location |
| 4274 | |
| 4275 | if cte_opts.nesting: |
| 4276 | if existing_cte_opts.nesting: |
| 4277 | raise exc.CompileError( |
| 4278 | "CTE is stated as 'nest_here' in " |
| 4279 | "more than one location" |
| 4280 | ) |
| 4281 | |
| 4282 | old_level_name = (cte_level, cte_name) |
| 4283 | cte_level = len(self.stack) if nesting else 1 |
| 4284 | cte_level_name = new_level_name = (cte_level, cte_name) |
| 4285 | |
| 4286 | del self.ctes_by_level_name[old_level_name] |
| 4287 | self.ctes_by_level_name[new_level_name] = existing_cte |
| 4288 | self.level_name_by_cte[_reference_cte] = new_level_name + ( |
| 4289 | cte_opts, |
| 4290 | ) |
| 4291 |
nothing calls this directly
no test coverage detected