Yago Bolivar
Refactor speech_to_text.py to implement a singleton ASR pipeline, enhance error handling, and introduce SpeechToTextTool for better integration. Update spreadsheet_tool.py to support querying and improve parsing functionality, including CSV support. Enhance video_processing_tool.py with new tasks for metadata extraction and frame extraction, while improving object detection capabilities and initialization checks.
87aa741
import ast | |
import contextlib | |
import io | |
import signal | |
import re | |
import traceback | |
from typing import Dict, Any, Optional, Union, List | |
from smolagents.tools import Tool | |
import os | |
class CodeExecutionTool(Tool): | |
""" | |
Executes Python code in a controlled environment for safe code interpretation. | |
Useful for evaluating code snippets and returning their output or errors. | |
""" | |
name = "python_code_executor" | |
description = "Executes a given Python code string or Python code from a file. Returns the output or error." | |
inputs = { | |
'code_string': {'type': 'string', 'description': 'The Python code to execute directly.', 'nullable': True}, | |
'filepath': {'type': 'string', 'description': 'The path to a Python file to execute.', 'nullable': True} | |
} | |
outputs = {'result': {'type': 'object', 'description': 'A dictionary containing \'success\', \'output\', and/or \'error\'.'}} | |
output_type = "object" | |
def __init__(self, timeout: int = 10, max_output_size: int = 20000, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.timeout = timeout | |
self.max_output_size = max_output_size | |
self.banned_modules = [ | |
'os', 'subprocess', 'sys', 'builtins', 'importlib', | |
'pickle', 'requests', 'socket', 'shutil', 'ctypes', 'multiprocessing' | |
] | |
self.is_initialized = True | |
def _analyze_code_safety(self, code: str) -> Dict[str, Any]: | |
"""Perform static analysis to check for potentially harmful code.""" | |
try: | |
parsed = ast.parse(code) | |
# Check for banned imports | |
imports = [] | |
for node in ast.walk(parsed): | |
if isinstance(node, ast.Import): | |
imports.extend(n.name for n in node.names) | |
elif isinstance(node, ast.ImportFrom): | |
# Ensure node.module is not None before attempting to check against banned_modules | |
if node.module and any(banned in node.module for banned in self.banned_modules): | |
imports.append(node.module) | |
dangerous_imports = [imp for imp in imports if imp and any( | |
banned in imp for banned in self.banned_modules)] | |
if dangerous_imports: | |
return { | |
"safe": False, | |
"reason": f"Potentially harmful imports detected: {dangerous_imports}" | |
} | |
# Check for exec/eval usage | |
for node in ast.walk(parsed): | |
if isinstance(node, ast.Call) and hasattr(node, 'func'): | |
if isinstance(node.func, ast.Name) and node.func.id in ['exec', 'eval']: | |
return { | |
"safe": False, | |
"reason": "Contains exec() or eval() calls" | |
} | |
return {"safe": True} | |
except SyntaxError: | |
return {"safe": False, "reason": "Invalid Python syntax"} | |
def _timeout_handler(self, signum, frame): | |
"""Handler for timeout signal.""" | |
raise TimeoutError("Code execution timed out") | |
def _extract_numeric_value(self, output: str) -> Optional[Union[int, float]]: | |
"""Extract the final numeric value from output.""" | |
# First try to get the last line that's a number | |
lines = [line.strip() for line in output.strip().split('\n') if line.strip()] | |
for line in reversed(lines): | |
# Try direct conversion first | |
try: | |
return float(line) | |
except ValueError: | |
pass | |
# Try to extract numeric portion if embedded in text | |
numeric_match = re.search(r'[-+]?\d*\.?\d+', line) | |
if numeric_match: | |
try: | |
return float(numeric_match.group()) | |
except ValueError: | |
pass | |
return None | |
# Main entry point for the agent | |
def forward(self, code_string: Optional[str] = None, filepath: Optional[str] = None) -> Dict[str, Any]: | |
if not code_string and not filepath: | |
return {"success": False, "error": "No code string or filepath provided."} | |
if code_string and filepath: | |
return {"success": False, "error": "Provide either a code string or a filepath, not both."} | |
code_to_execute = "" | |
if filepath: | |
if not os.path.exists(filepath): | |
return {"success": False, "error": f"File not found: {filepath}"} | |
if not filepath.endswith(".py"): | |
return {"success": False, "error": f"File is not a Python file: {filepath}"} | |
try: | |
with open(filepath, 'r') as file: | |
code_to_execute = file.read() | |
except Exception as e: | |
return {"success": False, "error": f"Error reading file {filepath}: {str(e)}"} | |
elif code_string: | |
code_to_execute = code_string | |
return self._execute_actual_code(code_to_execute) | |
# Renamed from execute_code to _execute_actual_code to be internal | |
def _execute_actual_code(self, code: str) -> Dict[str, Any]: | |
"""Execute Python code and capture the output or error.""" | |
safety_check = self._analyze_code_safety(code) | |
if not safety_check["safe"]: | |
return {"success": False, "error": f"Safety check failed: {safety_check['reason']}"} | |
# Setup timeout | |
signal.signal(signal.SIGALRM, self._timeout_handler) | |
signal.alarm(self.timeout) | |
captured_output = io.StringIO() | |
# It's generally safer to execute in a restricted scope | |
# and not provide access to all globals/locals by default. | |
# However, for a tool that might need to define functions/classes and use them, | |
# a shared scope might be necessary. This needs careful consideration. | |
exec_globals = {} | |
try: | |
with contextlib.redirect_stdout(captured_output): | |
with contextlib.redirect_stderr(captured_output): # Capture stderr as well | |
exec(code, exec_globals) # Execute in a controlled global scope | |
output = captured_output.getvalue() | |
if len(output) > self.max_output_size: | |
output = output[:self.max_output_size] + "... [output truncated]" | |
# Attempt to extract a final numeric value if applicable | |
# This might be specific to certain tasks, consider making it optional | |
# numeric_result = self._extract_numeric_value(output) | |
return { | |
"success": True, | |
"output": output, | |
# "numeric_value": numeric_result | |
} | |
except TimeoutError: | |
return {"success": False, "error": "Code execution timed out"} | |
except Exception as e: | |
# Get detailed traceback | |
tb_lines = traceback.format_exception(type(e), e, e.__traceback__) | |
error_details = "".join(tb_lines) | |
if len(error_details) > self.max_output_size: | |
error_details = error_details[:self.max_output_size] + "... [error truncated]" | |
return {"success": False, "error": f"Execution failed: {str(e)}\nTraceback:\n{error_details}"} | |
finally: | |
signal.alarm(0) # Disable the alarm | |
captured_output.close() | |
# Kept execute_file and execute_code as helper methods if direct access is ever needed, | |
# but they now call the main _execute_actual_code method. | |
def execute_file(self, filepath: str) -> Dict[str, Any]: | |
"""Helper to execute Python code from file.""" | |
if not os.path.exists(filepath): | |
return {"success": False, "error": f"File not found: {filepath}"} | |
if not filepath.endswith(".py"): | |
return {"success": False, "error": f"File is not a Python file: {filepath}"} | |
try: | |
with open(filepath, 'r') as file: | |
code = file.read() | |
return self._execute_actual_code(code) | |
except Exception as e: | |
return {"success": False, "error": f"Error reading file {filepath}: {str(e)}"} | |
def execute_code(self, code: str) -> Dict[str, Any]: | |
"""Helper to execute Python code from a string.""" | |
return self._execute_actual_code(code) | |
if __name__ == '__main__': | |
tool = CodeExecutionTool(timeout=5) | |
# Test 1: Safe code string | |
safe_code = "print('Hello from safe code!'); result = 10 * 2; print(result)" | |
print("\n--- Test 1: Safe Code String ---") | |
result1 = tool.forward(code_string=safe_code) | |
print(result1) | |
assert result1['success'] | |
assert "Hello from safe code!" in result1['output'] | |
assert "20" in result1['output'] | |
# Test 2: Code with an error | |
error_code = "print(1/0)" | |
print("\n--- Test 2: Code with Error ---") | |
result2 = tool.forward(code_string=error_code) | |
print(result2) | |
assert not result2['success'] | |
assert "ZeroDivisionError" in result2['error'] | |
# Test 3: Code with a banned import | |
unsafe_import_code = "import os; print(os.getcwd())" | |
print("\n--- Test 3: Unsafe Import ---") | |
result3 = tool.forward(code_string=unsafe_import_code) | |
print(result3) | |
assert not result3['success'] | |
assert "Safety check failed" in result3['error'] | |
assert "os" in result3['error'] | |
# Test 4: Timeout | |
timeout_code = "import time; time.sleep(10); print('Done sleeping')" | |
print("\n--- Test 4: Timeout ---") | |
# tool_timeout_short = CodeExecutionTool(timeout=2) # For testing timeout specifically | |
# result4 = tool_timeout_short.forward(code_string=timeout_code) | |
result4 = tool.forward(code_string=timeout_code) # Using the main tool instance with its timeout | |
print(result4) | |
assert not result4['success'] | |
assert "timed out" in result4['error'] | |
# Test 5: Execute from file | |
test_file_content = "print('Hello from file!'); x = 5; y = 7; print(f'Sum: {x+y}')" | |
test_filename = "temp_test_script.py" | |
with open(test_filename, "w") as f: | |
f.write(test_file_content) | |
print("\n--- Test 5: Execute from File ---") | |
result5 = tool.forward(filepath=test_filename) | |
print(result5) | |
assert result5['success'] | |
assert "Hello from file!" in result5['output'] | |
assert "Sum: 12" in result5['output'] | |
os.remove(test_filename) | |
# Test 6: File not found | |
print("\n--- Test 6: File Not Found ---") | |
result6 = tool.forward(filepath="non_existent_script.py") | |
print(result6) | |
assert not result6['success'] | |
assert "File not found" in result6['error'] | |
# Test 7: Provide both code_string and filepath | |
print("\n--- Test 7: Both code_string and filepath ---") | |
result7 = tool.forward(code_string="print('hello')", filepath=test_filename) | |
print(result7) | |
assert not result7['success'] | |
assert "Provide either a code string or a filepath, not both" in result7['error'] | |
# Test 8: Provide neither | |
print("\n--- Test 8: Neither code_string nor filepath ---") | |
result8 = tool.forward() | |
print(result8) | |
assert not result8['success'] | |
assert "No code string or filepath provided" in result8['error'] | |
# Test 9: Code that defines a function and calls it | |
func_def_code = "def my_func(a, b): return a + b; print(my_func(3,4))" | |
print("\n--- Test 9: Function Definition and Call ---") | |
result9 = tool.forward(code_string=func_def_code) | |
print(result9) | |
assert result9['success'] | |
assert "7" in result9['output'] | |
# Test 10: Max output size | |
# tool_max_output = CodeExecutionTool(max_output_size=50) | |
# long_output_code = "for i in range(20): print(f'Line {i}')" | |
# print("\n--- Test 10: Max Output Size ---") | |
# result10 = tool_max_output.forward(code_string=long_output_code) | |
# print(result10) | |
# assert result10['success'] | |
# assert "... [output truncated]" in result10['output'] | |
# assert len(result10['output']) <= 50 + len("... [output truncated]") + 5 # a bit of leeway | |
print("\nAll tests seem to have passed (check output for details).") |