import io import os import uvicorn import sounddevice as sd import numpy as np import speech_recognition as sr from fastapi import FastAPI, File, UploadFile from fastapi.responses import FileResponse, JSONResponse from fastapi.responses import HTMLResponse from pydub import AudioSegment import librosa import tempfile import shutil import subprocess from fastapi.middleware.cors import CORSMiddleware app = FastAPI() recognizer = sr.Recognizer() origins = ["*"] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/", response_class=HTMLResponse) async def read_root(): # Provide the path to the HTML file containing the front-end code with open("soundscripter.html", "r") as file: html_content = file.read() # return HTMLResponse(content=html_content) return html_content def resample_audio(input_path, output_path, target_sample_rate): ffmpeg_cmd = [ "ffmpeg", "-i", input_path, "-ar", str(target_sample_rate), output_path ] subprocess.run(ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) def recognize_speech_live(wav_file,lid="hi-IN"): AUDIO_FILE=wav_file file=open(wav_file+".txt","w") # print(file) text="Cannot read recorded file." try: with sr.AudioFile(AUDIO_FILE) as source: audio = recognizer.record(source) # print("before") text = recognizer.recognize_google(audio, language=lid) #file.write(aud_name +"\t"+text) return text except sr.UnknownValueError: return "Speech not recognized." except sr.RequestError as e: return f"API request failed: {e}" def convert_audio_format(input_data, input_format, output_format='wav'): # Convert audio data to WAV format audio = AudioSegment.from_file(io.BytesIO(input_data), format=input_format) output_data = audio.export(format=output_format).read() return output_data def recognize_speech(audio_data, language="hi-IN"): with io.BytesIO(audio_data) as audio_io: with sr.AudioFile(audio_io) as source: audio = recognizer.record(source) try: text = recognizer.recognize_google(audio, language=language) return text except sr.UnknownValueError: return "Speech not recognized." except sr.RequestError as e: return f"API request failed: {e}" @app.post("/asr") async def transcribe_audio(audio: UploadFile = File(...)): contents = await audio.read() # Determine the input audio format (assumes the format is part of the file name) input_format = audio.filename.split('.')[-1].lower() # Convert audio to WAV format wav_data = convert_audio_format(contents, input_format) # Saving the received audio file in WAV format for future analysis (optional) # wav_file_path = "received_audio.wav" # with open(wav_file_path, "wb") as f: # f.write(wav_data) # Transcribe the audio result = recognize_speech(wav_data) # print(result) # print(JSONResponse(content={"text": result})) # return {"Text": result} return JSONResponse(content={"text": result}) def get_sampling_rate(audio_file_path): audio = AudioSegment.from_file(audio_file_path) return audio.frame_rate @app.post("/asr/live") async def transcribe_live_audio(audio: UploadFile = File(...)): if not audio: return JSONResponse(content={"success": False}, status_code=400) # # Check if the uploaded file is in WAV format # if audio.content_type != "audio/wav": # return JSONResponse(content={"success": False, "message": "Audio must be in WAV format."}, status_code=400) # print("innn") try: # Save the received audio to a temporary file with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file: temp_file_path = temp_file.name shutil.copyfileobj(audio.file, temp_file) # Print the file path for debugging # print(temp_file_path) # Get the sampling rate of the received audio sampling_rate = get_sampling_rate(temp_file_path) # print(sampling_rate) # Resample the audio to 16 kHz if needed if sampling_rate != 16000: output_path = tempfile.mktemp(suffix=".wav") # print(output_path) resample_audio(temp_file_path, output_path, target_sample_rate=16000) result = recognize_speech_live(output_path) # print(result) else: result = recognize_speech_live(temp_file_path) # print(result) except Exception as e: # print("Error processing audio:", e) return JSONResponse(content={"success": False, "message": "Error processing audio."}, status_code=500) finally: # Cleanup: remove the temporary received audio file if os.path.exists(temp_file_path): os.remove(temp_file_path) return JSONResponse(content={"text": result}) # @app.post("/asr/live") # async def transcribe_live_audio(): # fs = 16000 # Target sample rate # duration = 3 # seconds # chunks = int(fs * duration) # # Record live audio # audio_data = sd.rec(chunks, samplerate=fs, channels=1, dtype=np.float32) # sd.wait() # # Resample the audio data to the target sample rate # audio_data_resampled = librosa.resample(audio_data.flatten(), orig_sr=fs, target_sr=16000) # # Convert audio data to bytes (use np.int16) # audio_bytes = audio_data_resampled.astype(np.int16).tobytes() # # Transcribe the audio # result = recognize_speech(audio_bytes) # return {"Text": result} #Run the FastAPI app # if __name__ == "__main__": # uvicorn.run(app, host="127.0.0.1", port=8000)