MCPcopy
hub / github.com/celery/celery / flush

Method flush

celery/concurrency/asynpool.py:999–1083  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

997 active_writes.discard(fd)
998
999 def flush(self):
1000 if self._state == TERMINATE:
1001 return
1002 # cancel all tasks that haven't been accepted so that NACK is sent
1003 # if synack is enabled, otherwise discard them from the cache
1004 # since they will be redelivered by the broker.
1005 for job in tuple(self._cache.values()):
1006 if not job._accepted:
1007 if self.synack:
1008 job._cancel()
1009 else:
1010 job.discard()
1011
1012 # clear the outgoing buffer as the tasks will be redelivered by
1013 # the broker anyway.
1014 if self.outbound_buffer:
1015 self.outbound_buffer.clear()
1016
1017 self.maintain_pool()
1018
1019 try:
1020 # ...but we must continue writing the payloads we already started
1021 # to keep message boundaries.
1022 # The messages may be NACK'ed later if synack is enabled.
1023 if self._state == RUN:
1024 # flush outgoing buffers
1025 intervals = fxrange(0.01, 0.1, 0.01, repeatlast=True)
1026
1027 # TODO: Rewrite this as a dictionary comprehension once we drop support for Python 3.7
1028 # This dict comprehension requires the walrus operator which is only available in 3.8.
1029 owned_by = {}
1030 for job in self._cache.values():
1031 writer = _get_job_writer(job)
1032 if writer is not None:
1033 owned_by[writer] = job
1034
1035 while self._active_writers:
1036 writers = list(self._active_writers)
1037 for gen in writers:
1038 if (gen.__name__ == '_write_job' and
1039 gen_not_started(gen)):
1040 # hasn't started writing the job so can
1041 # discard the task, but we must also remove
1042 # it from the Pool._cache.
1043 try:
1044 job = owned_by[gen]
1045 except KeyError:
1046 pass
1047 else:
1048 # removes from Pool._cache
1049 job.discard()
1050 self._active_writers.discard(gen)
1051 else:
1052 try:
1053 job = owned_by[gen]
1054 except KeyError:
1055 # Generator not in owned_by — not a _write_job
1056 # (e.g. a _write_ack coroutine added by send_ack()).

Calls 6

_flush_writerMethod · 0.95
_get_job_writerFunction · 0.85
gen_not_startedFunction · 0.85
maintain_poolMethod · 0.80
discardMethod · 0.45
clearMethod · 0.45