Spaces:
Runtime error
Runtime error
from huggingface_hub import InferenceClient | |
import gradio as gr | |
import re | |
import firebase_admin | |
from firebase_admin import credentials, firestore | |
from datetime import datetime | |
# Forcer le dark mode | |
js_func = """ | |
function refresh() { | |
const url = new URL(window.location); | |
if (url.searchParams.get('__theme') !== 'dark') { | |
url.searchParams.set('__theme', 'dark'); | |
window.location.href = url.href; | |
} | |
} | |
""" | |
# Initialiser Firebase | |
cred = credentials.Certificate("./montaillou-bdd-5b8a59219187.json") | |
firebase_admin.initialize_app(cred) | |
# Obtenir une référence à la base de données Firestore | |
db = firestore.client() | |
def save_message(user_id, message, role="user"): | |
# Enregistrer le message dans Firestore | |
doc_ref = db.collection("conversations-forgeron").document(user_id) | |
doc_ref.set( | |
{ | |
"messages": firestore.ArrayUnion( | |
[ | |
{ | |
"role": role, | |
"message": f"{message}", | |
"timestamp": datetime.now(), | |
} | |
] | |
) | |
}, | |
merge=True, | |
) | |
def load_history(user_id): | |
# Charger l'historique de conversation de l'utilisateur | |
doc_ref = db.collection("conversations-forgeron").document(user_id) | |
doc = doc_ref.get() | |
if doc.exists: | |
messages = doc.to_dict().get("messages", []) | |
return messages | |
else: | |
return [] | |
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") | |
# Name of the chat | |
chat_name = "Forgeron ⚒️" | |
# Variable globale pour compter les itérations | |
global_iteration = 0 | |
# Variable globale pour stocker le contexte | |
global_context = { | |
"static_context": "Tu es le forgeron du village de Montaillou, un homme fort et travailleur, vivant au 14ème siècle dans l'univers des cathares dans les Pyrénées. Tu ne parles qu'en français avec un langage direct et franc. Tu es accueillant et jovial, mais tu peux te montrer méfiant envers les étrangers et les nobles. Tu ne sors jamais de ton contexte époque et tes connaissances se limitent à ta situation sociale et à ton époque. Tu dois être immersif et cohérent pour la personne qui va te parler. Prends toujours en compte l'historique de la conversation. Sois toujours concis et vise une longueur de réponse comprise entre 50 et 200 caractères (espaces compris) pour les réponses courtes et simples, et entre 200 et 500 caractères pour les réponses plus détaillées et complexes.", | |
"sensitive_info_filter": ["contexte caché", "hidden_context", "[BOT]"], | |
} | |
def update_dynamic_context(user_id,user_input): | |
global global_context | |
keywords = { | |
"arme": "Tu as remarqué que le troubadour avait une dague cachée sous son manteau il y a quelques jours. Il pourrait être impliqué dans le meurtre, mais tu n'en es pas certain.", | |
"comportement": "Le troubadour s'est montré agité et nerveux ces derniers temps. Il pourrait cacher quelque chose.", | |
"lame": "Tu as trouvé une lame tachée de sang près de la forge récemment. Tu l'as cachée, mais tu pourrais donner des indices à son sujet.", | |
} | |
# Récupérer le contexte dynamique de l'utilisateur | |
doc_ref = db.collection("conversations-forgeron").document(user_id) | |
doc = doc_ref.get() | |
if doc.exists: | |
dynamic_context = doc.to_dict().get("dynamic_context", "") | |
else: | |
dynamic_context = "" | |
# Mettre à jour le contexte dynamique de l'utilisateur | |
for keyword in keywords: | |
if keyword in user_input.lower(): | |
dynamic_context += f" {keywords[keyword]}" | |
break | |
# Enregistrer le contexte dynamique mis à jour dans Firestore | |
doc_ref.set( | |
{ | |
"dynamic_context": dynamic_context, | |
}, | |
merge=True, | |
) | |
def filter_sensitive_info(text): | |
for word in global_context["sensitive_info_filter"]: | |
if word in text.lower(): | |
return text.replace(word, "[REDACTED]") | |
return text | |
def remove_length_info(response_text): | |
pattern = r"\(\d+ caractères\)" | |
return re.sub(pattern, "", response_text) | |
def format_prompt(message, history, user_id=""): | |
max_history_length = 10 # Limiter la longueur de l'historique | |
history = history[-max_history_length:] | |
prompt = "<s>" | |
for item in history: | |
role = item.get("role") | |
content = item.get("message") | |
if role == "user": | |
prompt += f"[USER] {content} [/USER]" | |
elif role == "bot": | |
if content == history[-1].get("message"): | |
content = f"[BOT] {content}" | |
prompt += f"{content} [/BOT]" | |
# Récupérer le contexte dynamique de l'utilisateur | |
doc_ref = db.collection("conversations-forgeron").document(user_id) | |
doc = doc_ref.get() | |
if doc.exists: | |
dynamic_context = doc.to_dict().get("dynamic_context", "") | |
else: | |
dynamic_context = "" | |
prompt += f"</s>[INST][USER] {message} [/USER][/INST]" | |
# Ajouter le contexte statique et dynamique à chaque fois que l'utilisateur pose une question | |
prompt += f"[INST] {global_context['static_context']} {dynamic_context} [/INST]" | |
return prompt | |
# Liste des noms des collections Firestore pour chaque espace | |
collections = [ | |
"conversations-tavernier", | |
"conversations-guerisseuse", | |
"conversations-troubadour", | |
"conversations-forgeron", | |
"conversations-pretre" | |
] | |
def delete_user_data(user_id): | |
for collection in collections: | |
doc_ref = db.collection(collection).document(user_id) | |
doc_ref.delete() | |
def generate( | |
prompt, | |
history, | |
request: gr.Request, | |
temperature=0.2, | |
max_new_tokens=256, | |
top_p=0.95, | |
repetition_penalty=1.0, | |
): | |
temperature = float(temperature) | |
if temperature < 1e-2: | |
temperature = 1e-2 | |
top_p = float(top_p) | |
generate_kwargs = dict( | |
temperature=temperature, | |
max_new_tokens=max_new_tokens, | |
top_p=top_p, | |
repetition_penalty=repetition_penalty, | |
do_sample=True, | |
seed=42, | |
) | |
if request: | |
# Récupérer l'identifiant unique de l'utilisateur à partir de la requête HTML et son historique de conversation | |
user_id = dict(request.query_params).get("user_id") | |
if not user_id: | |
return "Vous devez vous connecter pour accéder au bot." | |
history = load_history(user_id) | |
else: | |
user_id = "" | |
update_dynamic_context(user_id,prompt) | |
formatted_prompt = format_prompt(prompt, history, user_id) | |
global global_iteration | |
global_iteration += 1 | |
print(f"\n\nIteration {global_iteration}/{user_id}: {formatted_prompt}") | |
stream = client.text_generation( | |
formatted_prompt, | |
**generate_kwargs, | |
stream=True, | |
details=True, | |
return_full_text=False, | |
) | |
output = "" | |
for response in stream: | |
response_text = filter_sensitive_info(response.token.text) | |
response_text = remove_length_info(response_text) | |
output += response_text | |
yield output | |
if user_id != "": | |
# Sauvegarder le message de l'utilisateur et la réponse complète du bot dans Firestore | |
save_message(user_id, prompt, role="user") | |
save_message(user_id, output, role="bot") | |
return output | |
mychatbot = gr.Chatbot( | |
avatar_images=["./berger.jpg", "./pretre.jpeg"], | |
bubble_full_width=False, | |
show_label=False, | |
) | |
def handle_accuse_click(request: gr.Request, accuse_button): | |
if request: | |
# Récupérer l'identifiant unique de l'utilisateur à partir de la requête HTML et son historique de conversation | |
user_id = dict(request.query_params).get("user_id") | |
if not user_id: | |
print ("Vous devez vous connecter pour accéder au bot.") | |
return | |
history = load_history(user_id) | |
else: | |
user_id = "" | |
return | |
if user_id == "": | |
# Handle the case where request is None | |
print ("User Id is Empty") | |
return | |
accused_character = "troubadour" # Le personnage accusé (le troubadour dans ce cas) | |
# Supprimer l'historique et le contexte dynamique de tous les personnages dans tous les espaces | |
delete_user_data(user_id) | |
if accused_character == "troubadour": # Vérifier si le personnage accusé est le coupable | |
accuse_button.value = "Félicitations, tu as trouvé le coupable ! Clique pour recommencer🔁" | |
print("win") | |
else: | |
accuse_button.value = "Désolé, tu as accusé le mauvais personnage. Tu as perdu. Clique pour recommencer🔁" | |
print("loose") | |
accuse_button.click(fn=delete_user_data, inputs=[user_id], js=js_func) | |
def chatbot_interface(request: gr.Request,accuse_button): | |
chatbot_interface = gr.ChatInterface( | |
fn=generate, | |
chatbot=mychatbot, | |
title=chat_name, | |
retry_btn=None, | |
undo_btn=None, | |
clear_btn=None, | |
submit_btn="Parler", | |
css="footer {visibility: hidden !important} .gradio-container {background-color: #2D4059 !important; color: #FFD460 !important;}", | |
js=js_func | |
) | |
accuse_button.click(fn=handle_accuse_click, inputs=[request, accuse_button], js=js_func) | |
return chatbot_interface | |
def main(request: gr.Request): | |
with gr.Blocks(css="footer {visibility: hidden !important} .gradio-container {background-color: #2D4059 !important; color: #FFD460 !important;}", | |
js=js_func) as demo: | |
accuse_button = gr.Button("Accuser") | |
chat_interface = chatbot_interface(request,accuse_button) | |
demo.launch(show_api=False, inline=True) | |
if __name__ == "__main__": | |
main(None) |