MCPcopy
hub / github.com/celery/celery / test_destination_for

Method test_destination_for

t/unit/backends/test_rpc.py:221–238  ·  t/unit/backends/test_rpc.py::test_RPCBackend.test_destination_for
(self)

Source from the content-addressed store, hash-verified

219 self.add.s(5, 6)) | self.add.s()).delay()
220
221 def test_destination_for(self):
222 req = Mock(name=&class="cm">#x27;request')
223 req.reply_to = &class="cm">#x27;reply_to'
224 req.correlation_id = &class="cm">#x27;corid'
225 assert self.b.destination_for(&class="cm">#x27;task_idclass="st">', req) == ('reply_toclass="st">', 'corid')
226 task = Mock()
227 _task_stack.push(task)
228 try:
229 task.request.reply_to = &class="cm">#x27;reply_to'
230 task.request.correlation_id = &class="cm">#x27;corid'
231 assert self.b.destination_for(&class="cm">#x27;task_id', None) == (
232 &class="cm">#x27;reply_toclass="st">', 'corid',
233 )
234 finally:
235 _task_stack.pop()
236
237 with pytest.raises(RuntimeError):
238 self.b.destination_for(&class="cm">#x27;task_id', None)
239
240 def test_binding(self):
241 queue = self.b.binding

Callers

nothing calls this directly

Calls 4

destination_forMethod · 0.80
pushMethod · 0.80
popMethod · 0.45
raisesMethod · 0.45

Tested by

no test coverage detected