Coordinate producer and consumer coroutines. If maxsize is 0 (the default) the queue size is unbounded. .. testcode:: import asyncio from tornado.ioloop import IOLoop from tornado.queues import Queue q = Queue(maxsize=2) async def consumer():
| 79 | |
| 80 | |
| 81 | class Queue(Generic[_T]): |
| 82 | """Coordinate producer and consumer coroutines. |
| 83 | |
| 84 | If maxsize is 0 (the default) the queue size is unbounded. |
| 85 | |
| 86 | .. testcode:: |
| 87 | |
| 88 | import asyncio |
| 89 | from tornado.ioloop import IOLoop |
| 90 | from tornado.queues import Queue |
| 91 | |
| 92 | q = Queue(maxsize=2) |
| 93 | |
| 94 | async def consumer(): |
| 95 | async for item in q: |
| 96 | try: |
| 97 | print('Doing work on %s' % item) |
| 98 | await asyncio.sleep(0.01) |
| 99 | finally: |
| 100 | q.task_done() |
| 101 | |
| 102 | async def producer(): |
| 103 | for item in range(5): |
| 104 | await q.put(item) |
| 105 | print('Put %s' % item) |
| 106 | |
| 107 | async def main(): |
| 108 | # Start consumer without waiting (since it never finishes). |
| 109 | IOLoop.current().spawn_callback(consumer) |
| 110 | await producer() # Wait for producer to put all tasks. |
| 111 | await q.join() # Wait for consumer to finish all tasks. |
| 112 | print('Done') |
| 113 | |
| 114 | asyncio.run(main()) |
| 115 | |
| 116 | .. testoutput:: |
| 117 | |
| 118 | Put 0 |
| 119 | Put 1 |
| 120 | Doing work on 0 |
| 121 | Put 2 |
| 122 | Doing work on 1 |
| 123 | Put 3 |
| 124 | Doing work on 2 |
| 125 | Put 4 |
| 126 | Doing work on 3 |
| 127 | Doing work on 4 |
| 128 | Done |
| 129 | |
| 130 | |
| 131 | In versions of Python without native coroutines (before 3.5), |
| 132 | ``consumer()`` could be written as:: |
| 133 | |
| 134 | @gen.coroutine |
| 135 | def consumer(): |
| 136 | while True: |
| 137 | item = yield q.get() |
| 138 | try: |