Spaces:
Runtime error
Runtime error
| """ | |
| Improved GEO Scoring Module | |
| Analyzes content for Generative Engine Optimization (GEO) performance | |
| """ | |
| import json | |
| import re | |
| import logging | |
| import hashlib | |
| import asyncio | |
| from datetime import datetime | |
| from typing import Dict, Any, List, Union, Optional, Tuple | |
| from functools import lru_cache | |
| from dataclasses import dataclass | |
| from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate | |
| class GEOConfig: | |
| """Configuration class for GEO scoring parameters""" | |
| MAX_CONTENT_LENGTH: int = 8000 | |
| MIN_CONTENT_LENGTH: int = 100 | |
| QUICK_CONTENT_LENGTH: int = 4000 | |
| DEFAULT_TIMEOUT: int = 30 | |
| MAX_RETRIES: int = 3 | |
| CACHE_SIZE: int = 100 | |
| SMART_TRUNCATE_THRESHOLD: float = 0.8 | |
| class GEOValidator: | |
| """Input validation utilities for GEO analysis""" | |
| def validate_content_inputs(content: str, title: str, config: GEOConfig) -> Tuple[bool, str]: | |
| """Validate content and title inputs""" | |
| if not isinstance(content, str) or not isinstance(title, str): | |
| return False, "Content and title must be strings" | |
| if len(content.strip()) < config.MIN_CONTENT_LENGTH: | |
| return False, f"Content must be at least {config.MIN_CONTENT_LENGTH} characters" | |
| if len(title.strip()) == 0: | |
| return False, "Title cannot be empty" | |
| if len(title) > 200: | |
| return False, "Title too long (max 200 characters)" | |
| return True, "" | |
| def validate_pages_data(pages_data: List[Dict[str, Any]]) -> Tuple[bool, str]: | |
| """Validate pages data structure""" | |
| if not isinstance(pages_data, list): | |
| return False, "Pages data must be a list" | |
| if len(pages_data) == 0: | |
| return False, "Pages data cannot be empty" | |
| for i, page in enumerate(pages_data): | |
| if not isinstance(page, dict): | |
| return False, f"Page {i} must be a dictionary" | |
| if 'content' not in page: | |
| return False, f"Page {i} missing 'content' field" | |
| return True, "" | |
| class GEOContentProcessor: | |
| """Content processing utilities for GEO analysis""" | |
| def __init__(self, config: GEOConfig): | |
| self.config = config | |
| self.dangerous_patterns = [ | |
| r'ignore\s+previous\s+instructions', | |
| r'system\s*:', | |
| r'assistant\s*:', | |
| r'```json\s*{.*"prompt"', | |
| r'<\s*system\s*>', | |
| r'<\s*user\s*>', | |
| r'forget\s+everything', | |
| r'new\s+instructions\s*:', | |
| ] | |
| def sanitize_content(self, content: str) -> str: | |
| """Sanitize content to prevent prompt injection""" | |
| if not content: | |
| return "" | |
| # Remove potential prompt injection patterns | |
| sanitized = content | |
| for pattern in self.dangerous_patterns: | |
| sanitized = re.sub(pattern, '[FILTERED]', sanitized, flags=re.IGNORECASE) | |
| # Remove excessive whitespace | |
| sanitized = re.sub(r'\s+', ' ', sanitized).strip() | |
| # Hard limit on length | |
| return sanitized[:self.config.MAX_CONTENT_LENGTH * 2] | |
| def smart_truncate(self, content: str, max_length: int) -> str: | |
| """Intelligently truncate content preserving meaning""" | |
| if len(content) <= max_length: | |
| return content | |
| # Find last complete sentence within limit | |
| truncated = content[:max_length] | |
| # Look for sentence endings | |
| sentence_endings = ['. ', '! ', '? '] | |
| best_cut = -1 | |
| for ending in sentence_endings: | |
| last_occurrence = truncated.rfind(ending) | |
| if last_occurrence > max_length * self.config.SMART_TRUNCATE_THRESHOLD: | |
| best_cut = max(best_cut, last_occurrence + len(ending) - 1) | |
| if best_cut > 0: | |
| return truncated[:best_cut] | |
| # If no good sentence break, look for paragraph breaks | |
| last_paragraph = truncated.rfind('\n\n') | |
| if last_paragraph > max_length * self.config.SMART_TRUNCATE_THRESHOLD: | |
| return truncated[:last_paragraph] | |
| # If no good breaks, just truncate and add ellipsis | |
| return truncated.rstrip() + "..." | |
| def generate_content_hash(self, content: str, title: str, analysis_type: str) -> str: | |
| """Generate hash for content caching""" | |
| combined = f"{title}|{content}|{analysis_type}" | |
| return hashlib.md5(combined.encode()).hexdigest() | |
| class GEOPromptManager: | |
| """Manages prompts for different types of GEO analysis""" | |
| def __init__(self): | |
| self.prompts = self._initialize_prompts() | |
| def _initialize_prompts(self) -> Dict[str, str]: | |
| """Initialize all prompts""" | |
| return { | |
| 'detailed_analysis': self._get_detailed_prompt(), | |
| 'quick_analysis': self._get_quick_prompt(), | |
| 'competitive_analysis': self._get_competitive_prompt() | |
| } | |
| def _get_detailed_prompt(self) -> str: | |
| return """You are a Generative Engine Optimizer (GEO) specialist. Analyze the provided content for its effectiveness in AI-powered search engines and LLM systems. | |
| Evaluate the content based on these GEO criteria (score 1-10 each): | |
| 1. **AI Search Visibility**: How likely is this content to be surfaced by AI search engines? | |
| 2. **Query Intent Matching**: How well does the content match common user queries? | |
| 3. **Factual Accuracy & Authority**: How trustworthy and authoritative is the information? | |
| 4. **Conversational Readiness**: How suitable is the content for AI chat responses? | |
| 5. **Semantic Richness**: How well does the content use relevant semantic keywords? | |
| 6. **Context Completeness**: Does the content provide complete, self-contained answers? | |
| 7. **Citation Worthiness**: How likely are AI systems to cite this content? | |
| 8. **Multi-Query Coverage**: Does the content answer multiple related questions? | |
| Also identify: | |
| - Primary topics and entities | |
| - Missing information gaps | |
| - Optimization opportunities | |
| - Specific enhancement recommendations | |
| IMPORTANT: Respond ONLY with valid JSON. Do not include any text before or after the JSON. | |
| { | |
| "geo_scores": { | |
| "ai_search_visibility": 7.5, | |
| "query_intent_matching": 8.0, | |
| "factual_accuracy": 9.0, | |
| "conversational_readiness": 6.5, | |
| "semantic_richness": 7.0, | |
| "context_completeness": 8.5, | |
| "citation_worthiness": 7.8, | |
| "multi_query_coverage": 6.0 | |
| }, | |
| "overall_geo_score": 7.5, | |
| "primary_topics": ["topic1", "topic2"], | |
| "entities": ["entity1", "entity2"], | |
| "missing_gaps": ["gap1", "gap2"], | |
| "optimization_opportunities": [ | |
| { | |
| "type": "semantic_enhancement", | |
| "description": "Add more related terms", | |
| "priority": "high" | |
| } | |
| ], | |
| "recommendations": [ | |
| "Specific actionable recommendation 1", | |
| "Specific actionable recommendation 2" | |
| ] | |
| }""" | |
| def _get_quick_prompt(self) -> str: | |
| return """Analyze this content for AI search optimization. Provide scores (1-10) for: | |
| 1. AI Search Visibility | |
| 2. Query Intent Matching | |
| 3. Conversational Readiness | |
| 4. Citation Worthiness | |
| IMPORTANT: Respond ONLY with valid JSON. Do not include any text before or after the JSON. | |
| { | |
| "scores": { | |
| "ai_search_visibility": 7.5, | |
| "query_intent_matching": 8.0, | |
| "conversational_readiness": 6.5, | |
| "citation_worthiness": 7.8 | |
| }, | |
| "overall_score": 7.5, | |
| "top_recommendation": "Most important improvement needed" | |
| }""" | |
| def _get_competitive_prompt(self) -> str: | |
| return """Compare these content pieces for GEO performance. Identify which performs better for AI search and why. | |
| IMPORTANT: Respond ONLY with valid JSON. Do not include any text before or after the JSON. | |
| { | |
| "winner": "A", | |
| "score_comparison": { | |
| "content_a_score": 7.5, | |
| "content_b_score": 8.2 | |
| }, | |
| "key_differences": ["difference1", "difference2"], | |
| "improvement_suggestions": { | |
| "content_a": ["suggestion1"], | |
| "content_b": ["suggestion1"] | |
| } | |
| }""" | |
| def get_prompt(self, prompt_type: str) -> str: | |
| """Get prompt by type""" | |
| return self.prompts.get(prompt_type, self.prompts['detailed_analysis']) | |
| class GEOScorer: | |
| """Main class for calculating GEO scores and analysis""" | |
| def __init__(self, llm, config: Optional[GEOConfig] = None, logger: Optional[logging.Logger] = None): | |
| self.llm = llm | |
| self.config = config or GEOConfig() | |
| self.logger = logger or self._setup_logger() | |
| # Initialize components | |
| self.validator = GEOValidator() | |
| self.processor = GEOContentProcessor(self.config) | |
| self.prompt_manager = GEOPromptManager() | |
| # Performance tracking | |
| self.analysis_count = 0 | |
| self.cache_hits = 0 | |
| def _setup_logger(self) -> logging.Logger: | |
| """Setup default logger""" | |
| logger = logging.getLogger(__name__) | |
| if not logger.handlers: | |
| handler = logging.StreamHandler() | |
| formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| handler.setFormatter(formatter) | |
| logger.addHandler(handler) | |
| logger.setLevel(logging.INFO) | |
| return logger | |
| def _get_cached_analysis(self, content_hash: str) -> Optional[Dict[str, Any]]: | |
| """Cache mechanism for repeated analyses""" | |
| # This is a simple in-memory cache using lru_cache | |
| # In production, you might want to use Redis or similar | |
| return None | |
| def analyze_page_geo(self, content: str, title: str, detailed: bool = True) -> Dict[str, Any]: | |
| """ | |
| Analyze a single page for GEO performance with improved error handling and validation | |
| """ | |
| start_time = datetime.now() | |
| self.analysis_count += 1 | |
| try: | |
| # Input validation | |
| is_valid, error_msg = self.validator.validate_content_inputs(content, title, self.config) | |
| if not is_valid: | |
| self.logger.warning(f"Input validation failed: {error_msg}") | |
| return {'error': error_msg, 'error_type': 'validation'} | |
| # Check cache | |
| analysis_type = 'detailed' if detailed else 'quick' | |
| content_hash = self.processor.generate_content_hash(content, title, analysis_type) | |
| # Process content | |
| sanitized_content = self.processor.sanitize_content(content) | |
| max_length = self.config.MAX_CONTENT_LENGTH if detailed else self.config.QUICK_CONTENT_LENGTH | |
| processed_content = self.processor.smart_truncate(sanitized_content, max_length) | |
| # Get appropriate prompt | |
| prompt_type = 'detailed_analysis' if detailed else 'quick_analysis' | |
| system_prompt = self.prompt_manager.get_prompt(prompt_type) | |
| user_message = f"Title: {title}\n\nContent: {processed_content}" | |
| # Build and execute prompt | |
| prompt_template = ChatPromptTemplate.from_messages([ | |
| SystemMessagePromptTemplate.from_template(system_prompt), | |
| HumanMessagePromptTemplate.from_template(user_message) | |
| ]) | |
| chain = prompt_template | self.llm | |
| result = chain.invoke({}) | |
| # Extract and parse result | |
| result_content = result.content if hasattr(result, 'content') else str(result) | |
| parsed_result = self._parse_llm_response(result_content) | |
| # Add metadata | |
| processing_time = (datetime.now() - start_time).total_seconds() | |
| parsed_result.update({ | |
| 'analyzed_title': title, | |
| 'content_length': len(content), | |
| 'processed_content_length': len(processed_content), | |
| 'word_count': len(content.split()), | |
| 'analysis_type': analysis_type, | |
| 'processing_time_seconds': processing_time, | |
| 'content_hash': content_hash | |
| }) | |
| self.logger.info(f"Analysis completed for '{title}' in {processing_time:.2f}s") | |
| return parsed_result | |
| except json.JSONDecodeError as e: | |
| self.logger.error(f"JSON parsing failed for title '{title}': {e}") | |
| return {'error': 'Invalid response format from LLM', 'error_type': 'parsing', 'title': title} | |
| except Exception as e: | |
| self.logger.error(f"Analysis failed for title '{title}': {e}") | |
| return {'error': f"Analysis failed: {str(e)}", 'error_type': 'system', 'title': title} | |
| def analyze_multiple_pages(self, pages_data: List[Dict[str, Any]], detailed: bool = True) -> List[Dict[str, Any]]: | |
| """ | |
| Analyze multiple pages with improved validation and error handling | |
| """ | |
| # Validate input | |
| is_valid, error_msg = self.validator.validate_pages_data(pages_data) | |
| if not is_valid: | |
| self.logger.error(f"Pages data validation failed: {error_msg}") | |
| return [{'error': error_msg, 'error_type': 'validation'}] | |
| results = [] | |
| successful_analyses = 0 | |
| self.logger.info(f"Starting analysis of {len(pages_data)} pages") | |
| for i, page_data in enumerate(pages_data): | |
| try: | |
| content = page_data.get('content', '') | |
| title = page_data.get('title', f'Page {i+1}') | |
| analysis = self.analyze_page_geo(content, title, detailed) | |
| # Add page-specific metadata | |
| analysis.update({ | |
| 'page_url': page_data.get('url', ''), | |
| 'page_index': i, | |
| 'source_word_count': page_data.get('word_count', 0) | |
| }) | |
| if 'error' not in analysis: | |
| successful_analyses += 1 | |
| results.append(analysis) | |
| except Exception as e: | |
| self.logger.error(f"Failed to analyze page {i}: {e}") | |
| results.append({ | |
| 'page_index': i, | |
| 'page_url': page_data.get('url', ''), | |
| 'error': f"Analysis failed: {str(e)}", | |
| 'error_type': 'system' | |
| }) | |
| self.logger.info(f"Completed analysis: {successful_analyses}/{len(pages_data)} successful") | |
| return results | |
| def compare_content_geo(self, content_a: str, content_b: str, titles: Optional[Tuple[str, str]] = None) -> Dict[str, Any]: | |
| """ | |
| Compare two pieces of content for GEO performance with improved validation | |
| """ | |
| try: | |
| title_a, title_b = titles if titles else ("Content A", "Content B") | |
| # Validate inputs | |
| is_valid_a, error_a = self.validator.validate_content_inputs(content_a, title_a, self.config) | |
| is_valid_b, error_b = self.validator.validate_content_inputs(content_b, title_b, self.config) | |
| if not is_valid_a: | |
| return {'error': f"Content A validation failed: {error_a}", 'error_type': 'validation'} | |
| if not is_valid_b: | |
| return {'error': f"Content B validation failed: {error_b}", 'error_type': 'validation'} | |
| # Process content | |
| processed_a = self.processor.smart_truncate( | |
| self.processor.sanitize_content(content_a), | |
| self.config.QUICK_CONTENT_LENGTH | |
| ) | |
| processed_b = self.processor.smart_truncate( | |
| self.processor.sanitize_content(content_b), | |
| self.config.QUICK_CONTENT_LENGTH | |
| ) | |
| # Build comparison prompt | |
| system_prompt = self.prompt_manager.get_prompt('competitive_analysis') | |
| user_message = f"""Content A: | |
| Title: {title_a} | |
| Content: {processed_a} | |
| Content B: | |
| Title: {title_b} | |
| Content: {processed_b}""" | |
| prompt_template = ChatPromptTemplate.from_messages([ | |
| SystemMessagePromptTemplate.from_template(system_prompt), | |
| HumanMessagePromptTemplate.from_template(user_message) | |
| ]) | |
| chain = prompt_template | self.llm | |
| result = chain.invoke({}) | |
| result_content = result.content if hasattr(result, 'content') else str(result) | |
| comparison_result = self._parse_llm_response(result_content) | |
| # Add metadata | |
| comparison_result.update({ | |
| 'content_a_title': title_a, | |
| 'content_b_title': title_b, | |
| 'content_a_length': len(content_a), | |
| 'content_b_length': len(content_b) | |
| }) | |
| return comparison_result | |
| except Exception as e: | |
| self.logger.error(f"Comparison analysis failed: {e}") | |
| return {'error': f"Comparison analysis failed: {str(e)}", 'error_type': 'system'} | |
| def calculate_aggregate_scores(self, individual_results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Calculate aggregate GEO scores with improved error handling and insights | |
| """ | |
| try: | |
| # Filter out error results | |
| valid_results = [r for r in individual_results if 'geo_scores' in r and not r.get('error')] | |
| error_results = [r for r in individual_results if r.get('error')] | |
| if not valid_results: | |
| return { | |
| 'error': 'No valid results to aggregate', | |
| 'error_type': 'no_data', | |
| 'total_errors': len(error_results), | |
| 'error_breakdown': self._analyze_errors(error_results) | |
| } | |
| # Calculate average scores | |
| score_keys = list(valid_results[0]['geo_scores'].keys()) | |
| avg_scores = {} | |
| score_details = {} | |
| for key in score_keys: | |
| scores = [r['geo_scores'][key] for r in valid_results if key in r['geo_scores']] | |
| if scores: | |
| avg_scores[key] = sum(scores) / len(scores) | |
| score_details[key] = { | |
| 'average': avg_scores[key], | |
| 'min': min(scores), | |
| 'max': max(scores), | |
| 'count': len(scores) | |
| } | |
| else: | |
| avg_scores[key] = 0 | |
| score_details[key] = {'average': 0, 'min': 0, 'max': 0, 'count': 0} | |
| overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0 | |
| # Collect insights | |
| insights = self._generate_aggregate_insights(valid_results, avg_scores) | |
| # Find performance patterns | |
| best_score = max(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0) | |
| worst_score = min(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0) | |
| return { | |
| 'aggregate_scores': avg_scores, | |
| 'score_details': score_details, | |
| 'overall_score': overall_avg, | |
| 'pages_analyzed': len(valid_results), | |
| 'pages_with_errors': len(error_results), | |
| 'success_rate': len(valid_results) / len(individual_results) if individual_results else 0, | |
| 'best_performing_metric': { | |
| 'metric': best_score[0], | |
| 'score': best_score[1] | |
| }, | |
| 'lowest_performing_metric': { | |
| 'metric': worst_score[0], | |
| 'score': worst_score[1] | |
| }, | |
| 'insights': insights, | |
| 'score_distribution': self._calculate_score_distribution(avg_scores), | |
| 'processing_stats': self._calculate_processing_stats(valid_results) | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Aggregation failed: {e}") | |
| return {'error': f"Aggregation failed: {str(e)}", 'error_type': 'system'} | |
| def get_performance_stats(self) -> Dict[str, Any]: | |
| """Get performance statistics for the scorer""" | |
| return { | |
| 'total_analyses': self.analysis_count, | |
| 'cache_hits': self.cache_hits, | |
| 'cache_hit_rate': self.cache_hits / max(self.analysis_count, 1), | |
| 'config': { | |
| 'max_content_length': self.config.MAX_CONTENT_LENGTH, | |
| 'cache_size': self.config.CACHE_SIZE | |
| } | |
| } | |
| def _parse_llm_response(self, response_text: str) -> Dict[str, Any]: | |
| """Enhanced LLM response parsing with better error handling""" | |
| try: | |
| # Clean response text | |
| cleaned_response = response_text.strip() | |
| # Try to find JSON content | |
| json_patterns = [ | |
| r'\{.*\}', # Look for JSON object | |
| r'```json\s*(\{.*?\})\s*```', # JSON in code blocks | |
| r'```\s*(\{.*?\})\s*```' # Generic code blocks | |
| ] | |
| for pattern in json_patterns: | |
| matches = re.findall(pattern, cleaned_response, re.DOTALL) | |
| if matches: | |
| json_str = matches[0] if isinstance(matches[0], str) else matches[0] | |
| try: | |
| return json.loads(json_str) | |
| except json.JSONDecodeError: | |
| continue | |
| # If no JSON patterns found, try parsing the entire response | |
| try: | |
| return json.loads(cleaned_response) | |
| except json.JSONDecodeError: | |
| pass | |
| # Last resort: return structured error | |
| return { | |
| 'raw_response': response_text, | |
| 'parsing_error': 'No valid JSON found in response', | |
| 'error_type': 'parsing' | |
| } | |
| except Exception as e: | |
| return { | |
| 'raw_response': response_text, | |
| 'parsing_error': f'Unexpected parsing error: {str(e)}', | |
| 'error_type': 'parsing' | |
| } | |
| def _analyze_errors(self, error_results: List[Dict[str, Any]]) -> Dict[str, int]: | |
| """Analyze error patterns""" | |
| error_breakdown = {} | |
| for result in error_results: | |
| error_type = result.get('error_type', 'unknown') | |
| error_breakdown[error_type] = error_breakdown.get(error_type, 0) + 1 | |
| return error_breakdown | |
| def _generate_aggregate_insights(self, valid_results: List[Dict[str, Any]], avg_scores: Dict[str, float]) -> List[str]: | |
| """Generate insights from aggregate analysis""" | |
| insights = [] | |
| if not avg_scores: | |
| return ["No valid scores to analyze"] | |
| overall_avg = sum(avg_scores.values()) / len(avg_scores) | |
| # Performance level insights | |
| if overall_avg >= 8.0: | |
| insights.append("Excellent overall GEO performance across analyzed content") | |
| elif overall_avg >= 6.5: | |
| insights.append("Good GEO performance with room for targeted improvements") | |
| elif overall_avg >= 5.0: | |
| insights.append("Moderate GEO performance - significant optimization opportunities exist") | |
| else: | |
| insights.append("Below-average GEO performance - comprehensive optimization needed") | |
| # Specific metric insights | |
| best_metric = max(avg_scores.items(), key=lambda x: x[1]) | |
| worst_metric = min(avg_scores.items(), key=lambda x: x[1]) | |
| if best_metric[1] >= 8.0: | |
| insights.append(f"Strong performance in {best_metric[0].replace('_', ' ')} (score: {best_metric[1]:.1f})") | |
| if worst_metric[1] < 6.0: | |
| insights.append(f"Critical improvement needed in {worst_metric[0].replace('_', ' ')} (score: {worst_metric[1]:.1f})") | |
| # Consistency insights | |
| score_values = list(avg_scores.values()) | |
| score_range = max(score_values) - min(score_values) | |
| if score_range > 3.0: | |
| insights.append("High variability in scores indicates inconsistent optimization across metrics") | |
| elif score_range < 1.5: | |
| insights.append("Consistent performance across all GEO metrics") | |
| return insights | |
| def _calculate_score_distribution(self, scores: Dict[str, float]) -> Dict[str, Any]: | |
| """Calculate enhanced score distribution statistics""" | |
| if not scores: | |
| return {} | |
| score_values = list(scores.values()) | |
| return { | |
| 'highest_score': max(score_values), | |
| 'lowest_score': min(score_values), | |
| 'average_score': sum(score_values) / len(score_values), | |
| 'score_range': max(score_values) - min(score_values), | |
| 'scores_above_8': len([s for s in score_values if s >= 8.0]), | |
| 'scores_above_7': len([s for s in score_values if s >= 7.0]), | |
| 'scores_below_5': len([s for s in score_values if s < 5.0]), | |
| 'score_variance': sum((s - sum(score_values)/len(score_values))**2 for s in score_values) / len(score_values) | |
| } | |
| def _calculate_processing_stats(self, valid_results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Calculate processing statistics""" | |
| processing_times = [r.get('processing_time_seconds', 0) for r in valid_results if 'processing_time_seconds' in r] | |
| content_lengths = [r.get('content_length', 0) for r in valid_results if 'content_length' in r] | |
| if not processing_times: | |
| return {} | |
| return { | |
| 'avg_processing_time': sum(processing_times) / len(processing_times), | |
| 'max_processing_time': max(processing_times), | |
| 'min_processing_time': min(processing_times), | |
| 'avg_content_length': sum(content_lengths) / len(content_lengths) if content_lengths else 0, | |
| 'total_processing_time': sum(processing_times) | |
| } | |
| def _get_timestamp(self) -> str: | |
| """Get current timestamp""" | |
| return datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| # Example usage and testing utilities | |
| class GEOScorerTester: | |
| """Testing utilities for GEOScorer""" | |
| def create_test_content() -> List[Dict[str, Any]]: | |
| """Create test content for validation""" | |
| return [ | |
| { | |
| 'title': 'How to Optimize Content for AI Search', | |
| 'content': 'AI search engines are revolutionizing how people find information. To optimize your content for AI-powered search, focus on creating comprehensive, factual, and well-structured content that directly answers user questions. Use semantic keywords, provide clear context, and ensure your content is authoritative and cite-worthy.', | |
| 'url': 'https://example.com/ai-search-optimization' | |
| }, | |
| { | |
| 'title': 'Best Practices for GEO', | |
| 'content': 'Generative Engine Optimization (GEO) requires a different approach than traditional SEO. Focus on conversational readiness, semantic richness, and multi-query coverage. Ensure your content provides complete answers and is structured in a way that AI systems can easily understand and cite.', | |
| 'url': 'https://example.com/geo-best-practices' | |
| } | |
| ] | |
| def run_basic_test(scorer: GEOScorer) -> Dict[str, Any]: | |
| """Run basic functionality test""" | |
| test_content = GEOScorerTester.create_test_content() | |
| results = scorer.analyze_multiple_pages(test_content, detailed=False) | |
| aggregate = scorer.calculate_aggregate_scores(results) | |
| stats = scorer.get_performance_stats() | |
| return { | |
| 'individual_results': results, | |
| 'aggregate_results': aggregate, | |
| 'performance_stats': stats | |
| } |