|
import streamlit as st |
|
from audio_recorder_streamlit import audio_recorder |
|
from groq import Groq |
|
import os |
|
from langchain_groq import ChatGroq |
|
from langchain_core.output_parsers import StrOutputParser |
|
from langchain_core.prompts import ChatPromptTemplate |
|
import edge_tts |
|
import asyncio |
|
from dotenv import load_dotenv |
|
load_dotenv() |
|
|
|
|
|
def frontend(): |
|
st.title("Voice AI Demo") |
|
|
|
|
|
if "conversation" not in st.session_state: |
|
st.session_state.conversation = [] |
|
if "audio_count" not in st.session_state: |
|
st.session_state.audio_count = 1 |
|
|
|
status_placeholder = st.empty() |
|
status_placeholder.write("Press Mic button to start asking a question") |
|
|
|
recorded_audio = audio_recorder(sample_rate=8000) |
|
text = st.chat_input() |
|
|
|
def process_input(user_input): |
|
status_placeholder.write("Getting response...") |
|
response = answer(user_input) |
|
status_placeholder.write("Converting response to audio...") |
|
|
|
|
|
audio_filename = f"output{st.session_state.audio_count}.wav" |
|
asyncio.run(convert_audio(response, audio_filename)) |
|
st.session_state.audio_count += 1 |
|
|
|
status_placeholder.write("Press mic button again to ask more questions") |
|
|
|
|
|
st.session_state.conversation.append((f"Q: {user_input}", f"A: {response}", audio_filename)) |
|
|
|
|
|
if text: |
|
process_input(text) |
|
elif recorded_audio: |
|
status_placeholder.write("Converting audio...") |
|
data_to_file(recorded_audio) |
|
status_placeholder.write("Uploading audio...") |
|
transcription = audio_to_text("temp_audio.wav") |
|
status_placeholder.write("Transcription completed.") |
|
process_input(transcription) |
|
|
|
|
|
for i, (q, a, audio_file) in enumerate(st.session_state.conversation): |
|
st.write(q) |
|
st.write(a) |
|
st.audio(audio_file, format="audio/wav", loop=False, autoplay=(i == len(st.session_state.conversation) - 1)) |
|
|
|
|
|
|
|
def data_to_file(recorded_audio): |
|
temp_audio_path = "temp_audio.wav" |
|
with open(temp_audio_path, "wb") as temp_file: |
|
temp_file.write(recorded_audio) |
|
|
|
|
|
|
|
def audio_to_text(audio_path): |
|
client = Groq(api_key=os.getenv('GROQ_API_KEY')) |
|
with open(audio_path, 'rb') as file: |
|
transcription = client.audio.translations.create( |
|
file=(audio_path, file.read()), |
|
model='whisper-large-v3', |
|
) |
|
return transcription.text |
|
|
|
|
|
def answer(user_question): |
|
model = ChatGroq( |
|
model="llama-3.3-70b-versatile", |
|
temperature=0.6 |
|
) |
|
|
|
prompt = ChatPromptTemplate([ |
|
("system", "You are super knowlegable AI chat bot which will answer all User Query, answer with confident, also this response will get convert back to speech, so dont make point or anything, but make your answer in para form and dont make it too large, and use proper annotation, comma, full stop, question mark, so that a better text to speach can be genrate back."), |
|
("user", "User Query: {question}"), |
|
]) |
|
|
|
parser = StrOutputParser() |
|
|
|
chain = prompt|model|parser |
|
answer = chain.invoke({'question': user_question}) |
|
return answer |
|
|
|
|
|
async def convert_audio(text, filename): |
|
voice = "fr-FR-VivienneMultilingualNeural" |
|
communicate = edge_tts.Communicate(text, voice) |
|
await communicate.save(filename) |
|
|
|
frontend() |