(acc: dict[object, object], delta: dict[object, object])
| 4 | |
| 5 | |
| 6 | def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> dict[object, object]: |
| 7 | for key, delta_value in delta.items(): |
| 8 | if key not in acc: |
| 9 | acc[key] = delta_value |
| 10 | continue |
| 11 | |
| 12 | acc_value = acc[key] |
| 13 | if acc_value is None: |
| 14 | acc[key] = delta_value |
| 15 | continue |
| 16 | |
| 17 | # the `index` property is used in arrays of objects so it should |
| 18 | # not be accumulated like other values e.g. |
| 19 | # [{'foo': 'bar', 'index': 0}] |
| 20 | # |
| 21 | # the same applies to `type` properties as they're used for |
| 22 | # discriminated unions |
| 23 | if key == "index" or key == "type": |
| 24 | acc[key] = delta_value |
| 25 | continue |
| 26 | |
| 27 | if isinstance(acc_value, str) and isinstance(delta_value, str): |
| 28 | acc_value += delta_value |
| 29 | elif isinstance(acc_value, (int, float)) and isinstance(delta_value, (int, float)): |
| 30 | acc_value += delta_value |
| 31 | elif is_dict(acc_value) and is_dict(delta_value): |
| 32 | acc_value = accumulate_delta(acc_value, delta_value) |
| 33 | elif is_list(acc_value) and is_list(delta_value): |
| 34 | # for lists of non-dictionary items we'll only ever get new entries |
| 35 | # in the array, existing entries will never be changed |
| 36 | if all(isinstance(x, (str, int, float)) for x in acc_value): |
| 37 | acc_value.extend(delta_value) |
| 38 | continue |
| 39 | |
| 40 | for delta_entry in delta_value: |
| 41 | if not is_dict(delta_entry): |
| 42 | raise TypeError(f"Unexpected list delta entry is not a dictionary: {delta_entry}") |
| 43 | |
| 44 | try: |
| 45 | index = delta_entry["index"] |
| 46 | except KeyError as exc: |
| 47 | raise RuntimeError(f"Expected list delta entry to have an `index` key; {delta_entry}") from exc |
| 48 | |
| 49 | if not isinstance(index, int): |
| 50 | raise TypeError(f"Unexpected, list delta entry `index` value is not an integer; {index}") |
| 51 | |
| 52 | try: |
| 53 | acc_entry = acc_value[index] |
| 54 | except IndexError: |
| 55 | acc_value.insert(index, delta_entry) |
| 56 | else: |
| 57 | if not is_dict(acc_entry): |
| 58 | raise TypeError("not handled yet") |
| 59 | |
| 60 | acc_value[index] = accumulate_delta(acc_entry, delta_entry) |
| 61 | |
| 62 | acc[key] = acc_value |
| 63 |
no test coverage detected