Spaces:
Sleeping
Sleeping
import io | |
import os | |
import uvicorn | |
import sounddevice as sd | |
import numpy as np | |
import speech_recognition as sr | |
from fastapi import FastAPI, File, UploadFile | |
from fastapi.responses import FileResponse, JSONResponse | |
from fastapi.responses import HTMLResponse | |
from pydub import AudioSegment | |
import librosa | |
import tempfile | |
import shutil | |
import subprocess | |
from fastapi.middleware.cors import CORSMiddleware | |
app = FastAPI() | |
recognizer = sr.Recognizer() | |
origins = ["*"] | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=origins, | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
async def read_root(): | |
# Provide the path to the HTML file containing the front-end code | |
with open("soundscripter.html", "r") as file: | |
html_content = file.read() | |
# return HTMLResponse(content=html_content) | |
return html_content | |
def resample_audio(input_path, output_path, target_sample_rate): | |
ffmpeg_cmd = [ | |
"ffmpeg", | |
"-i", input_path, | |
"-ar", str(target_sample_rate), | |
output_path | |
] | |
subprocess.run(ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
def recognize_speech_live(wav_file,lid="hi-IN"): | |
AUDIO_FILE=wav_file | |
file=open(wav_file+".txt","w") | |
# print(file) | |
text="Cannot read recorded file." | |
try: | |
with sr.AudioFile(AUDIO_FILE) as source: | |
audio = recognizer.record(source) | |
# print("before") | |
text = recognizer.recognize_google(audio, language=lid) | |
#file.write(aud_name +"\t"+text) | |
return text | |
except sr.UnknownValueError: | |
return "Speech not recognized." | |
except sr.RequestError as e: | |
return f"API request failed: {e}" | |
def convert_audio_format(input_data, input_format, output_format='wav'): | |
# Convert audio data to WAV format | |
audio = AudioSegment.from_file(io.BytesIO(input_data), format=input_format) | |
output_data = audio.export(format=output_format).read() | |
return output_data | |
def recognize_speech(audio_data, language="hi-IN"): | |
with io.BytesIO(audio_data) as audio_io: | |
with sr.AudioFile(audio_io) as source: | |
audio = recognizer.record(source) | |
try: | |
text = recognizer.recognize_google(audio, language=language) | |
return text | |
except sr.UnknownValueError: | |
return "Speech not recognized." | |
except sr.RequestError as e: | |
return f"API request failed: {e}" | |
async def transcribe_audio(audio: UploadFile = File(...)): | |
contents = await audio.read() | |
# Determine the input audio format (assumes the format is part of the file name) | |
input_format = audio.filename.split('.')[-1].lower() | |
# Convert audio to WAV format | |
wav_data = convert_audio_format(contents, input_format) | |
# Saving the received audio file in WAV format for future analysis (optional) | |
# wav_file_path = "received_audio.wav" | |
# with open(wav_file_path, "wb") as f: | |
# f.write(wav_data) | |
# Transcribe the audio | |
result = recognize_speech(wav_data) | |
# print(result) | |
# print(JSONResponse(content={"text": result})) | |
# return {"Text": result} | |
return JSONResponse(content={"text": result}) | |
def get_sampling_rate(audio_file_path): | |
audio = AudioSegment.from_file(audio_file_path) | |
return audio.frame_rate | |
async def transcribe_live_audio(audio: UploadFile = File(...)): | |
if not audio: | |
return JSONResponse(content={"success": False}, status_code=400) | |
# # Check if the uploaded file is in WAV format | |
# if audio.content_type != "audio/wav": | |
# return JSONResponse(content={"success": False, "message": "Audio must be in WAV format."}, status_code=400) | |
# print("innn") | |
try: | |
# Save the received audio to a temporary file | |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file: | |
temp_file_path = temp_file.name | |
shutil.copyfileobj(audio.file, temp_file) | |
# Print the file path for debugging | |
# print(temp_file_path) | |
# Get the sampling rate of the received audio | |
sampling_rate = get_sampling_rate(temp_file_path) | |
# print(sampling_rate) | |
# Resample the audio to 16 kHz if needed | |
if sampling_rate != 16000: | |
output_path = tempfile.mktemp(suffix=".wav") | |
# print(output_path) | |
resample_audio(temp_file_path, output_path, target_sample_rate=16000) | |
result = recognize_speech_live(output_path) | |
# print(result) | |
else: | |
result = recognize_speech_live(temp_file_path) | |
# print(result) | |
except Exception as e: | |
# print("Error processing audio:", e) | |
return JSONResponse(content={"success": False, "message": "Error processing audio."}, status_code=500) | |
finally: | |
# Cleanup: remove the temporary received audio file | |
if os.path.exists(temp_file_path): | |
os.remove(temp_file_path) | |
return JSONResponse(content={"text": result}) | |
# @app.post("/asr/live") | |
# async def transcribe_live_audio(): | |
# fs = 16000 # Target sample rate | |
# duration = 3 # seconds | |
# chunks = int(fs * duration) | |
# # Record live audio | |
# audio_data = sd.rec(chunks, samplerate=fs, channels=1, dtype=np.float32) | |
# sd.wait() | |
# # Resample the audio data to the target sample rate | |
# audio_data_resampled = librosa.resample(audio_data.flatten(), orig_sr=fs, target_sr=16000) | |
# # Convert audio data to bytes (use np.int16) | |
# audio_bytes = audio_data_resampled.astype(np.int16).tobytes() | |
# # Transcribe the audio | |
# result = recognize_speech(audio_bytes) | |
# return {"Text": result} | |
#Run the FastAPI app | |
# if __name__ == "__main__": | |
# uvicorn.run(app, host="127.0.0.1", port=8000) | |