Spaces:
Runtime error
Runtime error
| """ | |
| GEO Scorer Data Integration Fix | |
| Handles various data formats from web scrapers and ensures compatibility | |
| """ | |
| import logging | |
| from typing import Dict, Any, List, Union, Optional | |
| class GEODataAdapter: | |
| """Adapter to handle different data formats from web scrapers""" | |
| def __init__(self, logger: Optional[logging.Logger] = None): | |
| self.logger = logger or logging.getLogger(__name__) | |
| def normalize_scraped_data(self, scraped_data: Union[Dict, List]) -> List[Dict[str, Any]]: | |
| """ | |
| Normalize scraped data to the format expected by GEOScorer | |
| Args: | |
| scraped_data: Raw data from web scraper (various formats) | |
| Returns: | |
| List[Dict]: Normalized data ready for GEO analysis | |
| """ | |
| try: | |
| # Handle different input formats | |
| if isinstance(scraped_data, dict): | |
| # Single page data | |
| normalized = [self._normalize_single_page(scraped_data)] | |
| elif isinstance(scraped_data, list): | |
| # Multiple pages | |
| normalized = [self._normalize_single_page(page) for page in scraped_data] | |
| else: | |
| raise ValueError(f"Unsupported data type: {type(scraped_data)}") | |
| # Filter out invalid entries | |
| valid_pages = [page for page in normalized if page.get('content')] | |
| self.logger.info(f"Normalized {len(valid_pages)} valid pages from {len(normalized) if isinstance(normalized, list) else 1} total") | |
| return valid_pages | |
| except Exception as e: | |
| self.logger.error(f"Data normalization failed: {e}") | |
| return [] | |
| def _normalize_single_page(self, page_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Normalize a single page's data structure""" | |
| # Common field mappings from different scrapers | |
| content_fields = ['content', 'text', 'body', 'html_content', 'page_content', 'main_content'] | |
| title_fields = ['title', 'page_title', 'heading', 'h1', 'name'] | |
| url_fields = ['url', 'link', 'page_url', 'source_url', 'href'] | |
| # Extract content (try multiple possible field names) | |
| content = "" | |
| for field in content_fields: | |
| if field in page_data and page_data[field]: | |
| content = str(page_data[field]) | |
| break | |
| # Extract title | |
| title = "Untitled Page" | |
| for field in title_fields: | |
| if field in page_data and page_data[field]: | |
| title = str(page_data[field]) | |
| break | |
| # Extract URL | |
| url = "" | |
| for field in url_fields: | |
| if field in page_data and page_data[field]: | |
| url = str(page_data[field]) | |
| break | |
| # Create normalized structure | |
| normalized = { | |
| 'content': content, | |
| 'title': title, | |
| 'url': url, | |
| 'word_count': len(content.split()) if content else 0, | |
| 'original_data': page_data # Keep original for debugging | |
| } | |
| # Add any additional metadata | |
| metadata_fields = ['description', 'keywords', 'author', 'date', 'meta_description'] | |
| for field in metadata_fields: | |
| if field in page_data: | |
| normalized[field] = page_data[field] | |
| return normalized | |
| def validate_normalized_data(self, normalized_data: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Validate normalized data and provide diagnostics""" | |
| validation_results = { | |
| 'total_pages': len(normalized_data), | |
| 'valid_pages': 0, | |
| 'invalid_pages': 0, | |
| 'issues': [], | |
| 'summary': {} | |
| } | |
| for i, page in enumerate(normalized_data): | |
| issues = [] | |
| # Check required fields | |
| if not page.get('content'): | |
| issues.append(f"Page {i}: Missing or empty content") | |
| elif len(page['content'].strip()) < 50: | |
| issues.append(f"Page {i}: Content too short ({len(page['content'])} chars)") | |
| if not page.get('title'): | |
| issues.append(f"Page {i}: Missing title") | |
| if issues: | |
| validation_results['invalid_pages'] += 1 | |
| validation_results['issues'].extend(issues) | |
| else: | |
| validation_results['valid_pages'] += 1 | |
| # Generate summary | |
| content_lengths = [len(page.get('content', '')) for page in normalized_data if page.get('content')] | |
| if content_lengths: | |
| validation_results['summary'] = { | |
| 'avg_content_length': sum(content_lengths) / len(content_lengths), | |
| 'min_content_length': min(content_lengths), | |
| 'max_content_length': max(content_lengths), | |
| 'pages_with_titles': len([p for p in normalized_data if p.get('title') and p['title'] != 'Untitled Page']), | |
| 'pages_with_urls': len([p for p in normalized_data if p.get('url')]) | |
| } | |
| return validation_results | |
| class GEOScorerWithAdapter(GEOScorer): | |
| """Extended GEOScorer with built-in data adaptation""" | |
| def __init__(self, llm, config: Optional[GEOConfig] = None, logger: Optional[logging.Logger] = None): | |
| super().__init__(llm, config, logger) | |
| self.data_adapter = GEODataAdapter(logger) | |
| def analyze_scraped_data(self, scraped_data: Union[Dict, List], detailed: bool = True) -> Dict[str, Any]: | |
| """ | |
| Analyze scraped data with automatic format detection and normalization | |
| Args: | |
| scraped_data: Raw scraped data in any format | |
| detailed: Whether to perform detailed analysis | |
| Returns: | |
| Dict: Complete analysis results with diagnostics | |
| """ | |
| self.logger.info("Starting analysis of scraped data") | |
| try: | |
| # Step 1: Normalize the data | |
| normalized_data = self.data_adapter.normalize_scraped_data(scraped_data) | |
| if not normalized_data: | |
| return { | |
| 'error': 'No valid data could be extracted from scraped content', | |
| 'error_type': 'data_normalization', | |
| 'original_data_type': str(type(scraped_data)), | |
| 'original_data_sample': str(scraped_data)[:200] if scraped_data else None | |
| } | |
| # Step 2: Validate normalized data | |
| validation_results = self.data_adapter.validate_normalized_data(normalized_data) | |
| # Step 3: Analyze valid pages | |
| analysis_results = self.analyze_multiple_pages(normalized_data, detailed) | |
| # Step 4: Calculate aggregate scores | |
| aggregate_results = self.calculate_aggregate_scores(analysis_results) | |
| # Step 5: Combine all results | |
| complete_results = { | |
| 'data_validation': validation_results, | |
| 'individual_analyses': analysis_results, | |
| 'aggregate_scores': aggregate_results, | |
| 'processing_summary': { | |
| 'pages_scraped': validation_results['total_pages'], | |
| 'pages_analyzed': len([r for r in analysis_results if not r.get('error')]), | |
| 'overall_success_rate': validation_results['valid_pages'] / max(validation_results['total_pages'], 1), | |
| 'analysis_type': 'detailed' if detailed else 'quick' | |
| } | |
| } | |
| self.logger.info(f"Analysis completed: {complete_results['processing_summary']}") | |
| return complete_results | |
| except Exception as e: | |
| self.logger.error(f"Scraped data analysis failed: {e}") | |
| return { | |
| 'error': f'Analysis failed: {str(e)}', | |
| 'error_type': 'system', | |
| 'original_data_type': str(type(scraped_data)), | |
| 'traceback': str(e) | |
| } | |
| # Debugging utility functions | |
| def debug_scraped_data(scraped_data: Union[Dict, List]) -> Dict[str, Any]: | |
| """ | |
| Debug utility to understand the structure of scraped data | |
| Args: | |
| scraped_data: The raw scraped data causing issues | |
| Returns: | |
| Dict: Detailed breakdown of the data structure | |
| """ | |
| debug_info = { | |
| 'data_type': str(type(scraped_data)), | |
| 'data_structure': {}, | |
| 'sample_content': {}, | |
| 'recommendations': [] | |
| } | |
| try: | |
| if isinstance(scraped_data, dict): | |
| debug_info['data_structure'] = { | |
| 'is_dict': True, | |
| 'keys': list(scraped_data.keys()), | |
| 'key_count': len(scraped_data.keys()) | |
| } | |
| # Sample first few key-value pairs | |
| for i, (key, value) in enumerate(list(scraped_data.items())[:5]): | |
| debug_info['sample_content'][key] = { | |
| 'type': str(type(value)), | |
| 'length': len(str(value)) if value else 0, | |
| 'sample': str(value)[:100] if value else None | |
| } | |
| # Check for common content fields | |
| content_fields = ['content', 'text', 'body', 'html_content', 'page_content'] | |
| found_content_fields = [field for field in content_fields if field in scraped_data] | |
| if found_content_fields: | |
| debug_info['recommendations'].append(f"Found potential content fields: {found_content_fields}") | |
| else: | |
| debug_info['recommendations'].append("No standard content fields found. Check field names.") | |
| elif isinstance(scraped_data, list): | |
| debug_info['data_structure'] = { | |
| 'is_list': True, | |
| 'length': len(scraped_data), | |
| 'first_item_type': str(type(scraped_data[0])) if scraped_data else 'empty' | |
| } | |
| if scraped_data and isinstance(scraped_data[0], dict): | |
| debug_info['sample_content']['first_item_keys'] = list(scraped_data[0].keys()) | |
| else: | |
| debug_info['recommendations'].append(f"Unexpected data type: {type(scraped_data)}") | |
| except Exception as e: | |
| debug_info['error'] = f"Debug analysis failed: {str(e)}" | |
| return debug_info | |
| def create_test_scraped_data() -> List[Dict[str, Any]]: | |
| """Create test data in various formats that scrapers might return""" | |
| # Format 1: Standard format | |
| format1 = { | |
| 'content': 'This is the main content of the page about AI optimization.', | |
| 'title': 'AI Optimization Guide', | |
| 'url': 'https://example.com/ai-guide' | |
| } | |
| # Format 2: Different field names | |
| format2 = { | |
| 'text': 'Content about machine learning best practices.', | |
| 'page_title': 'ML Best Practices', | |
| 'link': 'https://example.com/ml-practices' | |
| } | |
| # Format 3: Nested structure | |
| format3 = { | |
| 'page_data': { | |
| 'body': 'Deep learning techniques for content optimization.', | |
| 'heading': 'Deep Learning Guide' | |
| }, | |
| 'metadata': { | |
| 'source_url': 'https://example.com/deep-learning' | |
| } | |
| } | |
| return [format1, format2, format3] | |
| # Usage example and testing | |
| def test_data_integration(): | |
| """Test the data integration fixes""" | |
| # Test with various data formats | |
| test_data = create_test_scraped_data() | |
| # Debug the data first | |
| for i, data in enumerate(test_data): | |
| print(f"\n--- Debug Info for Test Data {i+1} ---") | |
| debug_info = debug_scraped_data(data) | |
| print(f"Data type: {debug_info['data_type']}") | |
| print(f"Keys: {debug_info['data_structure'].get('keys', 'N/A')}") | |
| print(f"Recommendations: {debug_info['recommendations']}") | |
| # Test normalization | |
| adapter = GEODataAdapter() | |
| normalized = adapter.normalize_scraped_data(test_data) | |
| print(f"\n--- Normalization Results ---") | |
| print(f"Original items: {len(test_data)}") | |
| print(f"Normalized items: {len(normalized)}") | |
| for i, item in enumerate(normalized): | |
| print(f"Item {i+1}: Title='{item['title']}', Content length={len(item['content'])}") | |
| # Test validation | |
| validation = adapter.validate_normalized_data(normalized) | |
| print(f"\n--- Validation Results ---") | |
| print(f"Valid pages: {validation['valid_pages']}/{validation['total_pages']}") | |
| print(f"Issues: {validation['issues']}") | |
| if __name__ == "__main__": | |
| test_data_integration() |