MCPcopy
hub / github.com/celery/celery / get_request

Method get_request

t/unit/worker/test_request.py:187–209  ·  view source on GitHub ↗
(self,
                    sig,
                    Request=Request,
                    exclude_headers=None,
                    headers=None,
                    **kwargs)

Source from the content-addressed store, hash-verified

185class test_Request(RequestCase):
186
187 def get_request(self,
188 sig,
189 Request=Request,
190 exclude_headers=None,
191 headers=None,
192 **kwargs):
193 msg = self.task_message_from_sig(self.app, sig)
194 if headers is None:
195 headers = msg.headers.copy()
196 if exclude_headers:
197 for header in exclude_headers:
198 headers.pop(header, None)
199 return Request(
200 msg,
201 on_ack=Mock(name='on_ack'),
202 on_reject=Mock(name='on_reject'),
203 eventer=Mock(name='eventer'),
204 app=self.app,
205 connection_errors=(socket.error,),
206 task=sig.type,
207 headers=headers,
208 **kwargs
209 )
210
211 def test_shadow(self):
212 assert self.get_request(

Calls 3

RequestClass · 0.90
copyMethod · 0.80
popMethod · 0.45

Tested by

no test coverage detected