| 403 | class ProducerConsumerTest(AsyncTestCase): |
| 404 | @gen_test |
| 405 | def test_producer_consumer(self): |
| 406 | q = queues.Queue(maxsize=3) # type: queues.Queue[int] |
| 407 | history = [] |
| 408 | |
| 409 | # We don't yield between get() and task_done(), so get() must wait for |
| 410 | # the next tick. Otherwise we'd immediately call task_done and unblock |
| 411 | # join() before q.put() resumes, and we'd only process the first four |
| 412 | # items. |
| 413 | @gen.coroutine |
| 414 | def consumer(): |
| 415 | while True: |
| 416 | history.append((yield q.get())) |
| 417 | q.task_done() |
| 418 | |
| 419 | @gen.coroutine |
| 420 | def producer(): |
| 421 | for item in range(10): |
| 422 | yield q.put(item) |
| 423 | |
| 424 | consumer() |
| 425 | yield producer() |
| 426 | yield q.join() |
| 427 | self.assertEqual(list(range(10)), history) |
| 428 | |
| 429 | |
| 430 | if __name__ == "__main__": |