demorrha / app.py
rick
mise a jours UI/UX
00f6004 unverified
raw
history blame
27 kB
# Standard libraries
import base64
import io
import json
import os
import re
import tempfile
import time
from os import getenv
from typing import Any
from typing import Dict
from typing import IO
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
# Third-party libraries
import requests
import streamlit as st
from audiorecorder import audiorecorder
from openai import OpenAI
from pydub import AudioSegment
def load_ui_language(file_path: Optional[str] = "ui_lang_support.json") -> Dict[str, Any]:
"""
Charge les traductions de l'interface utilisateur à partir d'un fichier JSON.
Args:
file_path (Optional[str]): Chemin vers le fichier JSON contenant les traductions.
Returns:
Dict[str, Any]: Un dictionnaire contenant les traductions de l'interface utilisateur.
"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
return json.load(file)
except FileNotFoundError:
print(f"{get_translation('erreur_fichier_non_trouve')} {file_path}")
return {}
except json.JSONDecodeError:
print(f"{get_translation('erreur_lecture_fichier')} JSON decoding error")
return {}
except IOError as e:
print(f"{get_translation('erreur_lecture_fichier')} {e}")
return {}
# Dictionary to store translations
translations = load_ui_language()
def get_translation(key: str) -> str:
"""
Obtient la traduction pour une clé donnée basée sur la langue d'interface sélectionnée.
Args:
key (str): La clé de traduction.
Returns:
str: Le texte traduit.
"""
return translations[st.session_state.interface_language][key]
# OpenAI client configuration with API key
client = OpenAI(api_key=getenv("OPENAI_API_KEY"))
def read_file(file_name: str) -> str:
"""
Lit et retourne le contenu des fichiers texte.
Args:
file_name (str): Le nom du fichier à lire.
Returns:
str: Le contenu du fichier ou un message d'erreur.
"""
try:
with open(file_name, 'r', encoding='utf-8') as file:
content = file.read()
return content
except FileNotFoundError:
return f"{get_translation('erreur_fichier_non_trouve')} {file_name}"
except IOError as e:
return f"{get_translation('erreur_lecture_fichier')} {str(e)}"
def split_audio(audio_file: str, max_size_mb: int = 25) -> List[str]:
"""
Divise un fichier audio en segments de 25 Mo ou moins.
Args:
audio_file (str): Chemin vers le fichier audio.
max_size_mb (int): Taille maximale de chaque segment en Mo.
Returns:
List[str]: Liste des chemins vers les segments audio divisés.
"""
try:
audio = AudioSegment.from_wav(audio_file)
duration_ms = len(audio)
segment_duration_ms = int(
(max_size_mb * 1024 * 1024 * 8) /
(audio.frame_rate * audio.sample_width * audio.channels)
)
segments = []
for start in range(0, duration_ms, segment_duration_ms):
end = min(start + segment_duration_ms, duration_ms)
segment = audio[start:end]
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_segment:
segment.export(temp_segment.name, format="wav")
segments.append(temp_segment.name)
return segments
except IOError as e:
print(f"Erreur lors de la lecture ou de l'écriture du fichier audio : {e}")
return []
except ValueError as e:
print(f"Erreur de valeur lors du traitement de l'audio : {e}")
return []
# Fonction modifiée pour transcrire l'audio en texte
def transcribe_audio(audio_file: IO, language: Optional[str] = None) -> str:
"""
Transcrit un fichier audio en texte.
Args:
audio_file (IO): Le fichier audio à transcrire.
language (Optional[str]): La langue de l'audio. Par défaut None.
Returns:
str: Le texte transcrit.
"""
max_size_mb = 25
file_size_mb = os.path.getsize(audio_file.name) / (1024 * 1024)
try:
if file_size_mb > max_size_mb:
segments = split_audio(audio_file.name, max_size_mb)
full_transcript = ""
for segment in segments:
with open(segment, "rb") as audio_segment:
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio_segment,
language=language
)
full_transcript += f"{transcript.text} "
os.unlink(segment) # Supprime le fichier temporaire
return full_transcript.strip()
else:
with open(audio_file.name, "rb") as audio_file:
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
language=language
)
return transcript.text
except IOError as e:
print(f"Erreur d'entrée/sortie lors de la transcription : {e}")
return ""
except client.APIError as e:
print(f"Erreur API lors de la transcription : {e}")
return ""
# Fonction pour détecter la langue d'un texte donné
def detect_language(input_text: str, temperature: float = 0.01) -> str:
"""
Détecte la langue d'un texte donné.
Args:
input_text (str): Le texte dont il faut détecter la langue.
temperature (float): La température pour le modèle de langage. Par défaut à 0.01.
Returns:
str: La langue détectée au format ISO-639-1.
Raises:
ValueError: Si la réponse de l'API est invalide.
requests.RequestException: En cas d'erreur de communication avec l'API.
"""
system_prompt = (
"Agissez comme une fonction de détection de langue. "
"Je fournirai du texte dans n'importe quelle langue, et vous détecterez sa langue. "
"Fournissez le résultat de votre détection au format ISO-639-1. "
"Votre réponse doit représenter l'argument `language` et ne contenir "
"que sa valeur sous forme de chaîne. "
"Fournir la langue d'entrée au format ISO-639-1 améliorera la précision et la latence."
)
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
temperature=temperature,
messages=[
{
"role": "system",
"content": system_prompt
},
{
"role": "user",
"content": input_text
}
]
)
detected_language = response.choices[0].message.content
if not detected_language:
raise ValueError("La réponse de l'API est vide")
return detected_language
except requests.RequestException as e:
raise requests.RequestException(f"Erreur de communication avec l'API : {str(e)}")
except Exception as e:
raise ValueError(f"Erreur inattendue lors de la détection de la langue : {str(e)}")
def get_duration_pydub(audio_file: str) -> float:
"""
Obtient la durée d'un fichier audio en utilisant pydub.
Args:
audio_file (str): Chemin vers le fichier audio.
Returns:
float: Durée du fichier audio en secondes.
"""
try:
audio = AudioSegment.from_file(audio_file)
return audio.duration_seconds
except FileNotFoundError:
print(f"Erreur : Le fichier audio '{audio_file}' n'a pas été trouvé.")
return 0.0
except Exception as e:
print(f"Erreur lors de la lecture du fichier audio : {str(e)}")
return 0.0
def text_to_speech(text: str) -> Tuple[Optional[bytes], float]:
"""
Convertit du texte en parole en utilisant l'API OpenAI.
Args:
text (str): Le texte à convertir en parole.
Returns:
Tuple[Optional[bytes], float]: Un tuple contenant les octets audio et la durée de l'audio en secondes.
"""
try:
response = client.audio.speech.create(
model="tts-1",
voice=st.session_state.tts_voice,
input=text
)
# Sauvegarde l'audio dans un fichier temporaire
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio:
response.stream_to_file(temp_audio.name)
# Lit le contenu du fichier audio
with open(temp_audio.name, "rb") as audio_file:
audio_bytes = audio_file.read()
# Obtient la durée de l'audio en secondes
audio_duration = get_duration_pydub(temp_audio.name)
return audio_bytes, audio_duration
except Exception as e:
print(f"Erreur lors de la conversion texte-parole : {str(e)}")
return None, 0.0
def concatenate_audio_files(audio_list: List[Tuple[bytes, float]]) -> Optional[bytes]:
"""
Concatène plusieurs fichiers audio avec des effets sonores.
Args:
audio_list (List[Tuple[bytes, float]]): Une liste de tuples, chacun contenant
des octets audio et la durée.
Returns:
Optional[bytes]: L'audio concaténé sous forme d'octets, ou None en cas d'erreur.
"""
# Créer un segment audio vide
final_audio = AudioSegment.empty()
try:
# Charger les effets sonores
begin_sound = AudioSegment.from_mp3(
"sound-effects/voice-message-play-begin/voice-message-play-begin-1.mp3"
)
end_sound = AudioSegment.from_mp3(
"sound-effects/voice-message-play-ending/voice-message-play-ending-1.mp3"
)
# 5 secondes de silence
silence = AudioSegment.silent(duration=1500) # 1500 ms = 1.5 secondes
for audio_bytes, _ in audio_list:
# Convertir les octets en un segment audio
segment = AudioSegment.from_mp3(io.BytesIO(audio_bytes))
# Ajouter le son de début, le segment TTS, le son de fin et le silence
final_audio += begin_sound + segment + end_sound + silence
# Convertir le segment audio final en octets
buffer = io.BytesIO()
final_audio.export(buffer, format="mp3")
return buffer.getvalue()
except IOError as e:
print(f"Erreur lors de la lecture ou de l'écriture des fichiers audio : {e}")
return None
except Exception as e:
print(f"Une erreur inattendue s'est produite : {e}")
return None
def process_message(
message: str,
operation_prompt: str = "",
tts_enabled: bool = False
) -> Tuple[Optional[bytes], Optional[float]]:
"""
Traite les messages des utilisateurs et génère une réponse.
Args:
message (str): Le message d'entrée de l'utilisateur.
operation_prompt (str, optional): Prompt supplémentaire pour l'opération. Par défaut "".
tts_enabled (bool, optional): Si la synthèse vocale est activée. Par défaut False.
Returns:
Tuple[Optional[bytes], Optional[float]]: Un tuple contenant l'audio TTS et sa durée,
ou (None, None) si TTS est désactivé ou en cas d'erreur.
"""
payload_content = f'{operation_prompt} :\n"""\n{message}\n"""'
st.session_state.messages.append({"role": "user", "content": payload_content})
with st.chat_message("user"):
st.markdown(message)
with st.chat_message("assistant"):
message_placeholder = st.empty()
full_response = ""
try:
for response in client.chat.completions.create(
model="gpt-4o-mini",
messages=st.session_state.messages,
stream=True,
temperature=0.1):
full_response += (response.choices[0].delta.content or "")
message_placeholder.markdown(full_response + "▌")
# Utiliser regex pour supprimer les trois premiers et derniers guillemets doubles
full_response = re.sub(r'^"{3}|"{3}$', '', full_response.strip())
message_placeholder.markdown(full_response)
except Exception as e:
st.error(f"Une erreur s'est produite lors de la génération de la réponse : {e}")
return None, None
st.session_state.messages.append(
{"role": "assistant", "content": full_response}
)
if tts_enabled:
try:
tts_audio, tts_duration = text_to_speech(full_response)
return tts_audio, tts_duration
except Exception as e:
st.error(f"Une erreur s'est produite lors de la conversion texte-parole : {e}")
return None, None
return None, None
class GlobalSystemPrompts:
"""Class to store global system prompts."""
@staticmethod
def linguascribe():
"""
Retrieve the system prompt for the Linguascribe feature.
Returns:
str: The system prompt for Linguascribe.
"""
try:
system_prompt = read_file('linguascribe.prompt')
return system_prompt
except FileNotFoundError:
print("Le fichier 'linguascribe.prompt' n'a pas été trouvé.")
return ""
except IOError as e:
print(f"Erreur lors de la lecture du fichier 'linguascribe.prompt': {e}")
return ""
# Function to configure the translation mode
def set_translation_mode(from_lang: str, dest_lang: str) -> Tuple[str, str]:
"""
Configure les prompts globaux pour le mode de traduction.
Args:
from_lang (str): La langue source.
dest_lang (str): La langue de destination.
Returns:
Tuple[str, str]: Un tuple contenant le prompt système et le prompt d'opération.
"""
system_prompt = GlobalSystemPrompts.linguascribe()
operation_prompt = f"Translate({from_lang} to {dest_lang})"
return system_prompt, operation_prompt
# List of languages supported by the application
SUPPORTED_LANGUAGES = [
"Afrikaans", "Arabic", "Armenian", "Azerbaijani", "Belarusian", "Bosnian",
"Bulgarian", "Catalan", "Chinese", "Croatian", "Czech", "Danish", "Dutch",
"English", "Estonian", "Finnish", "French", "Galician", "German", "Greek",
"Hebrew", "Hindi", "Hungarian", "Icelandic", "Indonesian", "Italian",
"Japanese", "Kannada", "Kazakh", "Korean", "Latvian", "Lithuanian",
"Macedonian", "Malay", "Marathi", "Maori", "Nepali", "Norwegian", "Persian",
"Polish", "Portuguese", "Romanian", "Russian", "Serbian", "Slovak",
"Slovenian", "Spanish", "Swahili", "Swedish", "Tagalog", "Tamil", "Thai",
"Turkish", "Ukrainian", "Urdu", "Vietnamese", "Welsh"
]
def convert_language_name_to_iso6391(language_data: Union[str, Dict[str, str]]) -> str:
"""
Convertit un nom de langue en son code ISO 639-1.
Args:
language_data (Union[str, Dict[str, str]]): Le nom de la langue ou un dictionnaire
contenant le nom de la langue.
Returns:
str: Le code ISO 639-1 pour la langue donnée, ou 'en' si non trouvé.
"""
# Dictionnaire associant les noms de langues aux codes ISO 639-1
language_to_iso: Dict[str, str] = {
"Afrikaans": "af", "Arabic": "ar", "Armenian": "hy", "Azerbaijani": "az",
"Belarusian": "be", "Bosnian": "bs", "Bulgarian": "bg", "Catalan": "ca",
"Chinese": "zh", "Croatian": "hr", "Czech": "cs", "Danish": "da",
"Dutch": "nl", "English": "en", "Estonian": "et", "Finnish": "fi",
"French": "fr", "Galician": "gl", "German": "de", "Greek": "el",
"Hebrew": "he", "Hindi": "hi", "Hungarian": "hu", "Icelandic": "is",
"Indonesian": "id", "Italian": "it", "Japanese": "ja", "Kannada": "kn",
"Kazakh": "kk", "Korean": "ko", "Latvian": "lv", "Lithuanian": "lt",
"Macedonian": "mk", "Malay": "ms", "Marathi": "mr", "Maori": "mi",
"Nepali": "ne", "Norwegian": "no", "Persian": "fa", "Polish": "pl",
"Portuguese": "pt", "Romanian": "ro", "Russian": "ru", "Serbian": "sr",
"Slovak": "sk", "Slovenian": "sl", "Spanish": "es", "Swahili": "sw",
"Swedish": "sv", "Tagalog": "tl", "Tamil": "ta", "Thai": "th",
"Turkish": "tr", "Ukrainian": "uk", "Urdu": "ur", "Vietnamese": "vi",
"Welsh": "cy"
}
# Vérifier si language_data est un dictionnaire
if isinstance(language_data, dict):
language_name = language_data.get('language', '')
else:
language_name = language_data
try:
# Retourner le code ISO 639-1 correspondant au nom de la langue
return language_to_iso[language_name]
except KeyError:
# Gérer spécifiquement l'exception KeyError
print(f"Langue non trouvée : {language_name}")
return "en" # Par défaut 'en' si la langue n'est pas trouvée
def on_languages_change() -> None:
"""Fonction de rappel pour le changement de langue(s) de destination."""
selected_language_names: List[str] = st.session_state.language_selector
st.session_state.selected_languages = [
{"language": lang, "iso-639-1": convert_language_name_to_iso6391(lang)}
for lang in selected_language_names
]
def init_process_mode() -> Tuple[str, str]:
"""
Initialise le mode de traitement pour la traduction si nécessaire.
Returns:
Tuple[str, str]: Un tuple contenant le prompt système et le prompt d'opération.
"""
if st.session_state["process_mode"] == "translation":
system_prompt, operation_prompt = set_translation_mode(
from_lang=st.session_state.language_detected,
dest_lang=st.session_state.target_language
)
return system_prompt, operation_prompt
return "", ""
# Fonction principale de l'application
def main():
"""Fonction principale qui configure et exécute l'application Streamlit."""
st.title("------- DEMORRHA -------")
# Initialisation des variables d'état de session
if "language_detected" not in st.session_state:
st.session_state["language_detected"] = None
if "process_mode" not in st.session_state:
st.session_state["process_mode"] = "translation"
if "target_language" not in st.session_state:
st.session_state.target_language = "en"
if "selected_languages" not in st.session_state:
st.session_state.selected_languages = [
{"language": "English", "iso-639-1": "en"}
]
if "enable_tts_for_input_from_text_field" not in st.session_state:
st.session_state["enable_tts_for_input_from_text_field"] = True
if "enable_tts_for_input_from_audio_record" not in st.session_state:
st.session_state["enable_tts_for_input_from_audio_record"] = True
if "interface_language" not in st.session_state:
st.session_state.interface_language = "French" # Langue par défaut
system_prompt, operation_prompt = init_process_mode()
# Initialisation de l'historique des messages avec le prompt système
if "messages" not in st.session_state:
st.session_state.messages = []
# Vérification de l'existence d'un message système dans st.session_state.messages
if not any(message["role"] == "system" for message in st.session_state.messages):
st.session_state.messages.insert(0, {"role": "system", "content": system_prompt})
with st.container(border=True):
# Interface utilisateur pour le chat textuel
if user_input := st.chat_input(get_translation("entrez_message")):
# Traitement du message texte de l'utilisateur
if st.session_state.language_detected is None:
st.session_state.language_detected = detect_language(
input_text=user_input, temperature=0.01
)
audio_list = []
for cursor_selected_lang in st.session_state.selected_languages:
st.session_state.target_language = cursor_selected_lang["iso-639-1"]
# Initialisation du mode de traitement pour la langue cible actuelle
system_prompt, operation_prompt = init_process_mode()
# Traitement du message utilisateur pour la langue cible actuelle
try:
tts_audio, tts_duration = process_message(
user_input,
operation_prompt=f"{operation_prompt}",
tts_enabled=st.session_state.enable_tts_for_input_from_text_field
)
if tts_audio is not None:
audio_list.append((tts_audio, tts_duration))
except Exception as e:
st.error(f"Erreur lors du traitement du message : {str(e)}")
if audio_list:
try:
final_audio = concatenate_audio_files(audio_list)
with st.container(border=True):
st.audio(final_audio, format="audio/mp3", autoplay=True)
st.download_button(
label=get_translation("telecharger_audio"),
data=final_audio,
file_name="audio_reponse.mp3",
mime="audio/mp3"
)
except Exception as e:
st.error(f"Erreur lors de la concaténation des fichiers audio : {str(e)}")
with st.container(border=True):
# Interface utilisateur pour l'enregistrement audio
st.write(get_translation("enregistrez_message"))
audio = audiorecorder(
start_prompt=get_translation("cliquez_enregistrer"),
stop_prompt=get_translation("cliquez_arreter"),
pause_prompt=get_translation("cliquez_pause"),
show_visualizer=True,
key="vocal_chat_input"
)
# Traitement de l'entrée audio de l'utilisateur
if len(audio) > 0:
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio:
audio.export(temp_audio.name, format="wav")
transcription = transcribe_audio(temp_audio, language=st.session_state.language_detected)
os.unlink(temp_audio.name) # Suppression du fichier temporaire
if st.session_state.language_detected is None:
st.session_state.language_detected = detect_language(
input_text=transcription, temperature=0.01
)
st.write(get_translation("langue_detectee").format(st.session_state.language_detected))
st.write(get_translation("transcription").format(transcription))
audio_list = []
for cursor_selected_lang in st.session_state.selected_languages:
st.session_state.target_language = cursor_selected_lang["iso-639-1"]
# Initialisation du mode de traitement pour la langue cible actuelle
system_prompt, operation_prompt = init_process_mode()
# Traitement du message utilisateur pour la langue cible actuelle
try:
tts_audio, tts_duration = process_message(
transcription,
operation_prompt=f"{operation_prompt}",
tts_enabled=st.session_state.enable_tts_for_input_from_audio_record
)
if tts_audio is not None:
audio_list.append((tts_audio, tts_duration))
except Exception as e:
st.error(f"Erreur lors du traitement du message audio : {str(e)}")
if audio_list:
try:
final_audio = concatenate_audio_files(audio_list)
with st.container(border=True):
st.audio(final_audio, format="audio/mp3", autoplay=True)
# Ajout d'un bouton de téléchargement pour l'audio final
st.download_button(
label=get_translation("telecharger_audio"),
data=final_audio,
file_name="audio_concatene.mp3",
mime="audio/mp3"
)
except Exception as e:
st.error(f"Erreur lors de la concaténation des fichiers audio : {str(e)}")
except Exception as e:
st.error(f"Erreur lors du traitement de l'audio : {str(e)}")
# Configuration de la barre latérale
with st.sidebar:
st.logo("img/logo_2.png", icon_image="img/logo_2.png")
st.header(get_translation("sidebar_titre"))
st.markdown(f"## {get_translation('a_propos')}")
st.info(get_translation("info_app"))
with st.container(border=True):
st.subheader(get_translation("langue_interface"))
# Sélection de la langue de l'interface
st.selectbox(
label=get_translation("choix_langue_interface"),
options=list(translations.keys()),
key="interface_language",
index=(
list(translations.keys()).index("French")
if "interface_language" not in st.session_state
else list(translations.keys()).index(st.session_state.interface_language)
)
)
with st.container(border=True):
# Conteneur pour la sélection de langue
st.subheader(get_translation("selection_langue"))
# Sélection multiple des langues de destination
st.multiselect(
label=get_translation("langues_destination"),
placeholder=get_translation("placeholder_langues"),
options=SUPPORTED_LANGUAGES,
default=["English"],
key="language_selector",
max_selections=4,
on_change=on_languages_change
)
with st.container(border=True):
st.subheader(get_translation("parametres_tts"))
st.selectbox(
get_translation("choix_voix_tts"),
options=["alloy", "echo", "fable", "onyx", "nova", "shimmer"],
index=3, # "onyx" est à l'index 3
key="tts_voice"
)
st.checkbox(
get_translation("activer_tts_texte"),
key="enable_tts_for_input_from_text_field"
)
st.checkbox(
get_translation("activer_tts_audio"),
key="enable_tts_for_input_from_audio_record"
)
# Point d'entrée de l'application
if __name__ == "__main__":
main()