Spaces:
Runtime error
Runtime error
import os | |
import spacy | |
import speech_recognition as sr | |
from loguru import logger | |
from pydub import AudioSegment | |
# Load spaCy's English language model for grammar and text analysis | |
def get_doc(text): | |
try: | |
nlp = spacy.load("en_core_web_sm") | |
doc = nlp(text) | |
return doc | |
except OSError as ex: | |
logger.exception("Error while getting Spacy doc: ", str(ex)) | |
logger.info("Please download the model with this command: python -m spacy download en_core_web_sm") | |
# Convert MP3 to WAV using pydub | |
def convert_mp3_to_wav(mp3_file, wav_file="converted_comprehension_audio.wav"): | |
sound = AudioSegment.from_mp3(mp3_file) | |
sound.export(wav_file, format="wav") | |
return wav_file | |
# Convert WAV to text using SpeechRecognition | |
def transcribe_audio(wav_file): | |
recognizer = sr.Recognizer() | |
with sr.AudioFile(wav_file) as source: | |
audio_data = recognizer.record(source) | |
text = recognizer.recognize_google(audio_data) | |
return text | |
def split_audio(audio_path, chunk_length_ms=30000): | |
"""Splits audio into chunks of specified length (in milliseconds).""" | |
audio = AudioSegment.from_wav(audio_path) | |
chunks = [audio[i : i + chunk_length_ms] for i in range(0, len(audio), chunk_length_ms)] | |
chunk_paths = [] | |
for idx, chunk in enumerate(chunks): | |
chunk_path = f"{audio_path}_chunk_{idx}.wav" | |
chunk.export(chunk_path, format="wav") | |
chunk_paths.append(chunk_path) | |
return chunk_paths | |
def aggregate_scores(chunk_scores): | |
"""Aggregate scores across all chunks to produce a final score.""" | |
final_score = { | |
"accuracy_score": 0, | |
"fluency_score": 0, | |
"completeness_score": 0, | |
"pronunciation_score": 0, | |
"mispronunced_words": [], | |
"display_text": "", | |
} | |
num_chunks = len(chunk_scores) | |
# Sum each score across chunks | |
for score in chunk_scores: | |
final_score["accuracy_score"] += score.get("accuracy_score", 0) | |
final_score["fluency_score"] += score.get("fluency_score", 0) | |
final_score["completeness_score"] += score.get("completeness_score", 0) | |
final_score["pronunciation_score"] += score.get("pronunciation_score", 0) | |
final_score["mispronunced_words"].extend(score.get("mispronunced_words")) | |
final_score["display_text"] += score.get("display_text") | |
# Average each score | |
for key in final_score: | |
if type(final_score[key]) in [float, int]: | |
final_score[key] = round(final_score[key] / num_chunks, 2) | |
return final_score | |
def remove_files(file_list): | |
try: | |
os.system("rm " + " ".join(file_list)) | |
logger.info("All listed files removed.") | |
except Exception as ex: | |
logger.exception(f"Error removing files: {ex}") | |