MCPcopy
hub / github.com/celery/celery / timeout_then_error

Method timeout_then_error

t/unit/worker/test_loops.py:97–102  ·  view source on GitHub ↗
(self, mock)

Source from the content-addressed store, hash-verified

95 _consumer.strategies = self.obj.strategies
96
97 def timeout_then_error(self, mock):
98
99 def first(*args, **kwargs):
100 mock.side_effect = socket.error()
101 raise socket.timeout()
102 mock.side_effect = first
103
104 def close_then_error(self, mock=None, mod=0, exc=None):
105 mock = Mock() if mock is None else mock

Callers 2

test_timeout_ignoredMethod · 0.95

Calls

no outgoing calls

Tested by

no test coverage detected