voiceCal-ai-v3 / scripts /refresh_token_only.py
pgits's picture
FEAT: Add privacy policy and terms of service for Google OAuth verification
aca490b
#!/usr/bin/env python3
"""
Google OAuth Token Refresh Script for VoiceCal.ai
This script automatically refreshes Google OAuth access tokens and updates:
1. Local .env file
2. HuggingFace Spaces secrets
3. Credentials JSON file (if exists)
Designed for cron job execution with logging and email notifications.
Usage:
python3 scripts/refresh_token_only.py [--notify-email EMAIL] [--verbose]
"""
import requests
import json
import os
import sys
import logging
import argparse
from datetime import datetime, timedelta, timezone
from pathlib import Path
def setup_logging(verbose=False):
"""Setup logging configuration."""
# Create logs directory if it doesn't exist
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
# Create timestamped log file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = log_dir / f"token_refresh_{timestamp}.log"
# Configure logging
log_format = "%(asctime)s | %(levelname)s | %(message)s"
log_level = logging.DEBUG if verbose else logging.INFO
# Log to both file and console
logging.basicConfig(
level=log_level,
format=log_format,
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler(sys.stdout)
]
)
return log_file
def load_credentials():
"""Load OAuth credentials from environment variables."""
logging.info("πŸ” Loading OAuth credentials...")
# Load from .env file if it exists
env_path = Path(".env")
if env_path.exists():
logging.info("πŸ“ Loading from environment variables")
from dotenv import load_dotenv
load_dotenv()
refresh_token = os.environ.get('GOOGLE_REFRESH_TOKEN')
client_id = os.environ.get('GOOGLE_CLIENT_ID')
client_secret = os.environ.get('GOOGLE_CLIENT_SECRET')
if not all([refresh_token, client_id, client_secret]):
logging.error("❌ Missing required environment variables:")
logging.error(f" GOOGLE_REFRESH_TOKEN: {'βœ“' if refresh_token else 'βœ—'}")
logging.error(f" GOOGLE_CLIENT_ID: {'βœ“' if client_id else 'βœ—'}")
logging.error(f" GOOGLE_CLIENT_SECRET: {'βœ“' if client_secret else 'βœ—'}")
raise ValueError("Missing required environment variables")
logging.info("βœ… Credentials loaded")
logging.info(f" Client ID: {client_id[:20]}...")
logging.info(f" Refresh Token: {refresh_token[:20]}...")
return {
'refresh_token': refresh_token,
'client_id': client_id,
'client_secret': client_secret
}
def refresh_access_token(credentials):
"""Refresh Google OAuth access token."""
logging.info("πŸ”„ Refreshing access token...")
data = {
'client_id': credentials['client_id'],
'client_secret': credentials['client_secret'],
'refresh_token': credentials['refresh_token'],
'grant_type': 'refresh_token'
}
try:
response = requests.post('https://oauth2.googleapis.com/token', data=data, timeout=30)
if response.status_code == 200:
tokens = response.json()
access_token = tokens['access_token']
expires_in = tokens.get('expires_in', 3600)
logging.info("βœ… Access token refreshed successfully!")
logging.info(f" New token: {access_token[:20]}...")
logging.info(f" Expires in: {expires_in} seconds ({expires_in/3600:.1f} hours)")
return {
'access_token': access_token,
'expires_in': expires_in
}
else:
error_msg = f"Token has been expired or revoked." if response.status_code == 400 else response.text
logging.error(f"❌ Failed to refresh token: {error_msg}")
logging.error(f" Response: {response.text if response.text else 'No response'}")
raise Exception(f"Failed to refresh token: {error_msg}")
except requests.exceptions.RequestException as e:
logging.error(f"❌ Network error during token refresh: {e}")
raise
except Exception as e:
logging.error(f"❌ Token refresh failed: {e}")
raise
def update_env_file(access_token, expiry_seconds):
"""Update .env file with new access token."""
logging.info("πŸ’Ύ Updating .env file...")
env_path = Path(".env")
if not env_path.exists():
logging.warning("⚠️ .env file not found")
return False
# Calculate expiry time using timezone-aware datetime
expiry_time = datetime.now(timezone.utc) + timedelta(seconds=expiry_seconds)
# Read current .env file
with open(env_path, 'r') as f:
lines = f.readlines()
# Update token lines
updated_lines = []
found_access_token = False
found_token_expiry = False
for line in lines:
if line.startswith('GOOGLE_ACCESS_TOKEN='):
updated_lines.append(f'GOOGLE_ACCESS_TOKEN={access_token}\n')
found_access_token = True
elif line.startswith('GOOGLE_TOKEN_EXPIRY='):
updated_lines.append(f'GOOGLE_TOKEN_EXPIRY={expiry_time.isoformat()}\n')
found_token_expiry = True
else:
updated_lines.append(line)
# Add lines if they don't exist
if not found_access_token:
updated_lines.append(f'GOOGLE_ACCESS_TOKEN={access_token}\n')
if not found_token_expiry:
updated_lines.append(f'GOOGLE_TOKEN_EXPIRY={expiry_time.isoformat()}\n')
# Write back to .env file
with open(env_path, 'w') as f:
f.writelines(updated_lines)
logging.info("βœ… .env file updated")
return True
def update_credentials_file(access_token, expiry_seconds, credentials):
"""Update Google Calendar credentials JSON file."""
logging.info("πŸ“ Updating credentials file...")
creds_path = Path("app/credentials/google_calendar_credentials.json")
if not creds_path.exists():
logging.warning(f"⚠️ Credentials file not found: {creds_path}")
return False
# Calculate expiry time
expiry_time = datetime.now(timezone.utc) + timedelta(seconds=expiry_seconds)
# Load existing credentials
with open(creds_path, 'r') as f:
creds_data = json.load(f)
# Update token and expiry
creds_data['token'] = access_token
creds_data['expiry'] = expiry_time.isoformat()
# Write back to file
with open(creds_path, 'w') as f:
json.dump(creds_data, f, indent=2)
logging.info("βœ… Credentials file updated")
return True
def update_huggingface_secrets(access_token, expiry_seconds):
"""Update HuggingFace Spaces secrets with new OAuth tokens."""
logging.info("☁️ Updating HuggingFace Spaces secrets...")
try:
from huggingface_hub import HfApi
# Get HF token from environment
hf_token = os.environ.get("HF_TOKEN")
if not hf_token:
logging.warning("⚠️ No HF_TOKEN found - skipping HuggingFace Secrets update")
return False
# Initialize HF API
api = HfApi(token=hf_token)
# Your space repository ID
repo_id = "pgits/voiceCal-ai-v3"
# Calculate expiry time
expiry_time = datetime.now(timezone.utc) + timedelta(seconds=expiry_seconds)
# Update access token secret
api.add_space_secret(
repo_id=repo_id,
key="GOOGLE_ACCESS_TOKEN",
value=access_token
)
logging.info("βœ… Updated GOOGLE_ACCESS_TOKEN in HF Secrets")
# Update expiry as ISO string
api.add_space_secret(
repo_id=repo_id,
key="GOOGLE_TOKEN_EXPIRY",
value=expiry_time.isoformat()
)
logging.info("βœ… Updated GOOGLE_TOKEN_EXPIRY in HF Secrets")
logging.info("πŸŽ‰ HuggingFace Secrets updated successfully!")
return True
except ImportError:
logging.warning("⚠️ huggingface_hub not installed - skipping HF Secrets update")
return False
except Exception as e:
logging.error(f"❌ Failed to update HuggingFace Secrets: {e}")
return False
def send_email_notification(recipient_email, subject, body):
"""Send email notification using SMTP."""
try:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
smtp_host = os.environ.get("SMTP_HOST", "smtp.gmail.com")
smtp_port = int(os.environ.get("SMTP_PORT", "587"))
smtp_username = os.environ.get("SMTP_USERNAME")
smtp_password = os.environ.get("SMTP_PASSWORD")
if not all([smtp_username, smtp_password]):
logging.warning("⚠️ SMTP credentials not configured - skipping email notification")
return False
msg = MIMEMultipart()
msg['From'] = smtp_username
msg['To'] = recipient_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
with smtplib.SMTP(smtp_host, smtp_port) as server:
server.starttls()
server.login(smtp_username, smtp_password)
server.send_message(msg)
logging.info(f"βœ… Email notification sent to {recipient_email}")
return True
except Exception as e:
logging.error(f"❌ Failed to send email notification: {e}")
return False
def main():
"""Main execution function."""
parser = argparse.ArgumentParser(description='Refresh Google OAuth tokens')
parser.add_argument('--notify-email', help='Email address for failure notifications')
parser.add_argument('--verbose', action='store_true', help='Enable verbose logging')
args = parser.parse_args()
# Setup logging
log_file = setup_logging(args.verbose)
logging.info("=" * 80)
logging.info("VoiceCal.ai OAuth Token Refresh Script")
logging.info(f"Started at: {datetime.now(timezone.utc).isoformat()}")
logging.info(f"Log file: {log_file}")
logging.info("=" * 80)
success = False
error_message = None
try:
# Load credentials
credentials = load_credentials()
# Refresh access token
tokens = refresh_access_token(credentials)
# Update .env file
update_env_file(tokens['access_token'], tokens['expires_in'])
# Update credentials file (optional)
update_credentials_file(tokens['access_token'], tokens['expires_in'], credentials)
# Update HuggingFace Spaces secrets
update_huggingface_secrets(tokens['access_token'], tokens['expires_in'])
success = True
except Exception as e:
error_message = str(e)
logging.error(f"❌ Token refresh failed: {error_message}")
# Send failure notification if email provided
if args.notify_email:
logging.info(f"πŸ“§ Sending failure notification to {args.notify_email}...")
subject = "⚠️ VoiceCal.ai OAuth Token Refresh Failed"
body = f"""
VoiceCal.ai OAuth token refresh failed.
Error: {error_message}
Timestamp: {datetime.now(timezone.utc).isoformat()}
Log file: {log_file}
Please re-authenticate by visiting:
http://localhost:8080/auth/login
-- VoiceCal.ai Automated Token Refresh
""".strip()
send_email_notification(args.notify_email, subject, body)
# Log completion status
logging.info("=" * 80)
logging.info(f"Completed at: {datetime.now(timezone.utc).isoformat()}")
logging.info(f"Status: {'βœ… SUCCESS' if success else '❌ FAILED'}")
logging.info("=" * 80)
# Exit with appropriate code
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()