from huggingface_hub import InferenceClient import gradio as gr import re import random import string import firebase_admin from firebase_admin import credentials, firestore from datetime import datetime # Initialiser Firebase cred = credentials.Certificate("./montaillou-bdd-5b8a59219187.json") firebase_admin.initialize_app(cred) # Obtenir une référence à la base de données Firestore db = firestore.client() def save_message(user_id, message): # Enregistrer le message dans Firestore doc_ref = db.collection("conversations").document(user_id) doc_ref.set( { "messages": firestore.ArrayUnion( [{"message": message, "timestamp": datetime.now()}] ) } ) def load_history(user_id): # Charger l'historique de conversation de l'utilisateur doc_ref = db.collection("conversations").document(user_id) doc = doc_ref.get() if doc.exists: messages = doc.to_dict()["messages"] return messages else: return [] client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") # Name of the chat chat_name = "Tavernier 🍺" # Variable globale pour compter les itérations global_iteration = 0 # Variable globale pour stocker le contexte global_context = { "static_context": "Tu es le gérant d'une taverne dans le village de Montaillou au 14ème siècle dans l'univers des cathares dans les Pyrénées. Tu ne parles qu'en français. Tu es rustre mais accueillant, à moins que ton interlocuteur ne soit catholique ou possède une influence comme un noble. Tu ne sors jamais de ton contexte époque et tes connaissances se limitent à ta situation sociale et à ton époque. Tu dois être immersif et cohérent pour la personne qui va te parler. Prends toujours en compte l'historique de la conversation. Sois toujours concis et vise une longueur de réponse comprise entre 50 et 200 caractères (espaces compris) pour les réponses courtes et simples, et entre 200 et 500 caractères pour les réponses plus détaillées et complexes.", "dynamic_context": "", "sensitive_info_filter": [ "contexte caché", "hidden_context", ], } # Créer une interface avec un champ de texte pour le contexte caché context_input = gr.Textbox( label="Contexte caché", value=global_context["static_context"] ) def update_dynamic_context(user_input): global global_context keywords = { "meutre": "Un meutre a été mentionné. Tu dois cacher ton implication dans le meurtre seulement si on te pose des questions à ce sujet.", } for keyword in keywords: if keyword in user_input.lower(): global_context["dynamic_context"] += f" {keywords[keyword]}" break def filter_sensitive_info(text): for word in global_context["sensitive_info_filter"]: if word in text.lower(): return text.replace(word, "[REDACTED]") return text def remove_length_info(response_text): pattern = r"\(\d+ caractères\)" return re.sub(pattern, "", response_text) def format_prompt(message, history, user_id=""): max_history_length = 10 # Limiter la longueur de l'historique history = history[-max_history_length:] prompt = "" for user_prompt, bot_response in history: prompt += f"[USER] {user_prompt} [/USER]" if ( history and isinstance(history[-1], dict) and user_prompt == history[-1].get("0") ): # N'ajouter les balises [BOT] qu'autour de la dernière réponse du bot prompt += f" [BOT]{filter_sensitive_info(bot_response)}[/BOT] " else: prompt += f" {filter_sensitive_info(bot_response)} " prompt += f"[INST][USER] {message} [/USER][/INST]" # Ajouter le contexte statique et dynamique à chaque fois que l'utilisateur pose une question prompt += f"[INST] {global_context['static_context']} {global_context['dynamic_context']} [/INST]" return prompt def get_random_string(length): # choose from all lowercase letter letters = string.ascii_lowercase return "".join(random.choice(letters) for i in range(length)) def generate( prompt, history, request: gr.Request, temperature=0.2, max_new_tokens=256, top_p=0.95, repetition_penalty=1.0, ): temperature = float(temperature) if temperature < 1e-2: temperature = 1e-2 top_p = float(top_p) generate_kwargs = dict( temperature=temperature, max_new_tokens=max_new_tokens, top_p=top_p, repetition_penalty=repetition_penalty, do_sample=True, seed=42, ) if request: # Récupérer l'identifiant unique de l'utilisateur à partir de la requête HTML et son historique de conversation user_id = dict(request.query_params).get("user_id") history = load_history(user_id) else: user_id = "" update_dynamic_context(prompt) formatted_prompt = format_prompt(prompt, history, user_id) global global_iteration global_iteration += 1 print(f"\n\nIteration {global_iteration}/{user_id}: {formatted_prompt}") stream = client.text_generation( formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False, ) output = "" bot_response = "" for response in stream: response_text = filter_sensitive_info(response.token.text) response_text = remove_length_info(response_text) output += response_text bot_response += response_text yield output if user_id != "": # Sauvegarder le message de l'utilisateur et la réponse complète du bot dans Firestore save_message(user_id, prompt) save_message(user_id, bot_response) return output mychatbot = gr.Chatbot( avatar_images=["./berger.jpg", "./tavernier.jpg"], bubble_full_width=False, show_label=False, ) def chatbot_interface(request: gr.Request): chatbot_interface = gr.ChatInterface( fn=generate, chatbot=mychatbot, title=chat_name, retry_btn=None, undo_btn=None, examples=[ [ "Bonjour", "Salutations, voyageur ! Bienvenue dans ma taverne. Que puis-je faire pour toi ?", ] ], cache_examples=True, ) return chatbot_interface def main(request: gr.Request): user_chatbot_interface = chatbot_interface(request) user_chatbot_interface.queue().launch() if __name__ == "__main__": main(None) # def reset_history(chatbot): # chatbot.history = [] # return [] # def update_context(context): # global global_context # global_context["static_context"] = context # global mychatbot # mychatbot.history = reset_history(mychatbot) # global global_iteration # global_iteration = 0 # return None