Translate / app.py
Athspi's picture
Update app.py
8bc1a84 verified
import os
import time
import tempfile
import uuid
import google.generativeai as genai
import requests
from flask import Flask, request, render_template, send_from_directory, url_for, flash, jsonify
from moviepy.video.io.VideoFileClip import VideoFileClip
from moviepy.audio.io.AudioFileClip import AudioFileClip
from werkzeug.utils import secure_filename
from dotenv import load_dotenv
import threading
from datetime import datetime, timedelta
import logging
# Initialize Flask app and load secrets
load_dotenv()
app = Flask(__name__)
# Configuration
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
TTS_API_URL = os.getenv("TTS_API_URL")
if not GEMINI_API_KEY or not TTS_API_URL:
raise ValueError("Missing required environment variables")
genai.configure(api_key=GEMINI_API_KEY)
# File storage setup
UPLOAD_FOLDER = 'uploads'
DOWNLOAD_FOLDER = 'downloads'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(DOWNLOAD_FOLDER, exist_ok=True)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['DOWNLOAD_FOLDER'] = DOWNLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 # 500MB
app.secret_key = os.urandom(24)
# Processing status tracking
processing_status = {}
processing_times = {
'upload': 0,
'transcription': 0,
'tts': 0,
'dubbing': 0
}
# Voice options
VOICE_CHOICES = {
"Male (Charon)": "Charon",
"Female (Zephyr)": "Zephyr"
}
GEMINI_PROMPT = """
You are an AI scriptwriter. Your task is to watch the provided video and transcribe ALL spoken dialogue into a SINGLE, CONTINUOUS block of modern, colloquial Tamil.
**CRITICAL INSTRUCTIONS:**
1. **Single Script:** Combine all dialogue into one continuous script.
2. **NO Timestamps or Speaker Labels:** Do NOT include any timestamps or speaker identifiers.
3. **Incorporate Performance:** Add English style prompts (e.g., `Say happily:`, `Whisper mysteriously:`) and performance tags (e.g., `[laugh]`, `[sigh]`) directly into the text for an expressive narration.
**EXAMPLE OUTPUT:**
Say happily: வணக்கம்! [laugh] எப்படி இருக்கீங்க? Whisper mysteriously: அந்த ரகசியம் எனக்கு மட்டும் தான் தெரியும்.
"""
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def track_processing_time(task_id, stage, duration):
"""Track processing times for each stage"""
processing_times[stage] = duration
if task_id in processing_status:
processing_status[task_id]['timings'][stage] = duration
def estimate_remaining_time(task_id):
"""Estimate remaining processing time"""
if task_id not in processing_status:
return "Calculating..."
status = processing_status[task_id]
completed_stages = [s for s in status['timings'] if status['timings'][s] is not None]
if len(completed_stages) == 0:
return "Starting soon..."
# Weighted average based on stage complexity
weights = {
'transcription': 2.0,
'tts': 1.5,
'dubbing': 1.0
}
total_weighted_time = 0
total_weights = 0
for stage in completed_stages:
weight = weights.get(stage, 1.0)
total_weighted_time += status['timings'][stage] * weight
total_weights += weight
if total_weights == 0:
return "Estimating..."
avg_time = total_weighted_time / total_weights
remaining_stages = 4 - len(completed_stages)
return remaining_stages * avg_time
def process_video_background(task_id, video_path, voice, cheerful):
"""Background processing function with enhanced logging"""
try:
start_time = time.time()
processing_status[task_id] = {
'status': 'processing',
'progress': 0,
'message': 'Starting transcription',
'timings': {'upload': None, 'transcription': None, 'tts': None, 'dubbing': None},
'start_time': start_time,
'video_duration': get_video_duration(video_path)
}
# Stage 1: Transcription
processing_status[task_id]['message'] = 'Transcribing video content'
logger.info(f"Task {task_id}: Starting transcription")
script_start = time.time()
script = generate_tamil_script(video_path)
transcription_time = time.time() - script_start
track_processing_time(task_id, 'transcription', transcription_time)
processing_status[task_id]['progress'] = 25
processing_status[task_id]['script'] = script
logger.info(f"Task {task_id}: Transcription completed in {transcription_time:.1f}s")
# Stage 2: TTS Generation
processing_status[task_id]['message'] = 'Generating audio narration'
logger.info(f"Task {task_id}: Starting TTS generation")
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio:
audio_path = temp_audio.name
tts_start = time.time()
generate_audio_track(script, voice, cheerful, audio_path)
tts_time = time.time() - tts_start
track_processing_time(task_id, 'tts', tts_time)
processing_status[task_id]['progress'] = 50
logger.info(f"Task {task_id}: TTS completed in {tts_time:.1f}s")
# Stage 3: Dubbing
processing_status[task_id]['message'] = 'Creating dubbed video'
logger.info(f"Task {task_id}: Starting dubbing")
final_filename = f"dubbed_{task_id}.mp4"
final_path = os.path.join(app.config['DOWNLOAD_FOLDER'], final_filename)
dubbing_start = time.time()
replace_video_audio(video_path, audio_path, final_path)
dubbing_time = time.time() - dubbing_start
track_processing_time(task_id, 'dubbing', dubbing_time)
processing_status[task_id]['progress'] = 75
logger.info(f"Task {task_id}: Dubbing completed in {dubbing_time:.1f}s")
# Cleanup
os.unlink(audio_path)
# Finalize
processing_status[task_id].update({
'status': 'complete',
'progress': 100,
'message': 'Processing complete',
'result_path': final_path,
'end_time': time.time()
})
logger.info(f"Task {task_id}: Processing completed successfully")
except Exception as e:
logger.error(f"Task {task_id} failed: {str(e)}")
processing_status[task_id].update({
'status': 'error',
'message': f'Error: {str(e)}'
})
# Cleanup temporary files
if 'video_path' in locals() and os.path.exists(video_path):
os.unlink(video_path)
if 'audio_path' in locals() and os.path.exists(audio_path):
os.unlink(audio_path)
def get_video_duration(video_path):
"""Get duration of video in seconds"""
try:
with VideoFileClip(video_path) as video:
return video.duration
except:
return 0
def generate_tamil_script(video_path):
"""Generate Tamil script using Gemini with retry logic"""
max_retries = 3
retry_delay = 10 # seconds
for attempt in range(max_retries):
try:
video_file = genai.upload_file(video_path, mime_type="video/mp4")
# Wait for file processing with timeout
start_wait = time.time()
while video_file.state.name == "PROCESSING":
if time.time() - start_wait > 300: # 5 minutes timeout
raise TimeoutError("Gemini processing timed out")
time.sleep(5)
video_file = genai.get_file(video_file.name)
if video_file.state.name != "ACTIVE":
raise Exception(f"Gemini processing failed: {video_file.state.name}")
model = genai.GenerativeModel(model_name="models/gemini-2.5-flash")
response = model.generate_content([GEMINI_PROMPT, video_file])
genai.delete_file(video_file.name)
if hasattr(response, 'text') and response.text:
return " ".join(response.text.strip().splitlines())
raise Exception("No valid script generated")
except Exception as e:
if attempt < max_retries - 1:
logger.warning(f"Gemini error (attempt {attempt+1}/{max_retries}): {str(e)}")
time.sleep(retry_delay * (attempt + 1))
else:
raise
def generate_audio_track(text, voice, cheerful, output_path):
"""Generate audio using TTS API with retry logic"""
max_retries = 3
retry_delay = 5 # seconds
for attempt in range(max_retries):
try:
payload = {
"text": text,
"voice_name": voice,
"cheerful": cheerful
}
response = requests.post(TTS_API_URL, json=payload, timeout=300)
if response.status_code != 200:
raise Exception(f"TTS API error: {response.status_code} - {response.text}")
with open(output_path, "wb") as f:
f.write(response.content)
return
except Exception as e:
if attempt < max_retries - 1:
logger.warning(f"TTS error (attempt {attempt+1}/{max_retries}): {str(e)}")
time.sleep(retry_delay * (attempt + 1))
else:
raise
def replace_video_audio(video_path, audio_path, output_path):
"""Replace video audio track with enhanced error handling"""
video = None
audio = None
try:
# Open video and audio files
video = VideoFileClip(video_path)
audio = AudioFileClip(audio_path)
# Set video audio
video.audio = audio
# Write output with optimized settings
video.write_videofile(
output_path,
codec="libx264",
audio_codec="aac",
logger=None,
threads=4,
preset='medium',
ffmpeg_params=['-crf', '23', '-movflags', '+faststart']
)
except Exception as e:
logger.error(f"Video processing error: {str(e)}")
# Cleanup partially created file
if os.path.exists(output_path):
os.unlink(output_path)
raise
finally:
if video:
video.close()
if audio:
audio.close()
@app.route('/')
def index():
"""Main page"""
return render_template('index.html', voices=VOICE_CHOICES)
@app.route('/upload', methods=['POST'])
def upload_video():
"""Handle video upload and start processing"""
if 'video' not in request.files:
return jsonify({'error': 'No file uploaded'}), 400
file = request.files['video']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
# Generate unique task ID
task_id = str(uuid.uuid4())
filename = secure_filename(f"{task_id}_{file.filename}")
video_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(video_path)
# Get processing options
voice = request.form.get('voice', 'Charon')
cheerful = request.form.get('cheerful', 'false') == 'true'
# Start background processing
processing_status[task_id] = {
'status': 'uploaded',
'progress': 0,
'message': 'Starting processing',
'timings': {'upload': time.time(), 'transcription': None, 'tts': None, 'dubbing': None},
'start_time': time.time(),
'video_duration': get_video_duration(video_path)
}
thread = threading.Thread(
target=process_video_background,
args=(task_id, video_path, voice, cheerful)
)
thread.start()
return jsonify({
'task_id': task_id,
'video_duration': processing_status[task_id]['video_duration']
})
@app.route('/status/<task_id>')
def get_status(task_id):
"""Check processing status"""
if task_id not in processing_status:
return jsonify({'error': 'Invalid task ID'}), 404
status = processing_status[task_id]
# Calculate ETA if processing
eta = None
if status['status'] == 'processing':
elapsed = time.time() - status['start_time']
remaining = estimate_remaining_time(task_id)
if isinstance(remaining, (int, float)):
eta = str(timedelta(seconds=int(remaining)))
response = {
'status': status['status'],
'progress': status.get('progress', 0),
'message': status.get('message', ''),
'eta': eta
}
if status['status'] == 'complete':
response['result_url'] = url_for('download', filename=os.path.basename(status['result_path']))
response['script'] = status.get('script', '')
return jsonify(response)
@app.route('/download/<filename>')
def download(filename):
"""Serve processed video"""
return send_from_directory(app.config['DOWNLOAD_FOLDER'], filename)
@app.route('/cleanup', methods=['POST'])
def cleanup():
"""Cleanup old files"""
try:
# Cleanup uploads older than 1 hour
for filename in os.listdir(UPLOAD_FOLDER):
file_path = os.path.join(UPLOAD_FOLDER, filename)
if os.path.getmtime(file_path) < time.time() - 3600:
os.unlink(file_path)
# Cleanup downloads older than 24 hours
for filename in os.listdir(DOWNLOAD_FOLDER):
file_path = os.path.join(DOWNLOAD_FOLDER, filename)
if os.path.getmtime(file_path) < time.time() - 86400:
os.unlink(file_path)
return jsonify({'status': 'success', 'message': 'Cleanup completed'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
if __name__ == '__main__':
# Schedule cleanup thread
import schedule
import time as t
def cleanup_job():
with app.app_context():
app.test_client().post('/cleanup')
schedule.every().hour.do(cleanup_job)
# Start scheduler in background thread
def scheduler_thread():
while True:
schedule.run_pending()
t.sleep(1)
threading.Thread(target=scheduler_thread, daemon=True).start()
# Start Flask app
app.run(host="0.0.0.0", port=7860, threaded=True)