MCPcopy
hub / github.com/celery/celery / test_get_avail_port

Method test_get_avail_port

t/unit/contrib/test_rdb.py:77–108  ·  view source on GitHub ↗
(self, sock)

Source from the content-addressed store, hash-verified

75 @patch('socket.socket')
76 @t.skip.if_pypy
77 def test_get_avail_port(self, sock):
78 out = WhateverIO()
79 sock.return_value.accept.return_value = (Mock(), ['helu'])
80 with Rdb(out=out):
81 pass
82
83 with patch('celery.contrib.rdb.current_process') as curproc:
84 curproc.return_value.name = 'PoolWorker-10'
85 with Rdb(out=out):
86 pass
87
88 err = sock.return_value.bind.side_effect = SockErr()
89 err.errno = errno.ENOENT
90 with pytest.raises(SockErr):
91 with Rdb(out=out):
92 pass
93 err.errno = errno.EADDRINUSE
94 with pytest.raises(Exception):
95 with Rdb(out=out):
96 pass
97 called = [0]
98
99 def effect(*a, **kw):
100 try:
101 if called[0] > 50:
102 return True
103 raise err
104 finally:
105 called[0] += 1
106 sock.return_value.bind.side_effect = effect
107 with Rdb(out=out):
108 pass

Callers

nothing calls this directly

Calls 4

WhateverIOClass · 0.90
RdbClass · 0.90
SockErrClass · 0.85
raisesMethod · 0.45

Tested by

no test coverage detected