Spaces:
Sleeping
Sleeping
from dotenv import load_dotenv | |
import os | |
import json | |
from langchain_openai import ChatOpenAI | |
#from langchain.schema import AIMessage, HumanMessage, SystemMessage | |
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, ToolMessage | |
from langchain_core.tools import tool | |
import gradio as gr | |
load_dotenv() | |
credentials = {} | |
credentials[os.getenv("USERNAME")] = os.getenv("PASSWORD") | |
i = 1 | |
while os.getenv(f"USERNAME_{i}") and os.getenv(f"PASSWORD_{i}"): | |
credentials[os.getenv(f"USERNAME_{i}")] = os.getenv(f"PASSWORD_{i}") | |
i += 1 | |
#print(credentials) | |
MAX_HISTORY = 20 | |
system_message = """ | |
Sei l'assistente virtuale di Forfè, un software di fatturazione e gestione fiscale | |
specifico per il Regime Forfettario, la Gestione Separata INPS e la Cassa Artigiani e Commercianti. | |
Il tuo compito è aiutare gli utenti nella gestione della loro attività, fornendo supporto nella creazione | |
di fatture, gestione clienti e prodotti, e altre operazioni contabili. | |
🔹 **Le tue funzionalità principali includono:** | |
- Creazione di nuovi clienti, sia persone fisiche che società. | |
- Creazione di nuovi prodotti o servizi da fatturare. | |
- Generazione di fatture con i dati dei clienti e dei prodotti registrati. | |
📌 **Regole di interazione:** | |
- Chiedi all'utente solo i dati strettamente necessari per l'operazione richiesta. | |
- Mantieni sempre un linguaggio chiaro, professionale e amichevole. | |
- Se il numero di informazioni da richiedere all'utente sono troppe, dividi l'operazione in più passaggi, chiedendo dapprima i parametri obbligatori e poi quelli opzionali. | |
- Rispondi in italiano e guida l'utente passo dopo passo nel processo. | |
🚀 **Obiettivo:** Aiutare i professionisti e le piccole imprese a gestire la loro attività in modo semplice ed efficace con Forfè. | |
""" | |
def create_product( | |
name: str, | |
price: float | |
) -> str: | |
"""Crea un nuovo prodotto o servizio e restituisce una conferma. | |
Parametri: | |
- name (str): Nome del prodotto o del servizio offerto. | |
- price (float): Importo in euro (€) del prodotto o servizio. | |
Ritorna: | |
- str: Un messaggio di conferma con i dettagli del prodotto o servizio creato. | |
""" | |
print("\n--- Nuovo Prodotto/Servizio Creato ---") | |
print(f"Nome: {name}") | |
print(f"Prezzo: {price:.2f} EUR") | |
return f"Il nuovo prodotto o servizio '{name}' con un importo di {price:.2f}€ è stato generato con successo!" | |
def create_customer_type(customer_type: str) -> str: | |
"""Seleziona il tipo di cliente da creare. | |
Parametri: | |
- customer_type (str): Il tipo di cliente da creare. Deve essere 'individual' (persona fisica) o 'company' (società). | |
Ritorna: | |
- str: Un messaggio di conferma con il tipo di cliente selezionato. | |
""" | |
if customer_type not in ["individual", "company"]: | |
return "Errore: Il tipo di cliente deve essere 'individual' (persona fisica) o 'company' (società)." | |
return f"Hai selezionato {customer_type}. Ora puoi procedere con l'inserimento dei dati." | |
def create_individual( | |
first_name: str, | |
last_name: str, | |
tax_code: str, | |
address: str, | |
street_number: str, | |
postal_code: str, | |
city: str, | |
province: str, | |
country: str, | |
vat_number: str = None, # Opzionale | |
email: str = None, | |
pec: str = None, | |
phone: str = None, | |
recipient_code: str = None | |
) -> str: | |
"""Crea un cliente di tipo persona fisica. | |
Parametri: | |
- first_name (str): Nome del cliente. | |
- last_name (str): Cognome del cliente. | |
- tax_code (str): Codice fiscale. | |
- address (str): Indirizzo di residenza. | |
- street_number (str): Numero civico. | |
- postal_code (str): CAP. | |
- city (str): Città. | |
- province (str): Provincia. | |
- country (str): Nazione. | |
- vat_number (str, opzionale): Partita IVA, se presente. | |
- email (str, opzionale): Indirizzo email. | |
- pec (str, opzionale): Indirizzo PEC. | |
- phone (str, opzionale): Numero di telefono. | |
- recipient_code (str, opzionale): Codice destinatario. | |
Ritorna: | |
- str: Un messaggio di conferma con i dati della persona fisica creata. | |
""" | |
print("\n--- Nuova Persona Fisica Creata ---") | |
print(f"Nome: {first_name} {last_name}") | |
print(f"Codice Fiscale: {tax_code}") | |
print(f"Indirizzo: {address}, {street_number}, {postal_code}, {city}, {province}, {country}") | |
if vat_number: | |
print(f"Partita IVA: {vat_number}") | |
if email: | |
print(f"Email: {email}") | |
if pec: | |
print(f"PEC: {pec}") | |
if phone: | |
print(f"Telefono: {phone}") | |
if recipient_code: | |
print(f"Codice Destinatario: {recipient_code}") | |
return f"La persona fisica {first_name} {last_name} è stata creata con successo!" | |
def create_company_mode(company_type: str) -> str: | |
"""Permette di scegliere se inserire i dati societari tramite Partiva IVA o Codice Fiscale. | |
Parametri: | |
- company_type (str): Modalità di inserimento delle informazioni societarie. Deve essere 'vat_number' (Partiva IVA) o 'tax_code' (Codice Fiscale). | |
Ritorna: | |
- str: Un messaggio di conferma con il tipo di selezionato. | |
""" | |
if customer_type not in ["vat_number", "tax_code"]: | |
return "Errore: Il tipo di modalità deve essere 'vat_number' (Partiva IVA) o 'tax_code' (Codice Fiscale)." | |
return f"Hai selezionato {company_type}. Ora puoi procedere con l'inserimento dei dati." | |
def create_company_tax_code( | |
company_name: str, | |
tax_code: str, | |
address: str, | |
street_number: str, | |
postal_code: str, | |
city: str, | |
province: str, | |
country: str, | |
vat_number: str = None, | |
email: str = None, | |
pec: str = None, | |
phone: str = None, | |
recipient_code: str = None | |
) -> str: | |
"""Crea un cliente di tipo società tramite Codice Fiscale. | |
Parametri: | |
- company_name (str): Nome della società (ragione sociale). | |
- tax_code (str): Codice fiscale della società. | |
- address (str): Indirizzo della sede legale. | |
- street_number (str): Numero civico. | |
- postal_code (str): CAP. | |
- city (str): Città. | |
- province (str): Provincia. | |
- country (str): Nazione. | |
- vat_number (str, opzionale): Partita IVA. | |
- email (str, opzionale): Indirizzo email. | |
- pec (str, opzionale): Indirizzo PEC. | |
- phone (str, opzionale): Numero di telefono. | |
- recipient_code (str, opzionale): Codice destinatario. | |
Ritorna: | |
- str: Un messaggio di conferma con i dati della società creata. | |
""" | |
print("\n--- Nuova Società Creata ---") | |
print(f"Ragione Sociale: {company_name}") | |
print(f"Codice Fiscale: {tax_code}") | |
print(f"Indirizzo: {address}, {street_number}, {postal_code}, {city}, {province}, {country}") | |
if vat_number: | |
print(f"Partita IVA: {vat_number}") | |
if email: | |
print(f"Email: {email}") | |
if pec: | |
print(f"PEC: {pec}") | |
if phone: | |
print(f"Telefono: {phone}") | |
if recipient_code: | |
print(f"Codice Destinatario: {recipient_code}") | |
return f"La società {company_name} è stata creata con successo!" | |
def create_company_vat_number( | |
vat_number: str = None, | |
) -> str: | |
"""Crea un cliente di tipo società tramite Partita IVA, non vanno richiesti altri dati, solo confermare la Partita IVA. | |
Parametri: | |
- vat_number (str): Partita IVA. | |
Ritorna: | |
- str: Un messaggio di conferma con i dati della Partita IVA della società creata. | |
""" | |
print("\n--- Nuova Società Creata ---") | |
print(f"Partita IVA: {vat_number}") | |
return f"La società con Partita IVA {vat_number} è stata creata con successo!" | |
tools = [ | |
create_product, | |
create_customer_type, | |
create_individual, | |
create_company_mode, | |
create_company_tax_code, | |
create_company_vat_number | |
] | |
llm = ChatOpenAI( | |
model="gpt-4o-mini", | |
streaming=True, | |
) | |
llm_with_tools = llm.bind_tools(tools) | |
tool_name = "" | |
tool_args = "" | |
tool_mapping = { | |
#"create_customer_type": create_customer_type, | |
"create_individual": create_individual, | |
"create_company_tax_code": create_company_tax_code, | |
"create_company_vat_number": create_company_vat_number, | |
"create_product": create_product | |
} | |
label_buttons = ["Procedi", "Annulla"] | |
def _check_response(ai_msg): | |
if "persona fisica" in ai_msg.lower() and "società" in ai_msg.lower(): | |
context_analysis_prompt = f""" | |
Questa è la risposta dell'LLM a un utente che vuole creare un cliente: | |
\"{ai_msg}\" | |
Devi solo rispondere con "create_client" se la risposta chiede all'utente di selezionare tra "Persona Fisica" e "Società", | |
oppure "SPIEGAZIONE" se si tratta solo di una descrizione generale dei tipi di clienti senza richiedere un'azione. | |
""" | |
analysis_response = llm.invoke(context_analysis_prompt).content.strip() | |
print(analysis_response) | |
if analysis_response == "create_client": | |
return True, analysis_response | |
elif "partita iva" in ai_msg.lower() and "codice fiscale" in ai_msg.lower(): | |
context_analysis_prompt = f""" | |
Questa è la risposta dell'LLM a un utente che deve scegliere la modalità con cui vuole inserire i dati societari | |
\"{ai_msg}\" | |
Devi solo rispondere con "select_mode" se la risposta chiede all'utente di selezionare tra "Partita IVA" e "Codice Fiscale", | |
oppure "SPIEGAZIONE" se si tratta solo di una descrizione generale senza richiedere un'azione. | |
""" | |
analysis_response = llm.invoke(context_analysis_prompt).content.strip() | |
print(analysis_response) | |
if analysis_response == "select_mode": | |
return True, analysis_response | |
return False, "" | |
def _get_confirmation_message(tool_name: str, tool_args: dict) -> str: | |
"""Genera un messaggio descrittivo per confermare l'azione che sta per essere eseguita.""" | |
if tool_name == "create_individual": | |
return f"**ACTION** Stai per creare un cliente di tipo Persona Fisica: {tool_args.get('first_name', 'N/D')} {tool_args.get('last_name', 'N/D')}." | |
elif tool_name == "create_company_tax_code": | |
return f"**ACTION** Stai per creare una Società: {tool_args.get('company_name', 'N/D')}." | |
elif tool_name == "create_company_vat_number": | |
return f"**ACTION** Stai per creare una Società con Partita IVA: {tool_args.get('vat_number', 'N/D')}." | |
elif tool_name == "create_product": | |
return f"**ACTION** Stai per creare un nuovo prodotto o servizio: '{tool_args.get('name', 'N/D')}' al prezzo di {tool_args.get('price', 0):.2f}€." | |
return "**ACTION** Stai per eseguire un'operazione sconosciuta." | |
def _format_tool_args(tool_name: str, tool_args: dict) -> str: | |
"""Trasforma i parametri tecnici in una descrizione leggibile per l'utente, gestendo i valori opzionali.""" | |
translations = { | |
"first_name": "Nome", | |
"last_name": "Cognome", | |
"tax_code": "Codice Fiscale", | |
"address": "Indirizzo", | |
"street_number": "Numero Civico", | |
"postal_code": "CAP", | |
"city": "Città", | |
"province": "Provincia", | |
"country": "Nazione", | |
"vat_number": "Partita IVA", | |
"email": "Email", | |
"pec": "PEC", | |
"phone": "Telefono", | |
"recipient_code": "Codice Destinatario", | |
"company_name": "Ragione Sociale", | |
"name": "Nome del Prodotto", | |
"price": "Prezzo", | |
"amount": "Importo" | |
} | |
_formatted_args = "\n".join([ | |
f"- {translations.get(key, key)}: {value if value else 'Non specificato'}" | |
for key, value in tool_args.items() | |
]) | |
return _formatted_args | |
def _get_company_info(vat_number: str) -> dict: | |
print(f"Getting company info for vat_number: {vat_number}") | |
company_info = { | |
"company_name": "NewCo", | |
"tax_code": "abcdefghilmnopqr", | |
"vat_number": vat_number, | |
"address": "via Roma", | |
"street_number": "12", | |
"postal_code": "00100", | |
"city": "Roma", | |
"province": "RM", | |
"country": "IT", | |
"email": "aaa@bbb.ccc", | |
"pec": "aaa@bbb.ccc", | |
"phone": "1234567890", | |
"recipient_code": "abcabcabc" | |
} | |
return company_info | |
def chatbot_response(message, history): | |
global tool_name, tool_args | |
history = history or [] | |
if message.startswith("**OPERAZIONE ANNULLATA**"): | |
history.append({"role": "user", "content": "**OPERAZIONE ANNULLATA**"}) | |
else: | |
history.append({"role": "user", "content": message}) | |
messages = [SystemMessage(content=system_message)] | |
truncated_history = history[-MAX_HISTORY:] if len(history) > MAX_HISTORY else history | |
#print(f"Truncated history: {truncated_history}") | |
for entry in truncated_history: | |
if entry["role"] == "user": | |
messages.append(HumanMessage(content=entry["content"])) | |
elif entry["role"] == "assistant": | |
messages.append(AIMessage(content=entry["content"])) | |
if message is not None: | |
messages.append(HumanMessage(content=message)) | |
#_response=llm_with_tools.stream(history_langchain_format) | |
ai_msg=llm_with_tools.invoke(messages) | |
print(ai_msg) | |
choice, analysis_response = _check_response(ai_msg.content) | |
if choice and analysis_response == "create_client": | |
history.append({"role": "assistant", "content": f"**CREAZIONE CLIENTE**\n{ai_msg.content}"}) | |
elif choice and analysis_response == "select_mode": | |
history.append({"role": "assistant", "content": f"**MODALITA' INSERIMENTO DATI SOCIETARI**\n{ai_msg.content}"}) | |
elif hasattr(ai_msg, "tool_calls") and ai_msg.tool_calls: | |
for tool_call in ai_msg.tool_calls: | |
tool_name = tool_call["name"].lower() | |
tool_args = tool_call["args"] | |
selected_tool = tool_mapping.get(tool_name) | |
if selected_tool: | |
if selected_tool==create_company_vat_number: | |
tool_args=_get_company_info(tool_args.get('vat_number', 'N/D')) | |
if tool_args is None: | |
history.append({"role": "user", "content": "Errore: Dati societari non trovati."}) | |
return history | |
confirmation_message = _get_confirmation_message(tool_name, tool_args) | |
formatted_args = _format_tool_args(tool_name, tool_args) | |
ai_msg= f"{confirmation_message}\n\n**Ecco i dettagli inseriti:**\n{formatted_args}" | |
history.append({"role": "assistant", "content": ai_msg}) | |
else: | |
history.append({"role": "assistant", "content": ai_msg.content}) | |
return history | |
def reset_textbox(): | |
"""Clears the textbox after sending a message.""" | |
return gr.update(value="") | |
def show_buttons(history): | |
global label_buttons | |
if history and history[-1]["content"].startswith("**ACTION**"): | |
label_buttons = ["Procedi", "Annulla"] | |
return ( | |
gr.update(visible=False), | |
gr.update(visible=True, value=label_buttons[0]), | |
gr.update(visible=True, value=label_buttons[1]) | |
) | |
elif history and history[-1]["content"].startswith("**CREAZIONE CLIENTE**"): | |
label_buttons = ["Persona Fisica", "Società"] | |
return ( | |
gr.update(visible=False), | |
gr.update(visible=True, value=label_buttons[0]), | |
gr.update(visible=True, value=label_buttons[1]) | |
) | |
elif history and history[-1]["content"].startswith("**MODALITA' INSERIMENTO DATI SOCIETARI**"): | |
#print("SONO QUI") | |
label_buttons = ["Partita IVA", "Codice Fiscale"] | |
return ( | |
gr.update(visible=False), | |
gr.update(visible=True, value=label_buttons[0]), | |
gr.update(visible=True, value=label_buttons[1]) | |
) | |
return ( | |
gr.update(visible=True), | |
gr.update(visible=False), | |
gr.update(visible=False) | |
) | |
def hide_buttons(): | |
"""Hides buttons and shows the textbox after clicking a button.""" | |
return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) | |
def show_or_hide_buttons(history): | |
global label_buttons | |
if history and history[-1]["content"].startswith("**ACTION**"): | |
label_buttons = ["Procedi", "Annulla"] | |
return ( | |
gr.update(visible=False), | |
gr.update(visible=True, value=label_buttons[0]), | |
gr.update(visible=True, value=label_buttons[1]), | |
gr.update(visible=False) | |
) | |
elif history and history[-1]["content"].startswith("**CREAZIONE CLIENTE**"): | |
label_buttons = ["Persona Fisica", "Società"] | |
return ( | |
gr.update(visible=False), | |
gr.update(visible=True, value=label_buttons[0]), | |
gr.update(visible=True, value=label_buttons[1]), | |
gr.update(visible=False) | |
) | |
elif history and history[-1]["content"].startswith("**MODALITA' INSERIMENTO DATI SOCIETARI**"): | |
#print("SONO QUI") | |
label_buttons = ["Partita IVA", "Codice Fiscale"] | |
return ( | |
gr.update(visible=False), | |
gr.update(visible=True, value=label_buttons[0]), | |
gr.update(visible=True, value=label_buttons[1]), | |
gr.update(visible=False) | |
) | |
return ( | |
gr.update(visible=True), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=True) | |
) | |
def button_clicked(option, history): | |
"""Handles button clicks and updates chat history.""" | |
global tool_name, tool_args | |
print(f"Button clicked: {option}") | |
if (option == "Procedi"): | |
history.append({"role": "user", "content": option}) | |
if tool_name and tool_args: | |
selected_tool = tool_mapping.get(tool_name) | |
if selected_tool: | |
tool_output = selected_tool.invoke(tool_args) | |
history.append({"role": "assistant", "content": tool_output}) | |
else: | |
history.append({"role": "user", "content": "Operazione annullata"}) | |
elif option == "Annulla": | |
history.append({"role": "user", "content": option}) | |
llm_message = ( | |
"**OPERAZIONE ANNULLATA**\n" | |
"Ho annullato l'operazione corrente.\n" | |
"Dobbiamo modificare alcuni parametri oppure passare a una nuova operazione.\n" | |
) | |
history = chatbot_response(llm_message, history) | |
elif option == "Persona Fisica": | |
llm_message = "Persona Fisica" | |
history = chatbot_response(llm_message, history) | |
elif option == "Società": | |
llm_message = "Società" | |
history = chatbot_response(llm_message, history) | |
elif option == "Partita IVA": | |
llm_message = "Partita IVA" | |
history = chatbot_response(llm_message, history) | |
elif option == "Codice Fiscale": | |
llm_message = "Codice Fiscale" | |
history = chatbot_response(llm_message, history) | |
tool_name = "" | |
tool_args = "" | |
return history | |
# Authentication function | |
def authenticate(username, password): | |
if username in credentials and credentials[username] == password: | |
print("🔑 Login successful!") | |
return gr.update(visible=False), gr.update(visible=True), gr.update(value="", visible=False) # Hide login, show chatbot, clear error | |
else: | |
print("❌ Incorrect username or password") | |
return gr.update(visible=True), gr.update(visible=False), gr.update(value="❌ Incorrect username or password", visible=True) # Show error | |
with gr.Blocks() as demo: | |
# 🔒 Login Section (Initially Visible) | |
with gr.Column(visible=True) as login_section: | |
gr.Markdown("### 🔒 Login Required") | |
username_input = gr.Textbox(label="Username") | |
password_input = gr.Textbox(label="Password", type="password") | |
login_button = gr.Button("Login") | |
error_message = gr.Text("", visible=False) | |
# 🧠 Chatbot Section (Initially Hidden) | |
with gr.Column(visible=False) as chatbot_section: | |
chatbot = gr.Chatbot( | |
label="Assistente Forfè", | |
type="messages" | |
) | |
user_input = gr.Textbox(label="Utente",placeholder="Cosa vuoi chiedere al tuo assistente Forfè?") | |
with gr.Row(): | |
btn1 = gr.Button("", visible=False) | |
btn2 = gr.Button("", visible=False) | |
send_btn = gr.Button("Invia") | |
# When user submits text | |
user_input.submit(chatbot_response, [user_input, chatbot], chatbot) \ | |
.then(reset_textbox, None, user_input) \ | |
.then(show_or_hide_buttons, chatbot, [user_input, btn1, btn2, send_btn]) | |
#.then(show_buttons, chatbot, [user_input, btn1, btn2]) | |
# When user clicks send | |
send_btn.click(chatbot_response, [user_input, chatbot], chatbot) \ | |
.then(reset_textbox, None, user_input) \ | |
.then(show_or_hide_buttons, chatbot, [user_input, btn1, btn2, send_btn]) | |
#.then(show_buttons, chatbot, [user_input, btn1, btn2]) | |
# Button clicks: Show textbox, hide buttons | |
btn1.click(lambda h: button_clicked(label_buttons[0], h), chatbot, chatbot) \ | |
.then(show_or_hide_buttons, chatbot, [user_input, btn1, btn2, send_btn]) | |
#.then(hide_buttons, None, [user_input, btn1, btn2]) | |
btn2.click(lambda h: button_clicked(label_buttons[1], h), chatbot, chatbot) \ | |
.then(show_or_hide_buttons, chatbot, [user_input, btn1, btn2, send_btn]) | |
#.then(hide_buttons, None, [user_input, btn1, btn2]) | |
# 🔑 Login Button Action (Now updates visibility correctly) | |
login_button.click( | |
authenticate, | |
[username_input, password_input], | |
[login_section, chatbot_section, error_message] | |
) | |
demo.launch( | |
debug=True, | |
#share=True | |
) |