|
|
|
import requests |
|
import json |
|
import pandas as pd |
|
import os |
|
import io |
|
import contextlib |
|
import ast |
|
import traceback |
|
from typing import Optional, Dict, Any |
|
from .utils import encode_image_to_base64, validate_file_exists, get_env_var, logger |
|
|
|
class MultimodalTools: |
|
"""Free multimodal AI tools using OpenRouter and other free services""" |
|
|
|
def __init__(self, openrouter_key: Optional[str] = None): |
|
self.openrouter_key = openrouter_key or get_env_var("OPENROUTER_API_KEY", None) |
|
self.openrouter_url = "https://openrouter.ai/api/v1/chat/completions" |
|
self.headers = { |
|
"Authorization": f"Bearer {self.openrouter_key}", |
|
"Content-Type": "application/json", |
|
"HTTP-Referer": "https://your-app.com", |
|
"X-Title": "Multimodal Tools" |
|
} |
|
|
|
|
|
self.vision_model = "google/gemini-2.5-flash-preview-05-20" |
|
self.text_model = "google/gemini-2.5-flash-preview-05-20" |
|
|
|
def _make_openrouter_request(self, payload: Dict[str, Any]) -> str: |
|
"""Make request to OpenRouter API with error handling""" |
|
try: |
|
response = requests.post( |
|
self.openrouter_url, |
|
headers=self.headers, |
|
json=payload, |
|
timeout=60 |
|
) |
|
response.raise_for_status() |
|
|
|
result = response.json() |
|
if 'choices' in result and len(result['choices']) > 0: |
|
return result['choices'][0]['message']['content'] |
|
else: |
|
logger.error(f"Unexpected response format: {result}") |
|
return "Error: Invalid response format" |
|
|
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"OpenRouter API request failed: {str(e)}") |
|
return f"Error making API request: {str(e)}" |
|
except Exception as e: |
|
logger.error(f"Unexpected error: {str(e)}") |
|
return f"Unexpected error: {str(e)}" |
|
|
|
def analyze_image(self, image_path: str, question: str = "Describe this image in detail") -> str: |
|
""" |
|
Analyze image content using multimodal AI |
|
|
|
Args: |
|
image_path: Path to image file |
|
question: Question about the image |
|
|
|
Returns: |
|
AI analysis of the image |
|
""" |
|
if not validate_file_exists(image_path): |
|
return f"Error: Image file not found at {image_path}" |
|
|
|
try: |
|
encoded_image = encode_image_to_base64(image_path) |
|
|
|
payload = { |
|
"model": self.vision_model, |
|
"messages": [ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": question}, |
|
{ |
|
"type": "image_url", |
|
"image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"} |
|
} |
|
] |
|
} |
|
], |
|
"temperature": 0, |
|
"max_tokens": 2048 |
|
} |
|
|
|
return self._make_openrouter_request(payload) |
|
|
|
except Exception as e: |
|
error_msg = f"Error analyzing image: {str(e)}" |
|
logger.error(error_msg) |
|
return error_msg |
|
|
|
def extract_text_from_image(self, image_path: str) -> str: |
|
""" |
|
Extract text from image using OCR via multimodal AI |
|
|
|
Args: |
|
image_path: Path to image file |
|
|
|
Returns: |
|
Extracted text from image |
|
""" |
|
ocr_prompt = """Extract all visible text from this image. |
|
Return only the text content without any additional commentary or formatting. |
|
If no text is visible, return 'No text found'.""" |
|
|
|
return self.analyze_image(image_path, ocr_prompt) |
|
|
|
def analyze_audio_transcript(self, transcript: str, question: str = "Summarize this audio content") -> str: |
|
""" |
|
Analyze audio content via transcript |
|
|
|
Args: |
|
transcript: Audio transcript text |
|
question: Question about the audio content |
|
|
|
Returns: |
|
AI analysis of the audio content |
|
""" |
|
if not transcript.strip(): |
|
return "Error: Empty transcript provided" |
|
|
|
try: |
|
payload = { |
|
"model": self.text_model, |
|
"messages": [ |
|
{ |
|
"role": "user", |
|
"content": f"Audio transcript: {transcript}\n\nQuestion: {question}" |
|
} |
|
], |
|
"temperature": 0, |
|
"max_tokens": 2048 |
|
} |
|
|
|
return self._make_openrouter_request(payload) |
|
|
|
except Exception as e: |
|
error_msg = f"Error analyzing audio transcript: {str(e)}" |
|
logger.error(error_msg) |
|
return error_msg |
|
|
|
def analyze_excel_file(self, file_path: str, question: str) -> str: |
|
""" |
|
Analyze Excel or CSV file content using AI |
|
|
|
Args: |
|
file_path: Path to Excel (.xlsx) or CSV file |
|
question: Question about the data |
|
|
|
Returns: |
|
AI analysis of the spreadsheet data |
|
""" |
|
if not validate_file_exists(file_path): |
|
return f"Error: File not found at {file_path}" |
|
|
|
try: |
|
|
|
try: |
|
df = pd.read_excel(file_path) |
|
except Exception: |
|
try: |
|
df = pd.read_csv(file_path) |
|
except Exception as e: |
|
return f"Error reading file: Unable to read as Excel or CSV - {str(e)}" |
|
|
|
|
|
data_summary = f""" |
|
Data file analysis: |
|
- Shape: {df.shape[0]} rows, {df.shape[1]} columns |
|
- Columns: {list(df.columns)} |
|
|
|
First few rows: |
|
{df.head().to_string()} |
|
|
|
Data types: |
|
{df.dtypes.to_string()} |
|
|
|
Summary statistics: |
|
{df.describe().to_string()} |
|
""" |
|
|
|
payload = { |
|
"model": self.text_model, |
|
"messages": [ |
|
{ |
|
"role": "user", |
|
"content": f"Analyze this spreadsheet data and answer the question.\n\n{data_summary}\n\nQuestion: {question}" |
|
} |
|
], |
|
"temperature": 0, |
|
"max_tokens": 2048 |
|
} |
|
|
|
return self._make_openrouter_request(payload) |
|
|
|
except Exception as e: |
|
error_msg = f"Error analyzing Excel file: {str(e)}" |
|
logger.error(error_msg) |
|
return error_msg |
|
|
|
|
|
def _validate_python_code(self, code: str) -> bool: |
|
"""Validate Python code syntax""" |
|
try: |
|
ast.parse(code) |
|
return True |
|
except SyntaxError: |
|
return False |
|
|
|
def _execute_python_code(self, code: str) -> str: |
|
""" |
|
Safely execute Python code and capture output |
|
Based on search results from LlamaIndex SimpleCodeExecutor pattern |
|
""" |
|
|
|
stdout = io.StringIO() |
|
stderr = io.StringIO() |
|
output = "" |
|
return_value = None |
|
|
|
|
|
safe_globals = { |
|
'__builtins__': { |
|
'print': print, |
|
'len': len, |
|
'str': str, |
|
'int': int, |
|
'float': float, |
|
'list': list, |
|
'dict': dict, |
|
'sum': sum, |
|
'max': max, |
|
'min': min, |
|
'abs': abs, |
|
'round': round, |
|
'range': range, |
|
'enumerate': enumerate, |
|
'zip': zip, |
|
} |
|
} |
|
safe_locals = {} |
|
|
|
try: |
|
|
|
with contextlib.redirect_stdout(stdout), contextlib.redirect_stderr(stderr): |
|
|
|
try: |
|
tree = ast.parse(code) |
|
last_node = tree.body[-1] if tree.body else None |
|
|
|
|
|
if isinstance(last_node, ast.Expr): |
|
|
|
lines = code.rstrip().split('\n') |
|
last_line = lines[-1] |
|
exec_code = '\n'.join(lines[:-1]) + f'\n__result__ = {last_line}' |
|
|
|
|
|
exec(exec_code, safe_globals, safe_locals) |
|
return_value = safe_locals.get('__result__') |
|
else: |
|
|
|
exec(code, safe_globals, safe_locals) |
|
except: |
|
|
|
exec(code, safe_globals, safe_locals) |
|
|
|
|
|
output = stdout.getvalue() |
|
if stderr.getvalue(): |
|
output += "\n" + stderr.getvalue() |
|
|
|
|
|
if return_value is not None: |
|
output += f"\n\nFinal result: {return_value}" |
|
|
|
return output.strip() if output.strip() else str(return_value) if return_value is not None else "Code executed successfully (no output)" |
|
|
|
except Exception as e: |
|
|
|
error_output = f"Error: {type(e).__name__}: {str(e)}" |
|
logger.error(f"Code execution error: {error_output}") |
|
return error_output |
|
|
|
def analyze_python_file(self, file_path: str, question: str = "What is the final output of this code?") -> str: |
|
""" |
|
Read and analyze Python code file |
|
|
|
Args: |
|
file_path: Path to Python (.py) file |
|
question: Question about the code |
|
|
|
Returns: |
|
Analysis or execution result of the Python code |
|
""" |
|
if not validate_file_exists(file_path): |
|
return f"Error: Python file not found at {file_path}" |
|
|
|
try: |
|
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
code_content = f.read() |
|
|
|
if not code_content.strip(): |
|
return "Error: Python file is empty" |
|
|
|
|
|
if not self._validate_python_code(code_content): |
|
return "Error: Python file contains syntax errors" |
|
|
|
|
|
if any(keyword in question.lower() for keyword in ['output', 'result', 'execute', 'run', 'final']): |
|
logger.info(f"Executing Python code from {file_path}") |
|
execution_result = self._execute_python_code(code_content) |
|
|
|
|
|
if len(execution_result) < 50: |
|
payload = { |
|
"model": self.text_model, |
|
"messages": [ |
|
{ |
|
"role": "user", |
|
"content": f"Python code:\n``````\n\nExecution result: {execution_result}\n\nQuestion: {question}" |
|
} |
|
], |
|
"temperature": 0, |
|
"max_tokens": 1024 |
|
} |
|
|
|
ai_analysis = self._make_openrouter_request(payload) |
|
return f"Execution result: {execution_result}\n\nAnalysis: {ai_analysis}" |
|
else: |
|
return execution_result |
|
else: |
|
|
|
payload = { |
|
"model": self.text_model, |
|
"messages": [ |
|
{ |
|
"role": "user", |
|
"content": f"Analyze this Python code and answer the question.\n\nPython code:\n``````\n\nQuestion: {question}" |
|
} |
|
], |
|
"temperature": 0, |
|
"max_tokens": 2048 |
|
} |
|
|
|
return self._make_openrouter_request(payload) |
|
|
|
except Exception as e: |
|
error_msg = f"Error analyzing Python file: {str(e)}" |
|
logger.error(error_msg) |
|
return error_msg |
|
|
|
def describe_image(self, image_path: str) -> str: |
|
"""Get a detailed description of an image""" |
|
return self.analyze_image( |
|
image_path, |
|
"Provide a detailed, objective description of this image including objects, people, colors, setting, and any notable details." |
|
) |
|
|
|
def answer_visual_question(self, image_path: str, question: str) -> str: |
|
"""Answer a specific question about an image""" |
|
return self.analyze_image(image_path, question) |
|
|
|
|
|
def analyze_image(image_path: str, question: str = "Describe this image in detail") -> str: |
|
"""Standalone function to analyze an image""" |
|
tools = MultimodalTools() |
|
return tools.analyze_image(image_path, question) |
|
|
|
def extract_text(image_path: str) -> str: |
|
"""Standalone function to extract text from an image""" |
|
tools = MultimodalTools() |
|
return tools.extract_text_from_image(image_path) |
|
|
|
def analyze_transcript(transcript: str, question: str = "Summarize this content") -> str: |
|
"""Standalone function to analyze audio transcript""" |
|
tools = MultimodalTools() |
|
return tools.analyze_audio_transcript(transcript, question) |
|
|
|
def analyze_excel(file_path: str, question: str) -> str: |
|
"""Standalone function to analyze Excel/CSV files""" |
|
tools = MultimodalTools() |
|
return tools.analyze_excel_file(file_path, question) |
|
|
|
|
|
def analyze_python(file_path: str, question: str = "What is the final output of this code?") -> str: |
|
"""Standalone function to analyze Python files""" |
|
tools = MultimodalTools() |
|
return tools.analyze_python_file(file_path, question) |
|
|