MCPcopy
hub / github.com/tornadoweb/tornado / test_task_done

Method test_task_done

tornado/test/queues_test.py:318–337  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

316
317 @gen_test
318 def test_task_done(self):
319 q = self.queue_class() # type: queues.Queue
320 for i in range(100):
321 q.put_nowait(i)
322
323 self.accumulator = 0
324
325 @gen.coroutine
326 def worker():
327 while True:
328 item = yield q.get()
329 self.accumulator += item
330 q.task_done()
331 yield gen.sleep(random() * 0.01)
332
333 # Two coroutines share work.
334 worker()
335 worker()
336 yield q.join()
337 self.assertEqual(sum(range(100)), self.accumulator)
338
339 @gen_test
340 def test_task_done_delay(self):

Callers

nothing calls this directly

Calls 3

workerFunction · 0.85
put_nowaitMethod · 0.80
joinMethod · 0.80

Tested by

no test coverage detected