Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Google OAuth Token Refresh Script for VoiceCal.ai | |
| This script automatically refreshes Google OAuth access tokens and updates: | |
| 1. Local .env file | |
| 2. HuggingFace Spaces secrets | |
| 3. Credentials JSON file (if exists) | |
| Designed for cron job execution with logging and email notifications. | |
| Usage: | |
| python3 scripts/refresh_token_only.py [--notify-email EMAIL] [--verbose] | |
| """ | |
| import requests | |
| import json | |
| import os | |
| import sys | |
| import logging | |
| import argparse | |
| from datetime import datetime, timedelta, timezone | |
| from pathlib import Path | |
| def setup_logging(verbose=False): | |
| """Setup logging configuration.""" | |
| # Create logs directory if it doesn't exist | |
| log_dir = Path("logs") | |
| log_dir.mkdir(exist_ok=True) | |
| # Create timestamped log file | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| log_file = log_dir / f"token_refresh_{timestamp}.log" | |
| # Configure logging | |
| log_format = "%(asctime)s | %(levelname)s | %(message)s" | |
| log_level = logging.DEBUG if verbose else logging.INFO | |
| # Log to both file and console | |
| logging.basicConfig( | |
| level=log_level, | |
| format=log_format, | |
| handlers=[ | |
| logging.FileHandler(log_file), | |
| logging.StreamHandler(sys.stdout) | |
| ] | |
| ) | |
| return log_file | |
| def load_credentials(): | |
| """Load OAuth credentials from environment variables.""" | |
| logging.info("π Loading OAuth credentials...") | |
| # Load from .env file if it exists | |
| env_path = Path(".env") | |
| if env_path.exists(): | |
| logging.info("π Loading from environment variables") | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| refresh_token = os.environ.get('GOOGLE_REFRESH_TOKEN') | |
| client_id = os.environ.get('GOOGLE_CLIENT_ID') | |
| client_secret = os.environ.get('GOOGLE_CLIENT_SECRET') | |
| if not all([refresh_token, client_id, client_secret]): | |
| logging.error("β Missing required environment variables:") | |
| logging.error(f" GOOGLE_REFRESH_TOKEN: {'β' if refresh_token else 'β'}") | |
| logging.error(f" GOOGLE_CLIENT_ID: {'β' if client_id else 'β'}") | |
| logging.error(f" GOOGLE_CLIENT_SECRET: {'β' if client_secret else 'β'}") | |
| raise ValueError("Missing required environment variables") | |
| logging.info("β Credentials loaded") | |
| logging.info(f" Client ID: {client_id[:20]}...") | |
| logging.info(f" Refresh Token: {refresh_token[:20]}...") | |
| return { | |
| 'refresh_token': refresh_token, | |
| 'client_id': client_id, | |
| 'client_secret': client_secret | |
| } | |
| def refresh_access_token(credentials): | |
| """Refresh Google OAuth access token.""" | |
| logging.info("π Refreshing access token...") | |
| data = { | |
| 'client_id': credentials['client_id'], | |
| 'client_secret': credentials['client_secret'], | |
| 'refresh_token': credentials['refresh_token'], | |
| 'grant_type': 'refresh_token' | |
| } | |
| try: | |
| response = requests.post('https://oauth2.googleapis.com/token', data=data, timeout=30) | |
| if response.status_code == 200: | |
| tokens = response.json() | |
| access_token = tokens['access_token'] | |
| expires_in = tokens.get('expires_in', 3600) | |
| logging.info("β Access token refreshed successfully!") | |
| logging.info(f" New token: {access_token[:20]}...") | |
| logging.info(f" Expires in: {expires_in} seconds ({expires_in/3600:.1f} hours)") | |
| return { | |
| 'access_token': access_token, | |
| 'expires_in': expires_in | |
| } | |
| else: | |
| error_msg = f"Token has been expired or revoked." if response.status_code == 400 else response.text | |
| logging.error(f"β Failed to refresh token: {error_msg}") | |
| logging.error(f" Response: {response.text if response.text else 'No response'}") | |
| raise Exception(f"Failed to refresh token: {error_msg}") | |
| except requests.exceptions.RequestException as e: | |
| logging.error(f"β Network error during token refresh: {e}") | |
| raise | |
| except Exception as e: | |
| logging.error(f"β Token refresh failed: {e}") | |
| raise | |
| def update_env_file(access_token, expiry_seconds): | |
| """Update .env file with new access token.""" | |
| logging.info("πΎ Updating .env file...") | |
| env_path = Path(".env") | |
| if not env_path.exists(): | |
| logging.warning("β οΈ .env file not found") | |
| return False | |
| # Calculate expiry time using timezone-aware datetime | |
| expiry_time = datetime.now(timezone.utc) + timedelta(seconds=expiry_seconds) | |
| # Read current .env file | |
| with open(env_path, 'r') as f: | |
| lines = f.readlines() | |
| # Update token lines | |
| updated_lines = [] | |
| found_access_token = False | |
| found_token_expiry = False | |
| for line in lines: | |
| if line.startswith('GOOGLE_ACCESS_TOKEN='): | |
| updated_lines.append(f'GOOGLE_ACCESS_TOKEN={access_token}\n') | |
| found_access_token = True | |
| elif line.startswith('GOOGLE_TOKEN_EXPIRY='): | |
| updated_lines.append(f'GOOGLE_TOKEN_EXPIRY={expiry_time.isoformat()}\n') | |
| found_token_expiry = True | |
| else: | |
| updated_lines.append(line) | |
| # Add lines if they don't exist | |
| if not found_access_token: | |
| updated_lines.append(f'GOOGLE_ACCESS_TOKEN={access_token}\n') | |
| if not found_token_expiry: | |
| updated_lines.append(f'GOOGLE_TOKEN_EXPIRY={expiry_time.isoformat()}\n') | |
| # Write back to .env file | |
| with open(env_path, 'w') as f: | |
| f.writelines(updated_lines) | |
| logging.info("β .env file updated") | |
| return True | |
| def update_credentials_file(access_token, expiry_seconds, credentials): | |
| """Update Google Calendar credentials JSON file.""" | |
| logging.info("π Updating credentials file...") | |
| creds_path = Path("app/credentials/google_calendar_credentials.json") | |
| if not creds_path.exists(): | |
| logging.warning(f"β οΈ Credentials file not found: {creds_path}") | |
| return False | |
| # Calculate expiry time | |
| expiry_time = datetime.now(timezone.utc) + timedelta(seconds=expiry_seconds) | |
| # Load existing credentials | |
| with open(creds_path, 'r') as f: | |
| creds_data = json.load(f) | |
| # Update token and expiry | |
| creds_data['token'] = access_token | |
| creds_data['expiry'] = expiry_time.isoformat() | |
| # Write back to file | |
| with open(creds_path, 'w') as f: | |
| json.dump(creds_data, f, indent=2) | |
| logging.info("β Credentials file updated") | |
| return True | |
| def update_huggingface_secrets(access_token, expiry_seconds): | |
| """Update HuggingFace Spaces secrets with new OAuth tokens.""" | |
| logging.info("βοΈ Updating HuggingFace Spaces secrets...") | |
| try: | |
| from huggingface_hub import HfApi | |
| # Get HF token from environment | |
| hf_token = os.environ.get("HF_TOKEN") | |
| if not hf_token: | |
| logging.warning("β οΈ No HF_TOKEN found - skipping HuggingFace Secrets update") | |
| return False | |
| # Initialize HF API | |
| api = HfApi(token=hf_token) | |
| # Your space repository ID | |
| repo_id = "pgits/voiceCal-ai-v3" | |
| # Calculate expiry time | |
| expiry_time = datetime.now(timezone.utc) + timedelta(seconds=expiry_seconds) | |
| # Update access token secret | |
| api.add_space_secret( | |
| repo_id=repo_id, | |
| key="GOOGLE_ACCESS_TOKEN", | |
| value=access_token | |
| ) | |
| logging.info("β Updated GOOGLE_ACCESS_TOKEN in HF Secrets") | |
| # Update expiry as ISO string | |
| api.add_space_secret( | |
| repo_id=repo_id, | |
| key="GOOGLE_TOKEN_EXPIRY", | |
| value=expiry_time.isoformat() | |
| ) | |
| logging.info("β Updated GOOGLE_TOKEN_EXPIRY in HF Secrets") | |
| logging.info("π HuggingFace Secrets updated successfully!") | |
| return True | |
| except ImportError: | |
| logging.warning("β οΈ huggingface_hub not installed - skipping HF Secrets update") | |
| return False | |
| except Exception as e: | |
| logging.error(f"β Failed to update HuggingFace Secrets: {e}") | |
| return False | |
| def send_email_notification(recipient_email, subject, body): | |
| """Send email notification using SMTP.""" | |
| try: | |
| import smtplib | |
| from email.mime.text import MIMEText | |
| from email.mime.multipart import MIMEMultipart | |
| smtp_host = os.environ.get("SMTP_HOST", "smtp.gmail.com") | |
| smtp_port = int(os.environ.get("SMTP_PORT", "587")) | |
| smtp_username = os.environ.get("SMTP_USERNAME") | |
| smtp_password = os.environ.get("SMTP_PASSWORD") | |
| if not all([smtp_username, smtp_password]): | |
| logging.warning("β οΈ SMTP credentials not configured - skipping email notification") | |
| return False | |
| msg = MIMEMultipart() | |
| msg['From'] = smtp_username | |
| msg['To'] = recipient_email | |
| msg['Subject'] = subject | |
| msg.attach(MIMEText(body, 'plain')) | |
| with smtplib.SMTP(smtp_host, smtp_port) as server: | |
| server.starttls() | |
| server.login(smtp_username, smtp_password) | |
| server.send_message(msg) | |
| logging.info(f"β Email notification sent to {recipient_email}") | |
| return True | |
| except Exception as e: | |
| logging.error(f"β Failed to send email notification: {e}") | |
| return False | |
| def main(): | |
| """Main execution function.""" | |
| parser = argparse.ArgumentParser(description='Refresh Google OAuth tokens') | |
| parser.add_argument('--notify-email', help='Email address for failure notifications') | |
| parser.add_argument('--verbose', action='store_true', help='Enable verbose logging') | |
| args = parser.parse_args() | |
| # Setup logging | |
| log_file = setup_logging(args.verbose) | |
| logging.info("=" * 80) | |
| logging.info("VoiceCal.ai OAuth Token Refresh Script") | |
| logging.info(f"Started at: {datetime.now(timezone.utc).isoformat()}") | |
| logging.info(f"Log file: {log_file}") | |
| logging.info("=" * 80) | |
| success = False | |
| error_message = None | |
| try: | |
| # Load credentials | |
| credentials = load_credentials() | |
| # Refresh access token | |
| tokens = refresh_access_token(credentials) | |
| # Update .env file | |
| update_env_file(tokens['access_token'], tokens['expires_in']) | |
| # Update credentials file (optional) | |
| update_credentials_file(tokens['access_token'], tokens['expires_in'], credentials) | |
| # Update HuggingFace Spaces secrets | |
| update_huggingface_secrets(tokens['access_token'], tokens['expires_in']) | |
| success = True | |
| except Exception as e: | |
| error_message = str(e) | |
| logging.error(f"β Token refresh failed: {error_message}") | |
| # Send failure notification if email provided | |
| if args.notify_email: | |
| logging.info(f"π§ Sending failure notification to {args.notify_email}...") | |
| subject = "β οΈ VoiceCal.ai OAuth Token Refresh Failed" | |
| body = f""" | |
| VoiceCal.ai OAuth token refresh failed. | |
| Error: {error_message} | |
| Timestamp: {datetime.now(timezone.utc).isoformat()} | |
| Log file: {log_file} | |
| Please re-authenticate by visiting: | |
| http://localhost:8080/auth/login | |
| -- VoiceCal.ai Automated Token Refresh | |
| """.strip() | |
| send_email_notification(args.notify_email, subject, body) | |
| # Log completion status | |
| logging.info("=" * 80) | |
| logging.info(f"Completed at: {datetime.now(timezone.utc).isoformat()}") | |
| logging.info(f"Status: {'β SUCCESS' if success else 'β FAILED'}") | |
| logging.info("=" * 80) | |
| # Exit with appropriate code | |
| sys.exit(0 if success else 1) | |
| if __name__ == "__main__": | |
| main() | |