nikhil_staging / src /data /sources /gmail_source.py
nsthorat's picture
Push
e4f9cbe
raw
history blame
No virus
7.69 kB
"""Gmail source."""
import base64
import dataclasses
import os.path
import random
import re
from datetime import datetime
from time import sleep
from typing import Any, Iterable, Optional
from email_reply_parser import EmailReplyParser
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from pydantic import Field as PydanticField
from typing_extensions import override
from ...config import data_path
from ...schema import Item, field
from ...utils import log
from .source import Source, SourceSchema
# If modifying these scopes, delete the token json file.
_SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
_GMAIL_CONFIG_DIR = os.path.join(data_path(), '.gmail')
_TOKEN_FILENAME = 'token.json'
_CREDS_FILENAME = 'credentials.json'
_NUM_RETRIES = 10
_MAX_NUM_THREADS = 30_000
_UNWRAP_PATTERN = re.compile(r'(\S)\n(\S)')
HTTP_PATTERN = re.compile(r'https?://[^\s]+')
class Gmail(Source):
"""Connects to your Gmail and loads the text of your emails.
**One time setup**
Download the OAuth credentials file from the
[Google Cloud Console](https://console.cloud.google.com/apis/credentials) and save it to the
correct location. See
[guide](https://developers.google.com/gmail/api/quickstart/python#authorize_credentials_for_a_desktop_application)
for details.
"""
name = 'gmail'
credentials_file = PydanticField(
description='Path to the OAuth credentials file.',
default=os.path.join(_GMAIL_CONFIG_DIR, _CREDS_FILENAME))
_creds: Optional[Credentials] = None
class Config:
# Language is required even though it has a default value.
schema_extra = {'required': ['credentials_file']}
@override
def setup(self) -> None:
# The token file stores the user's access and refresh tokens, and is created automatically when
# the authorization flow completes for the first time.
token_filepath = os.path.join(_GMAIL_CONFIG_DIR, _TOKEN_FILENAME)
if os.path.exists(token_filepath):
self._creds = Credentials.from_authorized_user_file(token_filepath, _SCOPES)
# If there are no (valid) credentials available, let the user log in.
if not self._creds or not self._creds.valid:
if self._creds and self._creds.expired and self._creds.refresh_token:
self._creds.refresh(Request())
else:
if not os.path.exists(self.credentials_file):
raise ValueError(
f'Could not find the OAuth credentials file at "{self.credentials_file}". Make sure to '
'download it from the Google Cloud Console and save it to the correct location.')
flow = InstalledAppFlow.from_client_secrets_file(self.credentials_file, _SCOPES)
self._creds = flow.run_local_server()
os.makedirs(os.path.dirname(token_filepath), exist_ok=True)
# Save the token for the next run.
with open(token_filepath, 'w') as token:
token.write(self._creds.to_json())
@override
def source_schema(self) -> SourceSchema:
return SourceSchema(
fields={
'body': field('string'),
'snippet': field('string'),
'dates': field(fields=['string']),
'subject': field('string'),
})
@override
def process(self) -> Iterable[Item]:
# Call the Gmail API
service = build('gmail', 'v1', credentials=self._creds)
# threads.list API
threads_resource = service.users().threads()
thread_batch: list[Item] = []
retry_batch: set[str] = set()
num_retries = 0
num_threads_fetched = 0
def _thread_fetched(request_id: str, response: Any, exception: Optional[HttpError]) -> None:
if exception is not None:
retry_batch.add(request_id)
return
replies: list[str] = []
dates: list[str] = []
snippets: list[str] = []
subject: Optional[str] = None
for msg in response['messages']:
epoch_sec = int(msg['internalDate']) / 1000.
date = datetime.fromtimestamp(epoch_sec).strftime('%Y-%m-%d %H:%M:%S')
dates.append(date)
if 'snippet' in msg:
snippets.append(msg['snippet'])
email_info = _parse_payload(msg['payload'])
subject = subject or email_info.subject
parsed_parts: list[str] = []
for body in email_info.parts:
if not body:
continue
text = base64.urlsafe_b64decode(body).decode('utf-8')
text = EmailReplyParser.parse_reply(text)
# Unwrap text.
text = _UNWRAP_PATTERN.sub('\\1 \\2', text)
# Remove URLs.
text = HTTP_PATTERN.sub('', text)
if text:
parsed_parts.append(text)
if email_info.sender and parsed_parts:
parsed_parts = [
f'--------------------{email_info.sender}--------------------', *parsed_parts
]
if parsed_parts:
replies.append('\n'.join(parsed_parts))
if replies:
thread_batch.append({
'body': '\n\n'.join(replies),
'snippet': '\n'.join(snippets) if snippets else None,
'dates': dates,
'subject': subject,
})
if request_id in retry_batch:
retry_batch.remove(request_id)
# First request.
thread_list_req = threads_resource.list(userId='me', includeSpamTrash=False) or None
thread_list = thread_list_req.execute(num_retries=_NUM_RETRIES) if thread_list_req else None
while (num_threads_fetched < _MAX_NUM_THREADS and thread_list and thread_list_req):
batch = service.new_batch_http_request(callback=_thread_fetched)
for gmail_thread in thread_list['threads']:
thread_id = gmail_thread['id']
if not retry_batch or (thread_id in retry_batch):
batch.add(
service.users().threads().get(userId='me', id=thread_id, format='full'),
request_id=thread_id)
batch.execute()
num_threads_fetched += len(thread_batch)
yield from thread_batch
thread_batch = []
if retry_batch:
log(f'Failed to fetch {len(retry_batch)} threads. Retrying...')
timeout = 2**(num_retries - 1) + random.uniform(0, 1)
sleep(timeout)
num_retries += 1
else:
retry_batch = set()
num_retries = 0
# Fetch next page.
thread_list_req = threads_resource.list_next(thread_list_req, thread_list)
thread_list = thread_list_req.execute(num_retries=_NUM_RETRIES) if thread_list_req else None
@dataclasses.dataclass
class EmailInfo:
"""Stores parsed information about an email."""
sender: Optional[str] = None
subject: Optional[str] = None
parts: list[bytes] = dataclasses.field(default_factory=list)
def _get_header(payload: Any, name: str) -> Optional[str]:
if 'headers' not in payload:
return None
values = [h['value'] for h in payload['headers'] if h['name'].lower().strip() == name]
return values[0] if values else None
def _parse_payload(payload: Any) -> EmailInfo:
sender = _get_header(payload, 'from')
subject = _get_header(payload, 'subject')
parts: list[bytes] = []
# Process the message body.
if 'mimeType' in payload and 'text/plain' in payload['mimeType']:
if 'body' in payload and 'data' in payload['body']:
parts.append(payload['body']['data'].encode('ascii'))
# Process the message parts.
for part in payload.get('parts', []):
email_info = _parse_payload(part)
sender = sender or email_info.sender
subject = subject or email_info.subject
parts.extend(email_info.parts)
return EmailInfo(sender, subject, parts)