MCPcopy
hub / github.com/celery/celery / _live_receivers

Method _live_receivers

celery/utils/dispatch/signal.py:303–340  ·  view source on GitHub ↗

Filter sequence of receivers to get resolved, live receivers. This checks for weak references and resolves them, then returning only live receivers.

(self, sender)

Source from the content-addressed store, hash-verified

301 self.receivers = new_receivers
302
303 def _live_receivers(self, sender):
304 """Filter sequence of receivers to get resolved, live receivers.
305
306 This checks for weak references and resolves them, then returning only
307 live receivers.
308 """
309 receivers = None
310 if self.use_caching and not self._dead_receivers:
311 receivers = self.sender_receivers_cache.get(sender)
312 # We could end up here with NO_RECEIVERS even if we do check this
313 # case in .send() prior to calling _Live_receivers() due to
314 # concurrent .send() call.
315 if receivers is NO_RECEIVERS:
316 return []
317 if receivers is None:
318 with self.lock:
319 self._clear_dead_receivers()
320 senderkey = _make_id(sender)
321 receivers = []
322 for (receiverkey, r_senderkey), receiver in self.receivers:
323 if r_senderkey == NONE_ID or r_senderkey == senderkey:
324 receivers.append(receiver)
325 if self.use_caching:
326 if not receivers:
327 self.sender_receivers_cache[sender] = NO_RECEIVERS
328 else:
329 # Note: we must cache the weakref versions.
330 self.sender_receivers_cache[sender] = receivers
331 non_weak_receivers = []
332 for receiver in receivers:
333 if isinstance(receiver, weakref.ReferenceType):
334 # Dereference the weak reference.
335 receiver = receiver()
336 if receiver is not None:
337 non_weak_receivers.append(receiver)
338 else:
339 non_weak_receivers.append(receiver)
340 return non_weak_receivers
341
342 def _remove_receiver(self, receiver=None):
343 """Remove dead receivers from connections."""

Callers 2

has_listenersMethod · 0.95
sendMethod · 0.95

Calls 3

_clear_dead_receiversMethod · 0.95
_make_idFunction · 0.85
getMethod · 0.45

Tested by

no test coverage detected