# modules/code_executor/routes.py from flask import Blueprint, request, jsonify import sys import io import threading import queue import time import traceback import os import re from openai import OpenAI from config import API_KEY, BASE_URL, OPENAI_MODEL code_executor_bp = Blueprint('code_executor', __name__) # 创建OpenAI客户端 client = OpenAI( api_key=API_KEY, base_url=BASE_URL ) # 执行上下文存储 execution_contexts = {} class CustomStdin: def __init__(self, input_queue): self.input_queue = input_queue self.buffer = "" def readline(self): if not self.buffer: self.buffer = self.input_queue.get() + "\n" result = self.buffer self.buffer = "" return result class InteractiveExecution: """管理Python代码的交互式执行""" def __init__(self, code): self.code = code self.context_id = str(time.time()) self.is_complete = False self.is_waiting_for_input = False self.stdout_buffer = io.StringIO() self.last_read_position = 0 self.input_queue = queue.Queue() self.error = None self.thread = None self.should_terminate = False def run(self): """在单独的线程中启动执行""" self.thread = threading.Thread(target=self._execute) self.thread.daemon = True self.thread.start() # 给执行一点时间开始 time.sleep(0.1) return self.context_id def _execute(self): """执行代码,处理标准输入输出""" try: # 保存原始的stdin/stdout orig_stdin = sys.stdin orig_stdout = sys.stdout # 创建自定义stdin custom_stdin = CustomStdin(self.input_queue) # 重定向stdin和stdout sys.stdin = custom_stdin sys.stdout = self.stdout_buffer try: # 检查终止的函数 self._last_check_time = 0 def check_termination(): if self.should_terminate: raise KeyboardInterrupt("Execution terminated by user") # 设置一个模拟__main__模块的命名空间 shared_namespace = { "__builtins__": __builtins__, "_check_termination": check_termination, "time": time, "__name__": "__main__" } # 在这个命名空间中执行用户代码 try: exec(self.code, shared_namespace) except KeyboardInterrupt: print("\nExecution terminated by user") except Exception as e: self.error = { "error": str(e), "traceback": traceback.format_exc() } finally: # 恢复原始stdin/stdout sys.stdin = orig_stdin sys.stdout = orig_stdout # 标记执行完成 self.is_complete = True except Exception as e: self.error = { "error": str(e), "traceback": traceback.format_exc() } self.is_complete = True def terminate(self): """终止执行""" self.should_terminate = True # 如果在等待输入,放入一些内容以解除阻塞 if self.is_waiting_for_input: self.input_queue.put("\n") # 给执行一点时间终止 time.sleep(0.2) # 标记为完成 self.is_complete = True return True def provide_input(self, user_input): """为运行的代码提供输入""" self.input_queue.put(user_input) self.is_waiting_for_input = False return True def get_output(self): """获取stdout缓冲区的当前内容""" output = self.stdout_buffer.getvalue() return output def get_new_output(self): """只获取自上次读取以来的新输出""" current_value = self.stdout_buffer.getvalue() if self.last_read_position < len(current_value): new_output = current_value[self.last_read_position:] self.last_read_position = len(current_value) return new_output return "" @code_executor_bp.route('/generate', methods=['POST']) def generate_code(): """使用AI生成Python代码""" try: prompt = request.json.get('prompt') if not prompt: return jsonify({ "success": False, "error": "No prompt provided" }) # 构建带有适当上下文的提示 full_prompt = f"""You are a Python programming assistant. Generate Python code based on this requirement: {prompt} Provide only the Python code without any explanation or markdown formatting.""" # 调用OpenAI API response = client.chat.completions.create( model=OPENAI_MODEL, messages=[{"role": "user", "content": full_prompt}] ) # 提取和清理代码 code = response.choices[0].message.content.strip() # 删除Markdown代码块标记(如果存在) code = re.sub(r'```python\n', '', code) code = re.sub(r'```', '', code) return jsonify({ "success": True, "code": code }) except Exception as e: return jsonify({ "success": False, "error": str(e) }) @code_executor_bp.route('/execute', methods=['POST']) def execute_code(): """执行Python代码""" try: code = request.json.get('code') if not code: return jsonify({ "success": False, "error": "No code provided" }) # 创建并启动执行 execution = InteractiveExecution(code) context_id = execution.run() # 存储在全局上下文中 execution_contexts[context_id] = execution # 检查初始状态 if execution.error: # 执行立即失败 error_info = execution.error del execution_contexts[context_id] return jsonify({ "success": False, "error": error_info["error"], "traceback": error_info["traceback"] }) # 获取初始输出 output = execution.get_output() # 更新上次读取位置以标记此输出为已读 execution.last_read_position = len(output) # 检查是否命中input()调用或完成执行 if execution.is_complete: # 执行完成,不需要输入 del execution_contexts[context_id] return jsonify({ "success": True, "output": output, "needsInput": False }) else: # 假设我们正在等待输入 execution.is_waiting_for_input = True return jsonify({ "success": True, "output": output, "needsInput": True, "context_id": context_id }) except Exception as e: return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }) @code_executor_bp.route('/input', methods=['POST']) def provide_input(): """为正在执行的代码提供输入""" try: user_input = request.json.get('input', '') context_id = request.json.get('context_id') if not context_id or context_id not in execution_contexts: return jsonify({ "success": False, "error": "Invalid or expired execution context" }) execution = execution_contexts[context_id] # 提供输入 execution.provide_input(user_input) # 给一点时间处理 time.sleep(0.1) # 获取输入后的新输出 new_output = execution.get_new_output() # 检查执行状态 if execution.is_complete: # 执行已完成 if execution.error: # 有错误 error_info = execution.error del execution_contexts[context_id] return jsonify({ "success": False, "error": error_info["error"], "traceback": error_info["traceback"] }) else: # 干净执行 del execution_contexts[context_id] return jsonify({ "success": True, "output": new_output, "needsInput": False }) else: # 执行仍在运行,假设我们需要更多输入 execution.is_waiting_for_input = True return jsonify({ "success": True, "output": new_output, "needsInput": True }) except Exception as e: return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }) @code_executor_bp.route('/stop', methods=['POST']) def stop_execution(): """停止执行""" try: context_id = request.json.get('context_id') if not context_id or context_id not in execution_contexts: return jsonify({ "success": False, "error": "Invalid or expired execution context" }) execution = execution_contexts[context_id] # 终止执行 execution.terminate() # 获取最终输出 output = execution.get_output() # 清理 del execution_contexts[context_id] return jsonify({ "success": True, "output": output, "message": "Execution terminated" }) except Exception as e: return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() })