Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from chat_client import chat | |
| import time | |
| import os | |
| from dotenv import load_dotenv | |
| from sentence_transformers import SentenceTransformer | |
| import requests | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| import json | |
| load_dotenv() | |
| URL_APP_SCRIPT = os.getenv('URL_APP_SCRIPT') | |
| URL_PROMPT = URL_APP_SCRIPT + '?IdFoglio=1cLw9q70BsPmxMBj9PIzgXtq6sm3X-GVBVnOB5wE8jr8' | |
| URL_DOCUMENTI = URL_APP_SCRIPT + '?IdSecondoFoglio=1cLw9q70BsPmxMBj9PIzgXtq6sm3X-GVBVnOB5wE8jr8' | |
| SYSTEM_PROMPT = ["Sei BonsiAI e mi aiuterai nelle mie richieste (Parla in ITALIANO)", "Esatto, sono BonsiAI. Di cosa hai bisogno?"] | |
| CHAT_BOTS = {"Mixtral 8x7B v0.1" :"mistralai/Mixtral-8x7B-Instruct-v0.1"} | |
| option_personalizzata = {'Personalizzata': {'systemRole': 'Tu sei BONSI AI, il mio assistente personale della scuola superiore del Bonsignori. Aiutami in base alle mie esigenze', | |
| 'systemStyle': 'Firmati sempre come BONSI AI. (scrivi in italiano)', | |
| 'instruction': '', | |
| 'tipo': '', | |
| 'RAG': False} | |
| } | |
| # ----------------------------------------------------------- Interfaccia -------------------------------------------------------------------- | |
| st.set_page_config(page_title="Bonsi AI", page_icon="🏫") | |
| def init_state() : | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| if "temp" not in st.session_state: | |
| st.session_state.temp = 0.8 | |
| if "history" not in st.session_state: | |
| st.session_state.history = [SYSTEM_PROMPT] | |
| if "top_k" not in st.session_state: | |
| st.session_state.top_k = 5 | |
| if "repetion_penalty" not in st.session_state : | |
| st.session_state.repetion_penalty = 1 | |
| if "chat_bot" not in st.session_state : | |
| st.session_state.chat_bot = "Mixtral 8x7B v0.1" | |
| if 'loaded_data' not in st.session_state: | |
| st.session_state.loaded_data = False | |
| if "split" not in st.session_state: | |
| st.session_state.split = 30 | |
| if "enable_history" not in st.session_state: | |
| st.session_state.enable_history = False | |
| if "numero_generazioni" not in st.session_state: | |
| st.session_state.numero_generazioni = 1 | |
| if not st.session_state.loaded_data: | |
| with st.status("Caricamento in corso...", expanded=True) as status: | |
| st.write("Inizializzazione Ambiente") | |
| time.sleep(1) | |
| st.write("Inizializzazione Prompt") | |
| options = requests.get(URL_PROMPT).json() | |
| st.write("Inizializzazione Documenti") | |
| documenti = requests.get(URL_DOCUMENTI).json() | |
| st.session_state.options = {**option_personalizzata, **options} | |
| st.session_state.documenti = documenti | |
| st.session_state.loaded_data = True | |
| status.update(label="Caricamento Completato", state="complete", expanded=False) | |
| def sidebar(): | |
| def retrieval_settings() : | |
| st.markdown("# Impostazioni Prompt") | |
| st.session_state.selected_option_key = st.selectbox('Azione', list(st.session_state.options.keys())) | |
| st.session_state.selected_option = st.session_state.options.get(st.session_state.selected_option_key, {}) | |
| if st.session_state.options.get(st.session_state.selected_option_key, {})["tipo"]=='DOCUMENTO': | |
| st.session_state.selected_documento_key = st.selectbox('Documento', list(st.session_state.documenti.keys())) | |
| st.session_state.selected_documento = st.session_state.documenti.get(st.session_state.selected_documento_key, {}) | |
| st.session_state.instruction = st.session_state.selected_documento.get('instruction', '')['Testo'] | |
| st.session_state.split = st.slider(label="Pagine Suddivisione", min_value=1, max_value=30, value=30, help='Se il documento ha 100 pagine e suddivido per 20 pagine elaborerà la risposta 5 volte. Più alto è il numero e meno volte elaborerà ma la risposta sarà più imprecisa') | |
| else: | |
| st.session_state.instruction = st.session_state.selected_option.get('instruction', '') | |
| st.session_state.systemRole = st.session_state.selected_option.get('systemRole', '') | |
| st.session_state.systemRole = st.text_area("Descrizione", st.session_state.systemRole, help='Ruolo del chatbot e descrizione dell\'azione che deve svolgere') | |
| st.session_state.systemStyle = st.session_state.selected_option.get('systemStyle', '') | |
| st.session_state.systemStyle = st.text_area("Stile", st.session_state.systemStyle, help='Descrizione dello stile utilizzato per generare il testo') | |
| st.session_state.rag_enabled = st.session_state.selected_option.get('tipo', '')=='RAG' | |
| if st.session_state.selected_option_key == 'Decreti': | |
| st.session_state.top_k = st.slider(label="Documenti da ricercare", min_value=1, max_value=20, value=4, disabled=not st.session_state.rag_enabled) | |
| st.session_state.decreti_escludere = st.multiselect( | |
| 'Decreti da escludere', | |
| ['23.10.2 destinazione risorse residue pnrr DGR 1051-2023_Destinazione risorse PNRR Duale.pdf', '23.10.25 accompagnatoria Circolare Inail assicurazione.pdf', '23.10.26 circolare Inail assicurazione.pdf', '23.10.3 FAQ in attesa di avviso_.pdf', '23.11.2 avviso 24_24 Decreto 17106-2023 Approvazione Avviso IeFP 2023-2024.pdf', '23.5.15 decreto linee inclusione x enti locali.pdf', '23.6.21 Circolare+esplicativa+DGR+312-2023.pdf', '23.7.3 1° Decreto R.L. 23_24 .pdf', '23.9 Regolamento_prevenzione_bullismo_e_cyberbullismo__Centro_Bonsignori.pdf', '23.9.1 FAQ inizio anno formativo.pdf', '23.9.15 DECRETO VERIFICHE AMMINISTR 15-09-23.pdf', '23.9.4 modifica decreto GRS.pdf', '23.9.8 Budget 23_24.pdf', '24.10.2022 DECRETO loghi N.15176.pdf', 'ALLEGATO C_Scheda Supporti al funzionamento.pdf', 'ALLEGATO_ B_ Linee Guida.pdf', 'ALLEGATO_A1_PEI_INFANZIA.pdf', 'ALLEGATO_A2_PEI_PRIMARIA.pdf', 'ALLEGATO_A3_PEI_SEC_1_GRADO.pdf', 'ALLEGATO_A4_PEI_SEC_2_GRADO.pdf', 'ALLEGATO_C_1_Tabella_Fabbisogni.pdf', 'Brand+Guidelines+FSE+.pdf', 'Decreto 20797 del 22-12-2023_Aggiornamento budget PNRR.pdf', 'Decreto 20874 del 29-12-2023 Avviso IeFP PNRR 2023-2024_file unico.pdf'], | |
| []) | |
| st.markdown("---") | |
| def model_settings() : | |
| st.markdown("# Impostazioni Modello") | |
| st.session_state.chat_bot = st.sidebar.radio('Modello:', [key for key, value in CHAT_BOTS.items() ]) | |
| st.session_state.numero_generazioni = st.slider(label="Generazioni", min_value = 1, max_value=10, value=1) | |
| st.session_state.temp = st.slider(label="Creatività", min_value=0.0, max_value=1.0, step=0.1, value=0.9) | |
| st.session_state.max_tokens = st.slider(label="Lunghezza Output", min_value = 64, max_value=2048, step= 32, value=1024) | |
| st.session_state.enable_history = st.toggle("Storico Messaggi", value=False) | |
| with st.sidebar: | |
| retrieval_settings() | |
| model_settings() | |
| st.markdown("""> **Creato da [Matteo Bergamelli] 🔗**""") | |
| def header() : | |
| st.title("Bonsi AI") | |
| with st.expander("Cos'è BonsiAI?"): | |
| st.info("""BonsiAI Chat è un ChatBot personalizzato basato su un database vettoriale, funziona secondo il principio della Generazione potenziata da Recupero (RAG). | |
| La sua funzione principale ruota attorno alla gestione di un ampio repository di documenti BonsiAI e fornisce agli utenti risposte in linea con le loro domande. | |
| Questo approccio garantisce una risposta più precisa sulla base della richiesta degli utenti.""") | |
| def chat_box() : | |
| for message in st.session_state.messages: | |
| with st.chat_message(message["role"]): | |
| st.markdown(message["content"]) | |
| init_state() | |
| sidebar() | |
| header() | |
| chat_box() | |
| # ----------------------------------------------------------- Funzioni Varie -------------------------------------------------------------------- | |
| def formattaPrompt(prompt, systemRole, systemStyle, instruction): | |
| input_text = f''' | |
| {{ | |
| "input": {{ | |
| "role": "system", | |
| "content": "{systemRole}", | |
| "style": "{systemStyle}" | |
| }}, | |
| "messages": [ | |
| {{ | |
| "role": "instructions", | |
| "content": "{instruction} ({systemStyle})" | |
| }}, | |
| {{ | |
| "role": "user", | |
| "content": "{prompt}" | |
| }} | |
| ] | |
| }} | |
| ''' | |
| return input_text | |
| def gen_augmented_prompt(prompt, top_k) : | |
| links = "" | |
| embedding = HuggingFaceEmbeddings(model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") | |
| db = Chroma(persist_directory='./DB_Decreti', embedding_function=embedding) | |
| docs = db.similarity_search(prompt, k=top_k) | |
| links = [] | |
| context = '' | |
| NomeCartellaOriginariaDB = 'Documenti_2\\' | |
| for doc in docs: | |
| testo = doc.page_content.replace('\n', ' ') | |
| context += testo + '\n\n\n' | |
| reference = doc.metadata["source"].replace(NomeCartellaOriginariaDB, '') + ' (Pag. ' + str(doc.metadata["page"]) + ')' | |
| links.append((reference, testo)) | |
| generated_prompt = f""" | |
| A PARTIRE DAL SEGUENTE CONTESTO: {docs}, | |
| ---- | |
| RISPONDI ALLA SEGUENTE RICHIESTA: {prompt} | |
| """ | |
| return context, links | |
| def generate_chat_stream(prompt, prompt_originale, inst) : | |
| links = [] | |
| if st.session_state.rag_enabled : | |
| with st.spinner("Ricerca nei Decreti...."): | |
| time.sleep(1) | |
| st.session_state.instruction, links = gen_augmented_prompt(prompt=prompt_originale, top_k=st.session_state.top_k) | |
| with st.spinner("Generazione in corso...") : | |
| time.sleep(1) | |
| chat_stream = chat(prompt, st.session_state.history,chat_client=CHAT_BOTS[st.session_state.chat_bot] , | |
| temperature=st.session_state.temp, max_new_tokens=st.session_state.max_tokens) | |
| return chat_stream, links, inst | |
| def stream_handler(chat_stream, placeholder) : | |
| full_response = '' | |
| for chunk in chat_stream : | |
| if chunk.token.text!='</s>' : | |
| full_response += chunk.token.text | |
| placeholder.markdown(full_response + "▌") | |
| placeholder.markdown(full_response) | |
| return full_response | |
| def show_source(links) : | |
| with st.expander("Mostra fonti") : | |
| for link in links: | |
| reference, testo = link | |
| st.info('##### ' + reference.replace('_', ' ') + '\n\n'+ testo) | |
| def split_text(text, chunk_size): | |
| testo_suddiviso = [] | |
| if text == '': | |
| text = ' ' | |
| if chunk_size < 100: | |
| chunk_size = 60000 | |
| for i in range(0, len(text), chunk_size): | |
| testo_suddiviso.append(text[i:i+chunk_size]) | |
| return testo_suddiviso | |
| # -------------------------------------------------------------- Gestione Chat ----------------------------------------------------------------------- | |
| if prompt := st.chat_input("Chatta con BonsiAI..."): | |
| instruction_suddivise = split_text(st.session_state.instruction, st.session_state.split*2000) | |
| prompt_originale = prompt | |
| ruolo_originale = st.session_state.systemRole | |
| ruoli_divisi = ruolo_originale.split("&&") | |
| parte = 1 | |
| i = 1 | |
| risposta_completa = '' | |
| for ruolo_singolo in ruoli_divisi: | |
| for instruction_singola in instruction_suddivise: | |
| for numgen in range(1, st.session_state.numero_generazioni+1): | |
| prompt = formattaPrompt(prompt_originale, ruolo_singolo, st.session_state.systemStyle, instruction_singola) | |
| if i==1: | |
| st.chat_message("user").markdown(prompt_originale + (': Parte ' + str(parte) if i > 1 else '')) | |
| i+=1 | |
| st.session_state.messages.append({"role": "user", "content": prompt_originale}) | |
| chat_stream, links, inst = generate_chat_stream(prompt, prompt_originale, instruction_singola) | |
| with st.chat_message("assistant"): | |
| placeholder = st.empty() | |
| full_response = stream_handler(chat_stream, placeholder) | |
| if st.session_state.rag_enabled: | |
| show_source(links) | |
| if st.session_state.options.get(st.session_state.selected_option_key, {})["tipo"]=='DOCUMENTO': | |
| with st.expander("Mostra Documento") : | |
| st.info('##### ' + st.session_state.selected_documento_key + ' (Parte ' + str(parte) +')'+ '\n\n\n' + inst) | |
| parte+=1 | |
| st.session_state.messages.append({"role": "assistant", "content": full_response}) | |
| risposta_completa = risposta_completa + '\n' + full_response | |
| if st.session_state.enable_history: | |
| st.session_state.history.append([prompt, full_response]) | |
| st.success('Generazione Completata') | |
| payload = {"domanda": prompt_originale, "risposta": risposta_completa} | |
| json_payload = json.dumps(payload) | |
| response = requests.post(URL_APP_SCRIPT, data=json_payload) |