(self)
| 2287 | class CoroAsyncIOCompatTest(unittest.TestCase): |
| 2288 | |
| 2289 | def test_asyncio_1(self): |
| 2290 | # asyncio cannot be imported when Python is compiled without thread |
| 2291 | # support |
| 2292 | asyncio = import_helper.import_module('asyncio') |
| 2293 | |
| 2294 | class MyException(Exception): |
| 2295 | pass |
| 2296 | |
| 2297 | buffer = [] |
| 2298 | |
| 2299 | class CM: |
| 2300 | async def __aenter__(self): |
| 2301 | buffer.append(1) |
| 2302 | await asyncio.sleep(0.01) |
| 2303 | buffer.append(2) |
| 2304 | return self |
| 2305 | |
| 2306 | async def __aexit__(self, exc_type, exc_val, exc_tb): |
| 2307 | await asyncio.sleep(0.01) |
| 2308 | buffer.append(exc_type.__name__) |
| 2309 | |
| 2310 | async def f(): |
| 2311 | async with CM(): |
| 2312 | await asyncio.sleep(0.01) |
| 2313 | raise MyException |
| 2314 | buffer.append('unreachable') |
| 2315 | |
| 2316 | loop = asyncio.new_event_loop() |
| 2317 | asyncio.set_event_loop(loop) |
| 2318 | try: |
| 2319 | loop.run_until_complete(f()) |
| 2320 | except MyException: |
| 2321 | pass |
| 2322 | finally: |
| 2323 | loop.close() |
| 2324 | asyncio.events._set_event_loop_policy(None) |
| 2325 | |
| 2326 | self.assertEqual(buffer, [1, 2, 'MyException']) |
| 2327 | |
| 2328 | |
| 2329 | class OriginTrackingTest(unittest.TestCase): |
nothing calls this directly
no test coverage detected