samlax12's picture
Upload 23 files
1625bb7 verified
raw
history blame contribute delete
11.1 kB
# modules/code_executor/routes.py
from flask import Blueprint, request, jsonify
import sys
import io
import threading
import queue
import time
import traceback
import os
import re
from openai import OpenAI
from config import API_KEY, BASE_URL, OPENAI_MODEL
code_executor_bp = Blueprint('code_executor', __name__)
# 创建OpenAI客户端
client = OpenAI(
api_key=API_KEY,
base_url=BASE_URL
)
# 执行上下文存储
execution_contexts = {}
class CustomStdin:
def __init__(self, input_queue):
self.input_queue = input_queue
self.buffer = ""
def readline(self):
if not self.buffer:
self.buffer = self.input_queue.get() + "\n"
result = self.buffer
self.buffer = ""
return result
class InteractiveExecution:
"""管理Python代码的交互式执行"""
def __init__(self, code):
self.code = code
self.context_id = str(time.time())
self.is_complete = False
self.is_waiting_for_input = False
self.stdout_buffer = io.StringIO()
self.last_read_position = 0
self.input_queue = queue.Queue()
self.error = None
self.thread = None
self.should_terminate = False
def run(self):
"""在单独的线程中启动执行"""
self.thread = threading.Thread(target=self._execute)
self.thread.daemon = True
self.thread.start()
# 给执行一点时间开始
time.sleep(0.1)
return self.context_id
def _execute(self):
"""执行代码,处理标准输入输出"""
try:
# 保存原始的stdin/stdout
orig_stdin = sys.stdin
orig_stdout = sys.stdout
# 创建自定义stdin
custom_stdin = CustomStdin(self.input_queue)
# 重定向stdin和stdout
sys.stdin = custom_stdin
sys.stdout = self.stdout_buffer
try:
# 检查终止的函数
self._last_check_time = 0
def check_termination():
if self.should_terminate:
raise KeyboardInterrupt("Execution terminated by user")
# 设置一个模拟__main__模块的命名空间
shared_namespace = {
"__builtins__": __builtins__,
"_check_termination": check_termination,
"time": time,
"__name__": "__main__"
}
# 在这个命名空间中执行用户代码
try:
exec(self.code, shared_namespace)
except KeyboardInterrupt:
print("\nExecution terminated by user")
except Exception as e:
self.error = {
"error": str(e),
"traceback": traceback.format_exc()
}
finally:
# 恢复原始stdin/stdout
sys.stdin = orig_stdin
sys.stdout = orig_stdout
# 标记执行完成
self.is_complete = True
except Exception as e:
self.error = {
"error": str(e),
"traceback": traceback.format_exc()
}
self.is_complete = True
def terminate(self):
"""终止执行"""
self.should_terminate = True
# 如果在等待输入,放入一些内容以解除阻塞
if self.is_waiting_for_input:
self.input_queue.put("\n")
# 给执行一点时间终止
time.sleep(0.2)
# 标记为完成
self.is_complete = True
return True
def provide_input(self, user_input):
"""为运行的代码提供输入"""
self.input_queue.put(user_input)
self.is_waiting_for_input = False
return True
def get_output(self):
"""获取stdout缓冲区的当前内容"""
output = self.stdout_buffer.getvalue()
return output
def get_new_output(self):
"""只获取自上次读取以来的新输出"""
current_value = self.stdout_buffer.getvalue()
if self.last_read_position < len(current_value):
new_output = current_value[self.last_read_position:]
self.last_read_position = len(current_value)
return new_output
return ""
@code_executor_bp.route('/generate', methods=['POST'])
def generate_code():
"""使用AI生成Python代码"""
try:
prompt = request.json.get('prompt')
if not prompt:
return jsonify({
"success": False,
"error": "No prompt provided"
})
# 构建带有适当上下文的提示
full_prompt = f"""You are a Python programming assistant. Generate Python code based on this requirement:
{prompt}
Provide only the Python code without any explanation or markdown formatting."""
# 调用OpenAI API
response = client.chat.completions.create(
model=OPENAI_MODEL,
messages=[{"role": "user", "content": full_prompt}]
)
# 提取和清理代码
code = response.choices[0].message.content.strip()
# 删除Markdown代码块标记(如果存在)
code = re.sub(r'```python\n', '', code)
code = re.sub(r'```', '', code)
return jsonify({
"success": True,
"code": code
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
})
@code_executor_bp.route('/execute', methods=['POST'])
def execute_code():
"""执行Python代码"""
try:
code = request.json.get('code')
if not code:
return jsonify({
"success": False,
"error": "No code provided"
})
# 创建并启动执行
execution = InteractiveExecution(code)
context_id = execution.run()
# 存储在全局上下文中
execution_contexts[context_id] = execution
# 检查初始状态
if execution.error:
# 执行立即失败
error_info = execution.error
del execution_contexts[context_id]
return jsonify({
"success": False,
"error": error_info["error"],
"traceback": error_info["traceback"]
})
# 获取初始输出
output = execution.get_output()
# 更新上次读取位置以标记此输出为已读
execution.last_read_position = len(output)
# 检查是否命中input()调用或完成执行
if execution.is_complete:
# 执行完成,不需要输入
del execution_contexts[context_id]
return jsonify({
"success": True,
"output": output,
"needsInput": False
})
else:
# 假设我们正在等待输入
execution.is_waiting_for_input = True
return jsonify({
"success": True,
"output": output,
"needsInput": True,
"context_id": context_id
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
})
@code_executor_bp.route('/input', methods=['POST'])
def provide_input():
"""为正在执行的代码提供输入"""
try:
user_input = request.json.get('input', '')
context_id = request.json.get('context_id')
if not context_id or context_id not in execution_contexts:
return jsonify({
"success": False,
"error": "Invalid or expired execution context"
})
execution = execution_contexts[context_id]
# 提供输入
execution.provide_input(user_input)
# 给一点时间处理
time.sleep(0.1)
# 获取输入后的新输出
new_output = execution.get_new_output()
# 检查执行状态
if execution.is_complete:
# 执行已完成
if execution.error:
# 有错误
error_info = execution.error
del execution_contexts[context_id]
return jsonify({
"success": False,
"error": error_info["error"],
"traceback": error_info["traceback"]
})
else:
# 干净执行
del execution_contexts[context_id]
return jsonify({
"success": True,
"output": new_output,
"needsInput": False
})
else:
# 执行仍在运行,假设我们需要更多输入
execution.is_waiting_for_input = True
return jsonify({
"success": True,
"output": new_output,
"needsInput": True
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
})
@code_executor_bp.route('/stop', methods=['POST'])
def stop_execution():
"""停止执行"""
try:
context_id = request.json.get('context_id')
if not context_id or context_id not in execution_contexts:
return jsonify({
"success": False,
"error": "Invalid or expired execution context"
})
execution = execution_contexts[context_id]
# 终止执行
execution.terminate()
# 获取最终输出
output = execution.get_output()
# 清理
del execution_contexts[context_id]
return jsonify({
"success": True,
"output": output,
"message": "Execution terminated"
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
})