|
import logging |
|
import base64 |
|
import requests |
|
import asyncio |
|
from fastapi import WebSocket |
|
from services import utils as util |
|
import re |
|
from bs4 import BeautifulSoup |
|
|
|
async def send_chunked_data(websocket: WebSocket, filename: str, data: str ,company_associated:str , message_id:str): |
|
chunk_size = 2000 |
|
for i in range(0, len(data), chunk_size): |
|
await websocket.send_json({"filename": filename, "data_chunk": data[i:i + chunk_size]}) |
|
await asyncio.sleep(0.4) |
|
await websocket.send_json({"company_associated":company_associated , "message_id":message_id}) |
|
await websocket.send_text("FinishedThisAttachment") |
|
|
|
async def send_chunked_data_without_attch(websocket: WebSocket,body_text:str,message_id:str , company_associated:str): |
|
chunk_size = 2000 |
|
await websocket.send_text("This message does'nt contain an Attachment") |
|
for i in range(0, len(body_text), chunk_size): |
|
await websocket.send_json({"data_chunk": body_text[i:i + chunk_size]}) |
|
await asyncio.sleep(0.4) |
|
await websocket.send_json({"company_associated":company_associated , "message_id":message_id}) |
|
await websocket.send_text("FinishedThisAttachmentnotContainingAttachment") |
|
|
|
async def process_messages(access_token: str, websocket: WebSocket): |
|
logging.info("Entered process_messages") |
|
messages = get_messages(access_token) |
|
await websocket.send_json({"total_messages": len(messages)}) |
|
await websocket.send_text("CompletedSendingTotalMessagesLength") |
|
|
|
for message in messages: |
|
message_id = message.get("id") |
|
if message_id: |
|
message_data = fetch_message_data(access_token, message_id) |
|
await process_message_data(access_token,message_data, websocket,message_id) |
|
|
|
await websocket.send_text("CompletedFetchingMessages") |
|
|
|
async def websocket_main(code: str, websocket: WebSocket): |
|
logging.info("Entered mwebsocket_main") |
|
access_token = code |
|
await process_messages(access_token, websocket) |
|
logging.info("Completed Fetching all the messages") |
|
websocket.close() |
|
|
|
def get_messages(code: str): |
|
logging.info("Entered get_messages") |
|
access_token = code |
|
page_token = None |
|
messages = [] |
|
jobs_query = f'subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoice OR category:purchases' |
|
max_results = 10 |
|
while True: |
|
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={jobs_query}&maxResults={max_results}" |
|
if page_token: |
|
gmail_url += f"&pageToken={page_token}" |
|
|
|
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"}) |
|
logging.info(f"{gmail_response}") |
|
gmail_data = gmail_response.json() |
|
|
|
if "messages" in gmail_data: |
|
messages.extend(gmail_data["messages"]) |
|
|
|
|
|
if "nextPageToken" in gmail_data: |
|
page_token = gmail_data["nextPageToken"] |
|
else: |
|
break |
|
logging.info("Total Length:") |
|
|
|
logging.info(len(messages)) |
|
return messages |
|
|
|
def fetch_message_data(access_token: str, message_id: str): |
|
logging.info(f"Entered fetch_message_data for message_id: {message_id}") |
|
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" |
|
message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) |
|
return message_response.json() |
|
|
|
async def process_message_data(access_token:str,message_data: dict, websocket: WebSocket,message_id:str): |
|
logging.info("Entered process_message_data") |
|
subject='' |
|
body_base64 = '' |
|
body_html='' |
|
body_text = '' |
|
compnay_from_mail = 'others' |
|
|
|
|
|
subject = extract_subject_from_mail(message_data) |
|
|
|
company_from_mail = extract_domain_name(message_data['payload']['headers'],subject) |
|
|
|
if "payload" in message_data and "parts" in message_data["payload"]: |
|
|
|
|
|
|
|
for part in message_data["payload"]["parts"]: |
|
if 'mimeType' not in part: |
|
continue |
|
|
|
mime_type = part['mimeType'] |
|
|
|
if mime_type == 'text/plain' or mime_type == 'text/html': |
|
body_data = part['body'].get('data', '') |
|
body_base64 = base64.urlsafe_b64decode(body_data) |
|
body_text = extract_text(body_base64) |
|
|
|
if "body" in part and "attachmentId" not in part["body"]: |
|
await process_mail_body_data(websocket , body_text , message_id , company_from_mail) |
|
|
|
|
|
if "body" in part and "attachmentId" in part["body"]: |
|
attachment_id = part["body"]["attachmentId"] |
|
attachment_data = fetch_attachment_data(access_token, message_data["id"], attachment_id) |
|
body_text='' |
|
await process_attachment_data(part, attachment_data, websocket,company_from_mail ,message_id) |
|
|
|
|
|
async def process_attachment_data(part: dict, attachment_data: dict, websocket: WebSocket,company_associated:str,message_id:str): |
|
logging.info("Entered process_attachment_data") |
|
filename = part.get("filename", "untitled.txt") |
|
data = attachment_data.get("data", {}) |
|
if data: |
|
attachment_content = base64.urlsafe_b64decode(data) |
|
extracted_text = await util.extract_text_from_attachment(filename, attachment_content) |
|
logging.info(f"Extracted text from attachment {filename}: {extracted_text}") |
|
await send_chunked_data(websocket, filename, data , company_associated ,message_id) |
|
|
|
async def process_mail_body_data(websocket:WebSocket ,body_text : str, message_id:str,company_associated:str): |
|
await send_chunked_data_without_attch(websocket,body_text,message_id,company_associated) |
|
|
|
def fetch_attachment_data(access_token: str, message_id: str, attachment_id: str): |
|
logging.info(f"Entered fetch_attachment_data for attachment_id: {attachment_id}") |
|
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" |
|
attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"}) |
|
return attachment_response.json() |
|
|
|
|
|
def extract_subject_from_mail(message_data: dict): |
|
if 'payload' in message_data and 'headers' in message_data['payload']: |
|
headers = message_data['payload']['headers'] |
|
for header in headers: |
|
if header['name'] == 'Subject': |
|
return header['value'] |
|
|
|
return "" |
|
else: |
|
|
|
return "" |
|
|
|
|
|
def extract_domain_name(payload:dict,subject:str): |
|
domain_name = 'others' |
|
for fromdata in payload: |
|
if fromdata['name'] == 'From': |
|
domain_name = extract_domain_from_email(fromdata['value']) |
|
break |
|
if 'chanel' in subject.lower(): |
|
return 'chanel' |
|
if 'louis vuitton' in subject.lower(): |
|
return'Louis Vuitton' |
|
return domain_name |
|
|
|
|
|
def extract_domain_from_email(email_string:str): |
|
|
|
email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group() |
|
|
|
|
|
domain = email_address.split('@')[-1].split('.')[0] |
|
if email_address and domain : |
|
return domain |
|
else: |
|
return None |
|
|
|
def extract_text(html_content:str): |
|
""" |
|
Extracts text and links from HTML content. |
|
Args: |
|
html_content (str): The HTML content to process. |
|
Returns: |
|
tuple: A tuple containing the extracted text (str) and links (list of tuples). |
|
Raises: |
|
ValueError: If the input HTML content is empty or None. |
|
""" |
|
if not html_content: |
|
raise ValueError("HTML content is empty or None") |
|
|
|
soup = BeautifulSoup(html_content, 'html.parser') |
|
|
|
|
|
text = soup.get_text(separator=' ') |
|
text = re.sub(r'\s+', ' ', text).strip() |
|
print("Printing the extracted text from the html") |
|
print(text) |
|
print() |
|
print() |
|
|
|
links = [(link.text, link['href']) for link in soup.find_all('a', href=True)] |
|
|
|
return text |
|
|