(self)
| 3835 | ) |
| 3836 | |
| 3837 | async def on_slots_changed(self) -> None: |
| 3838 | # Observer hook invoked by NodesManager after a slots-cache refresh. |
| 3839 | # Schedule reconciliation as a separate task so the caller's code |
| 3840 | # path (typically MovedError handling in _execute_command) is not |
| 3841 | # blocked on the network I/O performed by reinitialize_shard_ |
| 3842 | # subscriptions. No-op when there are no shard subscriptions to |
| 3843 | # reconcile. |
| 3844 | if not self.shard_channels: |
| 3845 | return |
| 3846 | task = asyncio.create_task(self.reinitialize_shard_subscriptions()) |
| 3847 | self._reconcile_tasks.add(task) |
| 3848 | task.add_done_callback(self._reconcile_tasks.discard) |
| 3849 | # Consume the task's exception (if any) so Python does not emit a |
| 3850 | # "Task exception was never retrieved" warning. reinitialize_shard_ |
| 3851 | # subscriptions surfaces SlotNotCoveredError when a slot is still |
| 3852 | # transiently uncovered; route it through the same logger channel |
| 3853 | # as sync ClusterPubSubSlotsCacheListener for consistent observability. |
| 3854 | task.add_done_callback(self._log_reconcile_task_exception) |
| 3855 | |
| 3856 | @staticmethod |
| 3857 | def _log_reconcile_task_exception(task: "asyncio.Task") -> None: |
no test coverage detected