MCPcopy
hub / github.com/celery/celery / close_then_error

Method close_then_error

t/unit/worker/test_loops.py:104–112  ·  view source on GitHub ↗
(self, mock=None, mod=0, exc=None)

Source from the content-addressed store, hash-verified

102 mock.side_effect = first
103
104 def close_then_error(self, mock=None, mod=0, exc=None):
105 mock = Mock() if mock is None else mock
106
107 def first(*args, **kwargs):
108 if not mod or mock.call_count > mod:
109 self.close()
110 raise (socket.error() if exc is None else exc)
111 mock.side_effect = first
112 return mock
113
114 def close(self, *args, **kwargs):
115 self.blueprint.state = CLOSE

Calls

no outgoing calls

Tested by

no test coverage detected