MCPcopy
hub / github.com/celery/celery / test_rdb

Method test_rdb

t/unit/contrib/test_rdb.py:33–73  ·  view source on GitHub ↗
(self, get_avail_port)

Source from the content-addressed store, hash-verified

31 @patch('celery.contrib.rdb.Rdb.get_avail_port')
32 @t.skip.if_pypy
33 def test_rdb(self, get_avail_port):
34 sock = Mock()
35 get_avail_port.return_value = (sock, 8000)
36 sock.accept.return_value = (Mock(), ['helu'])
37 out = WhateverIO()
38 with Rdb(out=out) as rdb:
39 get_avail_port.assert_called()
40 assert 'helu' in out.getvalue()
41
42 # set_quit
43 with patch('sys.settrace') as settrace:
44 rdb.set_quit()
45 settrace.assert_called_with(None)
46
47 # set_trace
48 with patch('celery.contrib.rdb.Pdb.set_trace') as pset:
49 with patch('celery.contrib.rdb._frame'):
50 rdb.set_trace()
51 rdb.set_trace(Mock())
52 pset.side_effect = SockErr
53 pset.side_effect.errno = errno.ENOENT
54 with pytest.raises(SockErr):
55 rdb.set_trace()
56
57 # _close_session
58 rdb._close_session()
59 rdb.active = True
60 rdb._handle = None
61 rdb._client = None
62 rdb._sock = None
63 rdb._close_session()
64
65 # do_continue
66 rdb.set_continue = Mock()
67 rdb.do_continue(Mock())
68 rdb.set_continue.assert_called_with()
69
70 # do_quit
71 rdb.set_quit = Mock()
72 rdb.do_quit(Mock())
73 rdb.set_quit.assert_called_with()
74
75 @patch('socket.socket')
76 @t.skip.if_pypy

Callers

nothing calls this directly

Calls 7

WhateverIOClass · 0.90
RdbClass · 0.90
set_quitMethod · 0.80
_close_sessionMethod · 0.80
do_continueMethod · 0.80
do_quitMethod · 0.80
raisesMethod · 0.45

Tested by

no test coverage detected