Spaces:
Runtime error
Runtime error
File size: 8,190 Bytes
e4f9cbe 815971e e4f9cbe 815971e e4f9cbe 815971e e4f9cbe 815971e e4f9cbe 815971e e4f9cbe |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
"""Gmail source."""
import base64
import dataclasses
import os.path
import random
import re
from datetime import datetime
from time import sleep
from typing import TYPE_CHECKING, Any, Iterable, Optional
from pydantic import Field as PydanticField
from typing_extensions import override
from ...config import data_path
from ...schema import Item, field
from ...utils import log
from .source import Source, SourceSchema
if TYPE_CHECKING:
from google.oauth2.credentials import Credentials
# If modifying these scopes, delete the token json file.
_SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
_GMAIL_CONFIG_DIR = os.path.join(data_path(), '.gmail')
_TOKEN_FILENAME = 'token.json'
_CREDS_FILENAME = 'credentials.json'
_NUM_RETRIES = 10
_MAX_NUM_THREADS = 30_000
_UNWRAP_PATTERN = re.compile(r'(\S)\n(\S)')
HTTP_PATTERN = re.compile(r'https?://[^\s]+')
class Gmail(Source):
"""Connects to your Gmail and loads the text of your emails.
**One time setup**
Download the OAuth credentials file from the
[Google Cloud Console](https://console.cloud.google.com/apis/credentials) and save it to the
correct location. See
[guide](https://developers.google.com/gmail/api/quickstart/python#authorize_credentials_for_a_desktop_application)
for details.
"""
name = 'gmail'
credentials_file = PydanticField(
description='Path to the OAuth credentials file.',
default=os.path.join(_GMAIL_CONFIG_DIR, _CREDS_FILENAME))
_creds: Optional['Credentials'] = None
class Config:
# Language is required even though it has a default value.
schema_extra = {'required': ['credentials_file']}
@override
def setup(self) -> None:
try:
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
except ImportError:
raise ImportError('Could not import dependencies for the "gmail" source. '
'Please install with pip install lilac[gmail]')
# The token file stores the user's access and refresh tokens, and is created automatically when
# the authorization flow completes for the first time.
token_filepath = os.path.join(_GMAIL_CONFIG_DIR, _TOKEN_FILENAME)
if os.path.exists(token_filepath):
self._creds = Credentials.from_authorized_user_file(token_filepath, _SCOPES)
# If there are no (valid) credentials available, let the user log in.
if not self._creds or not self._creds.valid:
if self._creds and self._creds.expired and self._creds.refresh_token:
self._creds.refresh(Request())
else:
if not os.path.exists(self.credentials_file):
raise ValueError(
f'Could not find the OAuth credentials file at "{self.credentials_file}". Make sure to '
'download it from the Google Cloud Console and save it to the correct location.')
flow = InstalledAppFlow.from_client_secrets_file(self.credentials_file, _SCOPES)
self._creds = flow.run_local_server()
os.makedirs(os.path.dirname(token_filepath), exist_ok=True)
# Save the token for the next run.
with open(token_filepath, 'w') as token:
token.write(self._creds.to_json())
@override
def source_schema(self) -> SourceSchema:
return SourceSchema(
fields={
'body': field('string'),
'snippet': field('string'),
'dates': field(fields=['string']),
'subject': field('string'),
})
@override
def process(self) -> Iterable[Item]:
try:
from email_reply_parser import EmailReplyParser
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
except ImportError:
raise ImportError('Could not import dependencies for the "gmail" source. '
'Please install with pip install lilac[gmail]')
# Call the Gmail API
service = build('gmail', 'v1', credentials=self._creds)
# threads.list API
threads_resource = service.users().threads()
thread_batch: list[Item] = []
retry_batch: set[str] = set()
num_retries = 0
num_threads_fetched = 0
def _thread_fetched(request_id: str, response: Any, exception: Optional[HttpError]) -> None:
if exception is not None:
retry_batch.add(request_id)
return
replies: list[str] = []
dates: list[str] = []
snippets: list[str] = []
subject: Optional[str] = None
for msg in response['messages']:
epoch_sec = int(msg['internalDate']) / 1000.
date = datetime.fromtimestamp(epoch_sec).strftime('%Y-%m-%d %H:%M:%S')
dates.append(date)
if 'snippet' in msg:
snippets.append(msg['snippet'])
email_info = _parse_payload(msg['payload'])
subject = subject or email_info.subject
parsed_parts: list[str] = []
for body in email_info.parts:
if not body:
continue
text = base64.urlsafe_b64decode(body).decode('utf-8')
text = EmailReplyParser.parse_reply(text)
# Unwrap text.
text = _UNWRAP_PATTERN.sub('\\1 \\2', text)
# Remove URLs.
text = HTTP_PATTERN.sub('', text)
if text:
parsed_parts.append(text)
if email_info.sender and parsed_parts:
parsed_parts = [
f'--------------------{email_info.sender}--------------------', *parsed_parts
]
if parsed_parts:
replies.append('\n'.join(parsed_parts))
if replies:
thread_batch.append({
'body': '\n\n'.join(replies),
'snippet': '\n'.join(snippets) if snippets else None,
'dates': dates,
'subject': subject,
})
if request_id in retry_batch:
retry_batch.remove(request_id)
# First request.
thread_list_req = threads_resource.list(userId='me', includeSpamTrash=False) or None
thread_list = thread_list_req.execute(num_retries=_NUM_RETRIES) if thread_list_req else None
while (num_threads_fetched < _MAX_NUM_THREADS and thread_list and thread_list_req):
batch = service.new_batch_http_request(callback=_thread_fetched)
for gmail_thread in thread_list['threads']:
thread_id = gmail_thread['id']
if not retry_batch or (thread_id in retry_batch):
batch.add(
service.users().threads().get(userId='me', id=thread_id, format='full'),
request_id=thread_id)
batch.execute()
num_threads_fetched += len(thread_batch)
yield from thread_batch
thread_batch = []
if retry_batch:
log(f'Failed to fetch {len(retry_batch)} threads. Retrying...')
timeout = 2**(num_retries - 1) + random.uniform(0, 1)
sleep(timeout)
num_retries += 1
else:
retry_batch = set()
num_retries = 0
# Fetch next page.
thread_list_req = threads_resource.list_next(thread_list_req, thread_list)
thread_list = thread_list_req.execute(num_retries=_NUM_RETRIES) if thread_list_req else None
@dataclasses.dataclass
class EmailInfo:
"""Stores parsed information about an email."""
sender: Optional[str] = None
subject: Optional[str] = None
parts: list[bytes] = dataclasses.field(default_factory=list)
def _get_header(payload: Any, name: str) -> Optional[str]:
if 'headers' not in payload:
return None
values = [h['value'] for h in payload['headers'] if h['name'].lower().strip() == name]
return values[0] if values else None
def _parse_payload(payload: Any) -> EmailInfo:
sender = _get_header(payload, 'from')
subject = _get_header(payload, 'subject')
parts: list[bytes] = []
# Process the message body.
if 'mimeType' in payload and 'text/plain' in payload['mimeType']:
if 'body' in payload and 'data' in payload['body']:
parts.append(payload['body']['data'].encode('ascii'))
# Process the message parts.
for part in payload.get('parts', []):
email_info = _parse_payload(part)
sender = sender or email_info.sender
subject = subject or email_info.subject
parts.extend(email_info.parts)
return EmailInfo(sender, subject, parts)
|