File size: 15,198 Bytes
7319d31
 
 
a1c1d9a
 
 
 
 
 
7319d31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bd27766
 
7319d31
 
 
 
 
 
 
 
a1c1d9a
7319d31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a1c1d9a
7319d31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a1c1d9a
7319d31
 
 
 
 
 
 
 
 
a1c1d9a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7319d31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a1c1d9a
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
# tools/multimodal_tools.py
import requests
import json
import pandas as pd
import os  # βœ… Added for file operations
import io  # βœ… Added for code execution
import contextlib  # βœ… Added for code execution
import ast  # βœ… Added for code validation
import traceback  # βœ… Added for error handling
from typing import Optional, Dict, Any
from .utils import encode_image_to_base64, validate_file_exists, get_env_var, logger

class MultimodalTools:
    """Free multimodal AI tools using OpenRouter and other free services"""
    
    def __init__(self, openrouter_key: Optional[str] = None):
        self.openrouter_key = openrouter_key or get_env_var("OPENROUTER_API_KEY", None)
        self.openrouter_url = "https://openrouter.ai/api/v1/chat/completions"
        self.headers = {
            "Authorization": f"Bearer {self.openrouter_key}",
            "Content-Type": "application/json",
            "HTTP-Referer": "https://your-app.com",  # Optional: for analytics
            "X-Title": "Multimodal Tools"  # Optional: for analytics
        }
        
        # Available free multimodal models
        self.vision_model = "google/gemini-2.5-flash-preview-05-20"
        self.text_model = "google/gemini-2.5-flash-preview-05-20"
    
    def _make_openrouter_request(self, payload: Dict[str, Any]) -> str:
        """Make request to OpenRouter API with error handling"""
        try:
            response = requests.post(
                self.openrouter_url, 
                headers=self.headers, 
                json=payload,
                timeout=60
            )
            response.raise_for_status()
            
            result = response.json()
            if 'choices' in result and len(result['choices']) > 0:
                return result['choices'][0]['message']['content']
            else:
                logger.error(f"Unexpected response format: {result}")
                return "Error: Invalid response format"
                
        except requests.exceptions.RequestException as e:
            logger.error(f"OpenRouter API request failed: {str(e)}")
            return f"Error making API request: {str(e)}"
        except Exception as e:
            logger.error(f"Unexpected error: {str(e)}")
            return f"Unexpected error: {str(e)}"
    
    def analyze_image(self, image_path: str, question: str = "Describe this image in detail") -> str:
        """
        Analyze image content using multimodal AI
        
        Args:
            image_path: Path to image file
            question: Question about the image
            
        Returns:
            AI analysis of the image
        """
        if not validate_file_exists(image_path):
            return f"Error: Image file not found at {image_path}"
        
        try:
            encoded_image = encode_image_to_base64(image_path)
            
            payload = {
                "model": self.vision_model,
                "messages": [
                    {
                        "role": "user", 
                        "content": [
                            {"type": "text", "text": question},
                            {
                                "type": "image_url", 
                                "image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"}
                            }
                        ]
                    }
                ],
                "temperature": 0,
                "max_tokens": 2048
            }
            
            return self._make_openrouter_request(payload)
            
        except Exception as e:
            error_msg = f"Error analyzing image: {str(e)}"
            logger.error(error_msg)
            return error_msg
    
    def extract_text_from_image(self, image_path: str) -> str:
        """
        Extract text from image using OCR via multimodal AI
        
        Args:
            image_path: Path to image file
            
        Returns:
            Extracted text from image
        """
        ocr_prompt = """Extract all visible text from this image. 
        Return only the text content without any additional commentary or formatting. 
        If no text is visible, return 'No text found'."""
        
        return self.analyze_image(image_path, ocr_prompt)
    
    def analyze_audio_transcript(self, transcript: str, question: str = "Summarize this audio content") -> str:
        """
        Analyze audio content via transcript
        
        Args:
            transcript: Audio transcript text
            question: Question about the audio content
            
        Returns:
            AI analysis of the audio content
        """
        if not transcript.strip():
            return "Error: Empty transcript provided"
        
        try:
            payload = {
                "model": self.text_model,
                "messages": [
                    {
                        "role": "user", 
                        "content": f"Audio transcript: {transcript}\n\nQuestion: {question}"
                    }
                ],
                "temperature": 0,
                "max_tokens": 2048
            }
            
            return self._make_openrouter_request(payload)
            
        except Exception as e:
            error_msg = f"Error analyzing audio transcript: {str(e)}"
            logger.error(error_msg)
            return error_msg
    
    def analyze_excel_file(self, file_path: str, question: str) -> str:
        """
        Analyze Excel or CSV file content using AI
        
        Args:
            file_path: Path to Excel (.xlsx) or CSV file
            question: Question about the data
            
        Returns:
            AI analysis of the spreadsheet data
        """
        if not validate_file_exists(file_path):
            return f"Error: File not found at {file_path}"
        
        try:
            # Try reading as Excel first, then CSV
            try:
                df = pd.read_excel(file_path)
            except Exception:
                try:
                    df = pd.read_csv(file_path)
                except Exception as e:
                    return f"Error reading file: Unable to read as Excel or CSV - {str(e)}"
            
            # Convert dataframe to text representation for AI analysis
            data_summary = f"""
Data file analysis:
- Shape: {df.shape[0]} rows, {df.shape[1]} columns
- Columns: {list(df.columns)}

First few rows:
{df.head().to_string()}

Data types:
{df.dtypes.to_string()}

Summary statistics:
{df.describe().to_string()}
"""
            
            payload = {
                "model": self.text_model,
                "messages": [
                    {
                        "role": "user", 
                        "content": f"Analyze this spreadsheet data and answer the question.\n\n{data_summary}\n\nQuestion: {question}"
                    }
                ],
                "temperature": 0,
                "max_tokens": 2048
            }
            
            return self._make_openrouter_request(payload)
            
        except Exception as e:
            error_msg = f"Error analyzing Excel file: {str(e)}"
            logger.error(error_msg)
            return error_msg
    
    # βœ… NEW METHOD - Added Python code processing
    def _validate_python_code(self, code: str) -> bool:
        """Validate Python code syntax"""
        try:
            ast.parse(code)
            return True
        except SyntaxError:
            return False
    
    def _execute_python_code(self, code: str) -> str:
        """
        Safely execute Python code and capture output
        Based on search results from LlamaIndex SimpleCodeExecutor pattern
        """
        # Capture stdout and stderr
        stdout = io.StringIO()
        stderr = io.StringIO()
        output = ""
        return_value = None
        
        # Create a safe execution namespace
        safe_globals = {
            '__builtins__': {
                'print': print,
                'len': len,
                'str': str,
                'int': int,
                'float': float,
                'list': list,
                'dict': dict,
                'sum': sum,
                'max': max,
                'min': min,
                'abs': abs,
                'round': round,
                'range': range,
                'enumerate': enumerate,
                'zip': zip,
            }
        }
        safe_locals = {}
        
        try:
            # Execute with captured output
            with contextlib.redirect_stdout(stdout), contextlib.redirect_stderr(stderr):
                # Try to detect if there's a return value (last expression)
                try:
                    tree = ast.parse(code)
                    last_node = tree.body[-1] if tree.body else None
                    
                    # If the last statement is an expression, capture its value
                    if isinstance(last_node, ast.Expr):
                        # Split code to add a return value assignment
                        lines = code.rstrip().split('\n')
                        last_line = lines[-1]
                        exec_code = '\n'.join(lines[:-1]) + f'\n__result__ = {last_line}'
                        
                        # Execute modified code
                        exec(exec_code, safe_globals, safe_locals)
                        return_value = safe_locals.get('__result__')
                    else:
                        # Normal execution
                        exec(code, safe_globals, safe_locals)
                except:
                    # If parsing fails, just execute the code as is
                    exec(code, safe_globals, safe_locals)
            
            # Get output
            output = stdout.getvalue()
            if stderr.getvalue():
                output += "\n" + stderr.getvalue()
            
            # Add return value if it exists
            if return_value is not None:
                output += f"\n\nFinal result: {return_value}"
            
            return output.strip() if output.strip() else str(return_value) if return_value is not None else "Code executed successfully (no output)"
            
        except Exception as e:
            # Capture exception information
            error_output = f"Error: {type(e).__name__}: {str(e)}"
            logger.error(f"Code execution error: {error_output}")
            return error_output
    
    def analyze_python_file(self, file_path: str, question: str = "What is the final output of this code?") -> str:
        """
        Read and analyze Python code file
        
        Args:
            file_path: Path to Python (.py) file
            question: Question about the code
            
        Returns:
            Analysis or execution result of the Python code
        """
        if not validate_file_exists(file_path):
            return f"Error: Python file not found at {file_path}"
        
        try:
            # Read the Python file
            with open(file_path, 'r', encoding='utf-8') as f:
                code_content = f.read()
            
            if not code_content.strip():
                return "Error: Python file is empty"
            
            # Validate syntax
            if not self._validate_python_code(code_content):
                return "Error: Python file contains syntax errors"
            
            # If question asks for output/result, execute the code
            if any(keyword in question.lower() for keyword in ['output', 'result', 'execute', 'run', 'final']):
                logger.info(f"Executing Python code from {file_path}")
                execution_result = self._execute_python_code(code_content)
                
                # Also provide AI analysis if needed
                if len(execution_result) < 50:  # Short result, add AI analysis
                    payload = {
                        "model": self.text_model,
                        "messages": [
                            {
                                "role": "user", 
                                "content": f"Python code:\n``````\n\nExecution result: {execution_result}\n\nQuestion: {question}"
                            }
                        ],
                        "temperature": 0,
                        "max_tokens": 1024
                    }
                    
                    ai_analysis = self._make_openrouter_request(payload)
                    return f"Execution result: {execution_result}\n\nAnalysis: {ai_analysis}"
                else:
                    return execution_result
            else:
                # Just analyze the code without execution
                payload = {
                    "model": self.text_model,
                    "messages": [
                        {
                            "role": "user", 
                            "content": f"Analyze this Python code and answer the question.\n\nPython code:\n``````\n\nQuestion: {question}"
                        }
                    ],
                    "temperature": 0,
                    "max_tokens": 2048
                }
                
                return self._make_openrouter_request(payload)
            
        except Exception as e:
            error_msg = f"Error analyzing Python file: {str(e)}"
            logger.error(error_msg)
            return error_msg
    
    def describe_image(self, image_path: str) -> str:
        """Get a detailed description of an image"""
        return self.analyze_image(
            image_path, 
            "Provide a detailed, objective description of this image including objects, people, colors, setting, and any notable details."
        )
    
    def answer_visual_question(self, image_path: str, question: str) -> str:
        """Answer a specific question about an image"""
        return self.analyze_image(image_path, question)

# Convenience functions for direct use
def analyze_image(image_path: str, question: str = "Describe this image in detail") -> str:
    """Standalone function to analyze an image"""
    tools = MultimodalTools()
    return tools.analyze_image(image_path, question)

def extract_text(image_path: str) -> str:
    """Standalone function to extract text from an image"""
    tools = MultimodalTools()
    return tools.extract_text_from_image(image_path)

def analyze_transcript(transcript: str, question: str = "Summarize this content") -> str:
    """Standalone function to analyze audio transcript"""
    tools = MultimodalTools()
    return tools.analyze_audio_transcript(transcript, question)

def analyze_excel(file_path: str, question: str) -> str:
    """Standalone function to analyze Excel/CSV files"""
    tools = MultimodalTools()
    return tools.analyze_excel_file(file_path, question)

# βœ… NEW FUNCTION - Added Python code convenience function
def analyze_python(file_path: str, question: str = "What is the final output of this code?") -> str:
    """Standalone function to analyze Python files"""
    tools = MultimodalTools()
    return tools.analyze_python_file(file_path, question)