accent-detector / src /streamlit_app.py
amirjamali's picture
Fix indentation in expert analysis display section of Streamlit app for improved layout
5235a31 unverified
import streamlit as st
import os
import yt_dlp
import subprocess
import librosa
import numpy as np
import torch
import sys
# Global flag for SpeechBrain availability
HAS_SPEECHBRAIN = False
# Handle SpeechBrain import with fallbacks for different versions
try:
# Try the new path first (SpeechBrain 1.0+)
from speechbrain.inference.classifiers import EncoderClassifier
HAS_SPEECHBRAIN = True
except ImportError:
try:
# Try the legacy path
from speechbrain.pretrained.interfaces import EncoderClassifier
HAS_SPEECHBRAIN = True
except ImportError:
try:
# Try the very old path
from speechbrain.pretrained import EncoderClassifier
HAS_SPEECHBRAIN = True
except ImportError:
# If all fail, we'll handle this later in the code
st.error("⚠️ Unable to import SpeechBrain. Limited functionality available.")
EncoderClassifier = None
# Handle potential compatibility issues with transformers
try:
from transformers import AutoProcessor, AutoModelForAudioClassification
HAS_AUTO_PROCESSOR = True
except ImportError:
from transformers import AutoModelForAudioClassification
HAS_AUTO_PROCESSOR = False
st.warning("Using a compatible but limited version of transformers. Some features may be limited.")
from dotenv import load_dotenv
import matplotlib.pyplot as plt
import tempfile
import time
# Deployment instructions:
# To deploy this app:
# 1. Make sure Docker is installed
# 2. Build the Docker image: docker build -t accent-detector .
# 3. Run the container: docker run -p 8501:8501 --volume /tmp/accent-detector:/app/uploads accent-detector
# For Windows: docker run -p 8501:8501 --volume C:\temp\accent-detector:/app/uploads accent-detector
# 4. Access the app at http://localhost:8501
#
# For cloud deployment:
# - Streamlit Cloud: Connect your GitHub repository to Streamlit Cloud
# - Hugging Face Spaces: Use the Docker deployment option with proper volume mounts
# - Azure/AWS/GCP: Deploy the container using their container services with persistent storage
#
# Troubleshooting file uploads:
# - Set maxUploadSize in .streamlit/config.toml
# - Ensure write permissions on upload directories
# - For 403 errors, check file size and format compatibility
# Load environment variables (if .env file exists)
try:
load_dotenv()
except:
pass
# Check for OpenAI API access - optional for enhanced explanations
try:
import openai
openai.api_key = os.getenv("OPENAI_API_KEY")
have_openai = openai.api_key is not None
except (ImportError, AttributeError):
have_openai = False
# English accent categories
ENGLISH_ACCENTS = {
"en-us": "American English",
"en-gb": "British English",
"en-au": "Australian English",
"en-ca": "Canadian English",
"en-ie": "Irish English",
"en-scotland": "Scottish English",
"en-in": "Indian English",
"en-za": "South African English",
"en-ng": "Nigerian English",
"en-caribbean": "Caribbean English",
}
def download_video(url, video_path="video.mp4", cookies_file=None):
"""Download a video from a URL"""
# Determine if this is a YouTube URL
is_youtube = "youtube" in url.lower() or "youtu.be" in url.lower()
# Create a unique directory for each download to avoid permission issues
timestamp = str(int(time.time()))
# Use proper temp directory for Windows or Linux
if os.name == 'nt': # Windows
temp_dir = os.path.join(os.environ.get('TEMP', 'C:\\temp'), f"video_download_{timestamp}")
else: # Linux/Mac
temp_dir = f"/tmp/video_download_{timestamp}"
os.makedirs(temp_dir, exist_ok=True)
# Set correct permissions for the temp directory
try:
os.chmod(temp_dir, 0o777) # Full permissions for all users
except Exception as e:
st.warning(f"Could not set directory permissions: {str(e)}. Continuing anyway.")
# Use the temp directory for the video path
if not os.path.isabs(video_path):
video_path = os.path.join(temp_dir, video_path)
ydl_opts = {
"outtmpl": video_path,
"quiet": False,
"verbose": True, # More detailed output for debugging
"format": "bestaudio/best", # Prefer audio formats since we only need audio
"postprocessors": [{
"key": "FFmpegExtractAudio",
"preferredcodec": "wav",
}] if is_youtube else [], # Extract audio directly for YouTube
"noplaylist": True,
"extractor_retries": 5, # Increased from 3 to 5
"socket_timeout": 45, # Increased from 30 to 45
"retry_sleep_functions": {
"http": lambda n: 5 * (n + 1), # 5, 10, 15, 20, 25 seconds
},
"nocheckcertificate": True, # Skip HTTPS certificate validation
"ignoreerrors": False, # Don't ignore errors (we want to handle them)
}
# Add cookies if provided
if cookies_file and os.path.exists(cookies_file):
ydl_opts["cookiefile"] = cookies_file
st.info("Using provided cookies file for authentication")
# Set permissions on cookies file to make sure it's readable
try:
os.chmod(cookies_file, 0o644) # Read-write for owner, read-only for others
except Exception as e:
st.warning(f"Could not set permissions on cookies file: {str(e)}. Continuing anyway.")
# Setup environment variables for cache directories
os.environ['HOME'] = temp_dir # Set HOME to our temp dir for YouTube-DL cache
os.environ['XDG_CACHE_HOME'] = os.path.join(temp_dir, '.cache') # For Linux
os.environ['APPDATA'] = temp_dir # For Windows
try:
if is_youtube:
st.info("Attempting to download from YouTube. This might take longer...")
# List of alternative YouTube frontends to try
youtube_alternatives = [
(url, "Standard YouTube"),
(url.replace("youtube.com", "yewtu.be"), "Invidious (yewtu.be)"),
(url.replace("youtube.com", "piped.video"), "Piped"),
(url.replace("youtube.com", "inv.riverside.rocks"), "Invidious (riverside)")
]
# If youtu.be is used, create proper alternatives
if "youtu.be" in url.lower():
video_id = url.split("/")[-1].split("?")[0]
youtube_alternatives = [
(url, "Standard YouTube"),
(f"https://yewtu.be/watch?v={video_id}", "Invidious (yewtu.be)"),
(f"https://piped.video/watch?v={video_id}", "Piped"),
(f"https://inv.riverside.rocks/watch?v={video_id}", "Invidious (riverside)")
]
success = False
for alt_url, alt_name in youtube_alternatives:
if alt_url == url and alt_name != "Standard YouTube":
continue # Skip redundant first entry
st.info(f"Trying {alt_name}... Please wait.")
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([alt_url])
# If we get here without exception, it worked
st.success(f"Successfully downloaded using {alt_name}")
success = True
break
except Exception as download_error:
error_msg = str(download_error)
st.warning(f"{alt_name} download attempt failed: {error_msg}")
# Break early if it's a permission issue to avoid trying alternatives
if "permission" in error_msg.lower() or "access" in error_msg.lower():
st.error("Permission error detected. Stopping download attempts.")
raise download_error
# If all attempts failed
if not success:
st.error("All YouTube download methods failed.")
return False
else:
# For non-YouTube URLs
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
# Check if download was successful
if os.path.exists(video_path):
return True
else:
# Look for any downloaded files in the temp directory - more comprehensive search
downloaded_files = []
for root, _, files in os.walk(temp_dir):
for file in files:
if file.endswith(('.mp4', '.mp3', '.wav', '.m4a')):
downloaded_files.append(os.path.join(root, file))
if downloaded_files:
# Use the first media file found
first_file = downloaded_files[0]
try:
# Copy instead of move to avoid cross-device link issues
import shutil
shutil.copy(first_file, video_path)
return True
except Exception as copy_error:
st.error(f"Error copying downloaded file: {str(copy_error)}")
return False
st.error(f"Video downloaded but file not found: {video_path}")
return False
except Exception as e:
error_msg = str(e)
st.error(f"Download error: {error_msg}")
# Provide specific guidance based on error type
if is_youtube and ("bot" in error_msg.lower() or "sign in" in error_msg.lower() or "403" in error_msg):
st.warning("⚠️ YouTube requires authentication. Please try one of these solutions:")
st.markdown("""
1. **Upload a cookies.txt file** using the file uploader above
2. **Try a different video source** like Loom, Vimeo or direct MP3/WAV files
3. **Use the Audio Upload tab** instead of YouTube URLs
""")
elif "not find" in error_msg.lower() and "cookies" in error_msg.lower():
st.warning("Browser cookies could not be accessed. Please upload a cookies.txt file.")
elif "network" in error_msg.lower() or "timeout" in error_msg.lower():
st.warning("Network error. Please check your internet connection and try again.")
elif "permission" in error_msg.lower():
st.warning("Permission error. The application doesn't have access to create or write files in the temporary directory.")
st.info("Try running the Docker container with the proper volume mounts: `docker run -p 8501:8501 --volume /tmp/accent-detector:/app/uploads accent-detector`")
elif "not found" in error_msg.lower() and "ffmpeg" in error_msg.lower():
st.error("FFmpeg is not installed or not found in PATH.")
st.info("If running locally, please install FFmpeg. If using Docker, the container may be misconfigured.")
return False
finally:
# Clean up temp directory if it still exists
try:
if os.path.exists(temp_dir) and ("tmp" in temp_dir or "temp" in temp_dir.lower()):
import shutil
shutil.rmtree(temp_dir)
except Exception as cleanup_error:
st.warning(f"Could not clean up temporary directory: {str(cleanup_error)}")
pass
def extract_audio(video_path="video.mp4", audio_path="audio.wav"):
"""Extract audio from video file using ffmpeg"""
try:
subprocess.run(
['ffmpeg', '-i', video_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', audio_path],
check=True,
capture_output=True
)
return os.path.exists(audio_path)
except subprocess.CalledProcessError as e:
st.error(f"Error extracting audio: {e}")
st.error(f"ffmpeg output: {e.stderr.decode('utf-8')}")
raise
class AccentDetector:
def __init__(self):
# Initialize language identification model
self.have_lang_id = False
try:
if EncoderClassifier is not None:
self.lang_id = EncoderClassifier.from_hparams(
source="speechbrain/lang-id-commonlanguage_ecapa",
savedir="tmp_model"
)
self.have_lang_id = True
else:
st.error("SpeechBrain not available. Language identification disabled.")
except Exception as e:
st.error(f"Error loading language ID model: {str(e)}")
# Initialize the accent classifier
self.have_accent_model = False
try:
self.model_name = "speechbrain/lang-id-voxlingua107-ecapa"
# Handle case where AutoProcessor is not available
if HAS_AUTO_PROCESSOR:
self.processor = AutoProcessor.from_pretrained(self.model_name)
else:
# Fall back to using feature_extractor
from transformers import AutoFeatureExtractor
self.processor = AutoFeatureExtractor.from_pretrained(self.model_name)
self.model = AutoModelForAudioClassification.from_pretrained(self.model_name)
self.have_accent_model = True
except Exception as e:
st.warning(f"Could not load accent model: {str(e)}")
self.have_accent_model = False
def is_english(self, audio_path, threshold=0.7):
"""
Determine if the speech is English and return confidence score
"""
if not hasattr(self, 'have_lang_id') or not self.have_lang_id:
# If language ID model is not available, assume English
st.warning("Language identification is not available. Assuming English speech.")
return True, "en", 1.0
try:
out_prob, score, index, lang = self.lang_id.classify_file(audio_path)
score = float(score)
# Check if language is English (slightly fuzzy match)
is_english = "eng" in lang.lower() or "en-" in lang.lower() or lang.lower() == "en"
return is_english, lang, score
except Exception as e:
st.warning(f"Error identifying language: {str(e)}. Assuming English speech.")
return True, "en", 0.5
def classify_accent(self, audio_path):
"""
Classify the specific English accent
"""
if not self.have_accent_model:
return "Unknown English Accent", 0.0
try:
# Load and preprocess audio
audio, sr = librosa.load(audio_path, sr=16000)
inputs = self.processor(audio, sampling_rate=sr, return_tensors="pt")
# Get predictions
with torch.no_grad():
outputs = self.model(**inputs)
# Get probabilities
probs = outputs.logits.softmax(dim=-1)[0]
prediction_id = probs.argmax().item()
confidence = probs[prediction_id].item()
# Get predicted label
id2label = self.model.config.id2label
accent_code = id2label[prediction_id]
# Map to English accent if possible
if accent_code.startswith('en-'):
accent = ENGLISH_ACCENTS.get(accent_code, f"English ({accent_code})")
confidence = confidence # Keep confidence as-is for English accents
else:
# If it's not an English accent code, use our pre-classification
is_english, _, _ = self.is_english(audio_path)
if is_english:
accent = "General English"
else:
accent = f"Non-English ({accent_code})"
confidence *= 0.7 # Reduce confidence for non-specific matches
return accent, confidence
except Exception as e:
st.error(f"Error in accent classification: {str(e)}")
return "Unknown English Accent", 0.0
def generate_explanation(self, audio_path, accent, confidence, is_english, language):
"""
Generate an explanation of the accent detection results using OpenAI API (if available)
"""
if not have_openai:
if is_english:
return f"The speaker has a {accent} accent with {confidence*100:.1f}% confidence. The speech was identified as English."
else:
return f"The speech was identified as {language}, not English. English confidence is low."
try:
import openai
is_english, lang, lang_score = self.is_english(audio_path)
prompt = f"""
Audio analysis detected a speaker with the following characteristics:
- Primary accent/language: {accent}
- Confidence score: {confidence*100:.1f}%
- Detected language category: {lang}
- Is English: {is_english}
Based on this information, provide a 2-3 sentence summary about the speaker's accent.
Focus on how clear their English is and any notable accent characteristics.
This is for hiring purposes to evaluate English speaking abilities.
"""
response = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are an accent analysis specialist providing factual assessments."},
{"role": "user", "content": prompt}
],
max_tokens=150
)
return response.choices[0].message.content.strip()
except Exception as e:
st.error(f"Error generating explanation: {str(e)}")
if is_english:
return f"The speaker has a {accent} accent with {confidence*100:.1f}% confidence. The speech was identified as English."
else:
return f"The speech was identified as {language}, not English. English confidence is low."
def analyze_audio(self, audio_path):
"""
Complete analysis pipeline returning all needed results
"""
# Check if it's English
is_english, lang, lang_score = self.is_english(audio_path)
# Classify accent if it's English
if is_english:
accent, accent_confidence = self.classify_accent(audio_path)
english_confidence = lang_score * 100 # Scale to percentage
else:
accent = f"Non-English ({lang})"
accent_confidence = lang_score
english_confidence = max(0, min(30, lang_score * 50)) # Cap at 30% if non-English
# Generate explanation
explanation = self.generate_explanation(audio_path, accent, accent_confidence, is_english, lang)
# Create visualization of the audio waveform
try:
y, sr = librosa.load(audio_path, sr=None)
fig, ax = plt.subplots(figsize=(10, 2))
ax.plot(y)
ax.set_xlabel('Sample')
ax.set_ylabel('Amplitude')
ax.set_title('Audio Waveform')
plt.tight_layout()
audio_viz = fig
# Make sure the figure can be saved
try:
# Test if the figure can be saved
import tempfile
with tempfile.NamedTemporaryFile(suffix='.png') as tmp:
plt.savefig(tmp.name)
except Exception as viz_save_error:
st.warning(f"Could not save visualization: {str(viz_save_error)}. Using simpler visualization.")
# Create a simple alternative visualization
import numpy as np
# Downsample for performance
sample_rate = max(1, len(y) // 1000)
y_downsampled = y[::sample_rate]
fig2, ax2 = plt.subplots(figsize=(8, 2))
ax2.plot(np.arange(len(y_downsampled)), y_downsampled)
ax2.set_title("Audio Waveform (simplified)")
audio_viz = fig2
except Exception as e:
st.warning(f"Could not generate audio visualization: {str(e)}")
audio_viz = None
return {
"is_english": is_english,
"accent": accent,
"accent_confidence": accent_confidence * 100, # Scale to percentage
"english_confidence": english_confidence,
"language_detected": lang,
"explanation": explanation,
"audio_viz": audio_viz
}
def process_uploaded_audio(file_input):
"""Process uploaded audio file
Args:
file_input: Either a StreamlitUploadedFile object or a string path to a file
"""
audio_path = None
temp_input_path = None
try:
# Create a unique filename based on timestamp
timestamp = str(int(time.time()))
# Create a deterministic uploads directory with full permissions
uploads_dir = os.path.join(os.getcwd(), "uploads")
os.makedirs(uploads_dir, exist_ok=True)
# Try Streamlit's own upload path first if available
streamlit_uploads_path = os.environ.get('STREAMLIT_UPLOADS_PATH')
if streamlit_uploads_path and os.path.isdir(streamlit_uploads_path):
uploads_dir = streamlit_uploads_path
st.info(f"Using Streamlit's upload directory: {uploads_dir}")
# Make sure uploads directory has proper permissions
try:
os.chmod(uploads_dir, 0o777) # Full permissions
except Exception as chmod_error:
st.warning(f"Could not set permissions on uploads directory: {str(chmod_error)}. Continuing anyway.")
# Log upload dir info for debugging
st.info(f"Upload directory: {uploads_dir} (exists: {os.path.exists(uploads_dir)}, writable: {os.access(uploads_dir, os.W_OK)})")
# Handle different input types
if isinstance(file_input, str):
# If it's already a file path
temp_input_path = file_input
file_extension = os.path.splitext(temp_input_path)[1].lower()
st.info(f"Processing from saved file: {os.path.basename(temp_input_path)}")
else:
# If it's a StreamlitUploadedFile
file_extension = os.path.splitext(file_input.name)[1].lower()
# Write the uploaded file to disk with proper extension in the uploads directory
# Use a unique filename to avoid conflicts
safe_filename = ''.join(c if c.isalnum() or c in '._- ' else '_' for c in file_input.name)
temp_input_path = os.path.join(uploads_dir, f"uploaded_{timestamp}_{safe_filename}")
st.info(f"Saving uploaded file to: {temp_input_path}")
try:
# Write in chunks to handle large files better
chunk_size = 1024 * 1024 # 1MB chunks
buffer = file_input.getbuffer()
with open(temp_input_path, "wb") as f:
for i in range(0, len(buffer), chunk_size):
f.write(buffer[i:i+chunk_size])
# Verify file was written properly
if os.path.exists(temp_input_path):
file_size = os.path.getsize(temp_input_path)
st.success(f"File saved successfully: {file_size} bytes")
else:
st.error(f"Failed to save file - file doesn't exist after writing")
except Exception as write_error:
st.error(f"Error writing uploaded file: {str(write_error)}")
# Try alternative temp directory as fallback
try:
import tempfile
temp_dir = tempfile.gettempdir()
temp_input_path = os.path.join(temp_dir, f"uploaded_{timestamp}_{safe_filename}")
st.warning(f"Trying alternative location: {temp_input_path}")
with open(temp_input_path, "wb") as f:
f.write(file_input.getbuffer())
except Exception as alt_write_error:
st.error(f"Alternative write also failed: {str(alt_write_error)}")
raise
# For MP4 files, extract the audio using ffmpeg
if file_extension == ".mp4":
st.info("Extracting audio from video file...")
audio_path = os.path.join(uploads_dir, f"extracted_audio_{timestamp}.wav")
try:
# Add -y flag to overwrite output file if it exists
subprocess.run(
['ffmpeg', '-y', '-i', temp_input_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', audio_path],
check=True,
capture_output=True
)
st.success(f"Audio extracted successfully to {audio_path}")
# Remove the original video file if extraction was successful
if os.path.exists(audio_path) and os.path.getsize(audio_path) > 0:
os.remove(temp_input_path)
except subprocess.CalledProcessError as e:
st.error(f"Error extracting audio: {e}")
if e.stderr:
st.error(f"FFmpeg output: {e.stderr.decode('utf-8')}")
raise
else:
# For audio files, process based on format
if file_extension in [".mp3", ".m4a", ".ogg", ".flac"]:
# Convert to WAV for better compatibility
audio_path = os.path.join(uploads_dir, f"converted_audio_{timestamp}.wav")
st.info(f"Converting {file_extension} to WAV format for analysis...")
try:
# Use a verbose ffmpeg command with more options for compatibility
process = subprocess.run(
[
'ffmpeg', '-y', '-i', temp_input_path,
'-ar', '16000', '-ac', '1', '-c:a', 'pcm_s16le',
# Add error handling flags
'-err_detect', 'ignore_err',
# Add buffers for better handling
'-analyzeduration', '10000000', '-probesize', '10000000',
audio_path
],
check=True,
capture_output=True
)
# Verify the file was created successfully
if os.path.exists(audio_path) and os.path.getsize(audio_path) > 0:
st.success(f"Audio converted successfully: {os.path.getsize(audio_path)} bytes")
# If conversion was successful, remove the original file to save space
os.remove(temp_input_path)
else:
st.warning("Conversion produced an empty file. Trying fallback conversion method...")
# Try alternative conversion method - simpler command
fallback_cmd = ['ffmpeg', '-y', '-i', temp_input_path, audio_path]
subprocess.run(fallback_cmd, check=True, capture_output=True)
if not os.path.exists(audio_path) or os.path.getsize(audio_path) == 0:
st.warning("Fallback conversion also failed. Using original file.")
audio_path = temp_input_path
except subprocess.CalledProcessError as e:
st.warning(f"Conversion warning: {e}")
if e.stderr:
st.warning(f"FFmpeg error: {e.stderr.decode('utf-8')}")
st.info("Using original file instead.")
audio_path = temp_input_path
else:
# For already WAV files, use them directly
audio_path = temp_input_path
st.info(f"Using WAV file directly: {audio_path}")
detector = AccentDetector()
results = detector.analyze_audio(audio_path)
# Clean up
if audio_path and audio_path != temp_input_path and os.path.exists(audio_path):
os.remove(audio_path)
return results
except Exception as e:
error_msg = str(e)
st.error(f"Error processing audio: {error_msg}")
# Add detailed debugging info
import traceback
st.error(f"Error details: {traceback.format_exc()}")
# Show file info if available
if temp_input_path and os.path.exists(temp_input_path):
st.info(f"Input file exists: {temp_input_path}, size: {os.path.getsize(temp_input_path)} bytes")
os.remove(temp_input_path)
else:
if temp_input_path:
st.warning(f"Input file does not exist: {temp_input_path}")
if audio_path and os.path.exists(audio_path):
st.info(f"Audio file exists: {audio_path}, size: {os.path.getsize(audio_path)} bytes")
os.remove(audio_path)
else:
if audio_path:
st.warning(f"Audio file does not exist: {audio_path}")
# Check for common error types
if "ffmpeg" in error_msg.lower():
st.warning("FFmpeg error detected. The audio conversion failed.")
st.info("Try a different audio format or check if FFmpeg is installed correctly.")
elif "permission" in error_msg.lower():
st.warning("Permission error detected.")
st.info("Check that the uploads directory is writable.")
elif "no such file" in error_msg.lower():
st.warning("File not found error detected.")
st.info("The file may have been moved, deleted, or not saved correctly.")
raise
return results
# --- Streamlit App ---
st.set_page_config(
page_title="🎤 English Accent Detector",
page_icon="🎤",
layout="wide"
)
st.title("🎤 English Accent Detection Tool")
st.markdown("""
This application analyzes a speaker's English accent from video URLs or audio uploads,
providing detailed insights for hiring evaluation purposes.
""")
# Add container for tips
with st.container():
st.info("""
💡 **Tips for best results:**
- Use **Loom** or **Vimeo** videos (more reliable than YouTube)
- For YouTube videos, you may need to provide cookies
- Audio clips of 15-30 seconds work best
- Clear speech with minimal background noise is ideal
""")
st.markdown("""
This app analyzes a speaker's English accent from a video or audio source.
It provides:
- Classification of the accent (British, American, etc.)
- Confidence score for English proficiency
- Explanation of accent characteristics
""")
# Create tabs for different input methods
tab1, tab2 = st.tabs(["Video URL", "Upload Audio"])
with tab1:
st.markdown("### 🎬 Analyze video from URL")
url = st.text_input("Enter a public video URL",
placeholder="https://www.loom.com/..., https://vimeo.com/..., or direct MP4 link")
# Add alternative invidious frontend option for YouTube
use_alternative = st.checkbox("Try alternative YouTube source (for authentication issues)",
value=True,
help="Uses an alternative frontend (Invidious) that may bypass YouTube restrictions")
# Recommend alternative sources
st.caption("⚠️ **Note**: YouTube videos often require authentication. For best results, use Loom, Vimeo or direct video links.")
# Add file uploader for cookies.txt
cookies_file = None
uploaded_cookies = st.file_uploader("Upload cookies.txt file for YouTube (if needed)",
type="txt",
help="Only needed for YouTube videos that require authentication")
if uploaded_cookies is not None:
# Save the uploaded cookies file to a temporary file
cookies_file = f"cookies_{int(time.time())}.txt"
with open(cookies_file, "wb") as f:
f.write(uploaded_cookies.getbuffer())
st.success("Cookies file uploaded successfully!")
with st.expander("Having trouble with YouTube videos?"):
st.markdown("""
### YouTube Authentication Issues
YouTube's anti-bot measures often block automated video downloads. To solve this:
#### Option 1: Use Alternative Video Sources (Recommended)
These typically work without authentication issues:
- [Loom](https://www.loom.com/) - Great for screen recordings
- [Vimeo](https://vimeo.com/) - High-quality video hosting
- [Streamable](https://streamable.com/) - Simple video sharing
- Any direct MP4 link
#### Option 2: Upload Cookies for YouTube
1. Install a browser extension like [Get cookies.txt](https://chrome.google.com/webstore/detail/get-cookiestxt-locally/cclelndahbckbenkjhflpdbgdldlbecc)
2. Login to YouTube in your browser
3. Use the extension to export cookies to a .txt file
4. Upload the cookies.txt file using the uploader above
#### Option 3: Use Audio Upload Instead
The 'Upload Audio' tab allows direct analysis of audio files without URL issues.
""")
if st.button("Analyze Video"):
if not url:
st.warning("Please enter a valid URL")
else:
try:
# Create a placeholder for status updates
status = st.empty()
# Generate unique filenames using timestamp to avoid conflicts
timestamp = str(int(time.time()))
video_path = f"video_{timestamp}.mp4"
audio_path = f"audio_{timestamp}.wav"
# Download and process the video
status.text("Downloading video...")
download_success = download_video(url, video_path, cookies_file)
if not download_success:
st.error("Failed to download video")
else:
status.text("Extracting audio...")
extract_success = extract_audio(video_path, audio_path)
if not extract_success:
st.error("Failed to extract audio")
else:
status.text("Analyzing accent... (this may take a moment)")
detector = AccentDetector()
results = detector.analyze_audio(audio_path)
# Display results
st.success("✅ Analysis Complete!")
# Create columns for results
col1, col2 = st.columns([2, 1])
with col1:
st.subheader("Accent Analysis Results")
st.markdown(f"**Detected Accent:** {results['accent']}")
st.markdown(f"**English Proficiency:** {results['english_confidence']:.1f}%")
st.markdown(f"**Accent Confidence:** {results['accent_confidence']:.1f}%")
# Show explanation in a box
st.markdown("### Expert Analysis")
st.info(results['explanation'])
with col2:
if results['audio_viz']:
try:
st.pyplot(results['audio_viz'])
except Exception as viz_error:
st.warning("Could not display visualization due to torchvision issue.")
st.info("Audio analysis was successful even though visualization failed.")
# Show audio playback
st.audio(audio_path)
# Clean up files
try:
if os.path.exists(video_path):
os.remove(video_path)
if os.path.exists(audio_path):
os.remove(audio_path)
if cookies_file and os.path.exists(cookies_file):
os.remove(cookies_file)
except Exception as e:
st.warning(f"Couldn't clean up temporary files: {str(e)}")
except Exception as e:
st.error(f"Error during analysis: {str(e)}")
with tab2:
st.markdown("### 🎵 Upload Audio File")
st.caption("**Recommended option!** Direct audio upload is more reliable than video URLs.")
# Add some information about file size limits
st.info("📝 **File Requirements**: \n"
"• Maximum file size: 200MB \n"
"• Supported formats: WAV, MP3, M4A, OGG, FLAC, MP4 \n"
"• Recommended length: 15-60 seconds of clear speech")
uploaded_file = st.file_uploader("Upload an audio file",
type=["wav", "mp3", "m4a", "ogg", "flac", "mp4"],
help="Support for WAV, MP3, M4A, OGG, FLAC and MP4 formats",
accept_multiple_files=False)
if uploaded_file is not None: # Show a preview of the audio
st.markdown("#### Audio Preview:")
try:
st.audio(uploaded_file)
st.markdown("#### Ready for Analysis")
col1, col2 = st.columns([1, 3])
with col1:
analyze_button = st.button("Analyze Audio", type="primary", use_container_width=True)
with col2:
st.caption("Tip: 15-30 seconds of clear speech works best for accent detection")
except Exception as preview_error:
st.warning(f"Could not preview audio: {str(preview_error)}")
# If preview fails, still allow analysis
analyze_button = st.button("Analyze Audio (Preview Failed)", type="primary")
st.caption("Proceeding with analysis might still work even if preview failed")
if analyze_button:
with st.spinner("Analyzing audio... (this may take 15-30 seconds)"):
try:
# Check file size before processing
file_size_mb = len(uploaded_file.getvalue()) / (1024 * 1024)
if file_size_mb > 190: # Stay below the 200MB limit with some buffer
st.error(f"File size ({file_size_mb:.1f}MB) is too large. Maximum allowed is 190MB.")
st.info("Tip: Try trimming your audio to just the speech segment for better results.")
else: # Create a progress bar to show processing stages
progress_bar = st.progress(0)
# Check the file type and inform user about processing steps
file_extension = os.path.splitext(uploaded_file.name)[1].lower()
if file_extension == '.mp4':
st.info("Processing video file - extracting audio track...")
elif file_extension in ['.mp3', '.m4a', '.ogg', '.flac']:
st.info(f"Processing {file_extension} audio file...")
progress_bar.progress(25, text="Saving file...")
# First save the file to a known location to bypass 403 errors
# Create an uploads directory if it doesn't exist
uploads_dir = os.path.join(os.getcwd(), "uploads")
os.makedirs(uploads_dir, exist_ok=True) # Save the file first to avoid streaming it multiple times
temp_file_path = os.path.join(uploads_dir, f"temp_{int(time.time())}_{uploaded_file.name}")
with open(temp_file_path, "wb") as f:
f.write(uploaded_file.getbuffer())
progress_bar.progress(50, text="Analyzing audio...")
# Process using the saved file path directly
results = process_uploaded_audio(temp_file_path)
progress_bar.progress(100, text="Analysis complete!")
# Display results
st.success("✅ Analysis Complete!")
# Create columns for results
col1, col2 = st.columns([2, 1])
with col1:
st.subheader("Accent Analysis Results")
st.markdown(f"**Detected Accent:** {results['accent']}")
st.markdown(f"**English Proficiency:** {results['english_confidence']:.1f}%")
st.markdown(f"**Accent Confidence:** {results['accent_confidence']:.1f}%")
# Show explanation in a box
st.markdown("### Expert Analysis")
st.info(results['explanation'])
with col2:
if results['audio_viz']:
try:
st.pyplot(results['audio_viz'])
except Exception as viz_error:
st.warning("Could not display visualization due to torchvision issue.")
st.info("Audio analysis was successful even though visualization failed.")
except subprocess.CalledProcessError as e:
st.error("Error processing audio file")
st.error(f"FFmpeg error: {e.stderr.decode('utf-8') if e.stderr else str(e)}")
st.info("Troubleshooting tips:\n"
"• Try a different audio file format (WAV or MP3 recommended)\n"
"• Make sure the file is not corrupted\n"
"• Try a shorter audio clip")
except PermissionError as e:
st.error(f"Permission error: {str(e)}")
st.info("The app doesn't have permission to access or create temporary files. "
"This could be due to Docker container permissions. "
"Contact the administrator or try using a different file.")
except OSError as e:
st.error(f"System error: {str(e)}")
st.info("Check that the file isn't corrupted and try with a smaller audio clip.")
except Exception as e:
error_msg = str(e)
st.error(f"Error during analysis: {error_msg}")
if "403" in error_msg:
st.warning("Received a 403 Forbidden error. This may be due to: \n"
"• File size exceeding limits\n"
"• Temporary file permission issues\n"
"• Network restrictions")
st.info("Try a smaller audio file (less than 50MB) or a different format.")
elif "timeout" in error_msg.lower():
st.warning("The request timed out. Try a shorter audio clip or check your internet connection.")
elif "memory" in error_msg.lower():
st.warning("Out of memory error. Try a shorter audio clip.")
else:
st.info("If the problem persists, try a different audio file format such as MP3 or WAV.")
# Add footer with deployment info
st.markdown("---")
st.markdown("Deployed using Streamlit • Built with SpeechBrain and Transformers")
# Add a section for how it works
with st.expander("ℹ️ How It Works"):
st.markdown("""
This app uses a multi-stage process to analyze a speaker's accent:
1. **Audio Extraction**: The audio track is extracted from the input video or directly processed from uploaded audio.
2. **Language Identification**: First, we determine if the speech is English using SpeechBrain's language identification model.
3. **Accent Classification**: For English speech, we analyze the specific accent using a transformer-based model trained on diverse accent data.
4. **English Proficiency Score**: A confidence score is calculated based on both language identification and accent clarity.
5. **Analysis Summary**: An explanation is generated describing accent characteristics relevant for hiring evaluations.
""")
# Add debug function for troubleshooting HTTP errors
def debug_http_errors():
"""Print debug information for HTTP errors"""
st.warning("⚠️ HTTP 400 Error Debugging Mode")
st.markdown("""
### Common HTTP 400 Error Causes:
1. **File size exceeds limits** (current limit: 150MB)
2. **File format incompatibility**
3. **Network interruption** during upload
4. **Server-side timeout** during processing
5. **Permissions issues** in container
""")
# Show environment info
st.subheader("Environment Information")
env_info = {
"STREAMLIT_UPLOADS_PATH": os.environ.get("STREAMLIT_UPLOADS_PATH", "Not set"),
"STREAMLIT_SERVER_MAX_UPLOAD_SIZE": os.environ.get("STREAMLIT_SERVER_MAX_UPLOAD_SIZE", "Not set"),
"Current directory": os.getcwd(),
"Python version": sys.version
}
for key, value in env_info.items():
st.code(f"{key}: {value}")
# Check if uploads directory is writable
uploads_dir = os.environ.get("STREAMLIT_UPLOADS_PATH", os.path.join(os.getcwd(), "uploads"))
os.makedirs(uploads_dir, exist_ok=True)
try:
test_file = os.path.join(uploads_dir, "test_write.txt")
with open(test_file, "w") as f:
f.write("Test write permission")
os.remove(test_file)
st.success(f"✓ Upload directory is writable: {uploads_dir}")
except Exception as e:
st.error(f"✗ Cannot write to upload directory: {str(e)}")
# Test ffmpeg
try:
result = subprocess.run(["ffmpeg", "-version"], capture_output=True, text=True)
st.success(f"✓ FFmpeg is available")
except Exception as e:
st.error(f"✗ FFmpeg error: {str(e)}")
# Add debug mode flag to the app
debug_mode = False
with st.expander("🔧 Troubleshooting Tools"):
debug_mode = st.checkbox("Enable Debug Mode for HTTP 400 Errors")
if debug_mode:
debug_http_errors()
# Add option for user to try different upload method
alt_upload = st.checkbox("Use alternative upload method (for HTTP 400 errors)")
if alt_upload:
st.info("Using alternative upload method that may bypass some HTTP 400 errors")