MCPcopy
hub / github.com/celery/celery / test_on_event_task_received

Function test_on_event_task_received

t/unit/events/test_dumper.py:38–54  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

36
37
38def test_on_event_task_received():
39 buf = io.StringIO()
40 d = dumper.Dumper(out=buf)
41 event = {
42 'timestamp': datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc).timestamp(),
43 'type': 'task-received',
44 'hostname': 'worker1',
45 'uuid': 'abc',
46 'name': 'mytask',
47 'args': '(1,)',
48 'kwargs': '{}',
49 }
50 d.on_event(event.copy())
51 output = buf.getvalue()
52 assert 'worker1 [2024-01-01 12:00:00+00:00]' in output
53 assert 'task received' in output
54 assert 'mytask(abc) args=(1,) kwargs={}' in output
55
56
57def test_on_event_non_task():

Callers

nothing calls this directly

Calls 2

on_eventMethod · 0.95
copyMethod · 0.80

Tested by

no test coverage detected