import os from dotenv import load_dotenv import os.path from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError import base64 from email.message import EmailMessage from bs4 import BeautifulSoup import webbrowser import datetime from streamlit_javascript import st_javascript import streamlit as st import time SCOPES = ["https://www.googleapis.com/auth/gmail.compose", "https://www.googleapis.com/auth/gmail.modify", "https://mail.google.com/", "https://www.googleapis.com/auth/calendar.readonly", "https://www.googleapis.com/auth/documents.readonly", "https://www.googleapis.com/auth/forms.body", "https://www.googleapis.com/auth/spreadsheets.readonly", "https://www.googleapis.com/auth/drive"] #---------------------------------------------------------- LETTURA EMAIL --------------------------------------------------------- def converti_email_txt(body): try: soup = BeautifulSoup(body, 'html.parser') body_content = soup.find('body').get_text() if soup.find('body') else body body = body_content except: body = body return body def leggi_gmail(max_results=10, data_inizio = None, data_fine = None): links = [] service = build("gmail", "v1", credentials=st.session_state.creds) start_date_str = data_inizio.strftime("%Y/%m/%d") end_date_str = data_fine.strftime("%Y/%m/%d") query = f"is:unread after:{start_date_str} before:{end_date_str}" results = service.users().messages().list(userId="me", labelIds=["INBOX"], q=query, maxResults=max_results).execute() messages = results.get("messages", []) testo_email = '' if not messages: print("You have no New Messages.") else: message_count = 0 for message in messages: msg = service.users().messages().get(userId="me", id=message["id"]).execute() message_count = message_count + 1 email_data = msg["payload"]["headers"] for values in email_data: name = values["name"] if name == "From": from_name = values["value"] subject = [j["value"] for j in email_data if j["name"] == "Subject"] elif name == "Date": received_date = values["value"] body = "" if "parts" in msg["payload"]: for part in msg["payload"]["parts"]: if part["mimeType"] == "text/plain": body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8") body = converti_email_txt(body) break elif part["mimeType"] == "multipart/alternative": for subpart in part["parts"]: if subpart["mimeType"] == "text/plain": body = base64.urlsafe_b64decode(subpart["body"]["data"]).decode("utf-8") body = converti_email_txt(body) break else: if "data" in msg["payload"]["body"]: body = base64.urlsafe_b64decode(msg["payload"]["body"]["data"]).decode("utf-8") body = converti_email_txt(body) testo_email += 'Data Ricezione: ' + received_date[5:] + '\nMittente: ' + from_name + '\nOggetto: ' + ', '.join(subject) + '\nTesto Email: ' + body + '\n---------------------------------------------------\n' links.append(('Mittente: ' + from_name, 'Data: ' + received_date[5:] + '\n\nOggetto: ' + ', '.join(subject))) if testo_email == '': testo_email = 'Non sono presenti email nell intervallo di date specificate! Riprova con altre date' return testo_email, links #---------------------------------------------------------- SCRITTURA BOZZA EMAIL --------------------------------------------------------- def scrivi_bozza_gmail(testo): draft_url = 'https://mail.google.com/mail/u/0/#drafts' try: service = build("gmail", "v1", credentials=st.session_state.creds) message = EmailMessage() message.set_content(testo) message["To"] = "gduser1@workspacesamples.dev" message["From"] = "gduser2@workspacesamples.dev" message["Subject"] = "Automated draft" encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode() create_message = {"message": {"raw": encoded_message}} draft = (service.users().drafts().create(userId="me", body=create_message).execute()) print(f'Draft id: {draft["id"]}\nDraft message: {draft["message"]}') draft_id = draft["id"] draft_details = service.users().drafts().get(userId="me", id=draft_id).execute() print(draft_details) except HttpError as error: print(f"An error occurred: {error}") return draft_url #---------------------------------------------------------- CREA DOCUMENTO GOOGLE --------------------------------------------------------- def crea_documento_google(testo): drive_service = build('drive', 'v3', credentials=st.session_state.creds) docs_service = build('docs', 'v1', credentials=st.session_state.creds) file_metadata = {'name': 'Documento di Bonsi AI', 'mimeType': 'application/vnd.google-apps.document'} new_document = drive_service.files().create(body=file_metadata).execute() document_id = new_document['id'] title_text = "Documento di Bonsi AI" title_index = 1 bold_title_request = { "insertText": { "location": { "index": title_index, }, "text": title_text } } bold_request = { "updateTextStyle": { "range": { "startIndex": title_index, "endIndex": title_index + len(title_text) }, "textStyle": { "bold": True, "fontSize": { "magnitude": 20, "unit": "PT" } }, "fields": "bold,fontSize" } } normal_text = "\n\n" + testo normal_text_index = title_index + len(title_text) normal_text_request = { "insertText": { "location": { "index": normal_text_index, }, "text": normal_text } } normal_request = { "updateTextStyle": { "range": { "startIndex": normal_text_index, "endIndex": normal_text_index + len(normal_text) }, "textStyle": { "bold": False, "fontSize": { "magnitude": 12, "unit": "PT" } }, "fields": "bold,fontSize" } } docs_service.documents().batchUpdate(documentId=document_id, body={ 'requests': [bold_title_request, bold_request, normal_text_request, normal_request] }).execute() return 'https://docs.google.com/document/d/' + document_id #---------------------------------------------------------- LEGGI GOOGLE CALENDAR --------------------------------------------------------- def leggi_calendario_google(max_results=10, data_inizio = None, data_fine = None): try: service = build("calendar", "v3", credentials=st.session_state.creds) calendar_list_result = service.calendarList().list().execute() calendars = calendar_list_result.get('items', []) descrizione_eventi = '' links = [] timeMin = datetime.datetime.combine(data_inizio, datetime.time()).isoformat() + 'Z' timeMax = datetime.datetime.combine(data_fine, datetime.time(23, 59, 59)).isoformat() + 'Z' for calendar in calendars: events_result = (service.events().list(calendarId=calendar['id'], timeMin=timeMin, timeMax = timeMax, maxResults=max_results, singleEvents=True, orderBy="startTime", ).execute()) events = events_result.get("items", []) for event in events: start = event["start"].get("dateTime", event["start"].get("date")) end = event["end"].get("dateTime", event["end"].get("date")) descrizione = '' calendario = '' if 'description' in event: descrizione = event['description'].replace('\n', '. ') if 'displayName' in event['organizer']: calendario = event['organizer']['displayName'] else: calendario = 'Principale' descrizione_eventi += f'Calendario: {calendario} --- ' descrizione_eventi += f'Data Inizio: {start.replace("T", " ").replace("Z", " ")} - Data Fine: {end.replace("T", " ").replace("Z", " ")}: ' descrizione_link = f'Data: {start.replace("T", " ").replace("Z", " ")} \n\n Titolo: {event["summary"]}' if descrizione != '': descrizione_eventi += f'{event["summary"]} ({descrizione})' descrizione_link += f'\n\nDescrizione: {event["summary"]} ({descrizione})' else: descrizione_eventi += f'{event["summary"]}' descrizione_eventi += '\n' links.append((f'Calendario: {calendario}', descrizione_link)) except HttpError as error: print(f"An error occurred: {error}") if descrizione_eventi == '': descrizione_eventi = 'Non sono presenti eventi nel calendario per le date specificate. Riprova con altre date!' return descrizione_eventi, links #---------------------------------------------------------- CONNESSIONE ACCOUNT GOOGLE --------------------------------------------------------- def local_storage_get(key): return st_javascript(f"localStorage.getItem('{key}');") def local_storage_set(key, value): return st_javascript(f"localStorage.setItem('{key}', '{value}');") def connetti_google(): load_dotenv() json_variable_str = os.getenv("JSON_GOOGLE") URL_REDIRECT = os.getenv('URL_REDIRECT') with open("./credentials.json", "w") as f: f.write(json_variable_str) flow = InstalledAppFlow.from_client_secrets_file("./credentials.json", SCOPES, redirect_uri=URL_REDIRECT) token = local_storage_get("token") print('------------1------------------') print(token) if token and token != '': print('------------2------------------') flow.fetch_token(code=token) st.session_state.creds = flow.credentials print(st.session_state.creds) st.session_state.login_effettuato = True else: print('------------3------------------') auth_code = st.query_params.get("code") if auth_code: local_storage_set("token", auth_code) time.sleep(2) print('**************************************************************') print(auth_code) nav_script = """""" % (URL_REDIRECT) st.write(nav_script, unsafe_allow_html=True) else: print('------------4------------------') col1, col2, col3 = st.columns([1, 3, 1]) with col2: authorization_url, state = flow.authorization_url(include_granted_scopes="true",) st.write('# Bonsi A.I.') st.markdown(f'

Login Google

', unsafe_allow_html=True) st.write("") st.write("") def connetti_google_2(): load_dotenv() json_variable_str = os.getenv("JSON_GOOGLE") print(json_variable_str) with open("./credentials.json", "w") as f: f.write(json_variable_str) if os.path.exists("./credentials.json"): print('ESISTE') else: print('NON ESISTE') creds = None if os.path.exists("./token.json"): creds = Credentials.from_authorized_user_file("token.json", SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file("./credentials.json", SCOPES, redirect_uri=URL_REDIRECT) #creds = flow.run_local_server(port=8501, open_browser=False) creds = flow.run_console() with open("token.json", "w") as token: token.write(creds.to_json()) return creds