MCPcopy Index your code
hub / github.com/python/cpython / runcode

Method runcode

Lib/asyncio/__main__.py:33–84  ·  view source on GitHub ↗
(self, code)

Source from the content-addressed store, hash-verified

31 self.context = contextvars.copy_context()
32
33 def runcode(self, code):
34 global return_code
35 future = concurrent.futures.Future()
36
37 def callback():
38 global return_code
39 global repl_future
40 global keyboard_interrupted
41
42 repl_future = None
43 keyboard_interrupted = False
44
45 func = types.FunctionType(code, self.locals)
46 try:
47 coro = func()
48 except SystemExit as se:
49 return_code = se.code
50 self.loop.stop()
51 return
52 except KeyboardInterrupt as ex:
53 keyboard_interrupted = True
54 future.set_exception(ex)
55 return
56 except BaseException as ex:
57 future.set_exception(ex)
58 return
59
60 if not inspect.iscoroutine(coro):
61 future.set_result(coro)
62 return
63
64 try:
65 repl_future = self.loop.create_task(coro, context=self.context)
66 futures._chain_future(repl_future, future)
67 except BaseException as exc:
68 future.set_exception(exc)
69
70 self.loop.call_soon_threadsafe(callback, context=self.context)
71
72 try:
73 return future.result()
74 except SystemExit as se:
75 return_code = se.code
76 self.loop.stop()
77 return
78 except BaseException:
79 if keyboard_interrupted:
80 if not CAN_USE_PYREPL:
81 self.write("\nKeyboardInterrupt\n")
82 else:
83 self.showtraceback()
84 return self.STATEMENT_FAILED
85
86class REPLThread(threading.Thread):
87

Callers

nothing calls this directly

Calls 5

resultMethod · 0.95
call_soon_threadsafeMethod · 0.45
stopMethod · 0.45
writeMethod · 0.45
showtracebackMethod · 0.45

Tested by

no test coverage detected