MCPcopy Index your code
hub / github.com/python/cpython / _shutdown_put

Method _shutdown_put

Lib/test/test_queue.py:514–535  ·  view source on GitHub ↗
(self, immediate)

Source from the content-addressed store, hash-verified

512 return self._shutdown_get(True)
513
514 def _shutdown_put(self, immediate):
515 q = self.type2test(2)
516 results = []
517 go = threading.Event()
518 q.put("Y")
519 q.put("D")
520 # queue fulled
521
522 thrds = (
523 (self._put_shutdown, (q, "E", go, results)),
524 (self._put_shutdown, (q, "W", go, results)),
525 )
526 threads = []
527 for func, params in thrds:
528 threads.append(threading.Thread(target=func, args=params))
529 threads[-1].start()
530 q.shutdown()
531 go.set()
532 for t in threads:
533 t.join()
534
535 self.assertEqual(results, [True]*len(thrds))
536
537 def test_shutdown_put(self):
538 return self._shutdown_put(False)

Callers 2

test_shutdown_putMethod · 0.95

Calls 9

setMethod · 0.95
type2testMethod · 0.80
EventMethod · 0.80
putMethod · 0.45
appendMethod · 0.45
startMethod · 0.45
shutdownMethod · 0.45
joinMethod · 0.45
assertEqualMethod · 0.45

Tested by

no test coverage detected