MCPcopy
hub / github.com/celery/celery / Connection

Class Connection

t/unit/worker/test_worker.py:311–316  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

309 def test_loop_ignores_socket_timeout(self):
310
311 class Connection(self.app.connection_for_read().__class__):
312 obj = None
313
314 def drain_events(self, **kwargs):
315 self.obj.connection = None
316 raise socket.timeout(10)
317
318 c = self.NoopConsumer()
319 c.connection = Connection(self.app.conf.broker_url)

Callers 5

test_loopMethod · 0.70
test_start__loopMethod · 0.70

Calls 1

connection_for_readMethod · 0.45

Tested by 5

test_loopMethod · 0.56
test_start__loopMethod · 0.56

Used in the wild real call sites across dependent graphs

searching dependent graphs…