MCPcopy
hub / github.com/tornadoweb/tornado / test_producer_consumer

Method test_producer_consumer

tornado/test/queues_test.py:405–427  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

403class ProducerConsumerTest(AsyncTestCase):
404 @gen_test
405 def test_producer_consumer(self):
406 q = queues.Queue(maxsize=3) # type: queues.Queue[int]
407 history = []
408
409 # We don't yield between get() and task_done(), so get() must wait for
410 # the next tick. Otherwise we'd immediately call task_done and unblock
411 # join() before q.put() resumes, and we'd only process the first four
412 # items.
413 @gen.coroutine
414 def consumer():
415 while True:
416 history.append((yield q.get()))
417 q.task_done()
418
419 @gen.coroutine
420 def producer():
421 for item in range(10):
422 yield q.put(item)
423
424 consumer()
425 yield producer()
426 yield q.join()
427 self.assertEqual(list(range(10)), history)
428
429
430if __name__ == "__main__":

Callers

nothing calls this directly

Calls 1

joinMethod · 0.95

Tested by

no test coverage detected