|
import requests |
|
import base64 |
|
from bs4 import BeautifulSoup |
|
import re |
|
import jwt |
|
class GmailDataExtractor: |
|
|
|
def __init__(self,jwt:str , user_input: str = None) -> None: |
|
if jwt is None : |
|
self.error = "Error" |
|
else: |
|
self.__jwt = jwt |
|
self.__user_input = user_input |
|
self.error = None |
|
self.__secret_key = 'nkldjlncbamjlklwjeklwu24898h*&#Ujnfjf34893U5HSJFBSKFSHFNSK*$*W_ 3OWU' |
|
|
|
def __validate_jwt_token(self): |
|
try: |
|
payload = jwt.decode(self.jwt, self.secret_key, algorithms=["HS256"]) |
|
access_token = payload.get("access_token") |
|
if access_token: |
|
return access_token |
|
else: |
|
raise ValueError("Invalid JWT token: Missing access token") |
|
except jwt.ExpiredSignatureError: |
|
raise ValueError("Invalid JWT token: Expired token") |
|
except jwt.InvalidTokenError: |
|
raise ValueError("Invalid JWT token: Token verification failed") |
|
|
|
def __fetch_messages(self) -> list: |
|
""" |
|
Fetches messages from the Gmail API. |
|
|
|
Args: |
|
gmail_url (str): The URL for the Gmail API request. |
|
access_token (str): The access token for authenticating with Gmail API. |
|
|
|
Returns: |
|
list: A list of message objects retrieved from the Gmail API. |
|
|
|
Raises: |
|
RuntimeError: If there is an issue while fetching messages from the Gmail API. |
|
|
|
""" |
|
|
|
"""currently not implementing jwt for testing purposes |
|
replace every access_token with jwt function directly which returns the access token""" |
|
access_token = self.__jwt |
|
print("access token") |
|
print(access_token) |
|
receipt_query = f"(label:^smartlabel_receipt OR (subject:your AND subject:order) OR subject:receipts OR subject:receipt OR subject:invoice OR subject:invoice))" |
|
if self.__user_input is not None: |
|
receipt_query = f"(label:^smartlabel_receipt OR (subject:your AND subject:order) OR subject:receipts OR subject:receipt OR subject:invoice OR subject:invoice)) AND subject:{self.__user_input}" |
|
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={receipt_query}" |
|
def __fetch_page(url): |
|
response = requests.get(url, headers={"Authorization": f"Bearer {access_token}"}) |
|
response.raise_for_status() |
|
data = response.json() |
|
return data.get("messages", []), data.get("nextPageToken") |
|
|
|
messages = [] |
|
page_token = None |
|
try: |
|
while True: |
|
url = f"{gmail_url}&pageToken={page_token}" if page_token else gmail_url |
|
page_messages, page_token = __fetch_page(url) |
|
messages.extend(page_messages) |
|
if not page_token: |
|
break |
|
except requests.RequestException as e: |
|
raise RuntimeError(f"Error fetching messages from Gmail API: {str(e)}") |
|
|
|
return messages |
|
|
|
def __fetch_message_data(self, message_id: str) -> dict: |
|
""" |
|
Fetches message data from the Gmail API. |
|
|
|
Args: |
|
message_id (str): The ID of the message to fetch. |
|
|
|
Returns: |
|
dict: Message data retrieved from the Gmail API. |
|
|
|
Raises: |
|
RuntimeError: If there is an issue while fetching message data from the Gmail API. |
|
""" |
|
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" |
|
try: |
|
response = requests.get(message_url, headers={"Authorization": f"Bearer {self.__jwt}"}) |
|
response.raise_for_status() |
|
return response.json() |
|
except requests.RequestException as e: |
|
raise RuntimeError(f"Error fetching message data from Gmail API: {str(e)}") |
|
|
|
def __fetch_attachment_data(self, message_id: str, attachment_id: str) -> dict: |
|
""" |
|
Fetches attachment data from the Gmail API. |
|
|
|
Args: |
|
message_id (str): The ID of the message containing the attachment. |
|
attachment_id (str): The ID of the attachment to fetch. |
|
|
|
Returns: |
|
dict: Attachment data retrieved from the Gmail API. |
|
|
|
Raises: |
|
RuntimeError: If there is an issue while fetching attachment data from the Gmail API. |
|
""" |
|
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" |
|
try: |
|
response = requests.get(attachment_url, headers={"Authorization": f"Bearer {self.__jwt}"}) |
|
response.raise_for_status() |
|
return response.json() |
|
except requests.RequestException as e: |
|
raise RuntimeError(f"Error fetching attachment data from Gmail API: {str(e)}") |
|
|
|
def __process_message(self, message: dict) -> tuple: |
|
""" |
|
Processes a single message. |
|
|
|
Args: |
|
message (dict): The message to process. |
|
|
|
Returns: |
|
tuple: A tuple containing the subject (str), body (str), links (list of str), |
|
and base64 data if it contains an document attachment in the form of pdf, docx, ppt or any file format indicating whether the message contains an attachment. |
|
|
|
Raises: |
|
RuntimeError: If there is an issue while fetching message data from the Gmail API. |
|
""" |
|
message_id = message.get("id") |
|
if not message_id: |
|
return None, None, [], False |
|
|
|
message_data = self.__fetch_message_data(message_id, self.__jwt) |
|
subject = message_data.get('payload', {}).get('headers', {}).get('value', '') |
|
|
|
body = '' |
|
links = [] |
|
has_attachment = False |
|
|
|
if 'payload' in message_data and 'parts' in message_data['payload']: |
|
parts = message_data['payload']['parts'] |
|
for part in parts: |
|
if 'mimeType' not in part: |
|
continue |
|
|
|
mime_type = part['mimeType'] |
|
if mime_type == 'text/plain' or mime_type == 'text/html': |
|
body_data = part['body'].get('data', '') |
|
body = base64.urlsafe_b64decode(body_data).decode('utf-8') |
|
text= self._extract_text_and_links(body) |
|
|
|
if 'body' in part and 'attachmentId' in part['body']: |
|
attachment_id = part['body']['attachmentId'] |
|
attachment_data = self.__fetch_attachment_data(message_id, attachment_id) |
|
data = attachment_data.get("data", "") |
|
filename = part.get("filename", "untitled.txt") |
|
|
|
if data: |
|
|
|
return subject,body , links , {filename:data} |
|
|
|
return subject, body, links , None |
|
|
|
def extract_text_and_links(html_content: str) -> tuple: |
|
""" |
|
Extracts text and links from HTML content. |
|
|
|
Args: |
|
html_content (str): The HTML content to process. |
|
|
|
Returns: |
|
tuple: A tuple containing the extracted text (str) and links (list of tuples). |
|
|
|
Raises: |
|
ValueError: If the input HTML content is empty or None. |
|
""" |
|
if not html_content: |
|
raise ValueError("HTML content is empty or None") |
|
|
|
soup = BeautifulSoup(html_content, 'html.parser') |
|
|
|
|
|
text = soup.get_text(separator=' ') |
|
text = re.sub(r'\s+', ' ', text).strip() |
|
|
|
|
|
links = [(link.text, link['href']) for link in soup.find_all('a', href=True)] |
|
|
|
return text, links |
|
|
|
def extract_messages(self) -> dict: |
|
""" |
|
Extracts messages based on the provided brand name. |
|
|
|
Args: |
|
brand_name (str): The brand name to search for in email subjects. |
|
jwt_token (str): The JWT token for authentication. |
|
|
|
Returns: |
|
dict: A dictionary containing the extracted messages with their subjects, bodies, links, and attachment statuses. |
|
format:{"results":[{"subjec":"test subject" , "body":"it would be text" , "attachment_data":{"filename":base64URL format}},{second message with same content of subject , body , attachment_data}]} |
|
|
|
""" |
|
print("entered the extract messages") |
|
messages = self.__fetch_messages() |
|
results = [] |
|
for message in messages: |
|
subject, body, attachment_data = self.__process_message(message) |
|
|
|
""" Handling None values """ |
|
subject = subject if subject is not None else "" |
|
body = body if body is not None else "" |
|
attachment_data = attachment_data if attachment_data is not None else {} |
|
|
|
results.append({"subject": subject, "body": body, "attachment_data": attachment_data}) |
|
|
|
return {"results": results} |
|
|
|
|
|
|