Spaces:
Sleeping
Sleeping
import os | |
from dotenv import load_dotenv | |
import os.path | |
from google.auth.transport.requests import Request | |
from google.oauth2.credentials import Credentials | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from googleapiclient.discovery import build | |
from googleapiclient.errors import HttpError | |
import base64 | |
from email.message import EmailMessage | |
from bs4 import BeautifulSoup | |
import webbrowser | |
import datetime | |
from streamlit_javascript import st_javascript | |
import streamlit as st | |
import time | |
SCOPES = ["https://www.googleapis.com/auth/gmail.compose", | |
"https://www.googleapis.com/auth/gmail.modify", | |
"https://mail.google.com/", | |
"https://www.googleapis.com/auth/calendar.readonly", | |
"https://www.googleapis.com/auth/documents.readonly", | |
"https://www.googleapis.com/auth/forms.body", | |
"https://www.googleapis.com/auth/spreadsheets.readonly", | |
"https://www.googleapis.com/auth/drive"] | |
#---------------------------------------------------------- LETTURA EMAIL --------------------------------------------------------- | |
def converti_email_txt(body): | |
try: | |
soup = BeautifulSoup(body, 'html.parser') | |
body_content = soup.find('body').get_text() if soup.find('body') else body | |
body = body_content | |
except: | |
body = body | |
return body | |
def leggi_gmail(max_results=10, data_inizio = None, data_fine = None): | |
links = [] | |
service = build("gmail", "v1", credentials=st.session_state.creds) | |
start_date_str = data_inizio.strftime("%Y/%m/%d") | |
end_date_str = data_fine.strftime("%Y/%m/%d") | |
query = f"is:unread after:{start_date_str} before:{end_date_str}" | |
results = service.users().messages().list(userId="me", labelIds=["INBOX"], q=query, maxResults=max_results).execute() | |
messages = results.get("messages", []) | |
testo_email = '' | |
if not messages: | |
print("You have no New Messages.") | |
else: | |
message_count = 0 | |
for message in messages: | |
msg = service.users().messages().get(userId="me", id=message["id"]).execute() | |
message_count = message_count + 1 | |
email_data = msg["payload"]["headers"] | |
for values in email_data: | |
name = values["name"] | |
if name == "From": | |
from_name = values["value"] | |
subject = [j["value"] for j in email_data if j["name"] == "Subject"] | |
elif name == "Date": | |
received_date = values["value"] | |
body = "" | |
if "parts" in msg["payload"]: | |
for part in msg["payload"]["parts"]: | |
if part["mimeType"] == "text/plain": | |
body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8") | |
body = converti_email_txt(body) | |
break | |
elif part["mimeType"] == "multipart/alternative": | |
for subpart in part["parts"]: | |
if subpart["mimeType"] == "text/plain": | |
body = base64.urlsafe_b64decode(subpart["body"]["data"]).decode("utf-8") | |
body = converti_email_txt(body) | |
break | |
else: | |
if "data" in msg["payload"]["body"]: | |
body = base64.urlsafe_b64decode(msg["payload"]["body"]["data"]).decode("utf-8") | |
body = converti_email_txt(body) | |
testo_email += 'Data Ricezione: ' + received_date[5:] + '\nMittente: ' + from_name + '\nOggetto: ' + ', '.join(subject) + '\nTesto Email: ' + body + '\n---------------------------------------------------\n' | |
links.append(('Mittente: ' + from_name, 'Data: ' + received_date[5:] + '\n\nOggetto: ' + ', '.join(subject))) | |
if testo_email == '': | |
testo_email = 'Non sono presenti email nell intervallo di date specificate! Riprova con altre date' | |
return testo_email, links | |
#---------------------------------------------------------- SCRITTURA BOZZA EMAIL --------------------------------------------------------- | |
def scrivi_bozza_gmail(testo): | |
draft_url = 'https://mail.google.com/mail/u/0/#drafts' | |
try: | |
service = build("gmail", "v1", credentials=st.session_state.creds) | |
message = EmailMessage() | |
message.set_content(testo) | |
message["To"] = "gduser1@workspacesamples.dev" | |
message["From"] = "gduser2@workspacesamples.dev" | |
message["Subject"] = "Automated draft" | |
encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode() | |
create_message = {"message": {"raw": encoded_message}} | |
draft = (service.users().drafts().create(userId="me", body=create_message).execute()) | |
print(f'Draft id: {draft["id"]}\nDraft message: {draft["message"]}') | |
draft_id = draft["id"] | |
draft_details = service.users().drafts().get(userId="me", id=draft_id).execute() | |
print(draft_details) | |
except HttpError as error: | |
print(f"An error occurred: {error}") | |
return draft_url | |
#---------------------------------------------------------- CREA DOCUMENTO GOOGLE --------------------------------------------------------- | |
def crea_documento_google(testo): | |
drive_service = build('drive', 'v3', credentials=st.session_state.creds) | |
docs_service = build('docs', 'v1', credentials=st.session_state.creds) | |
file_metadata = {'name': 'Documento di Bonsi AI', 'mimeType': 'application/vnd.google-apps.document'} | |
new_document = drive_service.files().create(body=file_metadata).execute() | |
document_id = new_document['id'] | |
title_text = "Documento di Bonsi AI" | |
title_index = 1 | |
bold_title_request = { | |
"insertText": { | |
"location": { | |
"index": title_index, | |
}, | |
"text": title_text | |
} | |
} | |
bold_request = { | |
"updateTextStyle": { | |
"range": { | |
"startIndex": title_index, | |
"endIndex": title_index + len(title_text) | |
}, | |
"textStyle": { | |
"bold": True, | |
"fontSize": { | |
"magnitude": 20, | |
"unit": "PT" | |
} | |
}, | |
"fields": "bold,fontSize" | |
} | |
} | |
normal_text = "\n\n" + testo | |
normal_text_index = title_index + len(title_text) | |
normal_text_request = { | |
"insertText": { | |
"location": { | |
"index": normal_text_index, | |
}, | |
"text": normal_text | |
} | |
} | |
normal_request = { | |
"updateTextStyle": { | |
"range": { | |
"startIndex": normal_text_index, | |
"endIndex": normal_text_index + len(normal_text) | |
}, | |
"textStyle": { | |
"bold": False, | |
"fontSize": { | |
"magnitude": 12, | |
"unit": "PT" | |
} | |
}, | |
"fields": "bold,fontSize" | |
} | |
} | |
docs_service.documents().batchUpdate(documentId=document_id, body={ | |
'requests': [bold_title_request, bold_request, normal_text_request, normal_request] | |
}).execute() | |
return 'https://docs.google.com/document/d/' + document_id | |
#---------------------------------------------------------- LEGGI GOOGLE CALENDAR --------------------------------------------------------- | |
def leggi_calendario_google(max_results=10, data_inizio = None, data_fine = None): | |
try: | |
service = build("calendar", "v3", credentials=st.session_state.creds) | |
calendar_list_result = service.calendarList().list().execute() | |
calendars = calendar_list_result.get('items', []) | |
descrizione_eventi = '' | |
links = [] | |
timeMin = datetime.datetime.combine(data_inizio, datetime.time()).isoformat() + 'Z' | |
timeMax = datetime.datetime.combine(data_fine, datetime.time(23, 59, 59)).isoformat() + 'Z' | |
for calendar in calendars: | |
events_result = (service.events().list(calendarId=calendar['id'], timeMin=timeMin, timeMax = timeMax, maxResults=max_results, singleEvents=True, orderBy="startTime", ).execute()) | |
events = events_result.get("items", []) | |
for event in events: | |
start = event["start"].get("dateTime", event["start"].get("date")) | |
end = event["end"].get("dateTime", event["end"].get("date")) | |
descrizione = '' | |
calendario = '' | |
if 'description' in event: | |
descrizione = event['description'].replace('\n', '. ') | |
if 'displayName' in event['organizer']: | |
calendario = event['organizer']['displayName'] | |
else: | |
calendario = 'Principale' | |
descrizione_eventi += f'Calendario: {calendario} --- ' | |
descrizione_eventi += f'Data Inizio: {start.replace("T", " ").replace("Z", " ")} - Data Fine: {end.replace("T", " ").replace("Z", " ")}: ' | |
descrizione_link = f'Data: {start.replace("T", " ").replace("Z", " ")} \n\n Titolo: {event["summary"]}' | |
if descrizione != '': | |
descrizione_eventi += f'{event["summary"]} ({descrizione})' | |
descrizione_link += f'\n\nDescrizione: {event["summary"]} ({descrizione})' | |
else: | |
descrizione_eventi += f'{event["summary"]}' | |
descrizione_eventi += '\n' | |
links.append((f'Calendario: {calendario}', descrizione_link)) | |
except HttpError as error: | |
print(f"An error occurred: {error}") | |
if descrizione_eventi == '': | |
descrizione_eventi = 'Non sono presenti eventi nel calendario per le date specificate. Riprova con altre date!' | |
return descrizione_eventi, links | |
#---------------------------------------------------------- CONNESSIONE ACCOUNT GOOGLE --------------------------------------------------------- | |
def local_storage_get(key): | |
return st_javascript(f"localStorage.getItem('{key}');") | |
def local_storage_set(key, value): | |
return st_javascript(f"localStorage.setItem('{key}', '{value}');") | |
def connetti_google(): | |
load_dotenv() | |
json_variable_str = os.getenv("JSON_GOOGLE") | |
URL_REDIRECT = os.getenv('URL_REDIRECT') | |
with open("./credentials.json", "w") as f: | |
f.write(json_variable_str) | |
flow = InstalledAppFlow.from_client_secrets_file("./credentials.json", SCOPES, redirect_uri=URL_REDIRECT) | |
token = local_storage_get("token") | |
print('------------1------------------') | |
print(token) | |
if token and token != '': | |
print('------------2------------------') | |
flow.fetch_token(code=token) | |
st.session_state.creds = flow.credentials | |
print(st.session_state.creds) | |
st.session_state.login_effettuato = True | |
else: | |
print('------------3------------------') | |
auth_code = st.query_params.get("code") | |
if auth_code: | |
local_storage_set("token", auth_code) | |
time.sleep(2) | |
print('**************************************************************') | |
print(auth_code) | |
nav_script = """<meta http-equiv="refresh" content="0; url='%s'">""" % (URL_REDIRECT) | |
st.write(nav_script, unsafe_allow_html=True) | |
else: | |
print('------------4------------------') | |
col1, col2, col3 = st.columns([1, 3, 1]) | |
with col2: | |
authorization_url, state = flow.authorization_url(include_granted_scopes="true",) | |
st.write('# Bonsi A.I.') | |
st.markdown(f'<h3><a href="{authorization_url}" target="_self">Login Google</a></h3>', unsafe_allow_html=True) | |
st.write("") | |
st.write("") | |
def connetti_google_2(): | |
load_dotenv() | |
json_variable_str = os.getenv("JSON_GOOGLE") | |
print(json_variable_str) | |
with open("./credentials.json", "w") as f: | |
f.write(json_variable_str) | |
if os.path.exists("./credentials.json"): | |
print('ESISTE') | |
else: | |
print('NON ESISTE') | |
creds = None | |
if os.path.exists("./token.json"): | |
creds = Credentials.from_authorized_user_file("token.json", SCOPES) | |
if not creds or not creds.valid: | |
if creds and creds.expired and creds.refresh_token: | |
creds.refresh(Request()) | |
else: | |
flow = InstalledAppFlow.from_client_secrets_file("./credentials.json", SCOPES, redirect_uri=URL_REDIRECT) | |
#creds = flow.run_local_server(port=8501, open_browser=False) | |
creds = flow.run_console() | |
with open("token.json", "w") as token: | |
token.write(creds.to_json()) | |
return creds |