MCPcopy Index your code
hub / github.com/python/cpython / test_acquire

Method test_acquire

Lib/test/test_asyncio/test_locks.py:86–135  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

84 self.assertFalse(lock.locked())
85
86 async def test_acquire(self):
87 lock = asyncio.Lock()
88 result = []
89
90 self.assertTrue(await lock.acquire())
91
92 async def c1(result):
93 if await lock.acquire():
94 result.append(1)
95 return True
96
97 async def c2(result):
98 if await lock.acquire():
99 result.append(2)
100 return True
101
102 async def c3(result):
103 if await lock.acquire():
104 result.append(3)
105 return True
106
107 t1 = asyncio.create_task(c1(result))
108 t2 = asyncio.create_task(c2(result))
109
110 await asyncio.sleep(0)
111 self.assertEqual([], result)
112
113 lock.release()
114 await asyncio.sleep(0)
115 self.assertEqual([1], result)
116
117 await asyncio.sleep(0)
118 self.assertEqual([1], result)
119
120 t3 = asyncio.create_task(c3(result))
121
122 lock.release()
123 await asyncio.sleep(0)
124 self.assertEqual([1, 2], result)
125
126 lock.release()
127 await asyncio.sleep(0)
128 self.assertEqual([1, 2, 3], result)
129
130 self.assertTrue(t1.done())
131 self.assertTrue(t1.result())
132 self.assertTrue(t2.done())
133 self.assertTrue(t2.result())
134 self.assertTrue(t3.done())
135 self.assertTrue(t3.result())
136
137 async def test_acquire_cancel(self):
138 lock = asyncio.Lock()

Callers

nothing calls this directly

Calls 9

acquireMethod · 0.95
releaseMethod · 0.95
LockMethod · 0.80
assertTrueMethod · 0.80
create_taskMethod · 0.45
sleepMethod · 0.45
assertEqualMethod · 0.45
doneMethod · 0.45
resultMethod · 0.45

Tested by

no test coverage detected