MCPcopy
hub / github.com/celery/celery / test_connection

Method test_connection

t/unit/worker/test_worker.py:142–165  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

140 c.start()
141
142 def test_connection(self):
143 c = self.NoopConsumer()
144
145 c.blueprint.start(c)
146 assert isinstance(c.connection, Connection)
147
148 c.blueprint.state = RUN
149 c.event_dispatcher = None
150 c.blueprint.restart(c)
151 assert c.connection
152
153 c.blueprint.state = RUN
154 c.shutdown()
155 assert c.connection is None
156 assert c.task_consumer is None
157
158 c.blueprint.start(c)
159 assert isinstance(c.connection, Connection)
160 c.blueprint.restart(c)
161
162 c.stop()
163 c.shutdown()
164 assert c.connection is None
165 assert c.task_consumer is None
166
167 def test_close_connection(self):
168 c = self.NoopConsumer()

Callers

nothing calls this directly

Calls 5

NoopConsumerMethod · 0.95
startMethod · 0.45
restartMethod · 0.45
shutdownMethod · 0.45
stopMethod · 0.45

Tested by

no test coverage detected