(self, get_avail_port)
| 31 | @patch('celery.contrib.rdb.Rdb.get_avail_port') |
| 32 | @t.skip.if_pypy |
| 33 | def test_rdb(self, get_avail_port): |
| 34 | sock = Mock() |
| 35 | get_avail_port.return_value = (sock, 8000) |
| 36 | sock.accept.return_value = (Mock(), ['helu']) |
| 37 | out = WhateverIO() |
| 38 | with Rdb(out=out) as rdb: |
| 39 | get_avail_port.assert_called() |
| 40 | assert 'helu' in out.getvalue() |
| 41 | |
| 42 | # set_quit |
| 43 | with patch('sys.settrace') as settrace: |
| 44 | rdb.set_quit() |
| 45 | settrace.assert_called_with(None) |
| 46 | |
| 47 | # set_trace |
| 48 | with patch('celery.contrib.rdb.Pdb.set_trace') as pset: |
| 49 | with patch('celery.contrib.rdb._frame'): |
| 50 | rdb.set_trace() |
| 51 | rdb.set_trace(Mock()) |
| 52 | pset.side_effect = SockErr |
| 53 | pset.side_effect.errno = errno.ENOENT |
| 54 | with pytest.raises(SockErr): |
| 55 | rdb.set_trace() |
| 56 | |
| 57 | # _close_session |
| 58 | rdb._close_session() |
| 59 | rdb.active = True |
| 60 | rdb._handle = None |
| 61 | rdb._client = None |
| 62 | rdb._sock = None |
| 63 | rdb._close_session() |
| 64 | |
| 65 | # do_continue |
| 66 | rdb.set_continue = Mock() |
| 67 | rdb.do_continue(Mock()) |
| 68 | rdb.set_continue.assert_called_with() |
| 69 | |
| 70 | # do_quit |
| 71 | rdb.set_quit = Mock() |
| 72 | rdb.do_quit(Mock()) |
| 73 | rdb.set_quit.assert_called_with() |
| 74 | |
| 75 | @patch('socket.socket') |
| 76 | @t.skip.if_pypy |
nothing calls this directly
no test coverage detected