Spaces:
Sleeping
Sleeping
import os | |
import shutil | |
import subprocess | |
from fastapi import FastAPI, File, UploadFile, Form | |
from fastapi.responses import FileResponse, JSONResponse | |
from fastapi.responses import HTMLResponse | |
from pydub import AudioSegment | |
import shutil | |
import tempfile | |
import speech_recognition as sr | |
import os | |
r = sr.Recognizer() | |
app = FastAPI() | |
def resample_audio(input_path, output_path, target_sample_rate): | |
ffmpeg_cmd = [ | |
"ffmpeg", | |
"-i", input_path, | |
"-ar", str(target_sample_rate), | |
output_path | |
] | |
subprocess.run(ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
async def read_root(): | |
# Provide the path to the HTML file containing the front-end code | |
with open("soundscripter.html", "r") as file: | |
html_content = file.read() | |
return html_content | |
def get_sampling_rate(audio_file_path): | |
audio = AudioSegment.from_file(audio_file_path) | |
return audio.frame_rate | |
async def process_audio(audio: UploadFile = File(...), language: str = Form(...)): | |
if not audio or not language: | |
return JSONResponse(content={"success": False}, status_code=400) | |
# Check if the uploaded file is in WAV format | |
if audio.content_type != "audio/wav": | |
return JSONResponse(content={"success": False, "message": "Audio must be in WAV format."}, status_code=400) | |
try: | |
# Save the received audio to a temporary file | |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file: | |
temp_file_path = temp_file.name | |
shutil.copyfileobj(audio.file, temp_file) | |
# Print the file path for debugging | |
print(temp_file_path) | |
output_path = tempfile.mktemp(suffix=".wav") | |
# Resample the audio to 16000 Hz | |
resample_audio(temp_file_path, output_path, target_sample_rate=16000) | |
print(output_path) | |
# Get the sampling rate of the received audio | |
sampling_rate = get_sampling_rate(output_path) | |
# Resample the audio to 16 kHz if needed | |
if sampling_rate != 16000: | |
return JSONResponse(content={"success": False, "message": "Sample rate is not 16000Hz."}, status_code=500) | |
except Exception as e: | |
print("Error processing audio:", e) | |
return JSONResponse(content={"success": False, "message": "Error processing audio."}, status_code=500) | |
# finally: | |
# # Cleanup: remove the temporary received audio file | |
# if os.path.exists(audio_file_path): | |
# os.remove(audio_file_path) | |
return JSONResponse(content={"success": True, "language":calling_asr(output_path,"hi-IN")}) | |
def calling_asr(wav_file,lid): | |
AUDIO_FILE=wav_file | |
# aud_name=AUDIO_FILE.split('/')[-1].split('.')[0] | |
file=open(wav_file+".txt","w") | |
text="cant read wav file" | |
try: | |
with sr.AudioFile(AUDIO_FILE) as source: | |
audio = r.record(source) | |
text = r.recognize_google(audio, language=lid) | |
#file.write(aud_name +"\t"+text) | |
return text | |
except: | |
#file.write(" "+"Error in segement"+" ") | |
return text | |
#file.close() | |