MCPcopy
hub / github.com/celery/celery / test_trace_task_ret

Method test_trace_task_ret

t/unit/worker/test_request.py:1098–1110  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

1096 assert isinstance(runtime, numbers.Real)
1097
1098 def test_trace_task_ret(self):
1099 self.mytask.__trace__ = build_tracer(
1100 self.mytask.name, self.mytask, self.app.loader, 'test',
1101 app=self.app,
1102 )
1103 tid = uuid()
1104 message = self.TaskMessage(self.mytask.name, tid, args=[4])
1105 _, R, _ = trace_task_ret(
1106 self.mytask.name, tid, message.headers,
1107 message.body, message.content_type,
1108 message.content_encoding, app=self.app,
1109 )
1110 assert R == repr(4 ** 4)
1111
1112 def test_trace_task_ret__no_trace(self):
1113 try:

Callers

nothing calls this directly

Calls 2

build_tracerFunction · 0.90
trace_task_retFunction · 0.90

Tested by

no test coverage detected