MCPcopy
hub / github.com/celery/celery / test_send_event

Method test_send_event

t/unit/tasks/test_tasks.py:1151–1159  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

1149 assert presult.successful()
1150
1151 def test_send_event(self):
1152 mytask = self.mytask._get_current_object()
1153 mytask.app.events = Mock(name='events')
1154 mytask.app.events.attach_mock(ContextMock(), 'default_dispatcher')
1155 mytask.request.id = 'fb'
1156 mytask.send_event('task-foo', id=3122)
1157 mytask.app.events.default_dispatcher().send.assert_called_with(
1158 'task-foo', uuid='fb', id=3122,
1159 retry=True, retry_policy=self.app.conf.task_publish_retry_policy)
1160
1161 @pytest.mark.usefixtures('depends_on_current_app')
1162 def test_on_replace(self):

Callers

nothing calls this directly

Calls 4

ContextMockFunction · 0.90
default_dispatcherMethod · 0.80
_get_current_objectMethod · 0.45
send_eventMethod · 0.45

Tested by

no test coverage detected