Spaces:
Sleeping
Sleeping
File size: 5,837 Bytes
ee1587c 13e81e3 ee1587c b77c785 ee1587c 13e81e3 26ebd7f 13e81e3 26ebd7f 13e81e3 ee1587c b77c785 ee1587c b77c785 ee1587c d260c49 ee1587c 26ebd7f b77c785 ee1587c 13e81e3 26ebd7f 13e81e3 26ebd7f 13e81e3 26ebd7f 13e81e3 26ebd7f 13e81e3 26ebd7f 13e81e3 26ebd7f 13e81e3 26ebd7f 13e81e3 ee1587c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
import io
import os
import uvicorn
import sounddevice as sd
import numpy as np
import speech_recognition as sr
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import FileResponse, JSONResponse
from fastapi.responses import HTMLResponse
from pydub import AudioSegment
import librosa
import tempfile
import shutil
import subprocess
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
recognizer = sr.Recognizer()
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/", response_class=HTMLResponse)
async def read_root():
# Provide the path to the HTML file containing the front-end code
with open("soundscripter.html", "r") as file:
html_content = file.read()
# return HTMLResponse(content=html_content)
return html_content
def resample_audio(input_path, output_path, target_sample_rate):
ffmpeg_cmd = [
"ffmpeg",
"-i", input_path,
"-ar", str(target_sample_rate),
output_path
]
subprocess.run(ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def recognize_speech_live(wav_file,lid="hi-IN"):
AUDIO_FILE=wav_file
file=open(wav_file+".txt","w")
# print(file)
text="Cannot read recorded file."
try:
with sr.AudioFile(AUDIO_FILE) as source:
audio = recognizer.record(source)
# print("before")
text = recognizer.recognize_google(audio, language=lid)
#file.write(aud_name +"\t"+text)
return text
except sr.UnknownValueError:
return "Speech not recognized."
except sr.RequestError as e:
return f"API request failed: {e}"
def convert_audio_format(input_data, input_format, output_format='wav'):
# Convert audio data to WAV format
audio = AudioSegment.from_file(io.BytesIO(input_data), format=input_format)
output_data = audio.export(format=output_format).read()
return output_data
def recognize_speech(audio_data, language="hi-IN"):
with io.BytesIO(audio_data) as audio_io:
with sr.AudioFile(audio_io) as source:
audio = recognizer.record(source)
try:
text = recognizer.recognize_google(audio, language=language)
return text
except sr.UnknownValueError:
return "Speech not recognized."
except sr.RequestError as e:
return f"API request failed: {e}"
@app.post("/asr")
async def transcribe_audio(audio: UploadFile = File(...)):
contents = await audio.read()
# Determine the input audio format (assumes the format is part of the file name)
input_format = audio.filename.split('.')[-1].lower()
# Convert audio to WAV format
wav_data = convert_audio_format(contents, input_format)
# Saving the received audio file in WAV format for future analysis (optional)
# wav_file_path = "received_audio.wav"
# with open(wav_file_path, "wb") as f:
# f.write(wav_data)
# Transcribe the audio
result = recognize_speech(wav_data)
# print(result)
# print(JSONResponse(content={"text": result}))
# return {"Text": result}
return JSONResponse(content={"text": result})
def get_sampling_rate(audio_file_path):
audio = AudioSegment.from_file(audio_file_path)
return audio.frame_rate
@app.post("/asr/live")
async def transcribe_live_audio(audio: UploadFile = File(...)):
if not audio:
return JSONResponse(content={"success": False}, status_code=400)
# # Check if the uploaded file is in WAV format
# if audio.content_type != "audio/wav":
# return JSONResponse(content={"success": False, "message": "Audio must be in WAV format."}, status_code=400)
# print("innn")
try:
# Save the received audio to a temporary file
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file:
temp_file_path = temp_file.name
shutil.copyfileobj(audio.file, temp_file)
# Print the file path for debugging
# print(temp_file_path)
# Get the sampling rate of the received audio
sampling_rate = get_sampling_rate(temp_file_path)
# print(sampling_rate)
# Resample the audio to 16 kHz if needed
if sampling_rate != 16000:
output_path = tempfile.mktemp(suffix=".wav")
# print(output_path)
resample_audio(temp_file_path, output_path, target_sample_rate=16000)
result = recognize_speech_live(output_path)
# print(result)
else:
result = recognize_speech_live(temp_file_path)
# print(result)
except Exception as e:
# print("Error processing audio:", e)
return JSONResponse(content={"success": False, "message": "Error processing audio."}, status_code=500)
finally:
# Cleanup: remove the temporary received audio file
if os.path.exists(temp_file_path):
os.remove(temp_file_path)
return JSONResponse(content={"text": result})
# @app.post("/asr/live")
# async def transcribe_live_audio():
# fs = 16000 # Target sample rate
# duration = 3 # seconds
# chunks = int(fs * duration)
# # Record live audio
# audio_data = sd.rec(chunks, samplerate=fs, channels=1, dtype=np.float32)
# sd.wait()
# # Resample the audio data to the target sample rate
# audio_data_resampled = librosa.resample(audio_data.flatten(), orig_sr=fs, target_sr=16000)
# # Convert audio data to bytes (use np.int16)
# audio_bytes = audio_data_resampled.astype(np.int16).tobytes()
# # Transcribe the audio
# result = recognize_speech(audio_bytes)
# return {"Text": result}
#Run the FastAPI app
# if __name__ == "__main__":
# uvicorn.run(app, host="127.0.0.1", port=8000)
|