MCPcopy
hub / github.com/celery/celery / assert_generator

Method assert_generator

t/integration/test_canvas.py:2033–2041  ·  view source on GitHub ↗
(file_name)

Source from the content-addressed store, hash-verified

2031 raise pytest.skip('Requires redis result backend.')
2032
2033 def assert_generator(file_name):
2034 for i in range(size):
2035 sleep(1)
2036 if i == size - 1:
2037 with open(file_name) as file_handle:
2038 # ensures chord header generators tasks are processed incrementally #3021
2039 assert file_handle.readline() == '0\n', "Chord header was unrolled too early"
2040
2041 yield write_to_file_and_return_int.s(file_name, i)
2042
2043 with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_file:
2044 file_name = tmp_file.name

Callers

nothing calls this directly

Calls 3

openFunction · 0.85
readlineMethod · 0.80
sMethod · 0.45

Tested by

no test coverage detected