SoundScripter / soundscripter_fastAPI.py
Aditi Tewari
Update Code
f5dfcf5
import io
import os
import uvicorn
import sounddevice as sd
import numpy as np
import speech_recognition as sr
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import FileResponse, JSONResponse
from fastapi.responses import HTMLResponse
from pydub import AudioSegment
import librosa
import tempfile
import shutil
import subprocess
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
recognizer = sr.Recognizer()
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/", response_class=HTMLResponse)
async def read_root():
# Provide the path to the HTML file containing the front-end code
with open("soundscripter.html", "r") as file:
html_content = file.read()
# return HTMLResponse(content=html_content)
return html_content
def resample_audio(input_path, output_path, target_sample_rate):
ffmpeg_cmd = [
"ffmpeg",
"-i", input_path,
"-ar", str(target_sample_rate),
output_path
]
subprocess.run(ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def recognize_speech_live(wav_file,lid="hi-IN"):
AUDIO_FILE=wav_file
file=open(wav_file+".txt","w")
# print(file)
text="Cannot read recorded file."
try:
with sr.AudioFile(AUDIO_FILE) as source:
audio = recognizer.record(source)
# print("before")
text = recognizer.recognize_google(audio, language=lid)
#file.write(aud_name +"\t"+text)
return text
except sr.UnknownValueError:
return "Speech not recognized."
except sr.RequestError as e:
return f"API request failed: {e}"
def convert_audio_format(input_data, input_format, output_format='wav'):
# Convert audio data to WAV format
audio = AudioSegment.from_file(io.BytesIO(input_data), format=input_format)
output_data = audio.export(format=output_format).read()
return output_data
def recognize_speech(audio_data, language="hi-IN"):
with io.BytesIO(audio_data) as audio_io:
with sr.AudioFile(audio_io) as source:
audio = recognizer.record(source)
try:
text = recognizer.recognize_google(audio, language=language)
return text
except sr.UnknownValueError:
return "Speech not recognized."
except sr.RequestError as e:
return f"API request failed: {e}"
@app.post("/asr")
async def transcribe_audio(audio: UploadFile = File(...)):
contents = await audio.read()
# Determine the input audio format (assumes the format is part of the file name)
input_format = audio.filename.split('.')[-1].lower()
# Convert audio to WAV format
wav_data = convert_audio_format(contents, input_format)
# Saving the received audio file in WAV format for future analysis (optional)
# wav_file_path = "received_audio.wav"
# with open(wav_file_path, "wb") as f:
# f.write(wav_data)
# Transcribe the audio
result = recognize_speech(wav_data)
# print(result)
# print(JSONResponse(content={"text": result}))
# return {"Text": result}
return JSONResponse(content={"text": result})
def get_sampling_rate(audio_file_path):
audio = AudioSegment.from_file(audio_file_path)
return audio.frame_rate
@app.post("/asr/live")
async def transcribe_live_audio(audio: UploadFile = File(...)):
if not audio:
return JSONResponse(content={"success": False}, status_code=400)
# # Check if the uploaded file is in WAV format
# if audio.content_type != "audio/wav":
# return JSONResponse(content={"success": False, "message": "Audio must be in WAV format."}, status_code=400)
# print("innn")
try:
# Save the received audio to a temporary file
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file:
temp_file_path = temp_file.name
shutil.copyfileobj(audio.file, temp_file)
# Print the file path for debugging
# print(temp_file_path)
# Get the sampling rate of the received audio
sampling_rate = get_sampling_rate(temp_file_path)
# print(sampling_rate)
# Resample the audio to 16 kHz if needed
if sampling_rate != 16000:
output_path = tempfile.mktemp(suffix=".wav")
# print(output_path)
resample_audio(temp_file_path, output_path, target_sample_rate=16000)
result = recognize_speech_live(output_path)
# print(result)
else:
result = recognize_speech_live(temp_file_path)
# print(result)
except Exception as e:
# print("Error processing audio:", e)
return JSONResponse(content={"success": False, "message": "Error processing audio."}, status_code=500)
finally:
# Cleanup: remove the temporary received audio file
if os.path.exists(temp_file_path):
os.remove(temp_file_path)
return JSONResponse(content={"text": result})
# @app.post("/asr/live")
# async def transcribe_live_audio():
# fs = 16000 # Target sample rate
# duration = 3 # seconds
# chunks = int(fs * duration)
# # Record live audio
# audio_data = sd.rec(chunks, samplerate=fs, channels=1, dtype=np.float32)
# sd.wait()
# # Resample the audio data to the target sample rate
# audio_data_resampled = librosa.resample(audio_data.flatten(), orig_sr=fs, target_sr=16000)
# # Convert audio data to bytes (use np.int16)
# audio_bytes = audio_data_resampled.astype(np.int16).tobytes()
# # Transcribe the audio
# result = recognize_speech(audio_bytes)
# return {"Text": result}
#Run the FastAPI app
# if __name__ == "__main__":
# uvicorn.run(app, host="127.0.0.1", port=8000)