File size: 4,726 Bytes
fcdf91d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
import os
import sys
from ..subprocess_code_interpreter import SubprocessCodeInterpreter
import ast
import re
import shlex
class Python(SubprocessCodeInterpreter):
file_extension = "py"
proper_name = "Python"
def __init__(self):
super().__init__()
executable = sys.executable
if os.name != 'nt': # not Windows
executable = shlex.quote(executable)
self.start_cmd = executable + " -i -q -u"
def preprocess_code(self, code):
return preprocess_python(code)
def line_postprocessor(self, line):
if re.match(r'^(\s*>>>\s*|\s*\.\.\.\s*)', line):
return None
return line
def detect_active_line(self, line):
if "## active_line " in line:
return int(line.split("## active_line ")[1].split(" ##")[0])
return None
def detect_end_of_execution(self, line):
return "## end_of_execution ##" in line
def preprocess_python(code):
"""
Add active line markers
Wrap in a try except
Add end of execution marker
"""
# Add print commands that tell us what the active line is
code = add_active_line_prints(code)
# Wrap in a try except
code = wrap_in_try_except(code)
# Remove any whitespace lines, as this will break indented blocks
# (are we sure about this? test this)
code_lines = code.split("\n")
code_lines = [c for c in code_lines if c.strip() != ""]
code = "\n".join(code_lines)
# Add end command (we'll be listening for this so we know when it ends)
code += '\n\nprint("## end_of_execution ##")'
return code
def add_active_line_prints(code):
"""
Add print statements indicating line numbers to a python string.
"""
tree = ast.parse(code)
transformer = AddLinePrints()
new_tree = transformer.visit(tree)
return ast.unparse(new_tree)
class AddLinePrints(ast.NodeTransformer):
"""
Transformer to insert print statements indicating the line number
before every executable line in the AST.
"""
def insert_print_statement(self, line_number):
"""Inserts a print statement for a given line number."""
return ast.Expr(
value=ast.Call(
func=ast.Name(id='print', ctx=ast.Load()),
args=[ast.Constant(value=f"## active_line {line_number} ##")],
keywords=[]
)
)
def process_body(self, body):
"""Processes a block of statements, adding print calls."""
new_body = []
# In case it's not iterable:
if not isinstance(body, list):
body = [body]
for sub_node in body:
if hasattr(sub_node, 'lineno'):
new_body.append(self.insert_print_statement(sub_node.lineno))
new_body.append(sub_node)
return new_body
def visit(self, node):
"""Overridden visit to transform nodes."""
new_node = super().visit(node)
# If node has a body, process it
if hasattr(new_node, 'body'):
new_node.body = self.process_body(new_node.body)
# If node has an orelse block (like in for, while, if), process it
if hasattr(new_node, 'orelse') and new_node.orelse:
new_node.orelse = self.process_body(new_node.orelse)
# Special case for Try nodes as they have multiple blocks
if isinstance(new_node, ast.Try):
for handler in new_node.handlers:
handler.body = self.process_body(handler.body)
if new_node.finalbody:
new_node.finalbody = self.process_body(new_node.finalbody)
return new_node
def wrap_in_try_except(code):
# Add import traceback
code = "import traceback\n" + code
# Parse the input code into an AST
parsed_code = ast.parse(code)
# Wrap the entire code's AST in a single try-except block
try_except = ast.Try(
body=parsed_code.body,
handlers=[
ast.ExceptHandler(
type=ast.Name(id="Exception", ctx=ast.Load()),
name=None,
body=[
ast.Expr(
value=ast.Call(
func=ast.Attribute(value=ast.Name(id="traceback", ctx=ast.Load()), attr="print_exc", ctx=ast.Load()),
args=[],
keywords=[]
)
),
]
)
],
orelse=[],
finalbody=[]
)
# Assign the try-except block as the new body
parsed_code.body = [try_except]
# Convert the modified AST back to source code
return ast.unparse(parsed_code)
|