File size: 7,692 Bytes
e4f9cbe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
"""Gmail source."""
import base64
import dataclasses
import os.path
import random
import re
from datetime import datetime
from time import sleep
from typing import Any, Iterable, Optional

from email_reply_parser import EmailReplyParser
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from pydantic import Field as PydanticField
from typing_extensions import override

from ...config import data_path
from ...schema import Item, field
from ...utils import log
from .source import Source, SourceSchema

# If modifying these scopes, delete the token json file.
_SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
_GMAIL_CONFIG_DIR = os.path.join(data_path(), '.gmail')
_TOKEN_FILENAME = 'token.json'
_CREDS_FILENAME = 'credentials.json'
_NUM_RETRIES = 10
_MAX_NUM_THREADS = 30_000

_UNWRAP_PATTERN = re.compile(r'(\S)\n(\S)')
HTTP_PATTERN = re.compile(r'https?://[^\s]+')


class Gmail(Source):
  """Connects to your Gmail and loads the text of your emails.

  **One time setup**

  Download the OAuth credentials file from the
  [Google Cloud Console](https://console.cloud.google.com/apis/credentials) and save it to the
  correct location. See
  [guide](https://developers.google.com/gmail/api/quickstart/python#authorize_credentials_for_a_desktop_application)
  for details.
  """

  name = 'gmail'

  credentials_file = PydanticField(
    description='Path to the OAuth credentials file.',
    default=os.path.join(_GMAIL_CONFIG_DIR, _CREDS_FILENAME))

  _creds: Optional[Credentials] = None

  class Config:
    # Language is required even though it has a default value.
    schema_extra = {'required': ['credentials_file']}

  @override
  def setup(self) -> None:
    # The token file stores the user's access and refresh tokens, and is created automatically when
    # the authorization flow completes for the first time.
    token_filepath = os.path.join(_GMAIL_CONFIG_DIR, _TOKEN_FILENAME)
    if os.path.exists(token_filepath):
      self._creds = Credentials.from_authorized_user_file(token_filepath, _SCOPES)
    # If there are no (valid) credentials available, let the user log in.
    if not self._creds or not self._creds.valid:
      if self._creds and self._creds.expired and self._creds.refresh_token:
        self._creds.refresh(Request())
      else:
        if not os.path.exists(self.credentials_file):
          raise ValueError(
            f'Could not find the OAuth credentials file at "{self.credentials_file}". Make sure to '
            'download it from the Google Cloud Console and save it to the correct location.')
        flow = InstalledAppFlow.from_client_secrets_file(self.credentials_file, _SCOPES)
        self._creds = flow.run_local_server()

      os.makedirs(os.path.dirname(token_filepath), exist_ok=True)
      # Save the token for the next run.
      with open(token_filepath, 'w') as token:
        token.write(self._creds.to_json())

  @override
  def source_schema(self) -> SourceSchema:
    return SourceSchema(
      fields={
        'body': field('string'),
        'snippet': field('string'),
        'dates': field(fields=['string']),
        'subject': field('string'),
      })

  @override
  def process(self) -> Iterable[Item]:
    # Call the Gmail API
    service = build('gmail', 'v1', credentials=self._creds)

    # threads.list API
    threads_resource = service.users().threads()

    thread_batch: list[Item] = []
    retry_batch: set[str] = set()
    num_retries = 0
    num_threads_fetched = 0

    def _thread_fetched(request_id: str, response: Any, exception: Optional[HttpError]) -> None:
      if exception is not None:
        retry_batch.add(request_id)
        return

      replies: list[str] = []
      dates: list[str] = []
      snippets: list[str] = []
      subject: Optional[str] = None

      for msg in response['messages']:
        epoch_sec = int(msg['internalDate']) / 1000.
        date = datetime.fromtimestamp(epoch_sec).strftime('%Y-%m-%d %H:%M:%S')
        dates.append(date)
        if 'snippet' in msg:
          snippets.append(msg['snippet'])
        email_info = _parse_payload(msg['payload'])
        subject = subject or email_info.subject
        parsed_parts: list[str] = []
        for body in email_info.parts:
          if not body:
            continue
          text = base64.urlsafe_b64decode(body).decode('utf-8')
          text = EmailReplyParser.parse_reply(text)
          # Unwrap text.
          text = _UNWRAP_PATTERN.sub('\\1 \\2', text)
          # Remove URLs.
          text = HTTP_PATTERN.sub('', text)

          if text:
            parsed_parts.append(text)
        if email_info.sender and parsed_parts:
          parsed_parts = [
            f'--------------------{email_info.sender}--------------------', *parsed_parts
          ]
        if parsed_parts:
          replies.append('\n'.join(parsed_parts))

      if replies:
        thread_batch.append({
          'body': '\n\n'.join(replies),
          'snippet': '\n'.join(snippets) if snippets else None,
          'dates': dates,
          'subject': subject,
        })
      if request_id in retry_batch:
        retry_batch.remove(request_id)

    # First request.
    thread_list_req = threads_resource.list(userId='me', includeSpamTrash=False) or None
    thread_list = thread_list_req.execute(num_retries=_NUM_RETRIES) if thread_list_req else None

    while (num_threads_fetched < _MAX_NUM_THREADS and thread_list and thread_list_req):
      batch = service.new_batch_http_request(callback=_thread_fetched)

      for gmail_thread in thread_list['threads']:
        thread_id = gmail_thread['id']
        if not retry_batch or (thread_id in retry_batch):
          batch.add(
            service.users().threads().get(userId='me', id=thread_id, format='full'),
            request_id=thread_id)

      batch.execute()
      num_threads_fetched += len(thread_batch)
      yield from thread_batch
      thread_batch = []

      if retry_batch:
        log(f'Failed to fetch {len(retry_batch)} threads. Retrying...')
        timeout = 2**(num_retries - 1) + random.uniform(0, 1)
        sleep(timeout)
        num_retries += 1
      else:
        retry_batch = set()
        num_retries = 0
        # Fetch next page.
        thread_list_req = threads_resource.list_next(thread_list_req, thread_list)
        thread_list = thread_list_req.execute(num_retries=_NUM_RETRIES) if thread_list_req else None


@dataclasses.dataclass
class EmailInfo:
  """Stores parsed information about an email."""
  sender: Optional[str] = None
  subject: Optional[str] = None
  parts: list[bytes] = dataclasses.field(default_factory=list)


def _get_header(payload: Any, name: str) -> Optional[str]:
  if 'headers' not in payload:
    return None
  values = [h['value'] for h in payload['headers'] if h['name'].lower().strip() == name]
  return values[0] if values else None


def _parse_payload(payload: Any) -> EmailInfo:
  sender = _get_header(payload, 'from')
  subject = _get_header(payload, 'subject')
  parts: list[bytes] = []

  # Process the message body.
  if 'mimeType' in payload and 'text/plain' in payload['mimeType']:
    if 'body' in payload and 'data' in payload['body']:
      parts.append(payload['body']['data'].encode('ascii'))

  # Process the message parts.
  for part in payload.get('parts', []):
    email_info = _parse_payload(part)
    sender = sender or email_info.sender
    subject = subject or email_info.subject
    parts.extend(email_info.parts)

  return EmailInfo(sender, subject, parts)