(self, counter: int)
| 239 | should_exit = await self.on_tick(counter) |
| 240 | |
| 241 | async def on_tick(self, counter: int) -> bool: |
| 242 | # Update the default headers, once per second. |
| 243 | if counter % 10 == 0: |
| 244 | current_time = time.time() |
| 245 | current_date = formatdate(current_time, usegmt=True).encode() |
| 246 | |
| 247 | if self.config.date_header: |
| 248 | date_header = [(b"date", current_date)] |
| 249 | else: |
| 250 | date_header = [] |
| 251 | |
| 252 | self.server_state.default_headers = date_header + self.config.encoded_headers |
| 253 | |
| 254 | # Callback to `callback_notify` once every `timeout_notify` seconds. |
| 255 | if self.config.callback_notify is not None: |
| 256 | if current_time - self.last_notified > self.config.timeout_notify: # pragma: full coverage |
| 257 | self.last_notified = current_time |
| 258 | await self.config.callback_notify() |
| 259 | |
| 260 | # Determine if we should exit. |
| 261 | if self.should_exit: |
| 262 | return True |
| 263 | |
| 264 | max_requests = self.limit_max_requests |
| 265 | if max_requests is not None and self.server_state.total_requests >= max_requests: |
| 266 | logger.info("Maximum request limit of %d exceeded. Terminating process.", max_requests) |
| 267 | return True |
| 268 | |
| 269 | return False |
| 270 | |
| 271 | async def shutdown(self, sockets: list[socket.socket] | None = None) -> None: |
| 272 | logger.info("Shutting down") |
no test coverage detected