MCPcopy
hub / github.com/celery/celery / test_active_safe

Method test_active_safe

t/unit/worker/test_control.py:321–333  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

319 worker_state.active_requests.discard(r)
320
321 def test_active_safe(self):
322 kwargsrepr = '<anything>'
323 r = Request(
324 self.TaskMessage(self.mytask.name, id='do re mi',
325 kwargsrepr=kwargsrepr),
326 app=self.app,
327 )
328 worker_state.active_requests.add(r)
329 try:
330 active_resp = self.panel.handle('dump_active', {'safe': True})
331 assert active_resp[0]['kwargs'] == kwargsrepr
332 finally:
333 worker_state.active_requests.discard(r)
334
335 def test_pool_grow(self):
336

Callers

nothing calls this directly

Calls 4

RequestClass · 0.90
addMethod · 0.45
handleMethod · 0.45
discardMethod · 0.45

Tested by

no test coverage detected