Spaces:
Sleeping
Sleeping
| """ | |
| Multimodal Processor module for the Perception & Understanding Layer. | |
| This module handles the analysis and understanding of web page content | |
| using multimodal large foundation models (LFMs). | |
| """ | |
| import base64 | |
| import logging | |
| import os | |
| from typing import Dict, Any, Optional, List | |
| import httpx | |
| import cv2 | |
| import numpy as np | |
| import pytesseract | |
| from PIL import Image | |
| from io import BytesIO | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class MultimodalProcessor: | |
| """ | |
| Processes and analyzes web page content using multimodal LFMs. | |
| This class integrates various foundation models to understand | |
| text, images, and their relationships on web pages. | |
| """ | |
| def __init__(self): | |
| """Initialize the MultimodalProcessor.""" | |
| self.vision_model = os.environ.get("VISION_MODEL", "gpt-4-vision-preview") | |
| self.text_model = os.environ.get("TEXT_MODEL", "gpt-4-turbo") | |
| self.openai_client = None | |
| self.anthropic_client = None | |
| self.gemini_client = None | |
| # OCR settings | |
| self.ocr_config = '--oem 3 --psm 11' | |
| logger.info("MultimodalProcessor instance created") | |
| async def initialize(self): | |
| """Initialize clients and resources.""" | |
| # Import API clients here to avoid circular imports | |
| try: | |
| import openai | |
| import anthropic | |
| import google.generativeai as genai | |
| # Initialize OpenAI client | |
| self.openai_client = openai.AsyncClient( | |
| api_key=os.environ.get("OPENAI_API_KEY") | |
| ) | |
| # Initialize Anthropic client | |
| self.anthropic_client = anthropic.Anthropic( | |
| api_key=os.environ.get("ANTHROPIC_API_KEY") | |
| ) | |
| # Initialize Google Gemini client | |
| genai.configure(api_key=os.environ.get("GEMINI_API_KEY")) | |
| self.gemini_client = genai | |
| logger.info("All LFM clients initialized successfully") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error initializing LFM clients: {str(e)}") | |
| return False | |
| async def analyze_page(self, screenshot_bytes, dom_text, task_goal): | |
| """ | |
| Analyze a web page using both visual and textual content. | |
| Args: | |
| screenshot_bytes: PNG image bytes of the screenshot | |
| dom_text: Text representation of the DOM | |
| task_goal: Description of the current task goal | |
| Returns: | |
| Dict: Analysis results including identified elements and actions | |
| """ | |
| try: | |
| # Parallel processing of both visual and text analysis | |
| import asyncio | |
| vision_task = self.analyze_image(screenshot_bytes, task_goal) | |
| text_task = self.analyze_text(dom_text, task_goal) | |
| # Wait for both analyses to complete | |
| vision_analysis, text_analysis = await asyncio.gather(vision_task, text_task) | |
| # Synthesize the results | |
| understanding = await self.synthesize_understanding(vision_analysis, text_analysis, task_goal) | |
| return understanding | |
| except Exception as e: | |
| logger.error(f"Error analyzing page: {str(e)}") | |
| return {"error": str(e)} | |
| async def analyze_image(self, image_bytes, task_goal): | |
| """ | |
| Analyze an image using a multimodal vision model. | |
| Args: | |
| image_bytes: PNG image bytes | |
| task_goal: Description of the current task goal | |
| Returns: | |
| Dict: Vision model analysis results | |
| """ | |
| try: | |
| # Perform OCR on the image | |
| ocr_results = await self._extract_text_from_image(image_bytes) | |
| # Encode image to base64 for API | |
| base64_image = base64.b64encode(image_bytes).decode('utf-8') | |
| # Determine which LFM client to use | |
| if self.openai_client and "gpt" in self.vision_model: | |
| response = await self._analyze_with_openai_vision(base64_image, task_goal, ocr_results) | |
| elif self.anthropic_client and "claude" in self.vision_model: | |
| response = await self._analyze_with_anthropic_vision(base64_image, task_goal, ocr_results) | |
| elif self.gemini_client and "gemini" in self.vision_model: | |
| response = await self._analyze_with_gemini_vision(base64_image, task_goal, ocr_results) | |
| else: | |
| raise ValueError(f"Unsupported vision model: {self.vision_model}") | |
| return response | |
| except Exception as e: | |
| logger.error(f"Error in image analysis: {str(e)}") | |
| return {"error": str(e)} | |
| async def _extract_text_from_image(self, image_bytes): | |
| """ | |
| Extract text from an image using OCR. | |
| Args: | |
| image_bytes: PNG image bytes | |
| Returns: | |
| str: Extracted text | |
| """ | |
| try: | |
| # Convert bytes to numpy array | |
| nparr = np.frombuffer(image_bytes, np.uint8) | |
| img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| # Preprocess the image for better OCR results | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1] | |
| # Perform OCR | |
| text = pytesseract.image_to_string(thresh, config=self.ocr_config) | |
| return text | |
| except Exception as e: | |
| logger.error(f"OCR error: {str(e)}") | |
| return "" | |
| async def _analyze_with_openai_vision(self, base64_image, task_goal, ocr_text): | |
| """Use OpenAI's vision model for analysis.""" | |
| prompt = f""" | |
| Analyze this web page screenshot in the context of the following task: | |
| Task: {task_goal} | |
| OCR extracted text: {ocr_text} | |
| Identify: | |
| 1. Main UI elements visible (buttons, forms, links, etc.) | |
| 2. Their positions and descriptions | |
| 3. Any obstacles to completing the task | |
| 4. Recommended actions to progress the task | |
| Return the analysis as a structured JSON object. | |
| """ | |
| response = await self.openai_client.chat.completions.create( | |
| model=self.vision_model, | |
| messages=[ | |
| {"role": "system", "content": "You are a web UI analyzer that identifies elements and actions."}, | |
| {"role": "user", "content": [ | |
| {"type": "text", "text": prompt}, | |
| {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}} | |
| ]} | |
| ], | |
| response_format={"type": "json_object"} | |
| ) | |
| return response.choices[0].message.content | |
| async def _analyze_with_anthropic_vision(self, base64_image, task_goal, ocr_text): | |
| """Use Anthropic's Claude model for analysis.""" | |
| prompt = f""" | |
| Analyze this web page screenshot in the context of the following task: | |
| Task: {task_goal} | |
| OCR extracted text: {ocr_text} | |
| Identify: | |
| 1. Main UI elements visible (buttons, forms, links, etc.) | |
| 2. Their positions and descriptions | |
| 3. Any obstacles to completing the task | |
| 4. Recommended actions to progress the task | |
| Return the analysis as a structured JSON object. | |
| """ | |
| response = await self.anthropic_client.messages.create( | |
| model="claude-3-opus-20240229", | |
| max_tokens=2000, | |
| messages=[ | |
| {"role": "user", "content": [ | |
| {"type": "text", "text": prompt}, | |
| {"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": base64_image}} | |
| ]} | |
| ] | |
| ) | |
| return response.content[0].text | |
| async def _analyze_with_gemini_vision(self, base64_image, task_goal, ocr_text): | |
| """Use Google's Gemini Vision model for analysis.""" | |
| prompt = f""" | |
| Analyze this web page screenshot in the context of the following task: | |
| Task: {task_goal} | |
| OCR extracted text: {ocr_text} | |
| Identify: | |
| 1. Main UI elements visible (buttons, forms, links, etc.) | |
| 2. Their positions and descriptions | |
| 3. Any obstacles to completing the task | |
| 4. Recommended actions to progress the task | |
| Return the analysis as a structured JSON object. | |
| """ | |
| # Convert base64 to image for Gemini | |
| image_bytes = base64.b64decode(base64_image) | |
| image = Image.open(BytesIO(image_bytes)) | |
| # Generate content with Gemini | |
| generation_config = self.gemini_client.types.GenerationConfig( | |
| temperature=0.2, | |
| response_mime_type="application/json", | |
| ) | |
| model = self.gemini_client.GenerativeModel('gemini-pro-vision') | |
| response = model.generate_content( | |
| [ | |
| prompt, | |
| image, | |
| ], | |
| generation_config=generation_config | |
| ) | |
| return response.text | |
| async def analyze_text(self, dom_text, task_goal): | |
| """ | |
| Analyze text content of a DOM using LFMs. | |
| Args: | |
| dom_text: Text representation of the DOM | |
| task_goal: Description of the current task goal | |
| Returns: | |
| Dict: Analysis results including identified elements and structures | |
| """ | |
| try: | |
| prompt = f""" | |
| Analyze this web page DOM text in the context of the following task: | |
| Task: {task_goal} | |
| DOM Text: | |
| {dom_text[:10000]} # Limit size to avoid token limits | |
| Identify: | |
| 1. Main interactive elements (buttons, forms, links, etc.) | |
| 2. Their IDs, classes, and XPaths where available | |
| 3. Page structure and hierarchy | |
| 4. Any obstacles to completing the task | |
| 5. Recommended actions to progress the task | |
| Return the analysis as a structured JSON object. | |
| """ | |
| if self.openai_client: | |
| response = await self.openai_client.chat.completions.create( | |
| model=self.text_model, | |
| messages=[ | |
| {"role": "system", "content": "You are a web DOM analyzer that identifies elements and structures."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| response_format={"type": "json_object"} | |
| ) | |
| return response.choices[0].message.content | |
| elif self.anthropic_client: | |
| response = await self.anthropic_client.messages.create( | |
| model="claude-3-sonnet-20240229", | |
| max_tokens=2000, | |
| messages=[ | |
| {"role": "user", "content": prompt} | |
| ] | |
| ) | |
| return response.content[0].text | |
| else: | |
| raise ValueError("No suitable text model client available") | |
| except Exception as e: | |
| logger.error(f"Error in text analysis: {str(e)}") | |
| return {"error": str(e)} | |
| async def synthesize_understanding(self, vision_analysis, text_analysis, task_goal): | |
| """ | |
| Synthesize the results from visual and textual analysis. | |
| Args: | |
| vision_analysis: Results from image analysis | |
| text_analysis: Results from DOM text analysis | |
| task_goal: Description of the current task goal | |
| Returns: | |
| Dict: Combined understanding with action recommendations | |
| """ | |
| try: | |
| prompt = f""" | |
| Synthesize the following analyses of a web page in the context of this task: | |
| Task: {task_goal} | |
| Vision Analysis: {vision_analysis} | |
| DOM Text Analysis: {text_analysis} | |
| Create a comprehensive understanding of the page that includes: | |
| 1. All identified UI elements with their properties | |
| 2. The most accurate selectors to target each element | |
| 3. The page structure and navigation flow | |
| 4. Specific actionable steps to progress the task | |
| 5. Any potential challenges and alternative approaches | |
| Return the synthesis as a structured JSON object optimized for a web automation agent. | |
| """ | |
| if self.openai_client: | |
| response = await self.openai_client.chat.completions.create( | |
| model=self.text_model, | |
| messages=[ | |
| {"role": "system", "content": "You are a web automation expert that synthesizes analyses into actionable plans."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| response_format={"type": "json_object"} | |
| ) | |
| return response.choices[0].message.content | |
| elif self.anthropic_client: | |
| response = await self.anthropic_client.messages.create( | |
| model="claude-3-sonnet-20240229", | |
| max_tokens=2000, | |
| messages=[ | |
| {"role": "user", "content": prompt} | |
| ] | |
| ) | |
| return response.content[0].text | |
| else: | |
| raise ValueError("No suitable text model client available") | |
| except Exception as e: | |
| logger.error(f"Error in synthesizing understanding: {str(e)}") | |
| return {"error": str(e)} | |