Spaces:
Sleeping
Sleeping
| import io | |
| import os | |
| import uvicorn | |
| import sounddevice as sd | |
| import numpy as np | |
| import speech_recognition as sr | |
| from fastapi import FastAPI, File, UploadFile | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from fastapi.responses import HTMLResponse | |
| from pydub import AudioSegment | |
| import librosa | |
| import tempfile | |
| import shutil | |
| import subprocess | |
| from fastapi.middleware.cors import CORSMiddleware | |
| app = FastAPI() | |
| recognizer = sr.Recognizer() | |
| origins = ["*"] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def read_root(): | |
| # Provide the path to the HTML file containing the front-end code | |
| with open("soundscripter.html", "r") as file: | |
| html_content = file.read() | |
| # return HTMLResponse(content=html_content) | |
| return html_content | |
| def resample_audio(input_path, output_path, target_sample_rate): | |
| ffmpeg_cmd = [ | |
| "ffmpeg", | |
| "-i", input_path, | |
| "-ar", str(target_sample_rate), | |
| output_path | |
| ] | |
| subprocess.run(ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| def recognize_speech_live(wav_file,lid="hi-IN"): | |
| AUDIO_FILE=wav_file | |
| file=open(wav_file+".txt","w") | |
| # print(file) | |
| text="Cannot read recorded file." | |
| try: | |
| with sr.AudioFile(AUDIO_FILE) as source: | |
| audio = recognizer.record(source) | |
| # print("before") | |
| text = recognizer.recognize_google(audio, language=lid) | |
| #file.write(aud_name +"\t"+text) | |
| return text | |
| except sr.UnknownValueError: | |
| return "Speech not recognized." | |
| except sr.RequestError as e: | |
| return f"API request failed: {e}" | |
| def convert_audio_format(input_data, input_format, output_format='wav'): | |
| # Convert audio data to WAV format | |
| audio = AudioSegment.from_file(io.BytesIO(input_data), format=input_format) | |
| output_data = audio.export(format=output_format).read() | |
| return output_data | |
| def recognize_speech(audio_data, language="hi-IN"): | |
| with io.BytesIO(audio_data) as audio_io: | |
| with sr.AudioFile(audio_io) as source: | |
| audio = recognizer.record(source) | |
| try: | |
| text = recognizer.recognize_google(audio, language=language) | |
| return text | |
| except sr.UnknownValueError: | |
| return "Speech not recognized." | |
| except sr.RequestError as e: | |
| return f"API request failed: {e}" | |
| async def transcribe_audio(audio: UploadFile = File(...)): | |
| contents = await audio.read() | |
| # Determine the input audio format (assumes the format is part of the file name) | |
| input_format = audio.filename.split('.')[-1].lower() | |
| # Convert audio to WAV format | |
| wav_data = convert_audio_format(contents, input_format) | |
| # Saving the received audio file in WAV format for future analysis (optional) | |
| # wav_file_path = "received_audio.wav" | |
| # with open(wav_file_path, "wb") as f: | |
| # f.write(wav_data) | |
| # Transcribe the audio | |
| result = recognize_speech(wav_data) | |
| # print(result) | |
| # print(JSONResponse(content={"text": result})) | |
| # return {"Text": result} | |
| return JSONResponse(content={"text": result}) | |
| def get_sampling_rate(audio_file_path): | |
| audio = AudioSegment.from_file(audio_file_path) | |
| return audio.frame_rate | |
| async def transcribe_live_audio(audio: UploadFile = File(...)): | |
| if not audio: | |
| return JSONResponse(content={"success": False}, status_code=400) | |
| # # Check if the uploaded file is in WAV format | |
| # if audio.content_type != "audio/wav": | |
| # return JSONResponse(content={"success": False, "message": "Audio must be in WAV format."}, status_code=400) | |
| # print("innn") | |
| try: | |
| # Save the received audio to a temporary file | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file: | |
| temp_file_path = temp_file.name | |
| shutil.copyfileobj(audio.file, temp_file) | |
| # Print the file path for debugging | |
| # print(temp_file_path) | |
| # Get the sampling rate of the received audio | |
| sampling_rate = get_sampling_rate(temp_file_path) | |
| # print(sampling_rate) | |
| # Resample the audio to 16 kHz if needed | |
| if sampling_rate != 16000: | |
| output_path = tempfile.mktemp(suffix=".wav") | |
| # print(output_path) | |
| resample_audio(temp_file_path, output_path, target_sample_rate=16000) | |
| result = recognize_speech_live(output_path) | |
| # print(result) | |
| else: | |
| result = recognize_speech_live(temp_file_path) | |
| # print(result) | |
| except Exception as e: | |
| # print("Error processing audio:", e) | |
| return JSONResponse(content={"success": False, "message": "Error processing audio."}, status_code=500) | |
| finally: | |
| # Cleanup: remove the temporary received audio file | |
| if os.path.exists(temp_file_path): | |
| os.remove(temp_file_path) | |
| return JSONResponse(content={"text": result}) | |
| # @app.post("/asr/live") | |
| # async def transcribe_live_audio(): | |
| # fs = 16000 # Target sample rate | |
| # duration = 3 # seconds | |
| # chunks = int(fs * duration) | |
| # # Record live audio | |
| # audio_data = sd.rec(chunks, samplerate=fs, channels=1, dtype=np.float32) | |
| # sd.wait() | |
| # # Resample the audio data to the target sample rate | |
| # audio_data_resampled = librosa.resample(audio_data.flatten(), orig_sr=fs, target_sr=16000) | |
| # # Convert audio data to bytes (use np.int16) | |
| # audio_bytes = audio_data_resampled.astype(np.int16).tobytes() | |
| # # Transcribe the audio | |
| # result = recognize_speech(audio_bytes) | |
| # return {"Text": result} | |
| #Run the FastAPI app | |
| # if __name__ == "__main__": | |
| # uvicorn.run(app, host="127.0.0.1", port=8000) | |