MCPcopy
hub / github.com/tornadoweb/tornado / test_put_timeout

Method test_put_timeout

tornado/test/queues_test.py:227–240  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

225
226 @gen_test
227 def test_put_timeout(self):
228 q = queues.Queue(1) # type: queues.Queue[int]
229 q.put_nowait(0) # Now it's full.
230 put_timeout = q.put(1, timeout=timedelta(seconds=0.01))
231 put = q.put(2)
232 with self.assertRaises(TimeoutError):
233 yield put_timeout
234
235 self.assertEqual(0, q.get_nowait())
236 # 1 was never put in the queue.
237 self.assertEqual(2, (yield q.get()))
238
239 # Final get() unblocked this putter.
240 yield put
241
242 @gen_test
243 def test_put_timeout_preempted(self):

Callers

nothing calls this directly

Calls 4

put_nowaitMethod · 0.95
putMethod · 0.95
get_nowaitMethod · 0.95
getMethod · 0.95

Tested by

no test coverage detected