Spaces:
Runtime error
Runtime error
import os | |
import re | |
import gradio as gr | |
import tempfile | |
from pydub import AudioSegment | |
from pydub.utils import which | |
import edge_tts | |
import asyncio | |
import nest_asyncio | |
import requests | |
nest_asyncio.apply() | |
from openai import OpenAI | |
secret=os.getenv("SECRET") | |
url=os.getenv("SRVC") | |
url_audio=os.getenv("TRANSCRIPTION") | |
key=os.getenv("KEY") | |
description = """ | |
<center> | |
<img src="https://huggingface.co/spaces/rodrigomasini/audio-to-text/resolve/main/chagas.png" width=200px> | |
<strong>Primeiro assistente de IA de voz do Brasil</strong> | |
</center> | |
""" | |
OPENAI_API_KEY = secret | |
sync_client = OpenAI( | |
base_url=url, | |
api_key=key | |
) | |
# Ensuring pydub can locate ffmpeg | |
AudioSegment.converter = which("ffmpeg") | |
# TELA endpoint for speech-to-text generation | |
TELA_TRANSCRIPT_AUDIO_URL = url_audio | |
system_instruction = """ | |
A partir de agora, o seu nome é Chagas, um assistente virtual de saúde que fala português. | |
Durante a interação com o usuário, você deve responder e manter a conversa de forma amigável, concisa, clara e aberta. | |
Evite qualquer introdução desnecessária. | |
Responda em um tom amigável de conversação e sempre empático e suportivo. | |
Nunca retorne a sua resposta em formato Markdown. E sempre, sempre retorne na forma de frases, mesmo se a sua resposta for uma lista. | |
Novamente, apenas frases, mesmo que você queira realçar várias etapas como uma lista numerada e colocando markdown com asteriscos, não o faça! Apenas frases. | |
""" | |
def convert_to_mp3(audio_file_path): | |
print("[DEBUG] Starting audio conversion to mp3.") | |
temp_mp3 = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) | |
try: | |
audio = AudioSegment.from_file(audio_file_path) | |
audio.export(temp_mp3.name, format="mp3") | |
print(f"[DEBUG] Successfully converted to mp3: {temp_mp3.name}") | |
return temp_mp3.name | |
except Exception as e: | |
print(f"[ERROR] Error converting audio: {e}") | |
return None | |
def transcript(audio_file_path): | |
print("[DEBUG] Starting transcription process.") | |
if audio_file_path is None: | |
print("[ERROR] No audio file provided.") | |
return {"data": "failed", "error": "No audio file provided."} | |
mp3_file_path = convert_to_mp3(audio_file_path) | |
if not mp3_file_path: | |
print("[ERROR] Failed to convert audio to mp3.") | |
return {"data": "failed", "error": "Failed to convert audio to mp3."} | |
try: | |
print("[DEBUG] Sending mp3 to transcription endpoint.") | |
print(f"[DEBUG] Transcription API URL: {TELA_TRANSCRIPT_AUDIO_URL}") | |
with open(mp3_file_path, 'rb') as f: | |
files = {'file': f} | |
response = requests.post(TELA_TRANSCRIPT_AUDIO_URL, files=files) | |
print(f"[DEBUG] Response Status Code: {response.status_code}") | |
print(f"[DEBUG] Response Text: {response.text}") | |
if response.status_code == 200: | |
print("[DEBUG] Successfully received transcription.") | |
return response.json() | |
else: | |
print(f"[ERROR] Unexpected status code {response.status_code}: {response.text}") | |
return {"data": "failed", "error": f"Error {response.status_code}: {response.text}"} | |
except Exception as e: | |
print(f"[ERROR] Exception during transcription: {e}") | |
return {"data": "failed", "error": str(e)} | |
finally: | |
if mp3_file_path and os.path.exists(mp3_file_path): | |
try: | |
os.remove(mp3_file_path) | |
print("[DEBUG] Temporary mp3 file deleted.") | |
except OSError as e: | |
print(f"[ERROR] Error deleting temporary file: {e}") | |
def extract_user_input(transcription_response): | |
print("[DEBUG] Extracting user input from transcription response.") | |
try: | |
transcript_segments = transcription_response.get('result', []) | |
user_input = "".join([segment['text'] for segment in transcript_segments]) | |
print(f"[DEBUG] Extracted user input: {user_input.strip()}") | |
return user_input.strip() | |
except KeyError as e: | |
print(f"[ERROR] KeyError in transcription response: {e}") | |
return "" | |
def generate_speech(text): | |
print("[DEBUG] Generating speech from text.") | |
tts_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") | |
async def generate_tts(): | |
tts = edge_tts.Communicate(text, voice="pt-BR-AntonioNeural") | |
await tts.save(tts_file.name) | |
try: | |
asyncio.run(generate_tts()) | |
print(f"[DEBUG] TTS audio saved to: {tts_file.name}") | |
return tts_file.name | |
except Exception as e: | |
print(f"[ERROR] Error generating TTS: {e}") | |
return None | |
def chatbot_conversation(audio_file_path, history): | |
print("[DEBUG] Starting chatbot conversation.") | |
try: | |
transcription = transcript(audio_file_path) | |
user_input = extract_user_input(transcription) | |
if not user_input: | |
print("[ERROR] No user input extracted from transcription.") | |
yield "I could not generate the text. Please try again.", None, history | |
return | |
# Ensure we have a system_message | |
system_message = system_instruction | |
if history is None: | |
history = [] | |
# Reconstruct messages from history | |
messages = [{"role": "system", "content": system_message}] | |
for turn in history: | |
user_msg = turn[0].get("content") if turn[0] else "" | |
assistant_msg = turn[1].get("content") if turn[1] else "" | |
if user_msg: | |
messages.append({"role": "user", "content": user_msg}) | |
if assistant_msg: | |
messages.append({"role": "assistant", "content": assistant_msg}) | |
# Add the current user input | |
messages.append({"role": "user", "content": user_input}) | |
print("[DEBUG] Sending request to sync_client for chat completion.") | |
print(f"[DEBUG] Messages: {messages}") | |
response = "" | |
# Stream partial responses | |
try: | |
for message in sync_client.chat.completions.create( | |
model="marco-o1", | |
messages=messages, | |
stream=True, | |
max_tokens=1024, | |
temperature=0, | |
response_format={"type": "text"} | |
): | |
token = message.choices[0].delta.content | |
if token: | |
token = token.replace("<|im_start|>", "").replace("<|im_end|>", "") | |
print(token, end="") | |
response += token | |
# Yield partial text updates, no audio yet, history unchanged yet | |
yield (response, None, history) | |
except Exception as e: | |
print(f"[ERROR] Error during streaming response: {e}") | |
yield ("I could not understand you. Please try again.", None, history) | |
return | |
# Now that we have the full response, update history | |
history.append([ | |
{"role": "user", "content": user_input}, | |
{"role": "assistant", "content": response} | |
]) | |
# Generate TTS now | |
print("[DEBUG] Generating TTS for full response.") | |
tts_file_name = generate_speech(response) | |
if tts_file_name: | |
print("[DEBUG] Returning final response and TTS file with updated history.") | |
# Now yield again with final text, audio, and updated history | |
yield (response, tts_file_name, history) | |
else: | |
print("[ERROR] Failed to generate TTS.") | |
yield (response, None, history) | |
except Exception as e: | |
print(f"[ERROR] Exception in chatbot_conversation: {e}") | |
yield ("I could not understand you. Please try again.", None, history) | |
# Three outputs here: transcription text, audio, and the updated history | |
interface = gr.Interface( | |
fn=chatbot_conversation, | |
inputs=[ | |
gr.Audio(label="Usuário", type="filepath", streaming=False, container=True), | |
gr.State([]) # State holds the conversation history | |
], | |
outputs=[ | |
gr.Textbox(label="Resposta do Chagas"), | |
gr.Audio(type="filepath", autoplay=True, label="Chagas"), | |
gr.State([]) # Return updated history | |
], | |
title="Chagas - assistente de saúde", | |
description= description, | |
theme="sudeepshouche/minimalist", | |
live=True | |
) | |
interface.queue().launch() | |