Tachi67's picture
add code interpreter from open-interpreter
fcdf91d
raw
history blame
4.73 kB
import os
import sys
from ..subprocess_code_interpreter import SubprocessCodeInterpreter
import ast
import re
import shlex
class Python(SubprocessCodeInterpreter):
file_extension = "py"
proper_name = "Python"
def __init__(self):
super().__init__()
executable = sys.executable
if os.name != 'nt': # not Windows
executable = shlex.quote(executable)
self.start_cmd = executable + " -i -q -u"
def preprocess_code(self, code):
return preprocess_python(code)
def line_postprocessor(self, line):
if re.match(r'^(\s*>>>\s*|\s*\.\.\.\s*)', line):
return None
return line
def detect_active_line(self, line):
if "## active_line " in line:
return int(line.split("## active_line ")[1].split(" ##")[0])
return None
def detect_end_of_execution(self, line):
return "## end_of_execution ##" in line
def preprocess_python(code):
"""
Add active line markers
Wrap in a try except
Add end of execution marker
"""
# Add print commands that tell us what the active line is
code = add_active_line_prints(code)
# Wrap in a try except
code = wrap_in_try_except(code)
# Remove any whitespace lines, as this will break indented blocks
# (are we sure about this? test this)
code_lines = code.split("\n")
code_lines = [c for c in code_lines if c.strip() != ""]
code = "\n".join(code_lines)
# Add end command (we'll be listening for this so we know when it ends)
code += '\n\nprint("## end_of_execution ##")'
return code
def add_active_line_prints(code):
"""
Add print statements indicating line numbers to a python string.
"""
tree = ast.parse(code)
transformer = AddLinePrints()
new_tree = transformer.visit(tree)
return ast.unparse(new_tree)
class AddLinePrints(ast.NodeTransformer):
"""
Transformer to insert print statements indicating the line number
before every executable line in the AST.
"""
def insert_print_statement(self, line_number):
"""Inserts a print statement for a given line number."""
return ast.Expr(
value=ast.Call(
func=ast.Name(id='print', ctx=ast.Load()),
args=[ast.Constant(value=f"## active_line {line_number} ##")],
keywords=[]
)
)
def process_body(self, body):
"""Processes a block of statements, adding print calls."""
new_body = []
# In case it's not iterable:
if not isinstance(body, list):
body = [body]
for sub_node in body:
if hasattr(sub_node, 'lineno'):
new_body.append(self.insert_print_statement(sub_node.lineno))
new_body.append(sub_node)
return new_body
def visit(self, node):
"""Overridden visit to transform nodes."""
new_node = super().visit(node)
# If node has a body, process it
if hasattr(new_node, 'body'):
new_node.body = self.process_body(new_node.body)
# If node has an orelse block (like in for, while, if), process it
if hasattr(new_node, 'orelse') and new_node.orelse:
new_node.orelse = self.process_body(new_node.orelse)
# Special case for Try nodes as they have multiple blocks
if isinstance(new_node, ast.Try):
for handler in new_node.handlers:
handler.body = self.process_body(handler.body)
if new_node.finalbody:
new_node.finalbody = self.process_body(new_node.finalbody)
return new_node
def wrap_in_try_except(code):
# Add import traceback
code = "import traceback\n" + code
# Parse the input code into an AST
parsed_code = ast.parse(code)
# Wrap the entire code's AST in a single try-except block
try_except = ast.Try(
body=parsed_code.body,
handlers=[
ast.ExceptHandler(
type=ast.Name(id="Exception", ctx=ast.Load()),
name=None,
body=[
ast.Expr(
value=ast.Call(
func=ast.Attribute(value=ast.Name(id="traceback", ctx=ast.Load()), attr="print_exc", ctx=ast.Load()),
args=[],
keywords=[]
)
),
]
)
],
orelse=[],
finalbody=[]
)
# Assign the try-except block as the new body
parsed_code.body = [try_except]
# Convert the modified AST back to source code
return ast.unparse(parsed_code)