|
import requests |
|
import base64 |
|
from bs4 import BeautifulSoup |
|
import re |
|
import jwt |
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes |
|
from cryptography.hazmat.backends import default_backend |
|
import os |
|
import hashlib |
|
|
|
class GmailDataExtractor: |
|
|
|
def __init__(self,jwt:str , user_input: str = None) -> None: |
|
if jwt is None : |
|
self.error = "Error" |
|
else: |
|
self.__jwt = jwt |
|
self.__user_input = user_input |
|
self.error = None |
|
self.__secret_key = 'nkldjlncbamjlklwjeklwu24898h*&#Ujnfjf34893U5HSJFBSKFSHFNSK*$*W_ 3OWU' |
|
|
|
def __validate_jwt_token(self): |
|
try: |
|
payload = jwt.decode(self.jwt, self.secret_key, algorithms=["HS256"]) |
|
access_token = payload.get("access_token") |
|
if access_token: |
|
return access_token |
|
else: |
|
raise ValueError("Invalid JWT token: Missing access token") |
|
except jwt.ExpiredSignatureError: |
|
raise ValueError("Invalid JWT token: Expired token") |
|
except jwt.InvalidTokenError: |
|
raise ValueError("Invalid JWT token: Token verification failed") |
|
|
|
def __fetch_messages(self) -> list: |
|
""" |
|
Fetches messages from the Gmail API. |
|
Args: |
|
gmail_url (str): The URL for the Gmail API request. |
|
access_token (str): The access token for authenticating with Gmail API. |
|
Returns: |
|
list: A list of message objects retrieved from the Gmail API. |
|
Raises: |
|
RuntimeError: If there is an issue while fetching messages from the Gmail API. |
|
|
|
""" |
|
|
|
"""currently not implementing jwt for testing purposes |
|
replace every access_token with jwt function directly which returns the access token""" |
|
access_token = self.__jwt |
|
print("access token") |
|
print(access_token) |
|
receipt_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoice OR category:purchases)' |
|
|
|
|
|
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={receipt_query}&maxResults=10" |
|
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"}) |
|
gmail_data = gmail_response.json() |
|
messages=[] |
|
messages.extend(gmail_data.get("messages",[])) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(len(messages)) |
|
return messages |
|
|
|
def __fetch_message_data(self, message_id: str) -> dict: |
|
""" |
|
Fetches message data from the Gmail API. |
|
Args: |
|
message_id (str): The ID of the message to fetch. |
|
Returns: |
|
dict: Message data retrieved from the Gmail API. |
|
Raises: |
|
RuntimeError: If there is an issue while fetching message data from the Gmail API. |
|
""" |
|
print("fetch_message_data") |
|
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" |
|
try: |
|
response = requests.get(message_url, headers={"Authorization": f"Bearer {self.__jwt}"}) |
|
response.raise_for_status() |
|
return response.json() |
|
except requests.RequestException as e: |
|
raise RuntimeError(f"Error fetching message data from Gmail API: {str(e)}") |
|
|
|
def __fetch_attachment_data(self, message_id: str, attachment_id: str) -> dict: |
|
""" |
|
Fetches attachment data from the Gmail API. |
|
Args: |
|
message_id (str): The ID of the message containing the attachment. |
|
attachment_id (str): The ID of the attachment to fetch. |
|
Returns: |
|
dict: Attachment data retrieved from the Gmail API. |
|
Raises: |
|
RuntimeError: If there is an issue while fetching attachment data from the Gmail API. |
|
""" |
|
print("fetch_attachment_data") |
|
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" |
|
try: |
|
response = requests.get(attachment_url, headers={"Authorization": f"Bearer {self.__jwt}"}) |
|
response.raise_for_status() |
|
return response.json() |
|
except requests.RequestException as e: |
|
raise RuntimeError(f"Error fetching attachment data from Gmail API: {str(e)}") |
|
|
|
def __process_message(self, message: dict) -> tuple: |
|
""" |
|
Processes a single message. |
|
Args: |
|
message (dict): The message to process. |
|
Returns: |
|
tuple: A tuple containing the subject (str), body (str), links (list of str), |
|
and base64 data if it contains an document attachment in the form of pdf, docx, ppt or any file format indicating whether the message contains an attachment. |
|
Raises: |
|
RuntimeError: If there is an issue while fetching message data from the Gmail API. |
|
""" |
|
print("process_messages") |
|
message_id = message.get("id") |
|
|
|
if not message_id: |
|
return None, None, [], False |
|
subject='' |
|
message_data = self.__fetch_message_data(message_id) |
|
if 'payload' in message_data and 'headers' in message_data['payload']: |
|
headers = message_data['payload']['headers'] |
|
for header in headers: |
|
if header['name'] == 'Subject': |
|
subject = header['value'] |
|
|
|
|
|
body = '' |
|
text='' |
|
links = [] |
|
has_attachment = False |
|
company_from_gmail = 'others' |
|
|
|
if 'payload' in message_data and 'parts' in message_data['payload']: |
|
parts = message_data['payload']['parts'] |
|
payload = message_data['payload']['headers'] |
|
print("printing headers response") |
|
print(payload) |
|
|
|
|
|
for fromdata in payload: |
|
if fromdata['name'] == 'From': |
|
company_from_gmail = self.extract_domain_from_email(fromdata['value']) |
|
break |
|
if 'chanel' in subject.lower(): |
|
company_from_gmail = 'chanel' |
|
if 'louis vuitton' in subject.lower(): |
|
company_from_gmail = 'Louis Vuitton' |
|
|
|
for part in parts: |
|
if 'mimeType' not in part: |
|
continue |
|
|
|
mime_type = part['mimeType'] |
|
|
|
if mime_type == 'text/plain' or mime_type == 'text/html': |
|
body_data = part['body'].get('data', '') |
|
body = base64.urlsafe_b64decode(body_data) |
|
text= self.extract_text(body) |
|
|
|
if 'body' in part and 'attachmentId' in part['body']: |
|
attachment_id = part['body']['attachmentId'] |
|
attachment_data = self.__fetch_attachment_data(message_id, attachment_id) |
|
data = attachment_data.get("data", "") |
|
filename = part.get("filename", "untitled.txt") |
|
|
|
|
|
if data: |
|
|
|
return subject,text ,{"filename":filename , "data":data} , company_from_gmail , message_id |
|
|
|
return subject, text,None , company_from_gmail , message_id |
|
|
|
def encrypt_message_id(self,message_id:str): |
|
key = os.getenv('AES_KEY').encode('utf-8')[:32] |
|
message_id_bytes = message_id.encode('utf-8') |
|
iv = os.urandom(16) |
|
|
|
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) |
|
|
|
|
|
encryptor = cipher.encryptor() |
|
|
|
|
|
|
|
message_id_padded = message_id_bytes.ljust(32, b'\0') |
|
|
|
|
|
ciphertext = encryptor.update(message_id_padded) + encryptor.finalize() |
|
return ciphertext |
|
|
|
|
|
def extract_domain_from_email(self,email_string): |
|
|
|
email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group() |
|
|
|
|
|
domain = email_address.split('@')[-1].split('.')[0] |
|
if email_address and domain : |
|
return domain |
|
else: |
|
return None |
|
|
|
|
|
def extract_text(self,html_content:str): |
|
""" |
|
Extracts text and links from HTML content. |
|
Args: |
|
html_content (str): The HTML content to process. |
|
Returns: |
|
tuple: A tuple containing the extracted text (str) and links (list of tuples). |
|
Raises: |
|
ValueError: If the input HTML content is empty or None. |
|
""" |
|
if not html_content: |
|
raise ValueError("HTML content is empty or None") |
|
|
|
soup = BeautifulSoup(html_content, 'html.parser') |
|
|
|
|
|
text = soup.get_text(separator=' ') |
|
text = re.sub(r'\s+', ' ', text).strip() |
|
print("Printing the extracted text from the html") |
|
print(text) |
|
print() |
|
print() |
|
|
|
links = [(link.text, link['href']) for link in soup.find_all('a', href=True)] |
|
|
|
return text |
|
|
|
def extract_messages(self) -> dict: |
|
""" |
|
Extracts messages based on the provided brand name. |
|
Args: |
|
brand_name (str): The brand name to search for in email subjects. |
|
jwt_token (str): The JWT token for authentication. |
|
Returns: |
|
dict: A dictionary containing the extracted messages with their subjects, bodies, links, and attachment statuses. |
|
format:{"results":[{"subjec":"test subject" , "body":"it would be text" , "attachment_data":{"filename":base64URL format}},{second message with same content of subject , body , attachment_data}]} |
|
""" |
|
print("entered the extract messages") |
|
messages = self.__fetch_messages() |
|
results = [] |
|
for message in messages: |
|
subject, body, attachment_data , company_name , encrypt_mssg_id = self.__process_message(message) |
|
|
|
""" Handling None values """ |
|
body = body if body is not None else '' |
|
attachment_data = attachment_data if attachment_data is not None else {} |
|
company_associated = company_name if company_name is not None else '' |
|
en_msg_id = encrypt_mssg_id if encrypt_mssg_id is not None else None |
|
|
|
results.append({"body": body, "attachment_data": [attachment_data] ,'company_associated':company_associated , "message_id":en_msg_id}) |
|
|
|
return {"results": results} |