| """ |
| π§π
Google Integration for Olivia |
| Provides Gmail and Calendar access for the household manager persona. |
| |
| SETUP REQUIRED: |
| 1. Go to https://console.cloud.google.com/ |
| 2. Create a new project (or use existing) |
| 3. Enable Gmail API and Google Calendar API |
| 4. Create OAuth 2.0 credentials (Desktop app type) |
| 5. Download credentials.json and place in this directory |
| 6. Run this file directly to authenticate: python google_services.py |
| """ |
|
|
| import os |
| import pickle |
| import datetime |
| from pathlib import Path |
|
|
| |
| try: |
| from google.auth.transport.requests import Request |
| from google.oauth2.credentials import Credentials |
| from google_auth_oauthlib.flow import InstalledAppFlow |
| from googleapiclient.discovery import build |
| from googleapiclient.errors import HttpError |
| GOOGLE_AVAILABLE = True |
| except ImportError: |
| GOOGLE_AVAILABLE = False |
| print("β οΈ Google API libraries not installed. Run: pip install google-api-python-client google-auth-httplib2 google-auth-oauthlib") |
|
|
| |
| SCOPES = [ |
| 'https://www.googleapis.com/auth/gmail.readonly', |
| 'https://www.googleapis.com/auth/gmail.send', |
| 'https://www.googleapis.com/auth/calendar.readonly', |
| 'https://www.googleapis.com/auth/calendar.events', |
| ] |
|
|
| |
| SCRIPT_DIR = Path(__file__).parent |
| CREDENTIALS_FILE = SCRIPT_DIR / 'credentials.json' |
| TOKEN_FILE = SCRIPT_DIR / 'token.pickle' |
|
|
|
|
| def get_google_credentials(): |
| """Get or refresh Google OAuth credentials.""" |
| if not GOOGLE_AVAILABLE: |
| return None |
| |
| creds = None |
| |
| |
| if TOKEN_FILE.exists(): |
| with open(TOKEN_FILE, 'rb') as token: |
| creds = pickle.load(token) |
| |
| |
| if not creds or not creds.valid: |
| if creds and creds.expired and creds.refresh_token: |
| creds.refresh(Request()) |
| else: |
| if not CREDENTIALS_FILE.exists(): |
| print(f"β οΈ Missing {CREDENTIALS_FILE}") |
| print(" Download from Google Cloud Console β APIs & Services β Credentials") |
| return None |
| |
| flow = InstalledAppFlow.from_client_secrets_file(str(CREDENTIALS_FILE), SCOPES) |
| creds = flow.run_local_server(port=0) |
| |
| |
| with open(TOKEN_FILE, 'wb') as token: |
| pickle.dump(creds, token) |
| |
| return creds |
|
|
|
|
| class GmailService: |
| """Gmail integration for Olivia.""" |
| |
| def __init__(self): |
| self.service = None |
| self._connect() |
| |
| def _connect(self): |
| """Connect to Gmail API.""" |
| creds = get_google_credentials() |
| if creds: |
| try: |
| self.service = build('gmail', 'v1', credentials=creds) |
| except Exception as e: |
| print(f"β οΈ Gmail connection failed: {e}") |
| |
| def get_unread_count(self): |
| """Get count of unread emails.""" |
| if not self.service: |
| return None |
| try: |
| results = self.service.users().messages().list( |
| userId='me', q='is:unread', maxResults=1 |
| ).execute() |
| return results.get('resultSizeEstimate', 0) |
| except HttpError as e: |
| print(f"β οΈ Gmail error: {e}") |
| return None |
| |
| def get_recent_emails(self, max_results=5): |
| """Get recent email summaries.""" |
| if not self.service: |
| return [] |
| |
| try: |
| results = self.service.users().messages().list( |
| userId='me', maxResults=max_results |
| ).execute() |
| |
| messages = results.get('messages', []) |
| emails = [] |
| |
| for msg in messages: |
| msg_data = self.service.users().messages().get( |
| userId='me', id=msg['id'], format='metadata', |
| metadataHeaders=['From', 'Subject', 'Date'] |
| ).execute() |
| |
| headers = {h['name']: h['value'] for h in msg_data.get('payload', {}).get('headers', [])} |
| emails.append({ |
| 'id': msg['id'], |
| 'from': headers.get('From', 'Unknown'), |
| 'subject': headers.get('Subject', '(no subject)'), |
| 'date': headers.get('Date', ''), |
| 'snippet': msg_data.get('snippet', '')[:100], |
| 'unread': 'UNREAD' in msg_data.get('labelIds', []) |
| }) |
| |
| return emails |
| except HttpError as e: |
| print(f"β οΈ Gmail error: {e}") |
| return [] |
| |
| def get_email_summary(self): |
| """Get a natural language summary for Olivia to speak.""" |
| unread = self.get_unread_count() |
| if unread is None: |
| return "I couldn't access your Gmail right now." |
| |
| if unread == 0: |
| return "Your inbox is all clear - no unread emails!" |
| |
| recent = self.get_recent_emails(3) |
| unread_emails = [e for e in recent if e.get('unread')] |
| |
| if unread == 1: |
| summary = f"You have 1 unread email" |
| else: |
| summary = f"You have {unread} unread emails" |
| |
| if unread_emails: |
| first = unread_emails[0] |
| sender = first['from'].split('<')[0].strip() |
| summary += f". The most recent is from {sender} about '{first['subject'][:50]}'" |
| |
| return summary + "." |
|
|
|
|
| class CalendarService: |
| """Google Calendar integration for Olivia.""" |
| |
| def __init__(self): |
| self.service = None |
| self._connect() |
| |
| def _connect(self): |
| """Connect to Calendar API.""" |
| creds = get_google_credentials() |
| if creds: |
| try: |
| self.service = build('calendar', 'v3', credentials=creds) |
| except Exception as e: |
| print(f"β οΈ Calendar connection failed: {e}") |
| |
| def _get_all_calendar_ids(self): |
| """Get all calendar IDs the user has access to.""" |
| try: |
| calendar_list = self.service.calendarList().list().execute() |
| return [cal['id'] for cal in calendar_list.get('items', [])] |
| except HttpError: |
| return ['primary'] |
| |
| def get_today_events(self): |
| """Get today's calendar events from ALL calendars.""" |
| return self.get_events_for_date(days_offset=0) |
| |
| def get_upcoming_events(self, days=7, max_results=10): |
| """Get upcoming events for the next N days from ALL calendars.""" |
| if not self.service: |
| return [] |
| |
| try: |
| now = datetime.datetime.utcnow() |
| end_date = now + datetime.timedelta(days=days) |
| |
| all_events = [] |
| calendar_ids = self._get_all_calendar_ids() |
| |
| for cal_id in calendar_ids: |
| try: |
| events_result = self.service.events().list( |
| calendarId=cal_id, |
| timeMin=now.isoformat() + 'Z', |
| timeMax=end_date.isoformat() + 'Z', |
| maxResults=max_results, |
| singleEvents=True, |
| orderBy='startTime' |
| ).execute() |
| all_events.extend(events_result.get('items', [])) |
| except HttpError: |
| continue |
| |
| |
| all_events.sort(key=lambda x: x.get('start', {}).get('dateTime', x.get('start', {}).get('date', ''))) |
| return all_events[:max_results] |
| except HttpError as e: |
| print(f"β οΈ Calendar error: {e}") |
| return [] |
| |
| def get_schedule_summary(self): |
| """Get a natural language summary for Olivia to speak.""" |
| events = self.get_today_events() |
| |
| if not events: |
| return "Your calendar is clear for today - no scheduled events!" |
| |
| if len(events) == 1: |
| summary = "You have 1 event today" |
| else: |
| summary = f"You have {len(events)} events today" |
| |
| |
| for i, event in enumerate(events[:3]): |
| start = event.get('start', '') |
| if 'T' in start: |
| |
| try: |
| time_str = datetime.datetime.fromisoformat(start.replace('Z', '+00:00')) |
| time_formatted = time_str.strftime('%I:%M %p').lstrip('0') |
| except: |
| time_formatted = start |
| else: |
| time_formatted = "all day" |
| |
| if i == 0: |
| summary += f": {event['summary']} at {time_formatted}" |
| else: |
| summary += f", then {event['summary']} at {time_formatted}" |
| |
| if len(events) > 3: |
| summary += f", and {len(events) - 3} more" |
| |
| return summary + "." |
| |
| def get_events_for_date(self, days_offset=0): |
| """Get events for a specific day (0=today, 1=tomorrow, etc.) from ALL calendars. |
| Uses local date calculation to handle timezone differences.""" |
| if not self.service: |
| return [] |
| |
| try: |
| |
| import time |
| local_now = datetime.datetime.now() |
| target_date = local_now + datetime.timedelta(days=days_offset) |
| |
| |
| start_of_day = target_date.replace(hour=0, minute=0, second=0, microsecond=0) |
| end_of_day = start_of_day + datetime.timedelta(days=1) |
| |
| |
| |
| tz_offset = time.strftime('%z') |
| tz_formatted = f"{tz_offset[:3]}:{tz_offset[3:]}" if tz_offset else 'Z' |
| |
| time_min = start_of_day.strftime('%Y-%m-%dT%H:%M:%S') + tz_formatted |
| time_max = end_of_day.strftime('%Y-%m-%dT%H:%M:%S') + tz_formatted |
| |
| all_events = [] |
| calendar_ids = self._get_all_calendar_ids() |
| |
| for cal_id in calendar_ids: |
| try: |
| events_result = self.service.events().list( |
| calendarId=cal_id, |
| timeMin=time_min, |
| timeMax=time_max, |
| singleEvents=True, |
| orderBy='startTime' |
| ).execute() |
| |
| for e in events_result.get('items', []): |
| all_events.append({ |
| 'summary': e.get('summary', 'Untitled'), |
| 'start': e.get('start', {}).get('dateTime', e.get('start', {}).get('date', '')), |
| 'end': e.get('end', {}).get('dateTime', e.get('end', {}).get('date', '')), |
| 'location': e.get('location', ''), |
| }) |
| except HttpError: |
| continue |
| |
| all_events.sort(key=lambda x: x.get('start', '')) |
| return all_events |
| except HttpError as e: |
| print(f"β οΈ Calendar error: {e}") |
| return [] |
| |
| def get_tomorrow_summary(self): |
| """Get a natural language summary of tomorrow's events.""" |
| events = self.get_events_for_date(days_offset=1) |
| |
| if not events: |
| return "Your calendar is clear for tomorrow - no scheduled events!" |
| |
| if len(events) == 1: |
| summary = "You have 1 event tomorrow" |
| else: |
| summary = f"You have {len(events)} events tomorrow" |
| |
| for i, event in enumerate(events[:3]): |
| start = event.get('start', '') |
| if 'T' in start: |
| try: |
| time_str = datetime.datetime.fromisoformat(start.replace('Z', '+00:00')) |
| time_formatted = time_str.strftime('%I:%M %p').lstrip('0') |
| except: |
| time_formatted = start |
| else: |
| time_formatted = "all day" |
| |
| if i == 0: |
| summary += f": {event['summary']} at {time_formatted}" |
| else: |
| summary += f", then {event['summary']} at {time_formatted}" |
| |
| if len(events) > 3: |
| summary += f", and {len(events) - 3} more" |
| |
| return summary + "." |
| |
| def create_event(self, summary, start_time, end_time, description="", location=""): |
| """Create a new calendar event. |
| |
| Args: |
| summary: Event title |
| start_time: datetime object for start |
| end_time: datetime object for end |
| description: Optional event description |
| location: Optional location |
| |
| Returns: |
| Created event dict or None if failed |
| """ |
| if not self.service: |
| return None |
| |
| try: |
| import time |
| tz_offset = time.strftime('%z') |
| tz_formatted = f"{tz_offset[:3]}:{tz_offset[3:]}" if tz_offset else '-07:00' |
| |
| event = { |
| 'summary': summary, |
| 'location': location, |
| 'description': description, |
| 'start': { |
| 'dateTime': start_time.strftime('%Y-%m-%dT%H:%M:%S') + tz_formatted, |
| 'timeZone': 'America/Denver', |
| }, |
| 'end': { |
| 'dateTime': end_time.strftime('%Y-%m-%dT%H:%M:%S') + tz_formatted, |
| 'timeZone': 'America/Denver', |
| }, |
| } |
| |
| created_event = self.service.events().insert( |
| calendarId='primary', |
| body=event |
| ).execute() |
| |
| print(f"β
Created event: {created_event.get('summary')}") |
| return created_event |
| except HttpError as e: |
| print(f"β οΈ Failed to create event: {e}") |
| return None |
| |
| def delete_event(self, event_id): |
| """Delete a calendar event by ID.""" |
| if not self.service: |
| return False |
| |
| try: |
| self.service.events().delete( |
| calendarId='primary', |
| eventId=event_id |
| ).execute() |
| print(f"β
Deleted event: {event_id}") |
| return True |
| except HttpError as e: |
| print(f"β οΈ Failed to delete event: {e}") |
| return False |
| |
| def create_all_day_event(self, summary, date, recurrence=None, description=""): |
| """Create an all-day event (like birthdays, holidays). |
| |
| Args: |
| summary: Event title |
| date: datetime object for the date |
| recurrence: Optional - 'yearly', 'monthly', 'weekly', or None |
| description: Optional description |
| |
| Returns: |
| Created event dict or None if failed |
| """ |
| if not self.service: |
| return None |
| |
| try: |
| event = { |
| 'summary': summary, |
| 'description': description, |
| 'start': { |
| 'date': date.strftime('%Y-%m-%d'), |
| }, |
| 'end': { |
| 'date': date.strftime('%Y-%m-%d'), |
| }, |
| } |
| |
| |
| if recurrence: |
| rrule_map = { |
| 'yearly': 'RRULE:FREQ=YEARLY', |
| 'annually': 'RRULE:FREQ=YEARLY', |
| 'monthly': 'RRULE:FREQ=MONTHLY', |
| 'weekly': 'RRULE:FREQ=WEEKLY', |
| 'daily': 'RRULE:FREQ=DAILY', |
| } |
| if recurrence.lower() in rrule_map: |
| event['recurrence'] = [rrule_map[recurrence.lower()]] |
| |
| created_event = self.service.events().insert( |
| calendarId='primary', |
| body=event |
| ).execute() |
| |
| print(f"β
Created all-day event: {created_event.get('summary')}") |
| return created_event |
| except HttpError as e: |
| print(f"β οΈ Failed to create all-day event: {e}") |
| return None |
| |
| def find_events_by_name(self, search_term, days_ahead=30): |
| """Find events by name/summary. |
| |
| Args: |
| search_term: Text to search for in event titles |
| days_ahead: How many days ahead to search (default 30) |
| |
| Returns: |
| List of matching events with id, summary, start, end, location |
| """ |
| if not self.service: |
| return [] |
| |
| try: |
| import time |
| local_now = datetime.datetime.now() |
| time_min = local_now |
| time_max = local_now + datetime.timedelta(days=days_ahead) |
| |
| tz_offset = time.strftime('%z') |
| tz_formatted = f"{tz_offset[:3]}:{tz_offset[3:]}" if tz_offset else '-07:00' |
| |
| events_result = self.service.events().list( |
| calendarId='primary', |
| timeMin=time_min.strftime('%Y-%m-%dT%H:%M:%S') + tz_formatted, |
| timeMax=time_max.strftime('%Y-%m-%dT%H:%M:%S') + tz_formatted, |
| singleEvents=True, |
| orderBy='startTime', |
| q=search_term |
| ).execute() |
| |
| matches = [] |
| for e in events_result.get('items', []): |
| matches.append({ |
| 'id': e.get('id'), |
| 'summary': e.get('summary', 'Untitled'), |
| 'start': e.get('start', {}).get('dateTime', e.get('start', {}).get('date', '')), |
| 'end': e.get('end', {}).get('dateTime', e.get('end', {}).get('date', '')), |
| 'location': e.get('location', ''), |
| }) |
| |
| return matches |
| except HttpError as e: |
| print(f"β οΈ Failed to search events: {e}") |
| return [] |
| |
| def update_event(self, event_id, summary=None, location=None, description=None, start_time=None, end_time=None): |
| """Update an existing calendar event. |
| |
| Args: |
| event_id: The event ID to update |
| summary: New title (optional) |
| location: New location (optional) |
| description: New description (optional) |
| start_time: New start datetime (optional) |
| end_time: New end datetime (optional) |
| |
| Returns: |
| Updated event dict or None if failed |
| """ |
| if not self.service: |
| return None |
| |
| try: |
| |
| event = self.service.events().get( |
| calendarId='primary', |
| eventId=event_id |
| ).execute() |
| |
| |
| if summary: |
| event['summary'] = summary |
| if location: |
| event['location'] = location |
| if description: |
| event['description'] = description |
| |
| if start_time: |
| import time |
| tz_offset = time.strftime('%z') |
| tz_formatted = f"{tz_offset[:3]}:{tz_offset[3:]}" if tz_offset else '-07:00' |
| event['start'] = { |
| 'dateTime': start_time.strftime('%Y-%m-%dT%H:%M:%S') + tz_formatted, |
| 'timeZone': 'America/Denver', |
| } |
| |
| if end_time: |
| import time |
| tz_offset = time.strftime('%z') |
| tz_formatted = f"{tz_offset[:3]}:{tz_offset[3:]}" if tz_offset else '-07:00' |
| event['end'] = { |
| 'dateTime': end_time.strftime('%Y-%m-%dT%H:%M:%S') + tz_formatted, |
| 'timeZone': 'America/Denver', |
| } |
| |
| |
| updated_event = self.service.events().update( |
| calendarId='primary', |
| eventId=event_id, |
| body=event |
| ).execute() |
| |
| print(f"β
Updated event: {updated_event.get('summary')}") |
| return updated_event |
| except HttpError as e: |
| print(f"β οΈ Failed to update event: {e}") |
| return None |
|
|
|
|
| |
| _gmail = None |
| _calendar = None |
|
|
| def get_gmail(): |
| """Get Gmail service (singleton).""" |
| global _gmail |
| if _gmail is None and GOOGLE_AVAILABLE: |
| _gmail = GmailService() |
| return _gmail |
|
|
| def get_calendar(): |
| """Get Calendar service (singleton).""" |
| global _calendar |
| if _calendar is None and GOOGLE_AVAILABLE: |
| _calendar = CalendarService() |
| return _calendar |
|
|
|
|
| |
| def get_daily_briefing(): |
| """Get a complete daily briefing for Olivia.""" |
| parts = [] |
| |
| calendar = get_calendar() |
| if calendar and calendar.service: |
| parts.append(calendar.get_schedule_summary()) |
| |
| gmail = get_gmail() |
| if gmail and gmail.service: |
| parts.append(gmail.get_email_summary()) |
| |
| if not parts: |
| return "I don't have access to your calendar or email yet. Would you like help setting that up?" |
| |
| return " ".join(parts) |
|
|
|
|
| if __name__ == "__main__": |
| """Run directly to set up authentication.""" |
| print("π Haven Google Authentication Setup") |
| print("=" * 50) |
| |
| if not GOOGLE_AVAILABLE: |
| print("\nβ Google API libraries not installed!") |
| print(" Run: pip install google-api-python-client google-auth-httplib2 google-auth-oauthlib") |
| exit(1) |
| |
| if not CREDENTIALS_FILE.exists(): |
| print(f"\nβ Missing credentials file: {CREDENTIALS_FILE}") |
| print("\nTo set up Google integration:") |
| print("1. Go to https://console.cloud.google.com/") |
| print("2. Create a project and enable Gmail & Calendar APIs") |
| print("3. Create OAuth 2.0 credentials (Desktop app)") |
| print("4. Download and save as: credentials.json") |
| print(f" Save it here: {SCRIPT_DIR}") |
| exit(1) |
| |
| print("\nπ± Opening browser for Google authentication...") |
| creds = get_google_credentials() |
| |
| if creds: |
| print("\nβ
Authentication successful!") |
| print("\nTesting services...") |
| |
| gmail = get_gmail() |
| if gmail.service: |
| print(f"π§ Gmail: {gmail.get_email_summary()}") |
| |
| calendar = get_calendar() |
| if calendar.service: |
| print(f"π
Calendar: {calendar.get_schedule_summary()}") |
| |
| print("\nπ Olivia now has access to your Gmail and Calendar!") |
| else: |
| print("\nβ Authentication failed") |
|
|