Spaces:
Build error
Build error
| import os | |
| import re | |
| import unicodedata | |
| import soundfile as sf | |
| from dotenv import load_dotenv | |
| load_dotenv(override=True) | |
| from celery import Celery | |
| import tempfile | |
| import sys | |
| import redis | |
| from contextlib import contextmanager | |
| from services.ai_pipeline import process_video_pipeline, generate_tts_only, process_studio_pipeline | |
| from supabase import create_client, Client | |
| redis_client = redis.from_url(os.getenv("REDIS_URL", "redis://localhost:6379/0")) | |
| class ProgressCatcher: | |
| def __init__(self, original_stream, job_id): | |
| self.original_stream = original_stream | |
| self.job_id = job_id | |
| def write(self, text): | |
| self.original_stream.write(text) | |
| match = re.search(r'\[(\d+:\d+)<(\d+:\d+)', text) | |
| if match: | |
| elapsed = match.group(1) | |
| remaining = match.group(2) | |
| redis_client.set(f"progress_{self.job_id}", f"{elapsed}|{remaining}") | |
| def flush(self): | |
| self.original_stream.flush() | |
| def catch_progress(job_id): | |
| original_stderr = sys.stderr | |
| sys.stderr = ProgressCatcher(original_stderr, job_id) | |
| try: | |
| yield | |
| finally: | |
| sys.stderr = original_stderr | |
| def slugify(text): | |
| # Chuyển tiếng Việt có dấu thành không dấu | |
| text = unicodedata.normalize('NFD', text).encode('ascii', 'ignore').decode("utf-8") | |
| # Xóa ký tự đặc biệt, chuyển sang lowercase, thay khoảng trắng bằng gạch dưới | |
| text = re.sub(r'[^\w\s-]', '', text).strip().lower() | |
| text = re.sub(r'[-\s]+', '_', text) | |
| return text[:30] | |
| # Initialize Celery pointing to Redis | |
| celery_app = Celery( | |
| "video_tasks", | |
| broker=os.getenv("REDIS_URL", "redis://localhost:6379/0"), | |
| backend=os.getenv("REDIS_URL", "redis://localhost:6379/0") | |
| ) | |
| def render_video_task(self, job_id: str, script: str, ref_audio_path: str, aspect_ratio: str, sub_style: str, font_name: str, highlight_color: str): | |
| # Setup Supabase client per worker (avoid circular dependency with main.py) | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") | |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| try: | |
| # Update DB status | |
| supabase.table("video_jobs").update({"status": "processing"}).eq("id", job_id).execute() | |
| # Download files to local temp storage | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| local_ref = None | |
| if ref_audio_path: | |
| local_ref = os.path.join(tmpdir, "ref.wav") | |
| with open(local_ref, "wb") as f: | |
| f.write(supabase.storage.from_("content").download(ref_audio_path)) | |
| # RUN CORE ML & FFMPEG LOGIC | |
| output_mp4 = process_video_pipeline(tmpdir, script, local_ref, aspect_ratio, sub_style, font_name, highlight_color) | |
| # Upload Result | |
| result_path = f"rendered/{job_id}_final.mp4" | |
| with open(output_mp4, "rb") as f: | |
| supabase.storage.from_("content").upload(path=result_path, file=f.read()) | |
| # Finish | |
| supabase.table("video_jobs").update({ | |
| "status": "completed", | |
| "result_url": supabase.storage.from_("content").get_public_url(result_path) | |
| }).eq("id", job_id).execute() | |
| except Exception as e: | |
| supabase.table("video_jobs").update({"status": "failed", "error": str(e)}).eq("id", job_id).execute() | |
| raise e | |
| def generate_tts_task(job_id: str, script: str, voice: str, temperature: float, ref_audio_path: str = None, bgm_path: str = None, bgm_volume: float = 0.1): | |
| # Setup Supabase client per worker | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") | |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| supabase.table("video_jobs").update({"status": "processing"}).eq("id", job_id).execute() | |
| # --- AUTO-CLEAN SCRIPT --- | |
| # Xóa các đoạn ghi chú trong ngoặc vuông [như thế này] để không bị đọc nhầm (Bỏ qua [p:ms], [v:voice], [s:speed]) | |
| script = re.sub(r'\[(?!(?:p|v|s):\d*\.?\d*\]).*?\]', '', script).strip() | |
| script = re.sub(r'\.{2,}', ',', script) | |
| script = re.sub(r'\s+', ' ', script).strip() | |
| # ------------------------- | |
| try: | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| # 1. Download ref audio if it exists | |
| local_ref_path = None | |
| if ref_audio_path: | |
| local_ref_path = os.path.join(tmpdir, "input_ref.wav") | |
| with open(local_ref_path, 'wb') as f: | |
| f.write(supabase.storage.from_("content").download(ref_audio_path)) | |
| elif voice and (voice.endswith(".mp3") or voice.endswith(".wav")) and os.path.exists(voice): | |
| # Use local static voice file as reference (Copy to tmpdir to avoid overwriting original) | |
| import shutil | |
| local_ref_path = os.path.join(tmpdir, "static_ref.wav") | |
| shutil.copy(voice, local_ref_path) | |
| if local_ref_path: | |
| # --- AUTO-TRIM LOGIC --- | |
| # Đọc audio và cắt lấy 15 giây đầu để tránh tràn RAM (OOM) trên OnnxRuntime | |
| try: | |
| data, samplerate = sf.read(local_ref_path) | |
| # Nếu là stereo (2 kênh), lấy trung bình hoặc chỉ lấy 1 kênh | |
| if len(data.shape) > 1: | |
| data = data[:, 0] | |
| max_samples = 15 * samplerate | |
| if len(data) > max_samples: | |
| print(f"DEBUG: Audio mẫu quá dài ({len(data)/samplerate:.2f}s), tự động cắt còn 15s.") | |
| data = data[:max_samples] | |
| sf.write(local_ref_path, data, samplerate) | |
| except Exception as trim_err: | |
| print(f"Warning: Không thể cắt audio mẫu: {trim_err}") | |
| # ----------------------- | |
| # 2. Download BGM if provided | |
| local_bgm_path = None | |
| if bgm_path: | |
| if os.path.exists(bgm_path): | |
| local_bgm_path = bgm_path | |
| else: | |
| local_bgm_path = os.path.join(tmpdir, "bgm.mp3") | |
| with open(local_bgm_path, 'wb') as f: | |
| f.write(supabase.storage.from_("content").download(bgm_path)) | |
| # 3. Run Pure TTS Engine | |
| result_audio_local = generate_tts_only(tmpdir, script, local_ref_path, temperature, local_bgm_path, bgm_volume) | |
| # 3. Upload Result Audio | |
| # Tạo tên file thân thiện từ 30 ký tự đầu của script | |
| friendly_name = slugify(script) | |
| final_audio_path = f"results/{friendly_name}_{job_id[:8]}.wav" | |
| with open(result_audio_local, 'rb') as f: | |
| supabase.storage.from_("content").upload( | |
| path=final_audio_path, | |
| file=f, | |
| file_options={"content-type": "audio/wav"} | |
| ) | |
| public_url = supabase.storage.from_("content").get_public_url(final_audio_path) | |
| # 4. Mark job as complete | |
| supabase.table("video_jobs").update({ | |
| "status": "completed", | |
| "result_url": public_url | |
| }).eq("id", job_id).execute() | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| supabase.table("video_jobs").update({ | |
| "status": "error", | |
| "error": str(e) | |
| }).eq("id", job_id).execute() | |
| raise e | |
| def render_studio_task(job_id: str, script: str, temperature: float = 0.5, voice_preset: str = "default", bgm_path: str = None, bgm_volume: float = 0.1): | |
| """ | |
| Background job for Studio to render full MP4 with expression tracking. | |
| """ | |
| # Setup Supabase client per worker | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") | |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| try: | |
| supabase.table("video_jobs").update({"status": "processing"}).eq("id", job_id).execute() | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| with catch_progress(job_id): | |
| # Download BGM if provided | |
| local_bgm_path = None | |
| if bgm_path: | |
| if os.path.exists(bgm_path): | |
| local_bgm_path = bgm_path | |
| else: | |
| local_bgm_path = os.path.join(tmpdir, "bgm.mp3") | |
| with open(local_bgm_path, 'wb') as f: | |
| f.write(supabase.storage.from_("content").download(bgm_path)) | |
| output_mp4 = process_studio_pipeline(tmpdir, script, temperature, voice_preset, local_bgm_path, bgm_volume) | |
| # Upload Result Video | |
| friendly_name = slugify(script) | |
| final_video_path = f"results/studio_{friendly_name}_{job_id[:8]}.mp4" | |
| with open(output_mp4, 'rb') as f: | |
| supabase.storage.from_("content").upload( | |
| path=final_video_path, | |
| file=f, | |
| file_options={"content-type": "video/mp4"} | |
| ) | |
| public_url = supabase.storage.from_("content").get_public_url(final_video_path) | |
| supabase.table("video_jobs").update({ | |
| "status": "completed", | |
| "result_url": public_url | |
| }).eq("id", job_id).execute() | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| supabase.table("video_jobs").update({ | |
| "status": "error", | |
| "error": str(e) | |
| }).eq("id", job_id).execute() | |
| raise e | |