import ast import os import re import shlex import sys from ..subprocess_code_interpreter import SubprocessCodeInterpreter class Python(SubprocessCodeInterpreter): file_extension = 'py' proper_name = 'Python' def __init__(self): super().__init__() executable = sys.executable if != 'nt': # not Windows executable = shlex.quote(executable) self.start_cmd = executable + ' -i -q -u' def preprocess_code(self, code): return preprocess_python(code) def line_postprocessor(self, line): if re.match(r'^(\s*>>>\s*|\s*\.\.\.\s*)', line): return None return line def detect_active_line(self, line): if '##active_line' in line: return int(line.split('##active_line')[1].split('##')[0]) return None def detect_end_of_execution(self, line): return '##end_of_execution##' in line def preprocess_python(code): """ Add active line markers Wrap in a try except Add end of execution marker """ # Add print commands that tell us what the active line is code = add_active_line_prints(code) # Wrap in a try except code = wrap_in_try_except(code) # Remove any whitespace lines, as this will break indented blocks # (are we sure about this? test this) code_lines = code.split('\n') code_lines = [c for c in code_lines if c.strip() != ''] code = '\n'.join(code_lines) # Add end command (we'll be listening for this so we know when it ends) code += '\n\nprint("##end_of_execution##")' return code def add_active_line_prints(code): """ Add print statements indicating line numbers to a python string. """ tree = ast.parse(code) transformer = AddLinePrints() new_tree = transformer.visit(tree) return ast.unparse(new_tree) class AddLinePrints(ast.NodeTransformer): """ Transformer to insert print statements indicating the line number before every executable line in the AST. """ def insert_print_statement(self, line_number): """Inserts a print statement for a given line number.""" return ast.Expr( value=ast.Call( func=ast.Name(id='print', ctx=ast.Load()), args=[ast.Constant(value=f'##active_line{line_number}##')], keywords=[], )) def process_body(self, body): """Processes a block of statements, adding print calls.""" new_body = [] # In case it's not iterable: if not isinstance(body, list): body = [body] for sub_node in body: if hasattr(sub_node, 'lineno'): new_body.append(self.insert_print_statement(sub_node.lineno)) new_body.append(sub_node) return new_body def visit(self, node): """Overridden visit to transform nodes.""" new_node = super().visit(node) # If node has a body, process it if hasattr(new_node, 'body'): new_node.body = self.process_body(new_node.body) # If node has an orelse block (like in for, while, if), process it if hasattr(new_node, 'orelse') and new_node.orelse: new_node.orelse = self.process_body(new_node.orelse) # Special case for Try nodes as they have multiple blocks if isinstance(new_node, ast.Try): for handler in new_node.handlers: handler.body = self.process_body(handler.body) if new_node.finalbody: new_node.finalbody = self.process_body(new_node.finalbody) return new_node def wrap_in_try_except(code): # Add import traceback code = 'import traceback\n' + code # Parse the input code into an AST parsed_code = ast.parse(code) # Wrap the entire code's AST in a single try-except block try_except = ast.Try( body=parsed_code.body, handlers=[ ast.ExceptHandler( type=ast.Name(id='Exception', ctx=ast.Load()), name=None, body=[ ast.Expr( value=ast.Call( func=ast.Attribute( value=ast.Name(id='traceback', ctx=ast.Load()), attr='print_exc', ctx=ast.Load(), ), args=[], keywords=[], )), ], ) ], orelse=[], finalbody=[], ) # Assign the try-except block as the new body parsed_code.body = [try_except] # Convert the modified AST back to source code return ast.unparse(parsed_code)