Spaces:
Running
Running
| """ | |
| Code agent backend - handles code execution with E2B | |
| """ | |
| import json | |
| import logging | |
| import os | |
| import re | |
| from typing import List, Dict, Optional | |
| from e2b_code_interpreter import Sandbox | |
| from .tools import execute_code, upload_files, download_files | |
| from .image import resize_image_for_vlm | |
| logger = logging.getLogger(__name__) | |
| TOOLS = [execute_code, upload_files, download_files] | |
| MAX_TURNS = 40 | |
| def parse_execution_result(execution, max_output_length=4000): | |
| """Parse execution result for LLM feedback""" | |
| output = [] | |
| def truncate_if_needed(text): | |
| if len(text) > max_output_length: | |
| half = max_output_length // 2 | |
| return text[:half] + f"\n\n[... truncated {len(text) - max_output_length} of {len(text)} chars ...]\n\n" + text[-half:] | |
| return text | |
| # Check for images/plots | |
| has_images = any(result.png or result.jpeg or result.svg for result in execution.results) | |
| if has_images: | |
| output.append("[Plot/Image generated]") | |
| if execution.results: | |
| # Filter out Figure text representations | |
| text_results = [result.text for result in execution.results if result.text and not result.text.startswith('<Figure')] | |
| if text_results: | |
| output.append(truncate_if_needed("\n".join(text_results))) | |
| if execution.logs.stdout: | |
| output.append(truncate_if_needed("\n".join(execution.logs.stdout))) | |
| if execution.logs.stderr: | |
| output.append(truncate_if_needed("\n".join(execution.logs.stderr))) | |
| if execution.error is not None: | |
| output.append(truncate_if_needed(execution.error.traceback)) | |
| return "\n".join(filter(None, output)) | |
| def format_code_cell(code: str, execution_result: str = None, error: bool = False, images: list = None): | |
| """Format a code cell for display in the UI""" | |
| return { | |
| "type": "code", | |
| "code": code, | |
| "output": execution_result, | |
| "error": error, | |
| "images": images or [] | |
| } | |
| def format_thinking_cell(content: str): | |
| """Format assistant thinking for display""" | |
| return { | |
| "type": "thinking", | |
| "content": content | |
| } | |
| def upload_files_to_sandbox(sbx: Sandbox, paths: List[str], files_root: str) -> str: | |
| """ | |
| Upload multiple files to the sandbox. | |
| Args: | |
| sbx: E2B sandbox instance | |
| paths: List of relative file paths | |
| files_root: Root directory to resolve relative paths | |
| Returns: | |
| String describing what was uploaded or errors encountered | |
| """ | |
| results = [] | |
| for rel_path in paths: | |
| # Normalize the path (remove ./ prefix if present) | |
| rel_path = rel_path.lstrip('./') | |
| local_path = os.path.join(files_root, rel_path) | |
| # Security check: ensure path doesn't escape files_root | |
| real_local = os.path.realpath(local_path) | |
| real_root = os.path.realpath(files_root) | |
| if not real_local.startswith(real_root): | |
| results.append(f"Error: {rel_path} - path outside workspace") | |
| continue | |
| if not os.path.exists(local_path): | |
| results.append(f"Error: {rel_path} - file not found") | |
| continue | |
| if not os.path.isfile(local_path): | |
| results.append(f"Error: {rel_path} - not a file") | |
| continue | |
| try: | |
| # Get just the filename for the sandbox path | |
| filename = os.path.basename(rel_path) | |
| sandbox_path = f"/home/user/{filename}" | |
| with open(local_path, "rb") as f: | |
| sbx.files.write(sandbox_path, f) | |
| results.append(f"Uploaded: {rel_path} -> {sandbox_path}") | |
| except Exception as e: | |
| results.append(f"Error uploading {rel_path}: {str(e)}") | |
| return "\n".join(results) | |
| def download_files_from_sandbox(sbx: Sandbox, files: List[Dict], files_root: str) -> str: | |
| """ | |
| Download multiple files from the sandbox to the local workspace. | |
| Args: | |
| sbx: E2B sandbox instance | |
| files: List of dicts with 'sandbox_path' and 'local_path' keys | |
| files_root: Root directory to resolve relative paths | |
| Returns: | |
| String describing what was downloaded or errors encountered | |
| """ | |
| results = [] | |
| for file_spec in files: | |
| sandbox_path = file_spec.get('sandbox_path', '') | |
| local_rel_path = file_spec.get('local_path', '') | |
| if not sandbox_path or not local_rel_path: | |
| results.append(f"Error: Missing sandbox_path or local_path") | |
| continue | |
| # Normalize the local path (remove ./ prefix if present) | |
| local_rel_path = local_rel_path.lstrip('./') | |
| local_path = os.path.join(files_root, local_rel_path) | |
| # Security check: ensure path doesn't escape files_root | |
| real_local = os.path.realpath(os.path.dirname(local_path)) | |
| real_root = os.path.realpath(files_root) | |
| # Need to handle case where parent dir doesn't exist yet | |
| test_path = local_path | |
| while not os.path.exists(os.path.dirname(test_path)): | |
| test_path = os.path.dirname(test_path) | |
| real_local = os.path.realpath(test_path) | |
| if not real_local.startswith(real_root): | |
| results.append(f"Error: {local_rel_path} - path outside workspace") | |
| continue | |
| try: | |
| # Read file content from sandbox (use bytes for binary files) | |
| content = sbx.files.read(sandbox_path, format='bytes') | |
| # Create parent directories if needed | |
| os.makedirs(os.path.dirname(local_path), exist_ok=True) | |
| # Write to local file | |
| with open(local_path, 'wb') as f: | |
| f.write(content) | |
| results.append(f"Downloaded: {sandbox_path} -> {local_rel_path}") | |
| except Exception as e: | |
| results.append(f"Error downloading {sandbox_path}: {str(e)}") | |
| return "\n".join(results) | |
| def stream_code_execution(client, model: str, messages: List[Dict], sbx: Sandbox, files_root: str = None, extra_params: Optional[Dict] = None, abort_event=None, multimodal: bool = False, tab_id: str = "0", figure_store: Optional[Dict[str, dict]] = None): | |
| """ | |
| Stream code execution results | |
| Yields: | |
| dict: Updates with type 'thinking', 'code', or 'done' | |
| Args: | |
| client: OpenAI-compatible client | |
| model: Model name to use | |
| messages: Conversation messages | |
| sbx: E2B sandbox instance | |
| files_root: Root directory for file uploads (optional) | |
| extra_params: Extra parameters for API calls (optional) | |
| """ | |
| from .agents import call_llm | |
| turns = 0 | |
| done = False | |
| figure_counter = 0 # Track figure numbers | |
| figure_prefix = f"figure_T{tab_id}_" | |
| # Use shared global store if provided, otherwise create local one | |
| if figure_store is None: | |
| figure_store = {} | |
| figure_data = figure_store # Alias for clarity in this function | |
| has_result = False | |
| debug_call_number = 0 | |
| while not done and turns < MAX_TURNS: | |
| # Check abort before each turn | |
| if abort_event and abort_event.is_set(): | |
| yield {"type": "aborted"} | |
| return | |
| turns += 1 | |
| # LLM call with retries and debug events | |
| response = None | |
| for event in call_llm(client, model, messages, tools=TOOLS, extra_params=extra_params, abort_event=abort_event, call_number=debug_call_number): | |
| if "_response" in event: | |
| response = event["_response"] | |
| debug_call_number = event["_call_number"] | |
| else: | |
| yield event | |
| if event.get("type") in ("error", "aborted"): | |
| return | |
| if response is None: | |
| return | |
| # Get response | |
| assistant_message = response.choices[0].message | |
| content = assistant_message.content or "" | |
| tool_calls = assistant_message.tool_calls or [] | |
| # Check for result tags | |
| result_match = re.search(r'<result>(.*?)</result>', content, re.DOTALL | re.IGNORECASE) | |
| result_content = None | |
| thinking_content = content | |
| if result_match: | |
| logger.debug(f"Result found: {content[:200]}...") | |
| result_content = result_match.group(1).strip() | |
| # Remove result tags from thinking display | |
| thinking_content = re.sub(r'<result>.*?</result>', '', content, flags=re.DOTALL | re.IGNORECASE).strip() | |
| # Send thinking if there's content (excluding result tags) | |
| if thinking_content.strip(): | |
| yield format_thinking_cell(thinking_content) | |
| # Send result as a special highlighted message in the CODE notebook | |
| if result_content: | |
| yield {"type": "result_preview", "content": result_content, "figures": figure_data} | |
| # Handle tool calls | |
| if tool_calls: | |
| for tool_call in tool_calls: | |
| # Check abort between tool calls | |
| if abort_event and abort_event.is_set(): | |
| yield {"type": "aborted"} | |
| return | |
| if tool_call.function.name == "execute_code": | |
| # Parse arguments | |
| try: | |
| args = json.loads(tool_call.function.arguments) | |
| code = args["code"] | |
| except json.JSONDecodeError as e: | |
| error_msg = f"JSON parse error: {e}. Raw arguments: {tool_call.function.arguments[:500]}" | |
| logger.error(error_msg) | |
| # Treat as tool error so LLM can recover | |
| output = f"Error parsing code arguments: {e}" | |
| messages.append({ | |
| "role": "assistant", | |
| "content": content, | |
| "tool_calls": [{ | |
| "id": tool_call.id, | |
| "type": "function", | |
| "function": { | |
| "name": tool_call.function.name, | |
| "arguments": tool_call.function.arguments, | |
| } | |
| }] | |
| }) | |
| messages.append({ | |
| "role": "tool", | |
| "tool_call_id": tool_call.id, | |
| "content": output | |
| }) | |
| yield {"type": "error", "content": f"Failed to parse code arguments: {e}"} | |
| continue | |
| except KeyError as e: | |
| error_msg = f"Missing required key {e} in arguments: {tool_call.function.arguments[:500]}" | |
| logger.error(error_msg) | |
| output = f"Error: Missing required 'code' parameter" | |
| messages.append({ | |
| "role": "assistant", | |
| "content": content, | |
| "tool_calls": [{ | |
| "id": tool_call.id, | |
| "type": "function", | |
| "function": { | |
| "name": tool_call.function.name, | |
| "arguments": tool_call.function.arguments, | |
| } | |
| }] | |
| }) | |
| messages.append({ | |
| "role": "tool", | |
| "tool_call_id": tool_call.id, | |
| "content": output | |
| }) | |
| yield {"type": "error", "content": output} | |
| continue | |
| # Send code cell (before execution) | |
| yield {"type": "code_start", "code": code} | |
| # Execute code | |
| try: | |
| execution = sbx.run_code(code) | |
| output = parse_execution_result(execution) | |
| has_error = execution.error is not None | |
| # Extract images and assign figure names | |
| images = [] | |
| figure_names = [] | |
| for result in execution.results: | |
| if not (result.png or result.jpeg or result.svg): | |
| continue | |
| figure_counter += 1 | |
| figure_name = f"{figure_prefix}{figure_counter}" | |
| figure_names.append(figure_name) | |
| if result.png: | |
| images.append({"type": "png", "data": result.png, "name": figure_name}) | |
| figure_data[figure_name] = {"type": "png", "data": result.png} | |
| elif result.jpeg: | |
| images.append({"type": "jpeg", "data": result.jpeg, "name": figure_name}) | |
| figure_data[figure_name] = {"type": "jpeg", "data": result.jpeg} | |
| elif result.svg: | |
| images.append({"type": "svg", "data": result.svg, "name": figure_name}) | |
| figure_data[figure_name] = {"type": "svg", "data": result.svg} | |
| # Add figure info to output for LLM | |
| if figure_names: | |
| figure_info = f"\n[Generated figures: {', '.join(figure_names)}]" | |
| output = (output + figure_info) if output else figure_info.strip() | |
| # Send execution result | |
| yield format_code_cell(code, output, has_error, images) | |
| except Exception as e: | |
| error_str = str(e) | |
| # Check if this is a sandbox timeout error - if so, re-raise to trigger cleanup | |
| if "502" in error_str or "sandbox was not found" in error_str.lower() or "timeout" in error_str.lower(): | |
| raise # Re-raise to be caught by main.py handler | |
| yield format_code_cell(code, f"Execution error: {str(e)}", True) | |
| output = f"Execution failed: {str(e)}" | |
| has_error = True | |
| # Add to message history | |
| messages.append({ | |
| "role": "assistant", | |
| "content": content, | |
| "tool_calls": [{ | |
| "id": tool_call.id, | |
| "type": "function", | |
| "function": { | |
| "name": tool_call.function.name, | |
| "arguments": tool_call.function.arguments, | |
| } | |
| }] | |
| }) | |
| # Build tool response — include figures if multimodal | |
| if multimodal and images: | |
| tool_content = [{"type": "text", "text": output}] | |
| for img in images: | |
| if img["type"] in ("png", "jpeg"): | |
| vlm_img = resize_image_for_vlm(img["data"]) | |
| tool_content.append({ | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/jpeg;base64,{vlm_img}"} | |
| }) | |
| messages.append({ | |
| "role": "tool", | |
| "tool_call_id": tool_call.id, | |
| "content": tool_content | |
| }) | |
| else: | |
| messages.append({ | |
| "role": "tool", | |
| "tool_call_id": tool_call.id, | |
| "content": output | |
| }) | |
| elif tool_call.function.name == "upload_files": | |
| # Parse arguments | |
| try: | |
| args = json.loads(tool_call.function.arguments) | |
| paths = args["paths"] | |
| except (json.JSONDecodeError, KeyError) as e: | |
| error_msg = f"Failed to parse upload_files arguments: {e}. Raw: {tool_call.function.arguments[:500]}" | |
| logger.error(error_msg) | |
| output = f"Error parsing upload_files arguments: {e}" | |
| messages.append({ | |
| "role": "assistant", | |
| "content": content, | |
| "tool_calls": [{ | |
| "id": tool_call.id, | |
| "type": "function", | |
| "function": { | |
| "name": tool_call.function.name, | |
| "arguments": tool_call.function.arguments, | |
| } | |
| }] | |
| }) | |
| messages.append({ | |
| "role": "tool", | |
| "tool_call_id": tool_call.id, | |
| "content": output | |
| }) | |
| yield {"type": "error", "content": output} | |
| continue | |
| # Check if files_root is available | |
| if not files_root: | |
| output = "Error: File upload not available - no workspace configured" | |
| else: | |
| # Upload files | |
| output = upload_files_to_sandbox(sbx, paths, files_root) | |
| # Send upload notification to UI | |
| yield {"type": "upload", "paths": paths, "output": output} | |
| # Add to message history | |
| messages.append({ | |
| "role": "assistant", | |
| "content": content, | |
| "tool_calls": [{ | |
| "id": tool_call.id, | |
| "type": "function", | |
| "function": { | |
| "name": tool_call.function.name, | |
| "arguments": tool_call.function.arguments, | |
| } | |
| }] | |
| }) | |
| messages.append({ | |
| "role": "tool", | |
| "tool_call_id": tool_call.id, | |
| "content": output | |
| }) | |
| elif tool_call.function.name == "download_files": | |
| # Parse arguments | |
| try: | |
| args = json.loads(tool_call.function.arguments) | |
| files = args["files"] | |
| except (json.JSONDecodeError, KeyError) as e: | |
| error_msg = f"Failed to parse download_files arguments: {e}. Raw: {tool_call.function.arguments[:500]}" | |
| logger.error(error_msg) | |
| output = f"Error parsing download_files arguments: {e}" | |
| messages.append({ | |
| "role": "assistant", | |
| "content": content, | |
| "tool_calls": [{ | |
| "id": tool_call.id, | |
| "type": "function", | |
| "function": { | |
| "name": tool_call.function.name, | |
| "arguments": tool_call.function.arguments, | |
| } | |
| }] | |
| }) | |
| messages.append({ | |
| "role": "tool", | |
| "tool_call_id": tool_call.id, | |
| "content": output | |
| }) | |
| yield {"type": "error", "content": output} | |
| continue | |
| # Check if files_root is available | |
| if not files_root: | |
| output = "Error: File download not available - no workspace configured" | |
| else: | |
| # Download files | |
| output = download_files_from_sandbox(sbx, files, files_root) | |
| # Extract paths for UI display | |
| paths = [f"{f.get('sandbox_path', '')} -> {f.get('local_path', '')}" for f in files] | |
| # Send download notification to UI | |
| yield {"type": "download", "paths": paths, "output": output} | |
| # Add to message history | |
| messages.append({ | |
| "role": "assistant", | |
| "content": content, | |
| "tool_calls": [{ | |
| "id": tool_call.id, | |
| "type": "function", | |
| "function": { | |
| "name": tool_call.function.name, | |
| "arguments": tool_call.function.arguments, | |
| } | |
| }] | |
| }) | |
| messages.append({ | |
| "role": "tool", | |
| "tool_call_id": tool_call.id, | |
| "content": output | |
| }) | |
| else: | |
| # No tool calls - we're done | |
| messages.append({"role": "assistant", "content": content}) | |
| done = True | |
| # If we found a result tag, send it with figure data | |
| if result_content: | |
| has_result = True | |
| yield {"type": "result", "content": result_content, "figures": figure_data} | |
| # Yield generating state between turns | |
| if not done: | |
| yield {"type": "generating"} | |
| # If agent finished without a <result>, nudge it for one | |
| if not has_result: | |
| from .agents import nudge_for_result | |
| yield from nudge_for_result(client, model, messages, extra_params=extra_params, extra_result_data={"figures": figure_data}, call_number=debug_call_number) | |
| # Send done signal | |
| yield {"type": "done"} | |