MCPcopy
hub / github.com/psycopg/psycopg / AEvent

Class AEvent

tests/acompat.py:91–102  ·  view source on GitHub ↗

Subclass of asyncio.Event adding a wait with timeout like threading.Event. wait_timeout() is converted to wait() by async_to_sync.

Source from the content-addressed store, hash-verified

89
90
91class AEvent(asyncio.Event):
92 """
93 Subclass of asyncio.Event adding a wait with timeout like threading.Event.
94
95 wait_timeout() is converted to wait() by async_to_sync.
96 """
97
98 async def wait_timeout(self, timeout):
99 try:
100 return await asyncio.wait_for(self.wait(), timeout)
101 except asyncio.TimeoutError:
102 return False
103
104
105class Queue(queue.Queue): # type: ignore[type-arg]

Callers 13

test_notify_query_notifyFunction · 0.70
test_copy_concurrencyFunction · 0.70
test_wait_rFunction · 0.70
test_wait_r_no_linuxFunction · 0.70
test_wait_r_nowaitFunction · 0.70
test_refill_on_checkFunction · 0.50
test_queue_sizeFunction · 0.50
test_closed_queueFunction · 0.50

Calls

no outgoing calls

Tested by 13

test_notify_query_notifyFunction · 0.56
test_copy_concurrencyFunction · 0.56
test_wait_rFunction · 0.56
test_wait_r_no_linuxFunction · 0.56
test_wait_r_nowaitFunction · 0.56
test_refill_on_checkFunction · 0.40
test_queue_sizeFunction · 0.40
test_closed_queueFunction · 0.40