Parse RESP3 FT.AGGREGATE response into an AggregateResult object.
(self, res, **kwargs)
| 482 | ) |
| 483 | |
| 484 | def _parse_aggregate_resp3(self, res, **kwargs): |
| 485 | """Parse RESP3 FT.AGGREGATE response into an AggregateResult object.""" |
| 486 | query = kwargs.get("query") |
| 487 | has_cursor = kwargs.get("has_cursor", False) |
| 488 | |
| 489 | # When has_cursor is True, RESP3 returns [data_dict, cursor_id]. |
| 490 | cursor_id = 0 |
| 491 | if has_cursor and isinstance(res, list): |
| 492 | data = res[0] |
| 493 | cursor_id = res[1] if len(res) > 1 else 0 |
| 494 | else: |
| 495 | data = res |
| 496 | |
| 497 | if data is None: |
| 498 | data = {} |
| 499 | # On RESP3 connections with decode_responses=False the server's map |
| 500 | # keys arrive as bytes, so normalise structural keys to strings |
| 501 | # before lookup. Mirrors ``Result.from_resp3``. |
| 502 | data = {str_if_bytes(k): v for k, v in data.items()} |
| 503 | |
| 504 | warnings = [str_if_bytes(w) for w in data.get("warning", [])] |
| 505 | total = data.get("total_results", 0) |
| 506 | |
| 507 | rows = [] |
| 508 | for result_item in data.get("results", []): |
| 509 | result_item = {str_if_bytes(k): v for k, v in result_item.items()} |
| 510 | extra_attrs = result_item.get("extra_attributes", {}) |
| 511 | # Convert dict to flat list [key, value, key, value, ...] |
| 512 | # to match RESP2 row format consumers expect. |
| 513 | flat = [] |
| 514 | for k, v in extra_attrs.items(): |
| 515 | flat.append(k) |
| 516 | flat.append(v) |
| 517 | rows.append(flat) |
| 518 | |
| 519 | cursor = None |
| 520 | if has_cursor: |
| 521 | if isinstance(query, Cursor): |
| 522 | query.cid = cursor_id |
| 523 | cursor = query |
| 524 | else: |
| 525 | cursor = Cursor(cursor_id) |
| 526 | |
| 527 | return AggregateResult(rows, cursor, None, total=total, warnings=warnings) |
| 528 | |
| 529 | # ---- RESP3 HYBRID parsers ---- |
| 530 |