bskrishna2006
Add audio transcription endpoints for Railway integration
0254d02
"""
YouTube Video Summarizer API - Hugging Face Spaces Edition
Flask backend deployed on Hugging Face Spaces.
Provides multilingual YouTube video summarization using:
- Whisper (speech-to-text)
- NLLB-200 (translation)
- Groq API (summarization)
All ML models are FREE and run locally on HF Spaces infrastructure.
"""
from flask import Flask, request, jsonify
from flask_cors import CORS
from dotenv import load_dotenv
import os
import logging
from services.transcript import TranscriptService
from services.summarizer import SummarizerService
from config import (
SUPPORTED_LANGUAGES,
get_language_name,
is_english,
)
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
# Enable CORS for all origins (allow frontend from any domain)
CORS(app, resources={
r"/*": {
"origins": "*",
"methods": ["GET", "POST", "OPTIONS"],
"allow_headers": ["Content-Type", "Authorization"]
}
})
# Initialize services (lazy-loaded for heavy models)
transcript_service = TranscriptService()
summarizer_service = SummarizerService()
# Translation service is lazy-loaded to avoid loading 2.4GB model on startup
_translation_service = None
def get_translation_service():
"""Lazy-load the translation service."""
global _translation_service
if _translation_service is None:
from services.translation import TranslationService
_translation_service = TranslationService()
return _translation_service
# =============================================================================
# ROOT & HEALTH ENDPOINTS
# =============================================================================
@app.route('/', methods=['GET'])
def root():
"""Root endpoint - serves as health check for HF Spaces"""
return jsonify({
'status': 'healthy',
'service': 'YouTube Summarizer API',
'version': '2.0.0',
'docs': '/api/health for detailed status'
}), 200
@app.route('/api/health', methods=['GET'])
def health_check():
"""Detailed health check endpoint"""
return jsonify({
'status': 'healthy',
'message': 'YouTube Summarizer API is running on Hugging Face Spaces',
'version': '2.0.0',
'features': ['multilingual', 'whisper', 'translation'],
'models': {
'whisper': 'openai/whisper-small',
'translation': 'facebook/nllb-200-distilled-600M',
'summarization': 'groq/llama-3.1-8b-instant'
}
}), 200
@app.route('/api/languages', methods=['GET'])
def get_languages():
"""Get list of supported languages"""
return jsonify({
'success': True,
'languages': SUPPORTED_LANGUAGES
}), 200
@app.route('/api/warmup', methods=['POST'])
def warmup_models():
"""
Pre-load ML models to avoid delay on first request.
This can take 2-5 minutes on first run (downloading models).
"""
try:
results = {}
data = request.get_json() or {}
if data.get('translation', False):
logger.info("Warming up translation model...")
translation_service = get_translation_service()
translation_service.warmup()
results['translation'] = 'loaded'
if data.get('whisper', False):
logger.info("Warming up Whisper model...")
from services.speech_to_text import SpeechToTextService
stt = SpeechToTextService()
stt.warmup()
results['whisper'] = 'loaded'
return jsonify({
'success': True,
'message': 'Models warmed up successfully',
'models': results
}), 200
except Exception as e:
logger.error(f"Warmup failed: {e}")
return jsonify({
'error': 'Warmup failed',
'message': str(e)
}), 500
# =============================================================================
# AUDIO TRANSCRIPTION ENDPOINTS (for Railway integration)
# =============================================================================
@app.route('/api/transcribe-audio', methods=['POST'])
def transcribe_audio():
"""
Transcribe audio using Whisper.
Receives audio as base64 from Railway backend.
"""
try:
data = request.get_json()
if not data or 'audio_base64' not in data:
return jsonify({
'error': 'Missing audio',
'message': 'Please provide audio_base64'
}), 400
import base64
import tempfile
# Decode audio
audio_data = base64.b64decode(data['audio_base64'])
# Save to temp file
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
f.write(audio_data)
audio_path = f.name
try:
# Transcribe with Whisper
from services.speech_to_text import SpeechToTextService
stt = SpeechToTextService()
result = stt.transcribe_audio(audio_path)
return jsonify({
'success': True,
'transcript': result['text'],
'language': result['language'],
'word_count': len(result['text'].split())
}), 200
finally:
# Cleanup
import os
if os.path.exists(audio_path):
os.remove(audio_path)
except Exception as e:
logger.error(f"Audio transcription failed: {e}")
return jsonify({
'error': 'Transcription failed',
'message': str(e)
}), 500
@app.route('/api/process-audio', methods=['POST'])
def process_audio():
"""
Full pipeline for audio: Whisper transcription → Translation → Summary.
Receives audio as base64 from Railway backend.
"""
try:
data = request.get_json()
if not data or 'audio_base64' not in data:
return jsonify({
'error': 'Missing audio',
'message': 'Please provide audio_base64'
}), 400
import base64
import tempfile
video_id = data.get('video_id', 'unknown')
summary_type = data.get('summary_type', 'general')
target_language = data.get('target_language', 'eng')
# Decode audio
audio_data = base64.b64decode(data['audio_base64'])
# Save to temp file
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
f.write(audio_data)
audio_path = f.name
try:
# Step 1: Transcribe with Whisper
logger.info("Transcribing audio with Whisper...")
from services.speech_to_text import SpeechToTextService
stt = SpeechToTextService()
whisper_result = stt.transcribe_audio(audio_path)
transcript = whisper_result['text']
original_language = whisper_result['language']
original_word_count = len(transcript.split())
logger.info(f"Transcription complete. Language: {original_language}")
# Step 2: Translate to English if needed
english_transcript = transcript
if not is_english(original_language):
logger.info("Translating to English...")
translation_service = get_translation_service()
english_transcript = translation_service.translate_to_english(
transcript,
original_language
)
# Step 3: Summarize
logger.info("Generating summary...")
summary = summarizer_service.summarize(
text=english_transcript,
summary_type=summary_type,
chunk_size=2500,
max_tokens=500
)
# Step 4: Translate summary to target language if needed
final_summary = summary
summary_language = "eng"
if not is_english(target_language):
logger.info(f"Translating summary to {target_language}...")
translation_service = get_translation_service()
final_summary = translation_service.translate_from_english(summary, target_language)
summary_language = target_language
# Calculate statistics
summary_word_count = len(final_summary.split())
compression_ratio = (summary_word_count / original_word_count) * 100 if original_word_count > 0 else 0
response = {
'success': True,
'video_id': video_id,
'original_language': original_language,
'original_language_name': get_language_name(original_language),
'transcript': transcript,
'transcript_source': 'whisper',
'summary': final_summary,
'summary_language': summary_language,
'summary_language_name': get_language_name(summary_language),
'statistics': {
'original_word_count': original_word_count,
'summary_word_count': summary_word_count,
'compression_ratio': round(compression_ratio, 1),
'reading_time_minutes': max(1, summary_word_count // 200)
}
}
if not is_english(original_language):
response['english_transcript'] = english_transcript
if not is_english(target_language):
response['english_summary'] = summary
logger.info("Audio processing complete!")
return jsonify(response), 200
finally:
# Cleanup
import os
if os.path.exists(audio_path):
os.remove(audio_path)
except Exception as e:
logger.error(f"Audio processing failed: {e}")
return jsonify({
'error': 'Processing failed',
'message': str(e)
}), 500
# =============================================================================
# TRANSCRIPT ENDPOINTS
# =============================================================================
@app.route('/api/transcript', methods=['POST'])
def get_transcript():
"""
Extract transcript from YouTube video (multilingual).
Request: { "url": "youtube_url", "use_whisper": true }
Response: { "success": true, "transcript": "...", "language": "tam", ... }
"""
try:
data = request.get_json()
if not data or 'url' not in data:
return jsonify({
'error': 'Missing YouTube URL',
'message': 'Please provide a valid YouTube URL'
}), 400
url = data['url']
use_whisper = data.get('use_whisper', True)
video_id = transcript_service.extract_video_id(url)
result = transcript_service.get_video_transcript(url, use_whisper_fallback=use_whisper)
return jsonify({
'success': True,
'video_id': video_id,
'transcript': result['transcript'],
'language': result['language'],
'language_name': get_language_name(result['language']),
'source': result['source'],
'word_count': result['word_count']
}), 200
except ValueError as e:
return jsonify({'error': 'Invalid URL', 'message': str(e)}), 400
except Exception as e:
logger.error(f"Transcript extraction failed: {e}")
return jsonify({'error': 'Transcript extraction failed', 'message': str(e)}), 500
# =============================================================================
# TRANSLATION ENDPOINTS
# =============================================================================
@app.route('/api/translate', methods=['POST'])
def translate_text():
"""
Translate text between languages.
Request: { "text": "Hello", "source_lang": "eng", "target_lang": "hin" }
Response: { "success": true, "translated_text": "नमस्ते", ... }
"""
try:
data = request.get_json()
if not data or 'text' not in data:
return jsonify({
'error': 'Missing text',
'message': 'Please provide text to translate'
}), 400
text = data['text']
source_lang = data.get('source_lang', 'eng')
target_lang = data.get('target_lang', 'hin')
translation_service = get_translation_service()
translated = translation_service.translate(text, source_lang, target_lang)
return jsonify({
'success': True,
'translated_text': translated,
'source_lang': source_lang,
'source_lang_name': get_language_name(source_lang),
'target_lang': target_lang,
'target_lang_name': get_language_name(target_lang)
}), 200
except ValueError as e:
return jsonify({'error': 'Invalid language', 'message': str(e)}), 400
except Exception as e:
logger.error(f"Translation failed: {e}")
return jsonify({'error': 'Translation failed', 'message': str(e)}), 500
@app.route('/api/detect-language', methods=['POST'])
def detect_language():
"""Detect the language of given text."""
try:
data = request.get_json()
if not data or 'text' not in data:
return jsonify({
'error': 'Missing text',
'message': 'Please provide text for language detection'
}), 400
translation_service = get_translation_service()
result = translation_service.detect_language(data['text'])
return jsonify({
'success': True,
'language': result['code'],
'language_name': result['name']
}), 200
except Exception as e:
logger.error(f"Language detection failed: {e}")
return jsonify({'error': 'Language detection failed', 'message': str(e)}), 500
# =============================================================================
# SUMMARIZATION ENDPOINTS
# =============================================================================
@app.route('/api/summarize', methods=['POST'])
def summarize():
"""
Generate summary from transcript.
Request: { "transcript": "...", "summary_type": "general" }
Response: { "success": true, "summary": "...", "statistics": {...} }
"""
try:
data = request.get_json()
if not data or 'transcript' not in data:
return jsonify({
'error': 'Missing transcript',
'message': 'Please provide transcript text'
}), 400
transcript = data['transcript']
summary_type = data.get('summary_type', 'general')
chunk_size = data.get('chunk_size', 2500)
max_tokens = data.get('max_tokens', 500)
valid_types = ['general', 'detailed', 'bullet_points', 'key_takeaways']
if summary_type not in valid_types:
return jsonify({
'error': 'Invalid summary type',
'message': f'Must be one of: {", ".join(valid_types)}'
}), 400
summary = summarizer_service.summarize(
text=transcript,
summary_type=summary_type,
chunk_size=chunk_size,
max_tokens=max_tokens
)
summary_word_count = len(summary.split())
original_word_count = len(transcript.split())
compression_ratio = (summary_word_count / original_word_count) * 100 if original_word_count > 0 else 0
return jsonify({
'success': True,
'summary': summary,
'statistics': {
'original_word_count': original_word_count,
'summary_word_count': summary_word_count,
'compression_ratio': round(compression_ratio, 1),
'reading_time_minutes': max(1, summary_word_count // 200)
}
}), 200
except Exception as e:
logger.error(f"Summarization failed: {e}")
return jsonify({'error': 'Summarization failed', 'message': str(e)}), 500
# =============================================================================
# FULL PIPELINE ENDPOINT
# =============================================================================
@app.route('/api/process', methods=['POST'])
def process_video():
"""
Full multilingual pipeline: Transcript → Translation → Summary → Translation
Request: {
"url": "youtube_url",
"summary_type": "general",
"target_language": "hin" (optional)
}
"""
try:
data = request.get_json()
if not data or 'url' not in data:
return jsonify({
'error': 'Missing YouTube URL',
'message': 'Please provide a valid YouTube URL'
}), 400
url = data['url']
summary_type = data.get('summary_type', 'general')
target_language = data.get('target_language', 'eng')
chunk_size = data.get('chunk_size', 2500)
max_tokens = data.get('max_tokens', 500)
# Step 1: Extract video ID
video_id = transcript_service.extract_video_id(url)
logger.info(f"Processing video: {video_id}")
# Step 2: Get transcript with language
logger.info("Step 1/4: Extracting transcript...")
transcript_result = transcript_service.get_video_transcript(url, use_whisper_fallback=True)
original_transcript = transcript_result['transcript']
original_language = transcript_result['language']
original_word_count = transcript_result['word_count']
# Step 3: Translate to English if needed
english_transcript = original_transcript
if not is_english(original_language):
logger.info("Step 2/4: Translating to English...")
translation_service = get_translation_service()
english_transcript = translation_service.translate_to_english(
original_transcript,
original_language
)
else:
logger.info("Step 2/4: Skipped (already English)")
# Step 4: Summarize in English
logger.info("Step 3/4: Generating summary...")
summary = summarizer_service.summarize(
text=english_transcript,
summary_type=summary_type,
chunk_size=chunk_size,
max_tokens=max_tokens
)
# Step 5: Translate summary to target language
final_summary = summary
summary_language = "eng"
if not is_english(target_language):
logger.info(f"Step 4/4: Translating summary to {target_language}...")
translation_service = get_translation_service()
final_summary = translation_service.translate_from_english(summary, target_language)
summary_language = target_language
else:
logger.info("Step 4/4: Skipped (English output)")
# Calculate statistics
summary_word_count = len(final_summary.split())
compression_ratio = (summary_word_count / original_word_count) * 100 if original_word_count > 0 else 0
response = {
'success': True,
'video_id': video_id,
'original_language': original_language,
'original_language_name': get_language_name(original_language),
'transcript': original_transcript,
'transcript_source': transcript_result['source'],
'summary': final_summary,
'summary_language': summary_language,
'summary_language_name': get_language_name(summary_language),
'statistics': {
'original_word_count': original_word_count,
'summary_word_count': summary_word_count,
'compression_ratio': round(compression_ratio, 1),
'reading_time_minutes': max(1, summary_word_count // 200)
}
}
if not is_english(original_language):
response['english_transcript'] = english_transcript
if not is_english(target_language):
response['english_summary'] = summary
logger.info("Processing complete!")
return jsonify(response), 200
except ValueError as e:
return jsonify({'error': 'Invalid URL', 'message': str(e)}), 400
except Exception as e:
logger.error(f"Processing failed: {e}")
return jsonify({'error': 'Processing failed', 'message': str(e)}), 500
# =============================================================================
# ERROR HANDLERS
# =============================================================================
@app.errorhandler(404)
def not_found(error):
return jsonify({
'error': 'Not found',
'message': 'The requested endpoint does not exist'
}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({
'error': 'Internal server error',
'message': 'An unexpected error occurred'
}), 500
# =============================================================================
# MAIN (for local testing only - gunicorn is used in production)
# =============================================================================
if __name__ == '__main__':
port = int(os.environ.get('PORT', 7860))
if not os.getenv('GROQ_API_KEY'):
print("⚠️ Warning: GROQ_API_KEY not found")
print("Set it in HF Spaces Settings → Secrets")
print("🚀 Starting YouTube Summarizer API...")
print(f"📡 API available at: http://localhost:{port}")
app.run(debug=False, host='0.0.0.0', port=port)