| 997 | active_writes.discard(fd) |
| 998 | |
| 999 | def flush(self): |
| 1000 | if self._state == TERMINATE: |
| 1001 | return |
| 1002 | # cancel all tasks that haven't been accepted so that NACK is sent |
| 1003 | # if synack is enabled, otherwise discard them from the cache |
| 1004 | # since they will be redelivered by the broker. |
| 1005 | for job in tuple(self._cache.values()): |
| 1006 | if not job._accepted: |
| 1007 | if self.synack: |
| 1008 | job._cancel() |
| 1009 | else: |
| 1010 | job.discard() |
| 1011 | |
| 1012 | # clear the outgoing buffer as the tasks will be redelivered by |
| 1013 | # the broker anyway. |
| 1014 | if self.outbound_buffer: |
| 1015 | self.outbound_buffer.clear() |
| 1016 | |
| 1017 | self.maintain_pool() |
| 1018 | |
| 1019 | try: |
| 1020 | # ...but we must continue writing the payloads we already started |
| 1021 | # to keep message boundaries. |
| 1022 | # The messages may be NACK'ed later if synack is enabled. |
| 1023 | if self._state == RUN: |
| 1024 | # flush outgoing buffers |
| 1025 | intervals = fxrange(0.01, 0.1, 0.01, repeatlast=True) |
| 1026 | |
| 1027 | # TODO: Rewrite this as a dictionary comprehension once we drop support for Python 3.7 |
| 1028 | # This dict comprehension requires the walrus operator which is only available in 3.8. |
| 1029 | owned_by = {} |
| 1030 | for job in self._cache.values(): |
| 1031 | writer = _get_job_writer(job) |
| 1032 | if writer is not None: |
| 1033 | owned_by[writer] = job |
| 1034 | |
| 1035 | while self._active_writers: |
| 1036 | writers = list(self._active_writers) |
| 1037 | for gen in writers: |
| 1038 | if (gen.__name__ == '_write_job' and |
| 1039 | gen_not_started(gen)): |
| 1040 | # hasn't started writing the job so can |
| 1041 | # discard the task, but we must also remove |
| 1042 | # it from the Pool._cache. |
| 1043 | try: |
| 1044 | job = owned_by[gen] |
| 1045 | except KeyError: |
| 1046 | pass |
| 1047 | else: |
| 1048 | # removes from Pool._cache |
| 1049 | job.discard() |
| 1050 | self._active_writers.discard(gen) |
| 1051 | else: |
| 1052 | try: |
| 1053 | job = owned_by[gen] |
| 1054 | except KeyError: |
| 1055 | # Generator not in owned_by — not a _write_job |
| 1056 | # (e.g. a _write_ack coroutine added by send_ack()). |