(self, keys: Sequence[_KeyIndexType])
| 279 | ) |
| 280 | |
| 281 | def _reduce(self, keys: Sequence[_KeyIndexType]) -> Self: |
| 282 | recs = list(self._metadata_for_keys(keys)) |
| 283 | |
| 284 | indexes = [rec[MD_INDEX] for rec in recs] |
| 285 | new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs] |
| 286 | |
| 287 | if self._translated_indexes: |
| 288 | indexes = [self._translated_indexes[idx] for idx in indexes] |
| 289 | tup = tuplegetter(*indexes) |
| 290 | new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)] |
| 291 | |
| 292 | keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs} |
| 293 | # TODO: need unit test for: |
| 294 | # result = connection.execute("raw sql, no columns").scalars() |
| 295 | # without the "or ()" it's failing because MD_OBJECTS is None |
| 296 | keymap.update( |
| 297 | (e, new_rec) |
| 298 | for new_rec in new_recs |
| 299 | for e in new_rec[MD_OBJECTS] or () |
| 300 | ) |
| 301 | |
| 302 | return self._make_new_metadata( |
| 303 | unpickled=self._unpickled, |
| 304 | processors=self._processors, |
| 305 | keys=new_keys, |
| 306 | tuplefilter=tup, |
| 307 | translated_indexes=indexes, |
| 308 | keymap=keymap, # type: ignore[arg-type] |
| 309 | safe_for_cache=self._safe_for_cache, |
| 310 | keymap_by_result_column_idx=self._keymap_by_result_column_idx, |
| 311 | ) |
| 312 | |
| 313 | def _adapt_to_context(self, context: ExecutionContext) -> Self: |
| 314 | """When using a cached Compiled construct that has a _result_map, |
nothing calls this directly
no test coverage detected