Spaces:
Sleeping
Sleeping
import streamlit as st | |
from chat_client import chat | |
import time | |
import os | |
from dotenv import load_dotenv | |
from sentence_transformers import SentenceTransformer | |
import requests | |
from langchain_community.vectorstores import Chroma | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
import json | |
from audio_recorder_streamlit import audio_recorder | |
import speech_recognition as sr | |
from googlesearch import search | |
from bs4 import BeautifulSoup | |
load_dotenv() | |
URL_APP_SCRIPT = os.getenv('URL_APP_SCRIPT') | |
URL_PROMPT = URL_APP_SCRIPT + '?IdFoglio=1cLw9q70BsPmxMBj9PIzgXtq6sm3X-GVBVnOB5wE8jr8' | |
URL_DOCUMENTI = URL_APP_SCRIPT + '?IdSecondoFoglio=1cLw9q70BsPmxMBj9PIzgXtq6sm3X-GVBVnOB5wE8jr8' | |
SYSTEM_PROMPT = ["Sei BonsiAI e mi aiuterai nelle mie richieste (Parla in ITALIANO)", "Esatto, sono BonsiAI. Di cosa hai bisogno?"] | |
CHAT_BOTS = {"Mixtral 8x7B v0.1" :"mistralai/Mixtral-8x7B-Instruct-v0.1"} | |
option_personalizzata = {'Personalizzata': {'systemRole': 'Tu sei BONSI AI, il mio assistente personale della scuola superiore del Bonsignori. Aiutami in base alle mie esigenze', | |
'systemStyle': 'Firmati sempre come BONSI AI. (scrivi in italiano)', | |
'instruction': '', | |
'tipo': '', | |
'RAG': False} | |
} | |
# ----------------------------------------------------------- Interfaccia -------------------------------------------------------------------- | |
st.set_page_config(page_title="Bonsi A.I.", page_icon="🏫") | |
def init_state() : | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
if "temp" not in st.session_state: | |
st.session_state.temp = 0.8 | |
if "history" not in st.session_state: | |
st.session_state.history = [SYSTEM_PROMPT] | |
if "top_k" not in st.session_state: | |
st.session_state.top_k = 5 | |
if "repetion_penalty" not in st.session_state : | |
st.session_state.repetion_penalty = 1 | |
if "chat_bot" not in st.session_state : | |
st.session_state.chat_bot = "Mixtral 8x7B v0.1" | |
if 'loaded_data' not in st.session_state: | |
st.session_state.loaded_data = False | |
if "split" not in st.session_state: | |
st.session_state.split = 30 | |
if "enable_history" not in st.session_state: | |
st.session_state.enable_history = True | |
if "audio_bytes" not in st.session_state: | |
st.session_state.audio_bytes = False | |
if "cerca_online" not in st.session_state: | |
st.session_state.cerca_online = False | |
if "numero_siti" not in st.session_state: | |
st.session_state.numero_siti = 3 | |
if "numero_generazioni" not in st.session_state: | |
st.session_state.numero_generazioni = 1 | |
if "tbs_options" not in st.session_state: | |
st.session_state.tbs_options = { | |
"Sempre": "0", | |
"Ultimo anno": "qdr:y", | |
"Ultimo mese": "qdr:m", | |
"Ultima settimana": "qdr:w", | |
"Ultimo giorno": "qdr:d" | |
} | |
if not st.session_state.loaded_data: | |
with st.status("Caricamento in corso...", expanded=True) as status: | |
st.write("Inizializzazione Ambiente") | |
time.sleep(1) | |
st.write("Inizializzazione Prompt") | |
options = requests.get(URL_PROMPT).json() | |
st.write("Inizializzazione Documenti") | |
documenti = requests.get(URL_DOCUMENTI).json() | |
st.session_state.options = {**option_personalizzata, **options} | |
st.session_state.documenti = documenti | |
st.session_state.loaded_data = True | |
status.update(label="Caricamento Completato", state="complete", expanded=False) | |
def sidebar(): | |
def retrieval_settings() : | |
st.markdown("# Impostazioni Prompt") | |
st.session_state.selected_option_key = st.selectbox('Azione', list(st.session_state.options.keys())) | |
st.session_state.selected_option = st.session_state.options.get(st.session_state.selected_option_key, {}) | |
if st.session_state.options.get(st.session_state.selected_option_key, {})["tipo"]=='DOCUMENTO': | |
st.session_state.selected_documento_key = st.selectbox('Documento', list(st.session_state.documenti.keys())) | |
st.session_state.selected_documento = st.session_state.documenti.get(st.session_state.selected_documento_key, {}) | |
st.session_state.instruction = st.session_state.selected_documento.get('instruction', '')['Testo'] | |
st.session_state.split = st.slider(label="Pagine Suddivisione", min_value=1, max_value=30, value=30, help='Se il documento ha 100 pagine e suddivido per 20 pagine elaborerà la risposta 5 volte. Più alto è il numero e meno volte elaborerà ma la risposta sarà più imprecisa') | |
else: | |
st.session_state.instruction = st.session_state.selected_option.get('instruction', '') | |
st.session_state.systemRole = st.session_state.selected_option.get('systemRole', '') | |
st.session_state.systemRole = st.text_area("Descrizione", st.session_state.systemRole, help='Ruolo del chatbot e descrizione dell\'azione che deve svolgere') | |
st.session_state.systemStyle = st.session_state.selected_option.get('systemStyle', '') | |
st.session_state.systemStyle = st.text_area("Stile", st.session_state.systemStyle, help='Descrizione dello stile utilizzato per generare il testo') | |
st.session_state.rag_enabled = st.session_state.selected_option.get('tipo', '')=='RAG' | |
if st.session_state.selected_option_key == 'Decreti': | |
st.session_state.top_k = st.slider(label="Documenti da ricercare", min_value=1, max_value=20, value=4, disabled=not st.session_state.rag_enabled) | |
st.session_state.decreti_escludere = st.multiselect( | |
'Decreti da escludere', | |
['23.10.2 destinazione risorse residue pnrr DGR 1051-2023_Destinazione risorse PNRR Duale.pdf', '23.10.25 accompagnatoria Circolare Inail assicurazione.pdf', '23.10.26 circolare Inail assicurazione.pdf', '23.10.3 FAQ in attesa di avviso_.pdf', '23.11.2 avviso 24_24 Decreto 17106-2023 Approvazione Avviso IeFP 2023-2024.pdf', '23.5.15 decreto linee inclusione x enti locali.pdf', '23.6.21 Circolare+esplicativa+DGR+312-2023.pdf', '23.7.3 1° Decreto R.L. 23_24 .pdf', '23.9 Regolamento_prevenzione_bullismo_e_cyberbullismo__Centro_Bonsignori.pdf', '23.9.1 FAQ inizio anno formativo.pdf', '23.9.15 DECRETO VERIFICHE AMMINISTR 15-09-23.pdf', '23.9.4 modifica decreto GRS.pdf', '23.9.8 Budget 23_24.pdf', '24.10.2022 DECRETO loghi N.15176.pdf', 'ALLEGATO C_Scheda Supporti al funzionamento.pdf', 'ALLEGATO_ B_ Linee Guida.pdf', 'ALLEGATO_A1_PEI_INFANZIA.pdf', 'ALLEGATO_A2_PEI_PRIMARIA.pdf', 'ALLEGATO_A3_PEI_SEC_1_GRADO.pdf', 'ALLEGATO_A4_PEI_SEC_2_GRADO.pdf', 'ALLEGATO_C_1_Tabella_Fabbisogni.pdf', 'Brand+Guidelines+FSE+.pdf', 'Decreto 20797 del 22-12-2023_Aggiornamento budget PNRR.pdf', 'Decreto 20874 del 29-12-2023 Avviso IeFP PNRR 2023-2024_file unico.pdf'], | |
[]) | |
st.markdown("---") | |
st.markdown("# Ricerca Online") | |
st.session_state.cerca_online = st.toggle("Attivata", value=False) | |
st.session_state.selected_tbs = st.selectbox("Periodo:", list(st.session_state.tbs_options.keys()), disabled=not st.session_state.cerca_online) | |
st.session_state.tbs_value = st.session_state.tbs_options[st.session_state.selected_tbs] | |
st.session_state.numero_siti = st.slider(label="Risultati", min_value = 1, max_value=20, value=3, disabled=not st.session_state.cerca_online) | |
#st.session_state.suddividi_ricerca = st.toggle("Attivata", value=False) | |
st.markdown("---") | |
def model_settings(): | |
st.markdown("# Impostazioni Modello") | |
st.session_state.chat_bot = st.sidebar.radio('Modello:', [key for key, value in CHAT_BOTS.items() ]) | |
st.session_state.numero_generazioni = st.slider(label="Generazioni", min_value = 1, max_value=10, value=1) | |
st.session_state.enable_history = st.toggle("Storico Messaggi", value=True) | |
st.session_state.temp = st.slider(label="Creatività", min_value=0.0, max_value=1.0, step=0.1, value=0.9) | |
st.session_state.max_tokens = st.slider(label="Lunghezza Output", min_value = 2, max_value=2048, step= 32, value=1024) | |
with st.sidebar: | |
retrieval_settings() | |
model_settings() | |
st.markdown("""> **Creato da Matteo Bergamelli **""") | |
def audioRec(): | |
st.session_state.audio_bytes = audio_recorder(text='', icon_size="3x") | |
if st.session_state.audio_bytes: | |
with open("./AUDIO.wav", "wb") as file: | |
file.write(st.session_state.audio_bytes) | |
wav = sr.AudioFile("./AUDIO.wav") | |
with wav as source: | |
recognizer_instance = sr.Recognizer() | |
recognizer_instance.pause_threshold = 3.0 | |
audio = recognizer_instance.listen(source) | |
print("Ok! sto ora elaborando il messaggio!") | |
try: | |
text = recognizer_instance.recognize_google(audio, language="it-IT") | |
print(text) | |
js = f""" | |
<script> | |
var chatInput = parent.document.querySelector('textarea[data-testid="stChatInput"]'); | |
var nativeInputValueSetter = Object.getOwnPropertyDescriptor(window.HTMLTextAreaElement.prototype, "value").set; | |
nativeInputValueSetter.call(chatInput, "{text}"); | |
var event = new Event('input', {{ bubbles: true}}); | |
chatInput.dispatchEvent(event); | |
var sendChat = parent.document.querySelector('[data-testid="stChatInputSubmitButton"]'); | |
sendChat.click(); | |
var x = parent.document.querySelector('[data-testid="stIFrame"]'); | |
x.style.display = "none"; | |
</script> | |
""" | |
st.components.v1.html(js) | |
except Exception as e: | |
print(e) | |
def header() : | |
col1, colx, col2 = st.columns([2, 1, 25]) | |
with col2: | |
st.title("Bonsi A.I.", anchor=False) | |
with col1: | |
audioRec() | |
with st.expander("Cos'è BonsiAI?"): | |
st.info("""BonsiAI Chat è un ChatBot personalizzato basato su un database vettoriale, funziona secondo il principio della Generazione potenziata da Recupero (RAG). | |
La sua funzione principale ruota attorno alla gestione di un ampio repository di documenti BonsiAI e fornisce agli utenti risposte in linea con le loro domande. | |
Questo approccio garantisce una risposta più precisa sulla base della richiesta degli utenti.""") | |
def chat_box() : | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"]) | |
def formattaPrompt(prompt, systemRole, systemStyle, instruction): | |
if st.session_state.cerca_online: | |
systemRole += '. Ti ho fornito una lista di materiali nelle instruction. Devi rispondere sulla base delle informazioni fonrnite!' | |
input_text = f''' | |
{{ | |
"input": {{ | |
"role": "system", | |
"content": "{systemRole}", | |
"style": "{systemStyle} " | |
}}, | |
"messages": [ | |
{{ | |
"role": "instructions", | |
"content": "{instruction} ({systemStyle})" | |
}}, | |
{{ | |
"role": "user", | |
"content": "{prompt}" | |
}} | |
] | |
}} | |
''' | |
return input_text | |
def gen_augmented_prompt(prompt, top_k) : | |
links = "" | |
embedding = HuggingFaceEmbeddings(model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") | |
db = Chroma(persist_directory='./DB_Decreti', embedding_function=embedding) | |
docs = db.similarity_search(prompt, k=top_k) | |
links = [] | |
context = '' | |
NomeCartellaOriginariaDB = 'Documenti_2\\' | |
for doc in docs: | |
testo = doc.page_content.replace('\n', ' ') | |
context += testo + '\n\n\n' | |
reference = doc.metadata["source"].replace(NomeCartellaOriginariaDB, '') + ' (Pag. ' + str(doc.metadata["page"]) + ')' | |
links.append((reference, testo)) | |
return context, links | |
def get_search_results(query, top_k): | |
results = [] | |
for url in search(query, num=top_k, stop=top_k, tbs=st.session_state.tbs_value): | |
try: | |
response = requests.get(url) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
title = soup.title.string if soup.title else "N/A" | |
description = soup.find('meta', attrs={'name': 'description'})['content'] if soup.find('meta', attrs={'name': 'description'}) else "N/A" | |
body_content = soup.find('body').get_text() if soup.find('body') else "N/A" | |
results.append({'title': title, 'description': description, 'url': url, 'body': body_content}) | |
except Exception as e: | |
print(f"Error fetching data from {url}: {e}") | |
continue | |
return results | |
def gen_online_prompt(prompt, top_k) : | |
links = [] | |
context = '' | |
results = get_search_results(prompt, top_k) | |
for i, result in enumerate(results, start=1): | |
context += result['title'] + '\n' + result['description'] + '\n' + '\n\n' + result['body'].replace('\n','.') + '\n\n------------------------------------------------------------' | |
links.append((str(i) + '. ' + result['title'], result['description'] + '\n\n' + result['url'])) | |
return context, links | |
def generate_chat_stream(prompt) : | |
chat_stream = chat(prompt, st.session_state.history,chat_client=CHAT_BOTS[st.session_state.chat_bot] , | |
temperature=st.session_state.temp, max_new_tokens=st.session_state.max_tokens) | |
return chat_stream | |
def inserisci_istruzioni(prompt_originale): | |
links = [] | |
if st.session_state.cerca_online: | |
with st.spinner("Ricerca Online...."): | |
time.sleep(1) | |
st.session_state.instruction, links = gen_online_prompt(prompt=prompt_originale, top_k=st.session_state.numero_siti) | |
if st.session_state.rag_enabled : | |
with st.spinner("Ricerca nei Decreti...."): | |
time.sleep(1) | |
st.session_state.instruction, links = gen_augmented_prompt(prompt=prompt_originale, top_k=st.session_state.top_k) | |
with st.spinner("Generazione in corso...") : | |
time.sleep(1) | |
#st.session_state.instruction = instruction_originale + '\n----------------------------------------------\n' + st.session_state.instruction | |
return links | |
def stream_handler(chat_stream, placeholder) : | |
full_response = '' | |
for chunk in chat_stream : | |
if chunk.token.text!='</s>' : | |
full_response += chunk.token.text | |
placeholder.markdown(full_response + "▌") | |
placeholder.markdown(full_response) | |
return full_response | |
def show_source(links) : | |
with st.expander("Mostra fonti") : | |
for link in links: | |
reference, testo = link | |
st.info('##### ' + reference.replace('_', ' ') + '\n\n'+ testo) | |
init_state() | |
sidebar() | |
header() | |
chat_box() | |
def split_text(text, chunk_size): | |
testo_suddiviso = [] | |
if text == '': | |
text = ' ' | |
if chunk_size < 100: | |
chunk_size = 60000 | |
for i in range(0, len(text), chunk_size): | |
testo_suddiviso.append(text[i:i+chunk_size]) | |
return testo_suddiviso | |
if prompt := st.chat_input("Chatta con BonsiAI..."): | |
prompt_originale = prompt | |
links = inserisci_istruzioni(prompt_originale) | |
instruction_suddivise = split_text(st.session_state.instruction, st.session_state.split*2000) | |
ruolo_originale = st.session_state.systemRole | |
ruoli_divisi = ruolo_originale.split("&&") | |
parte=1 | |
i=1 | |
risposta_completa = '' | |
for ruolo_singolo in ruoli_divisi: | |
for instruction_singola in instruction_suddivise: | |
for numgen in range(1, st.session_state.numero_generazioni+1): | |
if i==1: | |
st.chat_message("user").markdown(prompt_originale + (': Parte ' + str(parte) if i > 1 else '')) | |
i+=1 | |
prompt = formattaPrompt(prompt_originale, ruolo_singolo, st.session_state.systemStyle, instruction_singola) | |
st.session_state.messages.append({"role": "user", "content": prompt_originale}) | |
chat_stream = generate_chat_stream(prompt) | |
with st.chat_message("assistant"): | |
placeholder = st.empty() | |
full_response = stream_handler(chat_stream, placeholder) | |
if st.session_state.rag_enabled or st.session_state.cerca_online: | |
show_source(links) | |
if st.session_state.options.get(st.session_state.selected_option_key, {})["tipo"]=='DOCUMENTO': | |
with st.expander("Mostra Documento") : | |
st.info('##### ' + st.session_state.selected_documento_key + ' (Parte ' + str(parte) +')'+ '\n\n\n' + inst) | |
parte+=1 | |
st.session_state.messages.append({"role": "assistant", "content": full_response}) | |
risposta_completa = risposta_completa + '\n' + full_response | |
if st.session_state.enable_history: | |
st.session_state.history.append([prompt, full_response]) | |
else: | |
st.session_state.history.append(['', '']) | |
st.success('Generazione Completata') | |
payload = {"domanda": prompt_originale, "risposta": risposta_completa} | |
json_payload = json.dumps(payload) | |
response = requests.post(URL_APP_SCRIPT, data=json_payload) |