jianuo's picture
first
09321b6
raw
history blame
No virus
4.8 kB
import ast
import os
import re
import shlex
import sys
from ..subprocess_code_interpreter import SubprocessCodeInterpreter
class Python(SubprocessCodeInterpreter):
file_extension = 'py'
proper_name = 'Python'
def __init__(self):
super().__init__()
executable = sys.executable
if os.name != 'nt': # not Windows
executable = shlex.quote(executable)
self.start_cmd = executable + ' -i -q -u'
def preprocess_code(self, code):
return preprocess_python(code)
def line_postprocessor(self, line):
if re.match(r'^(\s*>>>\s*|\s*\.\.\.\s*)', line):
return None
return line
def detect_active_line(self, line):
if '##active_line' in line:
return int(line.split('##active_line')[1].split('##')[0])
return None
def detect_end_of_execution(self, line):
return '##end_of_execution##' in line
def preprocess_python(code):
"""
Add active line markers
Wrap in a try except
Add end of execution marker
"""
# Add print commands that tell us what the active line is
code = add_active_line_prints(code)
# Wrap in a try except
code = wrap_in_try_except(code)
# Remove any whitespace lines, as this will break indented blocks
# (are we sure about this? test this)
code_lines = code.split('\n')
code_lines = [c for c in code_lines if c.strip() != '']
code = '\n'.join(code_lines)
# Add end command (we'll be listening for this so we know when it ends)
code += '\n\nprint("##end_of_execution##")'
return code
def add_active_line_prints(code):
"""
Add print statements indicating line numbers to a python string.
"""
tree = ast.parse(code)
transformer = AddLinePrints()
new_tree = transformer.visit(tree)
return ast.unparse(new_tree)
class AddLinePrints(ast.NodeTransformer):
"""
Transformer to insert print statements indicating the line number
before every executable line in the AST.
"""
def insert_print_statement(self, line_number):
"""Inserts a print statement for a given line number."""
return ast.Expr(
value=ast.Call(
func=ast.Name(id='print', ctx=ast.Load()),
args=[ast.Constant(value=f'##active_line{line_number}##')],
keywords=[],
))
def process_body(self, body):
"""Processes a block of statements, adding print calls."""
new_body = []
# In case it's not iterable:
if not isinstance(body, list):
body = [body]
for sub_node in body:
if hasattr(sub_node, 'lineno'):
new_body.append(self.insert_print_statement(sub_node.lineno))
new_body.append(sub_node)
return new_body
def visit(self, node):
"""Overridden visit to transform nodes."""
new_node = super().visit(node)
# If node has a body, process it
if hasattr(new_node, 'body'):
new_node.body = self.process_body(new_node.body)
# If node has an orelse block (like in for, while, if), process it
if hasattr(new_node, 'orelse') and new_node.orelse:
new_node.orelse = self.process_body(new_node.orelse)
# Special case for Try nodes as they have multiple blocks
if isinstance(new_node, ast.Try):
for handler in new_node.handlers:
handler.body = self.process_body(handler.body)
if new_node.finalbody:
new_node.finalbody = self.process_body(new_node.finalbody)
return new_node
def wrap_in_try_except(code):
# Add import traceback
code = 'import traceback\n' + code
# Parse the input code into an AST
parsed_code = ast.parse(code)
# Wrap the entire code's AST in a single try-except block
try_except = ast.Try(
body=parsed_code.body,
handlers=[
ast.ExceptHandler(
type=ast.Name(id='Exception', ctx=ast.Load()),
name=None,
body=[
ast.Expr(
value=ast.Call(
func=ast.Attribute(
value=ast.Name(id='traceback', ctx=ast.Load()),
attr='print_exc',
ctx=ast.Load(),
),
args=[],
keywords=[],
)),
],
)
],
orelse=[],
finalbody=[],
)
# Assign the try-except block as the new body
parsed_code.body = [try_except]
# Convert the modified AST back to source code
return ast.unparse(parsed_code)