(self)
| 131 | self._drain_complete_event = self._event() |
| 132 | |
| 133 | def run(self): |
| 134 | self._started.set() |
| 135 | |
| 136 | try: |
| 137 | while not self._stopped.is_set(): |
| 138 | try: |
| 139 | self.result_consumer.drain_events(timeout=1) |
| 140 | self._send_drain_complete_event() |
| 141 | except socket.timeout: |
| 142 | pass |
| 143 | except OSError: |
| 144 | # Recoverable connection errors (e.g. broker restart) |
| 145 | # are handled inside drain_events via reconnection. |
| 146 | # If something still leaks through, we log, back off |
| 147 | # briefly, and retry instead of spinning hot. |
| 148 | logging.warning( |
| 149 | 'Drainer: connection error during drain_events, ' |
| 150 | 'will retry on next loop iteration.', |
| 151 | exc_info=True, |
| 152 | ) |
| 153 | time.sleep(1) |
| 154 | except Exception as e: |
| 155 | self._exc = e |
| 156 | raise |
| 157 | finally: |
| 158 | self._send_drain_complete_event() |
| 159 | try: |
| 160 | self._shutdown.set() |
| 161 | except RuntimeError as e: |
| 162 | logging.error(f"Failed to set shutdown event: {e}") |
| 163 | |
| 164 | def start(self): |
| 165 | self._ensure_not_shut_down() |
nothing calls this directly
no test coverage detected