MCPcopy Index your code
hub / github.com/python/cpython / _shutdown_get

Method _shutdown_get

Lib/test/test_queue.py:474–506  ·  view source on GitHub ↗
(self, immediate)

Source from the content-addressed store, hash-verified

472 return self._join(q, results, True)
473
474 def _shutdown_get(self, immediate):
475 q = self.type2test(2)
476 results = []
477 go = threading.Event()
478 q.put("Y")
479 q.put("D")
480 # queue full
481
482 if immediate:
483 thrds = (
484 (self._get_shutdown, (q, go, results)),
485 (self._get_shutdown, (q, go, results)),
486 )
487 else:
488 thrds = (
489 # on shutdown(immediate=False)
490 # one of these threads should raise Shutdown
491 (self._get, (q, go, results)),
492 (self._get, (q, go, results)),
493 (self._get, (q, go, results)),
494 )
495 threads = []
496 for func, params in thrds:
497 threads.append(threading.Thread(target=func, args=params))
498 threads[-1].start()
499 q.shutdown(immediate)
500 go.set()
501 for t in threads:
502 t.join()
503 if immediate:
504 self.assertListEqual(results, [True, True])
505 else:
506 self.assertListEqual(sorted(results), [False] + [True]*(len(thrds)-1))
507
508 def test_shutdown_get(self):
509 return self._shutdown_get(False)

Callers 2

test_shutdown_getMethod · 0.95

Calls 9

setMethod · 0.95
type2testMethod · 0.80
EventMethod · 0.80
assertListEqualMethod · 0.80
putMethod · 0.45
appendMethod · 0.45
startMethod · 0.45
shutdownMethod · 0.45
joinMethod · 0.45

Tested by

no test coverage detected