Spaces:
Running
Running
""" | |
Google Search Console API client for SEO Report Generator | |
Handles OAuth authentication and Search Analytics API queries using Google API client | |
""" | |
import os | |
import json | |
from datetime import datetime, timedelta | |
from typing import Dict, Any, List, Optional | |
import time | |
try: | |
from google.auth.transport.requests import Request | |
from google.oauth2.credentials import Credentials | |
from google_auth_oauthlib.flow import Flow | |
from googleapiclient.discovery import build | |
GOOGLE_LIBS_AVAILABLE = True | |
except ImportError: | |
GOOGLE_LIBS_AVAILABLE = False | |
# Create dummy classes to prevent import errors | |
class Credentials: | |
pass | |
class Request: | |
pass | |
class Flow: | |
def from_client_config(cls, *args, **kwargs): | |
pass | |
def build(*args, **kwargs): | |
pass | |
from utils import safe_pct | |
class GSCClient: | |
def __init__(self): | |
if not GOOGLE_LIBS_AVAILABLE: | |
raise ImportError("Google API libraries not installed. Run: pip install google-api-python-client google-auth-oauthlib google-auth") | |
self.client_id = os.getenv('GOOGLE_CLIENT_ID') | |
self.client_secret = os.getenv('GOOGLE_CLIENT_SECRET') | |
self.redirect_uri = os.getenv('GSC_REDIRECT_URI', 'http://localhost:7860/auth/gsc/callback') | |
self.property_url = os.getenv('GSC_PROPERTY_URL') | |
# Configuration | |
self.row_limit = int(os.getenv('GSC_ROW_LIMIT', 1000)) | |
self.days = int(os.getenv('GSC_DAYS', 28)) | |
# OAuth2 scopes | |
self.scopes = ['https://www.googleapis.com/auth/webmasters.readonly'] | |
# Cache | |
self.cache = {} | |
self.cache_ttl = 3600 # 1 hour | |
def get_auth_url(self, state: str = None) -> str: | |
"""Generate OAuth authorization URL using Google OAuth2 flow""" | |
if not self.client_id or not self.client_secret: | |
raise ValueError("GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET must be configured") | |
# Create OAuth2 client configuration | |
client_config = { | |
"web": { | |
"client_id": self.client_id, | |
"client_secret": self.client_secret, | |
"auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
"token_uri": "https://oauth2.googleapis.com/token", | |
"redirect_uris": [self.redirect_uri] | |
} | |
} | |
# Create the flow | |
flow = Flow.from_client_config( | |
client_config, | |
scopes=self.scopes, | |
redirect_uri=self.redirect_uri | |
) | |
# Generate authorization URL | |
auth_url, _ = flow.authorization_url( | |
access_type='offline', | |
include_granted_scopes='true', | |
prompt='consent' | |
) | |
return auth_url | |
def exchange_code(self, auth_code: str) -> Dict[str, Any]: | |
"""Exchange authorization code for access token using Google OAuth2 flow""" | |
# Create OAuth2 client configuration | |
client_config = { | |
"web": { | |
"client_id": self.client_id, | |
"client_secret": self.client_secret, | |
"auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
"token_uri": "https://oauth2.googleapis.com/token", | |
"redirect_uris": [self.redirect_uri] | |
} | |
} | |
# Create the flow | |
flow = Flow.from_client_config( | |
client_config, | |
scopes=self.scopes, | |
redirect_uri=self.redirect_uri | |
) | |
# Exchange code for token | |
flow.fetch_token(code=auth_code) | |
# Return credentials in a format compatible with session storage | |
credentials = flow.credentials | |
return { | |
'access_token': credentials.token, | |
'refresh_token': credentials.refresh_token, | |
'token_uri': credentials.token_uri, | |
'client_id': credentials.client_id, | |
'client_secret': credentials.client_secret, | |
'scopes': credentials.scopes | |
} | |
def get_credentials_from_session(self, session_data: Dict[str, Any]) -> Credentials: | |
"""Create Credentials object from session data""" | |
return Credentials( | |
token=session_data.get('access_token'), | |
refresh_token=session_data.get('refresh_token'), | |
token_uri=session_data.get('token_uri'), | |
client_id=session_data.get('client_id'), | |
client_secret=session_data.get('client_secret'), | |
scopes=session_data.get('scopes') | |
) | |
def get_search_analytics(self, session_data: Dict[str, Any], property_url: str = None) -> Dict[str, Any]: | |
"""Fetch search analytics data from GSC using Google API client""" | |
if not property_url: | |
property_url = self.property_url | |
if not property_url: | |
raise ValueError("GSC_PROPERTY_URL not configured") | |
# Check cache | |
cache_key = f"gsc_{property_url}_{self.days}" | |
if cache_key in self.cache: | |
cache_time, data = self.cache[cache_key] | |
if time.time() - cache_time < self.cache_ttl: | |
return data | |
# Get credentials from session | |
credentials = self.get_credentials_from_session(session_data) | |
# Refresh token if needed | |
if not credentials.valid: | |
credentials.refresh(Request()) | |
# Update session with new token | |
session_data['access_token'] = credentials.token | |
# Build the Search Console service | |
service = build('searchconsole', 'v1', credentials=credentials) | |
# Calculate date range | |
end_date = datetime.now() - timedelta(days=3) # GSC has ~3 day delay | |
start_date = end_date - timedelta(days=self.days) | |
# Prepare the request body | |
request_body = { | |
'startDate': start_date.strftime('%Y-%m-%d'), | |
'endDate': end_date.strftime('%Y-%m-%d'), | |
'dimensions': ['query'], | |
'searchType': 'web', | |
'rowLimit': self.row_limit | |
} | |
try: | |
# Execute the search analytics query | |
response = service.searchanalytics().query( | |
siteUrl=property_url, | |
body=request_body | |
).execute() | |
# Cache the result | |
self.cache[cache_key] = (time.time(), response) | |
return response | |
except Exception as e: | |
raise Exception(f"GSC API request failed: {str(e)}") | |
def transform_gsc_data(self, gsc_response: Dict[str, Any], domain: str) -> Dict[str, Any]: | |
"""Transform GSC API response into keywords module format""" | |
rows = gsc_response.get('rows', []) | |
if not rows: | |
return { | |
'data_source': 'Google Search Console', | |
'totals': {'keywords': 0, 'estimated_traffic': 0}, | |
'distribution': {'top3': 0, 'top10': 0, 'top50': 0}, | |
'distribution_pct': {'top3': 0, 'top10': 0, 'top50': 0}, | |
'best_keywords': [], | |
'worst_keywords': {'by_ctr': [], 'by_position': []}, | |
'opportunities': [], | |
'competitor_summary': [] | |
} | |
# Transform rows | |
keywords = [] | |
for row in rows: | |
keywords.append({ | |
'query': row['keys'][0], | |
'clicks': row['clicks'], | |
'impressions': row['impressions'], | |
'ctr': row['ctr'] * 100, # Convert to percentage | |
'avg_position': row['position'] | |
}) | |
# Calculate distribution (approximate based on avg_position) | |
top3 = sum(1 for r in keywords if r['avg_position'] <= 3) | |
top10 = sum(1 for r in keywords if r['avg_position'] <= 10) | |
top50 = sum(1 for r in keywords if r['avg_position'] <= 50) | |
total = len(keywords) | |
# Best performers (sort by clicks, then CTR) | |
best_keywords = sorted(keywords, key=lambda x: (x['clicks'], x['ctr']), reverse=True)[:15] | |
# Transform best keywords to expected format | |
best_keywords_formatted = [ | |
{ | |
'keyword': k['query'], | |
'rank': round(k['avg_position'], 1), | |
'url': '', # GSC doesn't provide URL per query | |
'volume': k['impressions'], | |
'estimated_traffic': k['clicks'], | |
'trend': 'stable', # No historical data in single request | |
'clicks': k['clicks'], | |
'ctr': k['ctr'] | |
} | |
for k in best_keywords | |
] | |
# Worst performers | |
worst_keywords = self._identify_worst_gsc_keywords(keywords) | |
# Opportunities (high impressions, low CTR) | |
opportunities = [ | |
{ | |
'keyword': k['query'], | |
'impressions': k['impressions'], | |
'ctr': k['ctr'], | |
'avg_position': k['avg_position'], | |
'clicks': k['clicks'], | |
'priority_score': self._calculate_gsc_opportunity_score(k) | |
} | |
for k in keywords | |
if k['impressions'] >= 100 and k['ctr'] < 2.0 and k['avg_position'] > 10 | |
] | |
opportunities.sort(key=lambda x: x['priority_score'], reverse=True) | |
return { | |
'data_source': 'Google Search Console', | |
'totals': { | |
'keywords': total, | |
'estimated_traffic': sum(k['clicks'] for k in keywords) | |
}, | |
'distribution': { | |
'top3': top3, | |
'top10': top10, | |
'top50': top50 | |
}, | |
'distribution_pct': { | |
'top3': safe_pct(top3, total), | |
'top10': safe_pct(top10, total), | |
'top50': safe_pct(top50, total) | |
}, | |
'best_keywords': best_keywords_formatted, | |
'worst_keywords': worst_keywords, | |
'opportunities': opportunities[:50], | |
'competitor_summary': [], # GSC doesn't provide competitor data | |
'movement': {'new': 0, 'up': 0, 'down': 0, 'lost': 0}, # Requires historical data | |
'data_sources': { | |
'positions': 'Google Search Console', | |
'volume': 'Google Search Console', | |
'enrichment_rate': 100.0 # GSC provides complete data | |
} | |
} | |
def _identify_worst_gsc_keywords(self, keywords: List[Dict]) -> Dict[str, List[Dict]]: | |
"""Identify worst performing keywords from GSC data""" | |
IMP_MIN = 100 | |
CTR_MIN = 1.0 | |
# Worst by CTR | |
worst_by_ctr = [ | |
{ | |
'keyword': k['query'], | |
'rank': round(k['avg_position'], 1), | |
'impressions': k['impressions'], | |
'estimated_ctr': k['ctr'], | |
'clicks': k['clicks'] | |
} | |
for k in keywords | |
if k['impressions'] >= IMP_MIN and k['ctr'] < CTR_MIN | |
] | |
# Worst by position | |
worst_by_position = [ | |
{ | |
'keyword': k['query'], | |
'rank': round(k['avg_position'], 1), | |
'impressions': k['impressions'], | |
'clicks': k['clicks'], | |
'ctr': k['ctr'] | |
} | |
for k in keywords | |
if k['avg_position'] > 30 and k['impressions'] >= IMP_MIN | |
] | |
# Sort and limit | |
worst_by_ctr.sort(key=lambda x: x['estimated_ctr']) | |
worst_by_position.sort(key=lambda x: x['rank'], reverse=True) | |
return { | |
'by_ctr': worst_by_ctr[:20], | |
'by_position': worst_by_position[:20] | |
} | |
def _calculate_gsc_opportunity_score(self, keyword: Dict) -> float: | |
"""Calculate opportunity score for GSC keyword""" | |
impressions = keyword['impressions'] | |
ctr = keyword['ctr'] | |
position = keyword['avg_position'] | |
# Higher impressions = more opportunity | |
impression_score = min(100, impressions / 1000 * 10) | |
# Lower CTR = more opportunity for improvement | |
ctr_score = max(0, 5 - ctr) * 10 | |
# Closer to first page = more opportunity | |
position_score = max(0, 50 - position) | |
return round((impression_score + ctr_score + position_score) / 3, 1) |