MCPcopy
hub / github.com/celery/celery / test_sched

Method test_sched

t/unit/concurrency/test_gevent.py:33–54  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

31 self.GreenletExit = self.patching('gevent.greenlet.GreenletExit')
32
33 def test_sched(self):
34 self.greenlet.Greenlet = object
35 x = Timer()
36 self.greenlet.Greenlet = Mock()
37 x._Greenlet.spawn_later = Mock()
38 x._GreenletExit = KeyError
39 entry = Mock()
40 g = x._enter(1, 0, entry)
41 assert x.queue
42
43 x._entry_exit(g)
44 g.kill.assert_called_with()
45 assert not x._queue
46
47 x._queue.add(g)
48 x.clear()
49 x._queue.add(g)
50 g.kill.side_effect = KeyError()
51 x.clear()
52
53 g = x._Greenlet()
54 g.cancel()
55
56
57class test_TaskPool:

Callers

nothing calls this directly

Calls 6

_enterMethod · 0.95
_entry_exitMethod · 0.95
clearMethod · 0.95
TimerClass · 0.90
addMethod · 0.45
cancelMethod · 0.45

Tested by

no test coverage detected