MCPcopy
hub / github.com/celery/celery / test_sched

Method test_sched

t/unit/concurrency/test_eventlet.py:63–80  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

61 self.GreenletExit = patching('greenlet.GreenletExit')
62
63 def test_sched(self):
64 x = Timer()
65 x.GreenletExit = KeyError
66 entry = Mock()
67 g = x._enter(1, 0, entry)
68 assert x.queue
69
70 x._entry_exit(g, entry)
71 g.wait.side_effect = KeyError()
72 x._entry_exit(g, entry)
73 entry.cancel.assert_called_with()
74 assert not x._queue
75
76 x._queue.add(g)
77 x.clear()
78 x._queue.add(g)
79 g.cancel.side_effect = KeyError()
80 x.clear()
81
82 def test_cancel(self):
83 x = Timer()

Callers

nothing calls this directly

Calls 5

_enterMethod · 0.95
_entry_exitMethod · 0.95
clearMethod · 0.95
TimerClass · 0.90
addMethod · 0.45

Tested by

no test coverage detected