|
import os |
|
import sys |
|
from ..subprocess_code_interpreter import SubprocessCodeInterpreter |
|
import ast |
|
import re |
|
import shlex |
|
|
|
class Python(SubprocessCodeInterpreter): |
|
file_extension = "py" |
|
proper_name = "Python" |
|
|
|
def __init__(self): |
|
super().__init__() |
|
executable = sys.executable |
|
if os.name != 'nt': |
|
executable = shlex.quote(executable) |
|
self.start_cmd = executable + " -i -q -u" |
|
|
|
def preprocess_code(self, code): |
|
return preprocess_python(code) |
|
|
|
def line_postprocessor(self, line): |
|
if re.match(r'^(\s*>>>\s*|\s*\.\.\.\s*)', line): |
|
return None |
|
return line |
|
|
|
def detect_active_line(self, line): |
|
if "## active_line " in line: |
|
return int(line.split("## active_line ")[1].split(" ##")[0]) |
|
return None |
|
|
|
def detect_end_of_execution(self, line): |
|
return "## end_of_execution ##" in line |
|
|
|
|
|
def preprocess_python(code): |
|
""" |
|
Add active line markers |
|
Wrap in a try except |
|
Add end of execution marker |
|
""" |
|
|
|
|
|
code = add_active_line_prints(code) |
|
|
|
|
|
code = wrap_in_try_except(code) |
|
|
|
|
|
|
|
code_lines = code.split("\n") |
|
code_lines = [c for c in code_lines if c.strip() != ""] |
|
code = "\n".join(code_lines) |
|
|
|
|
|
code += '\n\nprint("## end_of_execution ##")' |
|
|
|
return code |
|
|
|
|
|
def add_active_line_prints(code): |
|
""" |
|
Add print statements indicating line numbers to a python string. |
|
""" |
|
tree = ast.parse(code) |
|
transformer = AddLinePrints() |
|
new_tree = transformer.visit(tree) |
|
return ast.unparse(new_tree) |
|
|
|
|
|
class AddLinePrints(ast.NodeTransformer): |
|
""" |
|
Transformer to insert print statements indicating the line number |
|
before every executable line in the AST. |
|
""" |
|
|
|
def insert_print_statement(self, line_number): |
|
"""Inserts a print statement for a given line number.""" |
|
return ast.Expr( |
|
value=ast.Call( |
|
func=ast.Name(id='print', ctx=ast.Load()), |
|
args=[ast.Constant(value=f"## active_line {line_number} ##")], |
|
keywords=[] |
|
) |
|
) |
|
|
|
def process_body(self, body): |
|
"""Processes a block of statements, adding print calls.""" |
|
new_body = [] |
|
|
|
|
|
if not isinstance(body, list): |
|
body = [body] |
|
|
|
for sub_node in body: |
|
if hasattr(sub_node, 'lineno'): |
|
new_body.append(self.insert_print_statement(sub_node.lineno)) |
|
new_body.append(sub_node) |
|
|
|
return new_body |
|
|
|
def visit(self, node): |
|
"""Overridden visit to transform nodes.""" |
|
new_node = super().visit(node) |
|
|
|
|
|
if hasattr(new_node, 'body'): |
|
new_node.body = self.process_body(new_node.body) |
|
|
|
|
|
if hasattr(new_node, 'orelse') and new_node.orelse: |
|
new_node.orelse = self.process_body(new_node.orelse) |
|
|
|
|
|
if isinstance(new_node, ast.Try): |
|
for handler in new_node.handlers: |
|
handler.body = self.process_body(handler.body) |
|
if new_node.finalbody: |
|
new_node.finalbody = self.process_body(new_node.finalbody) |
|
|
|
return new_node |
|
|
|
|
|
def wrap_in_try_except(code): |
|
|
|
code = "import traceback\n" + code |
|
|
|
|
|
parsed_code = ast.parse(code) |
|
|
|
|
|
try_except = ast.Try( |
|
body=parsed_code.body, |
|
handlers=[ |
|
ast.ExceptHandler( |
|
type=ast.Name(id="Exception", ctx=ast.Load()), |
|
name=None, |
|
body=[ |
|
ast.Expr( |
|
value=ast.Call( |
|
func=ast.Attribute(value=ast.Name(id="traceback", ctx=ast.Load()), attr="print_exc", ctx=ast.Load()), |
|
args=[], |
|
keywords=[] |
|
) |
|
), |
|
] |
|
) |
|
], |
|
orelse=[], |
|
finalbody=[] |
|
) |
|
|
|
|
|
parsed_code.body = [try_except] |
|
|
|
|
|
return ast.unparse(parsed_code) |
|
|