|
import base64 |
|
import json |
|
import logging |
|
import re |
|
from concurrent.futures import ThreadPoolExecutor |
|
from typing import Optional, List, Dict |
|
import requests |
|
from bs4 import BeautifulSoup |
|
from models.models import Message, Attachment |
|
from fastapi import WebSocket |
|
from services import utils as ut |
|
import asyncio |
|
def get_company_type(company_name:str)->str: |
|
company_types_dict ={'ao yun': 'wines and spirit', 'ardbeg': 'wines and spirit', 'belvedere': 'wines and spirit', 'bodega numanthia': 'wines and spirit', 'chandon': 'wines and spirit', 'château cheval blanc': 'wines and spirit', "château d'yquem": 'wines and spirit', 'château galoupet': 'wines and spirit', 'cheval des andes': 'wines and spirit', 'clos19': 'wines and spirit', 'cloudy bay': 'wines and spirit', 'colgin cellars': 'wines and spirit', 'dom pérignon': 'wines and spirit', 'domaine des lambrays': 'wines and spirit', 'eminente': 'wines and spirit', 'glenmorangie': 'wines and spirit', 'hennessy': 'wines and spirit', 'joseph phelps': 'wines and spirit', 'krug': 'wines and spirit', 'mercier': 'wines and spirit', 'moët & chandon': 'wines and spirit', 'newton vineyard': 'wines and spirit', 'ruinart': 'wines and spirit', 'terrazas de los andes': 'wines and spirit', 'veuve clicquot': 'wines and spirit', 'volcan de mi tierra': 'wines and spirit', 'woodinville': 'wines and spirit' , 'berluti': 'Fashion & Leather Goods', 'celine': 'Fashion & Leather Goods', 'christian dior': 'Fashion & Leather Goods', 'emilio pucci': 'Fashion & Leather Goods', 'fendi': 'Fashion & Leather Goods', 'givenchy': 'Fashion & Leather Goods', 'kenzo': 'Fashion & Leather Goods', 'loewe': 'Fashion & Leather Goods', 'loro piana': 'Fashion & Leather Goods', 'louis vuitton': 'Fashion & Leather Goods', 'marc jacobs': 'Fashion & Leather Goods', 'moynat': 'Fashion & Leather Goods', 'patou': 'Fashion & Leather Goods', 'rimowa': 'Fashion & Leather Goods','acqua di parma': 'Perfumes & Cosmetics', 'benefit cosmetics': 'Perfumes & Cosmetics', 'cha ling': 'Perfumes & Cosmetics', 'fenty beauty by rihanna': 'Perfumes & Cosmetics', 'fresh': 'Perfumes & Cosmetics', 'givenchy parfums': 'Perfumes & Cosmetics', 'guerlain': 'Perfumes & Cosmetics', 'kenzo parfums': 'Perfumes & Cosmetics', 'kvd beauty': 'Perfumes & Cosmetics', 'loewe perfumes': 'Perfumes & Cosmetics', 'maison francis kurkdjian': 'Perfumes & Cosmetics', 'make up for ever': 'Perfumes & Cosmetics', 'officine universelle buly': 'Perfumes & Cosmetics', 'olehenriksen': 'Perfumes & Cosmetics', 'parfums christian dior': 'Perfumes & Cosmetics', 'stella by stella mccartney': 'Perfumes & Cosmetics','bulgari': 'Watches & Jewelry', 'chaumet': 'Watches & Jewelry', 'fred': 'Watches & Jewelry', 'hublot': 'Watches & Jewelry', 'repossi': 'Watches & Jewelry', 'tag heuer': 'Watches & Jewelry', 'tiffany & co.': 'Watches & Jewelry', 'zenith': 'Watches & Jewelry','24s': 'Selective retailing', 'dfs': 'Selective retailing', 'la grande epicerie de paris': 'Selective retailing', 'le bon marché rive gauche': 'Selective retailing', 'sephora': 'Selective retailing','belmond': 'Other activities', 'cheval blanc': 'Other activities', 'connaissance des arts': 'Other activities', 'cova': 'Other activities', 'investir': 'Other activities', "jardin d'acclimatation": 'Other activities', 'le parisien': 'Other activities', 'les echos': 'Other activities', 'radio classique': 'Other activities', 'royal van lent': 'Other activities'} |
|
print(company_types_dict["louis vuitton"]) |
|
return company_types_dict.get(company_name.lower(), 'Others') |
|
|
|
async def get_messages(code: str,websocket:WebSocket,brand_name: Optional[str] = None): |
|
access_token = code |
|
g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject:aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases) has:attachment' |
|
if brand_name is not None: |
|
g_query = f'(subject:"your order" OR subject:receipts OR subject:receipt OR subject: aankoopbon OR subject:reçu OR subject:invoice OR subject:invoices OR category:purchases OR from:{brand_name}) AND subject:{brand_name} has:attachment' |
|
page_token = None |
|
messages = [] |
|
logging.info("Inside get messages") |
|
def fetch_message_wrapper(message_data): |
|
message_id = message_data.get("id") |
|
if message_id: |
|
return fetch_message_data(access_token, message_id) |
|
|
|
return None |
|
|
|
while True: |
|
gmail_url = f"https://www.googleapis.com/gmail/v1/users/me/messages?q={g_query}" |
|
if page_token: |
|
gmail_url += f"&pageToken={page_token}" |
|
|
|
gmail_response = requests.get(gmail_url, headers={"Authorization": f"Bearer {access_token}"}) |
|
gmail_data = gmail_response.json() |
|
print(len(gmail_data)) |
|
print(gmail_data) |
|
|
|
if "messages" in gmail_data: |
|
with ThreadPoolExecutor(max_workers=15) as executor: |
|
|
|
|
|
|
|
futures=[executor.submit(fetch_message_wrapper, message_data) for message_data in |
|
gmail_data["messages"]] |
|
for future in futures: |
|
message = future.result() |
|
if message: |
|
messages.append(message) |
|
for message_data in messages: |
|
await process_message(message_data,websocket,10000) |
|
|
|
if "nextPageToken" in gmail_data: |
|
page_token = gmail_data["nextPageToken"] |
|
else: |
|
break |
|
print("printing messages") |
|
print(messages) |
|
return messages |
|
|
|
async def process_message(message:Message, websocket:WebSocket, chunk_size:int): |
|
logging.info("process_message") |
|
if message: |
|
message_json = message.to_json() |
|
logging.info(f"{message_json}") |
|
await send_message_in_chunks(websocket, message_json, chunk_size) |
|
await websocket.send_text("NEXT_MESSAGE") |
|
|
|
|
|
def fetch_message_data(access_token: str, message_id: str) -> Message: |
|
message_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}" |
|
message_response = requests.get(message_url, headers={"Authorization": f"Bearer {access_token}"}) |
|
message_data = message_response.json() |
|
|
|
subject = extract_subject_from_mail(message_data) |
|
company_from_mail = extract_domain_name(message_data['payload']['headers'], subject) |
|
|
|
|
|
|
|
attachments,structed_attachment_data = extract_attachments_from_mail(access_token, message_data) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return Message(message_id=message_id,company=company_from_mail,structured_data = structed_attachment_data) |
|
|
|
|
|
|
|
def extract_subject_from_mail(message_data: dict) -> str: |
|
if 'payload' in message_data and 'headers' in message_data['payload']: |
|
headers = message_data['payload']['headers'] |
|
for header in headers: |
|
if header['name'] == 'Subject': |
|
return header['value'] |
|
return "" |
|
else: |
|
return "" |
|
|
|
|
|
def extract_domain_name(payload: dict, subject: str) -> str: |
|
domain_name = 'others' |
|
for fromdata in payload: |
|
if fromdata['name'] == 'From': |
|
domain_name = extract_domain_from_email(fromdata['value']) |
|
break |
|
if 'chanel' in subject.lower(): |
|
return 'chanel' |
|
if 'louis vuitton' in subject.lower(): |
|
return 'Louis Vuitton' |
|
return domain_name |
|
|
|
|
|
def extract_domain_from_email(email_string: str) -> Optional[str]: |
|
email_address = re.search(r'[\w\.-]+@[\w\.-]+', email_string).group() |
|
domain = email_address.split('@')[-1].split('.')[0] |
|
if email_address and domain: |
|
return domain |
|
else: |
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_body_from_mail(message_data: dict) -> str: |
|
body = None |
|
if "payload" in message_data: |
|
payload = message_data["payload"] |
|
if "parts" in payload: |
|
for part in payload["parts"]: |
|
if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'): |
|
body_data = part['body'].get('data', '') |
|
if body_data: |
|
body_base64 = base64.urlsafe_b64decode(body_data) |
|
body = extract_text(body_base64) |
|
|
|
elif 'body' in payload: |
|
body_data = payload['body'].get('data', '') |
|
if body_data: |
|
body_base64 = base64.urlsafe_b64decode(body_data) |
|
body = extract_text(body_base64) |
|
elif 'parts' in payload['body']: |
|
for part in payload['body']['parts']: |
|
if 'mimeType' in part and (part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html'): |
|
body_data = part['body'].get('data', '') |
|
if body_data: |
|
body_base64 = base64.urlsafe_b64decode(body_data) |
|
body = extract_text(body_base64) |
|
|
|
if not body: |
|
body = message_data.get('snippet', '') |
|
return body |
|
|
|
|
|
def fetch_attachment_data(access_token: str, message_id: str, attachment_id: str) -> Dict: |
|
attachment_url = f"https://www.googleapis.com/gmail/v1/users/me/messages/{message_id}/attachments/{attachment_id}" |
|
attachment_response = requests.get(attachment_url, headers={"Authorization": f"Bearer {access_token}"}) |
|
return attachment_response.json() |
|
|
|
|
|
def extract_attachments_from_mail(access_token: str, message_data: dict) -> List[Attachment]: |
|
attachments = [] |
|
structured_data = [] |
|
if "payload" in message_data and "parts" in message_data["payload"]: |
|
for part in message_data["payload"]["parts"]: |
|
if "body" in part and "attachmentId" in part["body"]: |
|
attachment_id = part["body"]["attachmentId"] |
|
attachment_data = fetch_attachment_data(access_token, message_data["id"], attachment_id) |
|
filename = part.get("filename", "untitled.txt") |
|
data = attachment_data.get("data", "") |
|
raw_text=ut.extract_text_from_attachment(filename , data) |
|
struct_data = ut.strcuture_document_data(raw_text) |
|
if struct_data: |
|
structured_data.append(struct_data) |
|
|
|
attachments.append(Attachment(attachment_len = len(attachment_data.get("data", "")),filename=filename, data=attachment_data.get("data", ""))) |
|
return attachments,structured_data |
|
|
|
|
|
def extract_text(html_content: str) -> str: |
|
if not html_content: |
|
raise ValueError("HTML content is empty or None") |
|
|
|
soup = BeautifulSoup(html_content, 'html.parser') |
|
text = soup.get_text(separator=' ') |
|
text = re.sub(r'\s+', ' ', text).strip() |
|
return text |
|
|
|
|
|
async def websocket_main(code: str, websocket: WebSocket,brand_name: Optional[str] = None): |
|
access_token = code |
|
|
|
logging.info("access_token") |
|
logging.info(access_token) |
|
await websocket.send_text(access_token) |
|
try: |
|
await get_messages(access_token,websocket,brand_name) |
|
|
|
except Exception as e: |
|
print("fucntion not called") |
|
logging.info("function not called") |
|
print(e) |
|
logging.info(e) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
await websocket.close() |
|
|
|
|
|
async def send_message_in_chunks(websocket: WebSocket, message_json: dict, chunk_size: int): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
message_str = json.dumps(message_json) |
|
|
|
|
|
|
|
|
|
for i in range(0, len(message_str), chunk_size): |
|
await websocket.send_text(message_str[i:i + chunk_size]) |
|
|