|
import os |
|
import time |
|
import tempfile |
|
import uuid |
|
import google.generativeai as genai |
|
import requests |
|
from flask import Flask, request, render_template, send_from_directory, url_for, flash, jsonify |
|
from moviepy.video.io.VideoFileClip import VideoFileClip |
|
from moviepy.audio.io.AudioFileClip import AudioFileClip |
|
from werkzeug.utils import secure_filename |
|
from dotenv import load_dotenv |
|
import threading |
|
from datetime import datetime, timedelta |
|
import logging |
|
|
|
|
|
load_dotenv() |
|
app = Flask(__name__) |
|
|
|
|
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
TTS_API_URL = os.getenv("TTS_API_URL") |
|
|
|
if not GEMINI_API_KEY or not TTS_API_URL: |
|
raise ValueError("Missing required environment variables") |
|
|
|
genai.configure(api_key=GEMINI_API_KEY) |
|
|
|
|
|
UPLOAD_FOLDER = 'uploads' |
|
DOWNLOAD_FOLDER = 'downloads' |
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
|
os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) |
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER |
|
app.config['DOWNLOAD_FOLDER'] = DOWNLOAD_FOLDER |
|
app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 |
|
app.secret_key = os.urandom(24) |
|
|
|
|
|
processing_status = {} |
|
processing_times = { |
|
'upload': 0, |
|
'transcription': 0, |
|
'tts': 0, |
|
'dubbing': 0 |
|
} |
|
|
|
|
|
VOICE_CHOICES = { |
|
"Male (Charon)": "Charon", |
|
"Female (Zephyr)": "Zephyr" |
|
} |
|
|
|
GEMINI_PROMPT = """ |
|
You are an AI scriptwriter. Your task is to watch the provided video and transcribe ALL spoken dialogue into a SINGLE, CONTINUOUS block of modern, colloquial Tamil. |
|
|
|
**CRITICAL INSTRUCTIONS:** |
|
1. **Single Script:** Combine all dialogue into one continuous script. |
|
2. **NO Timestamps or Speaker Labels:** Do NOT include any timestamps or speaker identifiers. |
|
3. **Incorporate Performance:** Add English style prompts (e.g., `Say happily:`, `Whisper mysteriously:`) and performance tags (e.g., `[laugh]`, `[sigh]`) directly into the text for an expressive narration. |
|
|
|
**EXAMPLE OUTPUT:** |
|
Say happily: வணக்கம்! [laugh] எப்படி இருக்கீங்க? Whisper mysteriously: அந்த ரகசியம் எனக்கு மட்டும் தான் தெரியும். |
|
""" |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
def track_processing_time(task_id, stage, duration): |
|
"""Track processing times for each stage""" |
|
processing_times[stage] = duration |
|
if task_id in processing_status: |
|
processing_status[task_id]['timings'][stage] = duration |
|
|
|
def estimate_remaining_time(task_id): |
|
"""Estimate remaining processing time""" |
|
if task_id not in processing_status: |
|
return "Calculating..." |
|
|
|
status = processing_status[task_id] |
|
completed_stages = [s for s in status['timings'] if status['timings'][s] is not None] |
|
|
|
if len(completed_stages) == 0: |
|
return "Starting soon..." |
|
|
|
|
|
weights = { |
|
'transcription': 2.0, |
|
'tts': 1.5, |
|
'dubbing': 1.0 |
|
} |
|
|
|
total_weighted_time = 0 |
|
total_weights = 0 |
|
|
|
for stage in completed_stages: |
|
weight = weights.get(stage, 1.0) |
|
total_weighted_time += status['timings'][stage] * weight |
|
total_weights += weight |
|
|
|
if total_weights == 0: |
|
return "Estimating..." |
|
|
|
avg_time = total_weighted_time / total_weights |
|
remaining_stages = 4 - len(completed_stages) |
|
return remaining_stages * avg_time |
|
|
|
def process_video_background(task_id, video_path, voice, cheerful): |
|
"""Background processing function with enhanced logging""" |
|
try: |
|
start_time = time.time() |
|
processing_status[task_id] = { |
|
'status': 'processing', |
|
'progress': 0, |
|
'message': 'Starting transcription', |
|
'timings': {'upload': None, 'transcription': None, 'tts': None, 'dubbing': None}, |
|
'start_time': start_time, |
|
'video_duration': get_video_duration(video_path) |
|
} |
|
|
|
|
|
processing_status[task_id]['message'] = 'Transcribing video content' |
|
logger.info(f"Task {task_id}: Starting transcription") |
|
script_start = time.time() |
|
script = generate_tamil_script(video_path) |
|
transcription_time = time.time() - script_start |
|
track_processing_time(task_id, 'transcription', transcription_time) |
|
processing_status[task_id]['progress'] = 25 |
|
processing_status[task_id]['script'] = script |
|
logger.info(f"Task {task_id}: Transcription completed in {transcription_time:.1f}s") |
|
|
|
|
|
processing_status[task_id]['message'] = 'Generating audio narration' |
|
logger.info(f"Task {task_id}: Starting TTS generation") |
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio: |
|
audio_path = temp_audio.name |
|
|
|
tts_start = time.time() |
|
generate_audio_track(script, voice, cheerful, audio_path) |
|
tts_time = time.time() - tts_start |
|
track_processing_time(task_id, 'tts', tts_time) |
|
processing_status[task_id]['progress'] = 50 |
|
logger.info(f"Task {task_id}: TTS completed in {tts_time:.1f}s") |
|
|
|
|
|
processing_status[task_id]['message'] = 'Creating dubbed video' |
|
logger.info(f"Task {task_id}: Starting dubbing") |
|
final_filename = f"dubbed_{task_id}.mp4" |
|
final_path = os.path.join(app.config['DOWNLOAD_FOLDER'], final_filename) |
|
|
|
dubbing_start = time.time() |
|
replace_video_audio(video_path, audio_path, final_path) |
|
dubbing_time = time.time() - dubbing_start |
|
track_processing_time(task_id, 'dubbing', dubbing_time) |
|
processing_status[task_id]['progress'] = 75 |
|
logger.info(f"Task {task_id}: Dubbing completed in {dubbing_time:.1f}s") |
|
|
|
|
|
os.unlink(audio_path) |
|
|
|
|
|
processing_status[task_id].update({ |
|
'status': 'complete', |
|
'progress': 100, |
|
'message': 'Processing complete', |
|
'result_path': final_path, |
|
'end_time': time.time() |
|
}) |
|
logger.info(f"Task {task_id}: Processing completed successfully") |
|
|
|
except Exception as e: |
|
logger.error(f"Task {task_id} failed: {str(e)}") |
|
processing_status[task_id].update({ |
|
'status': 'error', |
|
'message': f'Error: {str(e)}' |
|
}) |
|
|
|
if 'video_path' in locals() and os.path.exists(video_path): |
|
os.unlink(video_path) |
|
if 'audio_path' in locals() and os.path.exists(audio_path): |
|
os.unlink(audio_path) |
|
|
|
def get_video_duration(video_path): |
|
"""Get duration of video in seconds""" |
|
try: |
|
with VideoFileClip(video_path) as video: |
|
return video.duration |
|
except: |
|
return 0 |
|
|
|
def generate_tamil_script(video_path): |
|
"""Generate Tamil script using Gemini with retry logic""" |
|
max_retries = 3 |
|
retry_delay = 10 |
|
|
|
for attempt in range(max_retries): |
|
try: |
|
video_file = genai.upload_file(video_path, mime_type="video/mp4") |
|
|
|
|
|
start_wait = time.time() |
|
while video_file.state.name == "PROCESSING": |
|
if time.time() - start_wait > 300: |
|
raise TimeoutError("Gemini processing timed out") |
|
time.sleep(5) |
|
video_file = genai.get_file(video_file.name) |
|
|
|
if video_file.state.name != "ACTIVE": |
|
raise Exception(f"Gemini processing failed: {video_file.state.name}") |
|
|
|
model = genai.GenerativeModel(model_name="models/gemini-2.5-flash") |
|
response = model.generate_content([GEMINI_PROMPT, video_file]) |
|
genai.delete_file(video_file.name) |
|
|
|
if hasattr(response, 'text') and response.text: |
|
return " ".join(response.text.strip().splitlines()) |
|
raise Exception("No valid script generated") |
|
|
|
except Exception as e: |
|
if attempt < max_retries - 1: |
|
logger.warning(f"Gemini error (attempt {attempt+1}/{max_retries}): {str(e)}") |
|
time.sleep(retry_delay * (attempt + 1)) |
|
else: |
|
raise |
|
|
|
def generate_audio_track(text, voice, cheerful, output_path): |
|
"""Generate audio using TTS API with retry logic""" |
|
max_retries = 3 |
|
retry_delay = 5 |
|
|
|
for attempt in range(max_retries): |
|
try: |
|
payload = { |
|
"text": text, |
|
"voice_name": voice, |
|
"cheerful": cheerful |
|
} |
|
|
|
response = requests.post(TTS_API_URL, json=payload, timeout=300) |
|
if response.status_code != 200: |
|
raise Exception(f"TTS API error: {response.status_code} - {response.text}") |
|
|
|
with open(output_path, "wb") as f: |
|
f.write(response.content) |
|
return |
|
|
|
except Exception as e: |
|
if attempt < max_retries - 1: |
|
logger.warning(f"TTS error (attempt {attempt+1}/{max_retries}): {str(e)}") |
|
time.sleep(retry_delay * (attempt + 1)) |
|
else: |
|
raise |
|
|
|
def replace_video_audio(video_path, audio_path, output_path): |
|
"""Replace video audio track with enhanced error handling""" |
|
video = None |
|
audio = None |
|
try: |
|
|
|
video = VideoFileClip(video_path) |
|
audio = AudioFileClip(audio_path) |
|
|
|
|
|
video.audio = audio |
|
|
|
|
|
video.write_videofile( |
|
output_path, |
|
codec="libx264", |
|
audio_codec="aac", |
|
logger=None, |
|
threads=4, |
|
preset='medium', |
|
ffmpeg_params=['-crf', '23', '-movflags', '+faststart'] |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"Video processing error: {str(e)}") |
|
|
|
if os.path.exists(output_path): |
|
os.unlink(output_path) |
|
raise |
|
finally: |
|
if video: |
|
video.close() |
|
if audio: |
|
audio.close() |
|
|
|
@app.route('/') |
|
def index(): |
|
"""Main page""" |
|
return render_template('index.html', voices=VOICE_CHOICES) |
|
|
|
@app.route('/upload', methods=['POST']) |
|
def upload_video(): |
|
"""Handle video upload and start processing""" |
|
if 'video' not in request.files: |
|
return jsonify({'error': 'No file uploaded'}), 400 |
|
|
|
file = request.files['video'] |
|
if file.filename == '': |
|
return jsonify({'error': 'No file selected'}), 400 |
|
|
|
|
|
task_id = str(uuid.uuid4()) |
|
filename = secure_filename(f"{task_id}_{file.filename}") |
|
video_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
|
file.save(video_path) |
|
|
|
|
|
voice = request.form.get('voice', 'Charon') |
|
cheerful = request.form.get('cheerful', 'false') == 'true' |
|
|
|
|
|
processing_status[task_id] = { |
|
'status': 'uploaded', |
|
'progress': 0, |
|
'message': 'Starting processing', |
|
'timings': {'upload': time.time(), 'transcription': None, 'tts': None, 'dubbing': None}, |
|
'start_time': time.time(), |
|
'video_duration': get_video_duration(video_path) |
|
} |
|
|
|
thread = threading.Thread( |
|
target=process_video_background, |
|
args=(task_id, video_path, voice, cheerful) |
|
) |
|
thread.start() |
|
|
|
return jsonify({ |
|
'task_id': task_id, |
|
'video_duration': processing_status[task_id]['video_duration'] |
|
}) |
|
|
|
@app.route('/status/<task_id>') |
|
def get_status(task_id): |
|
"""Check processing status""" |
|
if task_id not in processing_status: |
|
return jsonify({'error': 'Invalid task ID'}), 404 |
|
|
|
status = processing_status[task_id] |
|
|
|
|
|
eta = None |
|
if status['status'] == 'processing': |
|
elapsed = time.time() - status['start_time'] |
|
remaining = estimate_remaining_time(task_id) |
|
if isinstance(remaining, (int, float)): |
|
eta = str(timedelta(seconds=int(remaining))) |
|
|
|
response = { |
|
'status': status['status'], |
|
'progress': status.get('progress', 0), |
|
'message': status.get('message', ''), |
|
'eta': eta |
|
} |
|
|
|
if status['status'] == 'complete': |
|
response['result_url'] = url_for('download', filename=os.path.basename(status['result_path'])) |
|
response['script'] = status.get('script', '') |
|
|
|
return jsonify(response) |
|
|
|
@app.route('/download/<filename>') |
|
def download(filename): |
|
"""Serve processed video""" |
|
return send_from_directory(app.config['DOWNLOAD_FOLDER'], filename) |
|
|
|
@app.route('/cleanup', methods=['POST']) |
|
def cleanup(): |
|
"""Cleanup old files""" |
|
try: |
|
|
|
for filename in os.listdir(UPLOAD_FOLDER): |
|
file_path = os.path.join(UPLOAD_FOLDER, filename) |
|
if os.path.getmtime(file_path) < time.time() - 3600: |
|
os.unlink(file_path) |
|
|
|
|
|
for filename in os.listdir(DOWNLOAD_FOLDER): |
|
file_path = os.path.join(DOWNLOAD_FOLDER, filename) |
|
if os.path.getmtime(file_path) < time.time() - 86400: |
|
os.unlink(file_path) |
|
|
|
return jsonify({'status': 'success', 'message': 'Cleanup completed'}) |
|
except Exception as e: |
|
return jsonify({'status': 'error', 'message': str(e)}), 500 |
|
|
|
if __name__ == '__main__': |
|
|
|
import schedule |
|
import time as t |
|
def cleanup_job(): |
|
with app.app_context(): |
|
app.test_client().post('/cleanup') |
|
|
|
schedule.every().hour.do(cleanup_job) |
|
|
|
|
|
def scheduler_thread(): |
|
while True: |
|
schedule.run_pending() |
|
t.sleep(1) |
|
|
|
threading.Thread(target=scheduler_thread, daemon=True).start() |
|
|
|
|
|
app.run(host="0.0.0.0", port=7860, threaded=True) |