MCPcopy Index your code
hub / github.com/python/cpython / basic_queue_test

Method basic_queue_test

Lib/test/test_queue.py:99–153  ·  view source on GitHub ↗
(self, q)

Source from the content-addressed store, hash-verified

97 self.cumlock = threading.Lock()
98
99 def basic_queue_test(self, q):
100 if q.qsize():
101 raise RuntimeError("Call this function with an empty queue")
102 self.assertTrue(q.empty())
103 self.assertFalse(q.full())
104 # I guess we better check things actually queue correctly a little :)
105 q.put(111)
106 q.put(333)
107 q.put(222)
108 target_order = dict(Queue = [111, 333, 222],
109 LifoQueue = [222, 333, 111],
110 PriorityQueue = [111, 222, 333])
111 actual_order = [q.get(), q.get(), q.get()]
112 self.assertEqual(actual_order, target_order[q.__class__.__name__],
113 "Didn't seem to queue the correct data!")
114 for i in range(QUEUE_SIZE-1):
115 q.put(i)
116 self.assertTrue(q.qsize(), "Queue should not be empty")
117 self.assertTrue(not qfull(q), "Queue should not be full")
118 last = 2 * QUEUE_SIZE
119 full = 3 * 2 * QUEUE_SIZE
120 q.put(last)
121 self.assertTrue(qfull(q), "Queue should be full")
122 self.assertFalse(q.empty())
123 self.assertTrue(q.full())
124 try:
125 q.put(full, block=0)
126 self.fail("Didn't appear to block with a full queue")
127 except self.queue.Full:
128 pass
129 try:
130 q.put(full, timeout=0.01)
131 self.fail("Didn't appear to time-out with a full queue")
132 except self.queue.Full:
133 pass
134 # Test a blocking put
135 self.do_blocking_test(q.put, (full,), q.get, ())
136 self.do_blocking_test(q.put, (full, True, 10), q.get, ())
137 # Empty it
138 for i in range(QUEUE_SIZE):
139 q.get()
140 self.assertTrue(not q.qsize(), "Queue should be empty")
141 try:
142 q.get(block=0)
143 self.fail("Didn't appear to block with an empty queue")
144 except self.queue.Empty:
145 pass
146 try:
147 q.get(timeout=0.01)
148 self.fail("Didn't appear to time-out with an empty queue")
149 except self.queue.Empty:
150 pass
151 # Test a blocking get
152 self.do_blocking_test(q.get, (), q.put, ('empty',))
153 self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
154
155
156 def worker(self, q):

Callers 1

test_basicMethod · 0.95

Calls 11

qfullFunction · 0.85
assertTrueMethod · 0.80
assertFalseMethod · 0.80
do_blocking_testMethod · 0.80
qsizeMethod · 0.45
emptyMethod · 0.45
fullMethod · 0.45
putMethod · 0.45
getMethod · 0.45
assertEqualMethod · 0.45
failMethod · 0.45

Tested by

no test coverage detected