import streamlit as st from huggingface_hub import InferenceClient import time import re import edge_tts import asyncio from concurrent.futures import ThreadPoolExecutor import tempfile from pydub import AudioSegment # Initialize Hugging Face InferenceClient client_hf = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") # Define the async function for text-to-speech conversion using Edge TTS async def text_to_speech_edge(text, language_code): voice = {"fr": "fr-FR-RemyMultilingualNeural"}[language_code] communicate = edge_tts.Communicate(text, voice) with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: tmp_path = tmp_file.name await communicate.save(tmp_path) return tmp_path # Helper function to run async functions from within Streamlit (synchronous context) def run_in_threadpool(func, *args, **kwargs): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) future = asyncio.ensure_future(func(*args, **kwargs)) return loop.run_until_complete(future) def concatenate_audio(paths): combined = AudioSegment.empty() for path in paths: audio = AudioSegment.from_mp3(path) combined += audio combined_path = tempfile.mktemp(suffix=".mp3") combined.export(combined_path, format="mp3") return combined_path # Modified function to work with async Edge TTS def dictee_to_audio_segmented(dictee): sentences = segmenter_texte(dictee) audio_urls = [] with ThreadPoolExecutor() as executor: for sentence in sentences: processed_sentence = replace_punctuation(sentence) audio_path = executor.submit(run_in_threadpool, text_to_speech_edge, processed_sentence, "fr").result() audio_urls.append(audio_path) return audio_urls def generer_dictee(classe, longueur): prompt = f"Créer une dictée pour la classe {classe} d'une longueur d'environ {longueur} mots. Il est important de créer le texte uniquement de la dictée et de ne pas ajouter de consignes ou d'indications supplémentaires." generate_kwargs = { "temperature": 0.7, "max_new_tokens": 1000, "top_p": 0.95, "repetition_penalty": 1.2, "do_sample": True, } formatted_prompt = f"[INST] {prompt} [/INST]" stream = client_hf.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) dictee = "" for response in stream: dictee += response.token.text dictee = dictee.replace("", "").strip() return dictee def correction_dictee(dictee, dictee_utilisateur): prompt = f"Voici une dictée crée: {dictee} | Voici la dictée faite par l'utilisateur : {dictee_utilisateur} - Corrige la dictée en donnant les explications, utilise les syntax du markdown pour une meilleur comprehesion de la correction." generate_kwargs = { "temperature": 0.7, "max_new_tokens": 2000, # Ajustez selon la longueur attendue de la correction "top_p": 0.95, "repetition_penalty": 1.2, "do_sample": True, } formatted_prompt = f"[INST] {prompt} [/INST]" stream = client_hf.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) texte_ameliore = "" for response in stream: texte_ameliore += response.token.text texte_ameliore = texte_ameliore.replace("", "").strip() return correction def replace_punctuation(text): replacements = { ".": " point.", ",": " virgule,", ";": " point-virgule;", ":": " deux-points:", "!": " point d'exclamation!", "?": " point d'interrogation?", "-": " tiret-", "'": " apostrophe'", } for key, value in replacements.items(): text = text.replace(key, value) return text def segmenter_texte(texte): sentences = re.split(r'(?<=[.!?]) +', texte) return sentences # Streamlit App Interface st.set_page_config(layout="wide") st.title('Générateur de Dictée') with st.expander("Paramètres de la dictée", expanded=True): mode = st.radio("Mode:", ["S'entrainer: Vous aurez uniquement les audios suivi d'une correction par IA (Pour 1 seul personne)", "Entrainer: Vous aurez uniquement le texte de la dictée pour entrainer quelqu'un d'autre (Pour 2 ou + personnes)"]) classe = st.selectbox("Classe", ["CP", "CE1", "CE2", "CM1", "CM2", "6ème", "5ème", "4ème", "3ème", "Seconde", "Premiere", "Terminale"], index=2) longueur = st.slider("Longueur de la dictée (nombre de mots)", 50, 500, 200) if st.button('Générer la Dictée'): with st.spinner("Génération de la dictée en cours..."): dictee = generer_dictee(classe, longueur) if mode == "S'entrainer: Vous aurez uniquement les audios suivi d'une correction par IA (Pour 1 seul personne)": audio_urls = dictee_to_audio_segmented(dictee) concatenated_audio_path = concatenate_audio(audio_urls) col1, col2 = st.columns(2) with col1: st.audio(concatenated_audio_path, format='audio/wav', start_time=0) with st.expander("Phrases de la Dictée"): for idx, url in enumerate(audio_urls, start=1): st.markdown(f"**Phrase {idx}:**") st.audio(url, format='audio/wav') with col2: dictee_utilisateur = st.text_area("Écrivez la dictée ici:", height=300) if st.button('Correction'): st.write("Dictée originale:") correction = correction_dictee(dictee, dictee_utilisateur) st.text_area("Voici la correction :", correction, height=500) elif mode == "Entrainer: Vous aurez uniquement le texte de la dictée pour entrainer quelqu'un d'autre (Pour 2 ou + personnes)": st.text_area("Voici votre dictée :", dictee, height=300)