| import os, json, hashlib, uuid, random, re, glob, shutil, subprocess, threading, time, struct, wave |
| from collections import defaultdict |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| _stage_locks = { |
| 'download': threading.Lock(), |
| 'whisper': threading.Lock(), |
| 'ai': threading.Lock(), |
| 'tts': threading.Lock(), |
| 'ffmpeg': threading.Lock(), |
| } |
|
|
| |
| _stage_queues = {s: [] for s in _stage_locks} |
| _sq_lock = threading.Lock() |
|
|
| def _stage_enqueue(stage, tid): |
| with _sq_lock: |
| if tid not in _stage_queues[stage]: |
| _stage_queues[stage].append(tid) |
|
|
| def _stage_dequeue(stage, tid): |
| with _sq_lock: |
| try: _stage_queues[stage].remove(tid) |
| except ValueError: pass |
|
|
| def _stage_queue_len(stage): |
| with _sq_lock: |
| return len(_stage_queues[stage]) |
|
|
| def run_stage(name, fn, tid, prog_fn, wait_msg, run_msg, *args, **kwargs): |
| """ |
| Strictly sequential stage execution with queue position display. |
| Uses threading.Lock โ only 1 job runs per stage at a time. |
| Other jobs spin-wait showing their position to the user. |
| """ |
| _stage_enqueue(name, tid) |
| lock = _stage_locks[name] |
| acquired = False |
| try: |
| while True: |
| |
| acquired = lock.acquire(blocking=False) |
| if acquired: |
| |
| _stage_dequeue(name, tid) |
| prog_fn(None, run_msg) |
| return fn(*args, **kwargs) |
| |
| with _sq_lock: |
| q = _stage_queues[name] |
| pos = q.index(tid) if tid in q else 0 |
| prog_fn(None, f'{wait_msg} ({pos} แแฑแฌแแบ แกแแแบแแฏแถแธ)') |
| time.sleep(0.8) |
| except Exception: |
| _stage_dequeue(name, tid) |
| raise |
| finally: |
| if acquired: |
| lock.release() |
| from datetime import datetime |
| from pathlib import Path |
| from flask import Flask, request, jsonify, send_from_directory, Response, redirect |
|
|
| try: |
| from openai import OpenAI |
| except ImportError: |
| OpenAI = None |
|
|
| try: |
| import whisper |
| except ImportError: |
| whisper = None |
|
|
| try: |
| import edge_tts, asyncio |
| except ImportError: |
| edge_tts = None |
|
|
| try: |
| from google import genai as ggenai |
| from google.genai import types as gtypes |
| except ImportError: |
| ggenai = None |
|
|
| |
| BASE_DIR = Path(__file__).parent |
| COOKIES_FILE = str(BASE_DIR / 'm_youtube_com_cookies.txt') |
| app = Flask(__name__) |
|
|
| |
| def ytdlp_download(out_tmpl, video_url, timeout=1200): |
| """yt-dlp download โ hard cap 720p max, platform-aware, cookies, robust fallback.""" |
| url_lower = video_url.lower() |
| is_tiktok = 'tiktok.com' in url_lower |
| is_facebook = 'facebook.com' in url_lower or 'fb.watch' in url_lower |
| is_instagram = 'instagram.com' in url_lower |
|
|
| if is_tiktok or is_facebook or is_instagram: |
| |
| fmt = ( |
| 'bestvideo[height<=720]+bestaudio' |
| '/best[height<=720]' |
| '/best' |
| ) |
| else: |
| |
| fmt = ( |
| 'bestvideo[height<=720][ext=mp4]+bestaudio[ext=m4a]' |
| '/bestvideo[height<=720]+bestaudio' |
| '/best[height<=720][ext=mp4]' |
| '/best[height<=720]' |
| '/best[height<=480]' |
| '/best' |
| ) |
|
|
| cmd = [ |
| 'yt-dlp', '--no-playlist', |
| '-f', fmt, |
| '--merge-output-format', 'mp4', |
| '--no-check-certificates', |
| ] |
| if os.path.exists(COOKIES_FILE): |
| cmd += ['--cookies', COOKIES_FILE] |
| cmd += ['-o', out_tmpl, video_url] |
| print(f'[ytdlp] Running: {" ".join(cmd)}') |
| subprocess.run(cmd, check=True, timeout=timeout) |
|
|
| app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 |
|
|
| OUTPUT_DIR = BASE_DIR / 'outputs' |
| OUTPUT_DIR.mkdir(exist_ok=True) |
|
|
| VIDEO_HISTORY_DIR = BASE_DIR / 'video_history' |
| VIDEO_HISTORY_DIR.mkdir(exist_ok=True) |
| VIDEO_HISTORY_TTL = 5 * 3600 |
|
|
| _vh_lock = threading.Lock() |
|
|
| def _vh_file(username): |
| safe = re.sub(r'[^a-zA-Z0-9_\-]', '_', username) |
| return VIDEO_HISTORY_DIR / f'{safe}.json' |
|
|
| def load_video_history(username): |
| f = _vh_file(username) |
| try: |
| with open(f, encoding='utf-8') as fh: |
| return json.load(fh) |
| except: |
| return [] |
|
|
| def save_video_history_entry(username, entry): |
| """Add entry to user's video history, enforce 5-hour TTL, keep latest 50.""" |
| with _vh_lock: |
| f = _vh_file(username) |
| try: |
| with open(f, encoding='utf-8') as fh: |
| records = json.load(fh) |
| except: |
| records = [] |
| now = time.time() |
| |
| records = [r for r in records if now - r.get('ts', 0) < VIDEO_HISTORY_TTL] |
| records.insert(0, entry) |
| records = records[:50] |
| with open(f, 'w', encoding='utf-8') as fh: |
| json.dump(records, fh, ensure_ascii=False) |
|
|
| def cleanup_old_history(): |
| """Called periodically โ remove expired entries from all history files.""" |
| now = time.time() |
| for hf in VIDEO_HISTORY_DIR.glob('*.json'): |
| try: |
| with open(hf, encoding='utf-8') as fh: |
| records = json.load(fh) |
| kept = [r for r in records if now - r.get('ts', 0) < VIDEO_HISTORY_TTL] |
| with open(hf, 'w', encoding='utf-8') as fh: |
| json.dump(kept, fh, ensure_ascii=False) |
| except: |
| pass |
|
|
| |
| def _auto_cleanup_loop(): |
| while True: |
| try: |
| now = time.time() |
| deleted = 0 |
| for fp in OUTPUT_DIR.glob('final_*.mp4'): |
| try: |
| if now - fp.stat().st_mtime > VIDEO_HISTORY_TTL: |
| fp.unlink(missing_ok=True) |
| deleted += 1 |
| except: |
| pass |
| if deleted: |
| print(f'๐๏ธ Auto-cleanup: deleted {deleted} old output(s)') |
| cleanup_old_history() |
| except Exception as e: |
| print(f'โ ๏ธ cleanup error: {e}') |
| time.sleep(1800) |
|
|
| threading.Thread(target=_auto_cleanup_loop, daemon=True).start() |
|
|
| |
| job_progress = {} |
|
|
| |
| _preview_cache = {} |
| _preview_cache_lock = threading.Lock() |
| PREVIEW_CACHE_TTL = 1800 |
|
|
| def _url_cache_key(url): |
| import hashlib |
| return hashlib.md5(url.encode()).hexdigest()[:12] |
|
|
| def _cleanup_preview_cache(): |
| now = time.time() |
| with _preview_cache_lock: |
| expired = [k for k, v in _preview_cache.items() if now - v['ts'] > PREVIEW_CACHE_TTL] |
| for k in expired: |
| shutil.rmtree(_preview_cache[k]['dir'], ignore_errors=True) |
| del _preview_cache[k] |
|
|
| |
| def cpu_queue_wait(): |
| pass |
|
|
| |
| DB_FILE = str(BASE_DIR / 'users_db.json') |
| HF_TOKEN = os.getenv('HF_TOKEN', '') |
| HF_REPO = 'Phoe2004/MovieRecapDB' |
| ADMIN_U = os.getenv('ADMIN_USERNAME', '') |
| ADMIN_P = os.getenv('ADMIN_PASSWORD', '') |
| ADMIN_TG = os.getenv('ADMIN_TG_USERNAME', '') |
|
|
| GEMINI_KEYS = [os.getenv(f'GEMINI_API_KEY_{i}') for i in range(1, 11)] |
| DEEPSEEK_KEYS = [os.getenv('DEEPSEEK_API_KEY')] |
|
|
| |
| GOOGLE_CLIENT_ID = os.getenv('GOOGLE_CLIENT_ID', '') |
| GOOGLE_CLIENT_SECRET = os.getenv('GOOGLE_CLIENT_SECRET', '') |
| GOOGLE_REDIRECT_URI = os.getenv('GOOGLE_REDIRECT_URI', 'https://recap.psonline.shop/auth/google/callback') |
| ADMIN_TELEGRAM_CHAT_ID = os.getenv('ADMIN_TELEGRAM_CHAT_ID', '') |
| TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', '') |
|
|
| _rr_idx = 0 |
| _rr_lock = threading.Lock() |
|
|
| def next_gemini_key(): |
| global _rr_idx |
| valid = [k for k in GEMINI_KEYS if k] |
| if not valid: return None, [] |
| with _rr_lock: |
| idx = _rr_idx % len(valid) |
| _rr_idx += 1 |
| primary = valid[idx] |
| ordered = valid[idx:] + valid[:idx] |
| return primary, ordered |
|
|
| |
| def pull_db(): |
| if not HF_TOKEN: |
| print('โ ๏ธ pull: HF_TOKEN missing') |
| return |
| try: |
| from huggingface_hub import hf_hub_download |
| import traceback |
| path = hf_hub_download( |
| repo_id=HF_REPO, filename='users_db.json', repo_type='dataset', |
| token=HF_TOKEN, local_dir=str(BASE_DIR), force_download=True, |
| ) |
| dest = str(BASE_DIR / 'users_db.json') |
| if path != dest: |
| import shutil as _shutil |
| _shutil.copy2(path, dest) |
| print('โ
DB pulled from HuggingFace') |
| except Exception as e: |
| import traceback |
| print(f'โ ๏ธ pull failed: {e}') |
| traceback.print_exc() |
|
|
| _push_lock = threading.Lock() |
|
|
| def push_db(): |
| if not HF_TOKEN: |
| print('โ ๏ธ push: HF_TOKEN missing') |
| return |
| with _push_lock: |
| for attempt in range(4): |
| try: |
| from huggingface_hub import HfApi |
| api = HfApi(token=HF_TOKEN) |
| api.upload_file( |
| path_or_fileobj=DB_FILE, path_in_repo='users_db.json', |
| repo_id=HF_REPO, repo_type='dataset', |
| commit_message=f'db {datetime.now().strftime("%Y%m%d_%H%M%S")}', |
| ) |
| print(f'โ
DB pushed (attempt {attempt+1})') |
| return |
| except Exception as e: |
| print(f'โ ๏ธ push attempt {attempt+1} failed: {e}') |
| if attempt < 3: |
| time.sleep(3 * (attempt + 1)) |
| print('โ push_db: all retries failed') |
|
|
| def load_db(): |
| if not os.path.exists(DB_FILE): return {'users': {}} |
| try: |
| with open(DB_FILE, encoding='utf-8') as f: return json.load(f) |
| except: return {'users': {}} |
|
|
| def save_db(db): |
| with open(DB_FILE, 'w', encoding='utf-8') as f: |
| json.dump(db, f, ensure_ascii=False, indent=2) |
| threading.Thread(target=push_db, daemon=True).start() |
| |
| PAYMENTS_DB_FILE = str(BASE_DIR / 'payments_db.json') |
| KBZ_NAME = os.getenv('KBZ_NAME', 'Phoe Shan') |
| KBZ_NUMBER = os.getenv('KBZ_NUMBER', '09679871352') |
| KBZ_QR_URL = os.getenv('KBZ_QR_URL', '') |
| SCB_NAME = os.getenv('SCB_NAME', 'Phoe Shan') |
| SCB_NUMBER = os.getenv('SCB_NUMBER', '3664768187') |
| PROMPTPAY_NUM = os.getenv('PROMPTPAY_NUMBER', '0951236012') |
| TRUEMONEY_NAME = os.getenv('TRUEMONEY_NAME', 'Phoe Shan') |
| TRUEMONEY_NUM = os.getenv('TRUEMONEY_NUMBER', '0951236012') |
| TRUEMONEY_QR_URL = os.getenv('TRUEMONEY_QR_URL', '') |
|
|
| def load_payments_db(): |
| if not os.path.exists(PAYMENTS_DB_FILE): |
| return {'payments': []} |
| try: |
| with open(PAYMENTS_DB_FILE, encoding='utf-8') as f: |
| return json.load(f) |
| except: |
| return {'payments': []} |
|
|
| def save_payments_db(db): |
| with open(PAYMENTS_DB_FILE, 'w', encoding='utf-8') as f: |
| json.dump(db, f, ensure_ascii=False, indent=2) |
|
|
| def hp(p): return hashlib.sha256(p.encode()).hexdigest() |
|
|
| ADJ = ['Red','Blue','Gold','Star','Sky','Fire','Moon','Cool','Ice','Dark','Neon','Wild'] |
| NOUN = ['Tiger','Dragon','Wolf','Hawk','Lion','Fox','Eagle','Storm','Flash','Ghost'] |
|
|
| def gen_uname(): |
| db = load_db() |
| for _ in range(60): |
| u = random.choice(ADJ)+random.choice(NOUN)+str(random.randint(10,999)) |
| if u not in db['users']: return u |
| return 'User'+str(uuid.uuid4())[:6].upper() |
|
|
| def login_user(u, p): |
| if u == ADMIN_U and p == ADMIN_P: return True, 'โ
Admin', -1 |
| db = load_db() |
| if u not in db['users']: return False, 'โ Username not found', 0 |
| stored = db['users'][u].get('password', '') |
| if stored and stored != hp(p): return False, 'โ Wrong password', 0 |
| db['users'][u]['last_login'] = datetime.now().isoformat() |
| save_db(db) |
| return True, 'โ
Logged in', db['users'][u]['coins'] |
|
|
| def get_coins(u): return load_db()['users'].get(u, {}).get('coins', 0) |
|
|
| def deduct(u, n): |
| db = load_db() |
| if u not in db['users']: return False, 0 |
| if db['users'][u]['coins'] < n: return False, db['users'][u]['coins'] |
| db['users'][u]['coins'] -= n; save_db(db) |
| return True, db['users'][u]['coins'] |
|
|
| def add_coins_fn(u, n, source=None): |
| db = load_db() |
| if u not in db['users']: return 'โ User not found' |
| db['users'][u]['coins'] += int(n) |
| db['users'][u]['free_trial'] = False |
| save_db(db) |
| return f"โ
+{n} โ {db['users'][u]['coins']} ๐ช" |
|
|
| def set_coins_fn(u, n): |
| db = load_db() |
| if u not in db['users']: return 'โ User not found' |
| db['users'][u]['coins'] = int(n); save_db(db) |
| return f'โ
Coin = {n} ๐ช' |
|
|
| def ban_fn(u, ban=True): |
| db = load_db() |
| if u not in db['users']: return 'โ User not found' |
| db['users'][u]['banned'] = ban |
| save_db(db) |
| return f"โ
{'Banned' if ban else 'Unbanned'}: {u}" |
|
|
| def upd_stat(u, t): |
| db = load_db() |
| if u not in db['users']: return |
| k = 'total_transcripts' if t == 'tr' else 'total_videos' |
| db['users'][u][k] = db['users'][u].get(k, 0) + 1; save_db(db) |
|
|
| def create_user_fn(uname, coins, caller): |
| if caller != ADMIN_U: return 'โ Not admin', '' |
| uname = (uname or '').strip() or gen_uname() |
| db = load_db() |
| if uname in db['users']: return f"โ '{uname}' already exists", '' |
| db['users'][uname] = {'password': '', 'coins': int(coins), |
| 'created_at': datetime.now().isoformat(), 'last_login': None, |
| 'total_transcripts': 0, 'total_videos': 0} |
| save_db(db); return f"โ
'{uname}' created", uname |
|
|
| |
|
|
| |
| def get_sys_prompt(ct, vo_lang='my'): |
| """ |
| vo_lang: 'my' = Myanmar (default), 'th' = Thai, 'en' = English |
| """ |
| if vo_lang == 'th': |
| |
| if ct == 'Medical/Health': |
| return ( |
| "เธเธธเธเธเธทเธญเธเธนเนเนเธเธฅเธเนเธฒเธเธเธฒเธฃเนเธเธเธขเนเธ เธฒเธฉเธฒเนเธเธข โ เธ เธฒเธฉเธฒเนเธเธขเธเธตเนเธเธนเธเนเธเธเธตเธงเธดเธเธเธฃเธฐเธเธณเธงเธฑเธ\n" |
| "Rules: 100% เธ เธฒเธฉเธฒเนเธเธข | เนเธกเนเนเธเนเธ เธฒเธฉเธฒเธเธฒเธเธเธฒเธฃเธกเธฒเธเนเธเธดเธเนเธ | เนเธเธทเนเธญเธซเธฒเธเนเธเธเธเธฑเธเนเธเนเธฒเธเธฑเนเธ\n" |
| "เนเธเนเธเธฑเธงเนเธฅเธเนเธเธข: 1=เธซเธเธถเนเธ, 2=เธชเธญเธ, 10=เธชเธดเธ, 100=เธฃเนเธญเธข, 1000=เธเธฑเธ\n" |
| "Format EXACTLY:\n[SCRIPT](full thai script here)\n[TITLE](short title)\n[HASHTAGS](exactly 5 hashtags e.g. #เธชเธธเธเธ เธฒเธ #thailand #health #viral #trending)" |
| ) |
| else: |
| return ( |
| "เธเธธเธเธเธทเธญเธเธฑเธเนเธเธตเธขเธเธชเธเธฃเธดเธเธเนเธชเธฃเธธเธเธซเธเธฑเธเธ เธฒเธฉเธฒเนเธเธข โ เนเธฅเนเธฒเนเธเธเธชเธเธธเธ เธ เธฒเธฉเธฒเธเธนเธเธเธฃเธฃเธกเธเธฒเธเธด\n" |
| "Rules: 100% เธ เธฒเธฉเธฒเนเธเธข | เนเธกเนเนเธเนเธ เธฒเธฉเธฒเธเธฒเธเธเธฒเธฃ | เนเธเธทเนเธญเธซเธฒเธเนเธเธเธเธฑเธเนเธเนเธฒเธเธฑเนเธ\n" |
| "เนเธเนเธเธฑเธงเนเธฅเธเนเธเธข: 1=เธซเธเธถเนเธ, 2=เธชเธญเธ, 10=เธชเธดเธ, 100=เธฃเนเธญเธข, 1000=เธเธฑเธ\n" |
| "เนเธเธฅเนเธเธทเนเธญเธซเธฒเธเนเธญเนเธเธเธตเนเนเธเนเธเธ เธฒเธฉเธฒเนเธเธข (เธชเนเธเธฅเนเนเธฅเนเธฒเนเธฃเธทเนเธญเธ movie recap เธเธตเนเธชเธเธธเธ)\n" |
| "เธเธญเธเนเธเนเธเธ เธฒเธฉเธฒเนเธเธขเนเธเนเธฒเธเธฑเนเธ เธซเนเธฒเธกเธกเธตเธ เธฒเธฉเธฒเธญเธฑเธเธเธคเธฉเนเธเธชเธเธฃเธดเธเธเน\n" |
| "Format: [SCRIPT](script)[TITLE](title โค10 words)[HASHTAGS]#movierecap #thailand" |
| ) |
| elif vo_lang == 'en': |
| |
| if ct == 'Medical/Health': |
| return ( |
| "You are an English medical content translator โ use clear, conversational English.\n" |
| "Rules: 100% English | conversational tone | original content only\n" |
| "Write numbers as words: 1=one, 2=two, 10=ten, 100=one hundred\n" |
| "Format EXACTLY:\n[SCRIPT](full english script here)\n[TITLE](short title)\n[HASHTAGS](exactly 5 hashtags e.g. #health #medical #wellness #viral #trending)" |
| ) |
| else: |
| return ( |
| "You are an English movie recap script writer โ engaging storytelling tone, conversational.\n" |
| "Rules: 100% English | conversational not formal | original content only\n" |
| "Write numbers as words: 1=one, 2=two, 10=ten, 100=one hundred\n" |
| "Translate and retell the following content in English (movie recap storytelling style)\n" |
| "Format: [SCRIPT](script)[TITLE](title โค10 words)[HASHTAGS]#movierecap #english" |
| ) |
| else: |
| |
| if ct == 'Medical/Health': |
| return ( |
| "แแผแแบแแฌ แแฑแธแแแบ แแฌแแฌแแผแแบแแฐ โ spoken Myanmar\n" |
| "Rules: 100% แแผแแบแแฌ | แแปแฑแฌแแบแธแแฏแถแธแ
แฌแแฑแแแฏแถแธแ | แแฏแแบแแแญแฏแแบแธ แ\n" |
| "แแแแบแธแแปแฌแธแแญแฏ แแผแแบแแฌแ
แแฌแธแแผแแทแบแแฌ แแฑแธแแซ โ แฅแแแฌ 1=แแ
แบ, 2=แแพแ
แบ, 10=แแ
แบแแแบ, 12=แแ
แบแแแทแบแแพแ
แบ, 20=แแพแ
แบแแแบ, 100=แแ
แบแแฌ, 1000=แแ
แบแแฑแฌแแบ โ Arabic digit แแแฏแถแธแ\n" |
| "English transcriptแกแแญแฏแแบแธ แกแแญแฏแกแแญแฏ 100%แแแพแญแแฒ movie recap styleแแฌแแฌแแผแแบแแซ แแผแฎแแฑแฌแท -แแแบ-แ
แแฌแธแแฏแถแธแแญแฏ 40%แแฌแแฏแถแธแแซ\n" |
| "Format EXACTLY:\n[SCRIPT](full myanmar script here)\n[TITLE](short title)\n[HASHTAGS](exactly 5 hashtags e.g. #แแปแแบแธแแฌแแฑแธ #myanmar #health #viral #trending)" |
| ) |
| else: |
| return ( |
| "แแผแแบแแฌ movie recap script แแฑแธแแฌแธแแฐ โ spoken Myanmar (แแฑแทแ
แแบแแผแฑแฌแแญแฏแแพแฏแแฌแแฌ)\n" |
| "Rules: 100% แแผแแบแแฌแแฌแแฌ | แแปแฑแฌแแบแธแแฏแถแธแ
แฌแแฑแแแฏแถแธแ | แแฐแcontent แแฌ | แแฏแแบแแแญแฏแแบแธ แ\n" |
| "แแแแบแธแแปแฌแธแแญแฏ แแผแแบแแฌแ
แแฌแธแแผแแทแบแแฌ แแฑแธแแซ โ แฅแแแฌ 1=แแ
แบ, 2=แแพแ
แบ, 10=แแ
แบแแแบ, 12=แแ
แบแแแทแบแแพแ
แบ, 20=แแพแ
แบแแแบ, 100=แแ
แบแแฌ, 1000=แแ
แบแแฑแฌแแบ โ Arabic digit แแแฏแถแธแ\n" |
| "Translate the following content into Burmese (storytelling tone movie recap tone and keep original content)\n" |
| "แแผแแบแแฌแแญแฏแแฒ แแผแฑแแฑแธแแซแ แกแแบแนแแแญแแบแแญแฏ แแฌแแพแแแผแแบแแฒแทแแกแแบแนแแแญแแบแ
แแฌแธแแฏแถแธแแฝแฑแแญแฏแแฝแฑแทแแแบแแแบแธ แแผแแบแแฌแแญแฏแแฒ แแฌแแฌแแผแแบแแผแฎแธ แแผแฑแแฑแธแแซ)\n" |
| "แแผแญแแฌแแฏแถแธแแแบ: -แแแบ แแญแฏ 50%แแฌแแฏแถแธแแซแ แแปแแบ 50% แแพแฌ -แแฌแแฑแซแท / -แแฒแทแแแบ / -แแญแฏแแบแแแบ / -แแฑแแฌ แแญแฏแแฒ แแฑแฌแแฏแถแธแแฑแธแแซแ\n" |
| "Format: [SCRIPT](script)[TITLE](title โค10 words)[HASHTAGS]#movierecap #แแผแแบแแฌ" |
| ) |
|
|
| |
| SYS_MOVIE = get_sys_prompt('Movie Recap', 'my') |
| SYS_MED = get_sys_prompt('Medical/Health', 'my') |
|
|
| NUM_TO_MM_RULE = ( |
| "แแแแบแธแแปแฌแธแแญแฏ แแผแแบแแฌแ
แแฌแธแแผแแทแบแแฌ แแฑแธแแซ โ " |
| "แฅแแแฌ 1=แแ
แบ, 2=แแพแ
แบ, 10=แแ
แบแแแบ, 12=แแ
แบแแแทแบแแพแ
แบ, 20=แแพแ
แบแแแบ, " |
| "100=แแ
แบแแฌ, 1000=แแ
แบแแฑแฌแแบ โ Arabic digit แแแฏแถแธแแ" |
| ) |
|
|
| def get_num_rule(vo_lang='my'): |
| if vo_lang == 'th': |
| return "เนเธเนเธเธฑเธงเนเธฅเธเนเธเธขเนเธเนเธฒเธเธฑเนเธ: 1=เธซเธเธถเนเธ, 2=เธชเธญเธ, 10=เธชเธดเธ, 20=เธขเธตเนเธชเธดเธ, 100=เธฃเนเธญเธข, 1000=เธเธฑเธ เธซเนเธฒเธกเนเธเนเธเธฑเธงเนเธฅเธเธญเธฒเธฃเธเธดเธ" |
| elif vo_lang == 'en': |
| return "Write all numbers as English words: 1=one, 2=two, 10=ten, 20=twenty, 100=one hundred, 1000=one thousand โ no Arabic digits." |
| else: |
| return NUM_TO_MM_RULE |
|
|
| |
| GEMINI_MODELS_TRANSCRIPT = [ |
| 'gemini-3-flash', |
| 'gemini-2.5-flash', |
| ] |
| |
| GEMINI_MODELS_CAPTION = [ |
| 'gemini-3.1-flash-lite', |
| 'gemini-2.5-flash-lite', |
| ] |
|
|
| _mdl_tr_idx = 0 |
| _mdl_cap_idx = 0 |
| _mdl_lock = threading.Lock() |
|
|
| def next_model(purpose='transcript'): |
| """Round-robin model selector โ spins like a spinner per call.""" |
| global _mdl_tr_idx, _mdl_cap_idx |
| models = GEMINI_MODELS_TRANSCRIPT if purpose == 'transcript' else GEMINI_MODELS_CAPTION |
| with _mdl_lock: |
| if purpose == 'transcript': |
| idx = _mdl_tr_idx % len(models) |
| _mdl_tr_idx += 1 |
| else: |
| idx = _mdl_cap_idx % len(models) |
| _mdl_cap_idx += 1 |
| |
| return models[idx:] + models[:idx] |
|
|
| def call_api(msgs, api='Gemini', purpose='transcript'): |
| """ |
| purpose='transcript' โ GEMINI_MODELS_TRANSCRIPT round-robin (gemini-3-flash โ gemini-2.5-flash) |
| purpose='caption' โ GEMINI_MODELS_CAPTION round-robin (gemini-3.1-flash-lite โ gemini-2.5-flash-lite) |
| """ |
| if api == 'DeepSeek': |
| keys, base = DEEPSEEK_KEYS, 'https://api.deepseek.com' |
| models = ['deepseek-chat'] |
| else: |
| keys, base = GEMINI_KEYS, 'https://generativelanguage.googleapis.com/v1beta/openai/' |
| models = next_model(purpose) |
| valid = [(i+1, k) for i, k in enumerate(keys) if k] |
| if not valid: raise Exception('No API Key available') |
| if api == 'Gemini': |
| _, ordered = next_gemini_key() |
| valid = sorted(valid, key=lambda x: ordered.index(x[1]) if x[1] in ordered else 99) |
| else: |
| random.shuffle(valid) |
| last_err = None |
| for n, k in valid: |
| for mdl in models: |
| try: |
| r = OpenAI(api_key=k, base_url=base, timeout=300.0).chat.completions.create( |
| model=mdl, messages=msgs, max_tokens=16384) |
| if r and r.choices and r.choices[0].message.content: |
| print(f'โ
call_api key={n} model={mdl} purpose={purpose}') |
| return r.choices[0].message.content.strip(), f'โ
Key{n} ({mdl})' |
| except Exception as e: |
| err = str(e); last_err = e |
| if '400' in err: continue |
| if '401' in err or '403' in err: break |
| if '429' in err: time.sleep(2); break |
| continue |
| raise Exception(f'โ All keys/models failed ({purpose}): {last_err}') |
|
|
| def parse_out(text): |
| sc, ti, ht = '', '', '' |
| m = re.search(r'\[SCRIPT\](.*?)\[TITLE\]', text, re.DOTALL) |
| if m: sc = m.group(1).strip() |
| m2 = re.search(r'\[TITLE\](.*?)(\[HASHTAGS\]|$)', text, re.DOTALL) |
| m3 = re.search(r'\[HASHTAGS\](.*?)$', text, re.DOTALL) |
| if m2: ti = m2.group(1).strip() |
| if m3: ht = m3.group(1).strip() |
| if not sc: sc = re.sub(r'\[SCRIPT\]|\[TITLE\]|\[HASHTAGS\]', '', text.split('[TITLE]')[0]).strip() |
| tags = re.findall(r'#\S+', ht) |
| if len(tags) < 5: |
| defaults = ['#myanmar','#viral','#trending','#foryou','#entertainment'] |
| tags = tags + [t for t in defaults if t not in tags] |
| ht = ' '.join(tags[:5]) |
| return sc, ti, ht |
|
|
| def split_txt(txt, vo_lang='my'): |
| if vo_lang == 'th': |
| parts = re.split(r'[ใ\n]', txt) |
| return [s.strip() for s in parts if s.strip()] or [txt] |
| elif vo_lang == 'en': |
| parts = re.split(r'(?<=[.!?])\s+', txt) |
| return [s.strip() for s in parts if s.strip()] or [txt] |
| else: |
| return [s.strip() + 'แ' for s in re.split(r'[แ]', txt) if s.strip()] or [txt] |
|
|
| def dur(fp): |
| try: |
| r = subprocess.run( |
| f'ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 "{fp}"', |
| shell=True, capture_output=True, text=True) |
| return float(r.stdout.strip()) |
| except: return 0 |
|
|
| |
| def run_tts_sync(sentences, voice_id, rate, tmp_dir): |
| async def _run(): |
| sil = f'{tmp_dir}/sil.mp3' |
| proc = await asyncio.create_subprocess_shell( |
| f'ffmpeg -f lavfi -i anullsrc=r=24000:cl=mono -t 0.2 -c:a libmp3lame -q:a 2 "{sil}" -y', |
| stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL) |
| await proc.wait() |
|
|
| |
| sem = asyncio.Semaphore(5) |
| raw_files = [f'{tmp_dir}/r{i:03d}.mp3' for i in range(len(sentences))] |
|
|
| async def _one(i, s): |
| async with sem: |
| last_err = None |
| for attempt in range(3): |
| try: |
| await edge_tts.Communicate(s, voice_id, rate=rate).save(raw_files[i]) |
| return |
| except Exception as e: |
| last_err = e |
| print(f'[TTS] sentence {i} attempt {attempt+1} failed: {e}') |
| if attempt < 2: |
| await asyncio.sleep(1.5 * (attempt + 1)) |
| raise Exception(f'[TTS] sentence {i} failed after 3 attempts: {last_err}') |
|
|
| await asyncio.gather(*[_one(i, s) for i, s in enumerate(sentences)]) |
|
|
| |
| |
| n = len(raw_files) |
| _norm_af = 'loudnorm=I=-14:TP=-1.5:LRA=11' |
| _boost_af = f'volume=1.15,{_norm_af}' |
| _to_normalize = set() |
| if n >= 1: _to_normalize.add(0) |
| if n >= 2: _to_normalize.add(n - 1) |
| if n >= 4: _to_normalize.add(1) |
|
|
| for idx in _to_normalize: |
| rf = raw_files[idx] |
| tmp_rf = rf + '.norm.mp3' |
| try: |
| subprocess.run( |
| f'ffmpeg -y -i "{rf}" -af "{_boost_af}" ' |
| f'-c:a libmp3lame -q:a 2 "{tmp_rf}"', |
| shell=True, check=True, capture_output=True) |
| os.replace(tmp_rf, rf) |
| except Exception as _ne: |
| print(f'[TTS norm] sentence {idx} normalize failed: {_ne}') |
| try: os.remove(tmp_rf) |
| except: pass |
|
|
| |
| parts = [] |
| for rf in raw_files: |
| parts += [rf, sil] |
| return parts |
| loop = asyncio.new_event_loop() |
| try: |
| return loop.run_until_complete(_run()) |
| finally: |
| loop.close() |
|
|
| MULTILINGUAL_VOICES = { |
| 'en-US-AndrewMultilingualNeural', |
| 'en-US-AvaMultilingualNeural', |
| 'en-US-BrianMultilingualNeural', |
| 'en-US-EmmaMultilingualNeural', |
| 'de-DE-FlorianMultilingualNeural', |
| 'de-DE-SeraphinaMultilingualNeural', |
| 'fr-FR-RemyMultilingualNeural', |
| 'fr-FR-VivienneMultilingualNeural', |
| } |
|
|
| def run_edge_preview(voice_id, rate, out_path): |
| |
| if voice_id in MULTILINGUAL_VOICES: |
| text = 'แแแบแนแแแฌแแซแ แแผแญแฏแแญแฏแแซแแแบแ' |
| elif voice_id.startswith('th-'): |
| text = 'เธชเธงเธฑเธชเธเธตเธเธฃเธฑเธ เธขเธดเธเธเธตเธเนเธญเธเธฃเธฑเธ' |
| elif voice_id.startswith('en-'): |
| text = 'Hello, welcome to Recap Studio.' |
| else: |
| text = 'แแแบแนแแแฌแแซแ แแผแญแฏแแญแฏแแซแแแบแ' |
| async def _run(): |
| await edge_tts.Communicate(text, voice_id, rate=rate).save(out_path) |
| loop = asyncio.new_event_loop() |
| try: |
| loop.run_until_complete(_run()) |
| finally: |
| loop.close() |
|
|
| |
| def _get_gemini_client(): |
| if ggenai is None: |
| raise Exception('google-genai package not installed') |
| valid_keys = [k for k in GEMINI_KEYS if k] |
| if not valid_keys: |
| raise Exception('No Gemini API Key') |
| random.shuffle(valid_keys) |
| return ggenai.Client(api_key=valid_keys[0]), valid_keys |
|
|
| def _save_pcm_as_wav(pcm_data, wav_path, sample_rate=24000, channels=1, sample_width=2): |
| with wave.open(wav_path, 'wb') as wf: |
| wf.setnchannels(channels) |
| wf.setsampwidth(sample_width) |
| wf.setframerate(sample_rate) |
| wf.writeframes(pcm_data) |
|
|
| def _wav_to_mp3(wav_path, mp3_path): |
| subprocess.run( |
| f'ffmpeg -y -i "{wav_path}" -c:a libmp3lame -q:a 2 "{mp3_path}"', |
| shell=True, check=True, capture_output=True) |
|
|
| def _gemini_tts_one_shot(client, text, voice_name, wav_path): |
| """Call Gemini TTS API once, save raw PCM as WAV. Returns wav_path.""" |
| response = client.models.generate_content( |
| model="gemini-2.5-flash-preview-tts", |
| contents=text, |
| config=gtypes.GenerateContentConfig( |
| response_modalities=["AUDIO"], |
| speech_config=gtypes.SpeechConfig( |
| voice_config=gtypes.VoiceConfig( |
| prebuilt_voice_config=gtypes.PrebuiltVoiceConfig( |
| voice_name=voice_name or "Kore" |
| ) |
| ) |
| ) |
| ) |
| ) |
| audio_data = None |
| if response.candidates: |
| for part in response.candidates[0].content.parts: |
| if part.inline_data and part.inline_data.mime_type.startswith('audio/'): |
| audio_data = part.inline_data.data |
| break |
| if not audio_data: |
| raise Exception('Gemini TTS: no audio data received') |
| _save_pcm_as_wav(audio_data, wav_path) |
| return wav_path |
| def run_gemini_tts_sync(sentences, voice_name, tmp_dir, speed=0): |
| if ggenai is None: |
| raise Exception('google-genai package not installed') |
| _, ordered_keys = next_gemini_key() |
| if not ordered_keys: |
| raise Exception('No Gemini API Key') |
| time.sleep(2) |
|
|
| |
| full_txt = '\n'.join(sentences) |
| mp3_out = f'{tmp_dir}/gemini_final.mp3' |
|
|
| last_err = None |
| for api_key in ordered_keys: |
| try: |
| client = ggenai.Client(api_key=api_key) |
| wav_out = f'{tmp_dir}/gemini_out.wav' |
| mp3_raw = f'{tmp_dir}/gemini_raw.mp3' |
|
|
| _gemini_tts_one_shot(client, full_txt, voice_name, wav_out) |
| _wav_to_mp3(wav_out, mp3_raw) |
| try: os.remove(wav_out) |
| except: pass |
|
|
| |
| spd_pct = speed if isinstance(speed, (int, float)) else 0 |
| tempo = max(0.5, min(2.0, 1.0 + spd_pct / 100.0)) |
| af_filters = [] |
| if abs(tempo - 1.0) > 0.01: |
| af_filters.append(f'atempo={tempo:.4f}') |
| af_filters += ['volume=2.3', 'loudnorm=I=-14:TP=-1.5:LRA=11'] |
| af_str = ','.join(af_filters) |
| subprocess.run( |
| f'ffmpeg -y -i "{mp3_raw}" -af "{af_str}" ' |
| f'-c:a libmp3lame -q:a 2 "{mp3_out}"', |
| shell=True, check=True, capture_output=True) |
| try: os.remove(mp3_raw) |
| except: pass |
|
|
| print(f'โ
Gemini TTS done (1-shot), key=...{api_key[-6:]}') |
| return [mp3_out] |
|
|
| except Exception as e: |
| last_err = e |
| print(f'โ ๏ธ Gemini TTS key failed: {e}') |
| for _f in [f'{tmp_dir}/gemini_out.wav', f'{tmp_dir}/gemini_raw.mp3']: |
| try: os.remove(_f) |
| except: pass |
| continue |
| raise Exception(f'โ Gemini TTS all keys failed: {last_err}') |
|
|
|
|
| def run_gemini_preview(voice_name, out_path): |
| if ggenai is None: |
| raise Exception('google-genai package not installed') |
| _, ordered_keys = next_gemini_key() |
| if not ordered_keys: |
| raise Exception('No Gemini API Key') |
| wav_path = out_path.replace('.mp3', '.wav') |
| for api_key in ordered_keys: |
| try: |
| client = ggenai.Client(api_key=api_key) |
| response = client.models.generate_content( |
| model="gemini-2.5-flash-preview-tts", |
| contents="แแแบแนแแแฌแแซแ แแฎแแฑแท แแฌแแปแฌแธ แแฏแแบแแแฒแ", |
| config=gtypes.GenerateContentConfig( |
| response_modalities=["AUDIO"], |
| speech_config=gtypes.SpeechConfig( |
| voice_config=gtypes.VoiceConfig( |
| prebuilt_voice_config=gtypes.PrebuiltVoiceConfig( |
| voice_name=voice_name or "Kore" |
| ) |
| ) |
| ) |
| ) |
| ) |
| audio_data = None |
| if response.candidates: |
| for part in response.candidates[0].content.parts: |
| if part.inline_data and part.inline_data.mime_type.startswith('audio/'): |
| audio_data = part.inline_data.data |
| break |
| if not audio_data: |
| raise Exception('Gemini TTS preview: no audio data') |
| _save_pcm_as_wav(audio_data, wav_path) |
| _wav_to_mp3(wav_path, out_path) |
| try: os.remove(wav_path) |
| except: pass |
| return |
| except Exception as e: |
| print(f'โ ๏ธ Gemini preview key failed: {e}') |
| continue |
| raise Exception('โ Gemini TTS preview: all keys failed') |
|
|
| |
| threading.Thread(target=pull_db, daemon=True).start() |
| whisper_model = None |
|
|
| |
| |
| |
|
|
| @app.route('/login') |
| def login_page(): |
| return redirect('/') |
|
|
| @app.route('/app') |
| def app_page(): |
| return send_from_directory(str(BASE_DIR), 'index.html') |
|
|
| @app.route('/terms') |
| def terms_page(): |
| return send_from_directory(str(BASE_DIR), 'terms.html') |
|
|
| @app.route('/privacy') |
| def privacy_page(): |
| return send_from_directory(str(BASE_DIR), 'privacy.html') |
|
|
| |
| @app.route('/api/auth/google_enabled') |
| def google_enabled(): |
| return jsonify(enabled=bool(GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET)) |
|
|
| @app.route('/auth/google') |
| def google_login(): |
| if not GOOGLE_CLIENT_ID: |
| return redirect('/?auth_error=Google+OAuth+not+configured') |
| from urllib.parse import urlencode |
| params = urlencode({ |
| 'client_id': GOOGLE_CLIENT_ID, |
| 'redirect_uri': GOOGLE_REDIRECT_URI, |
| 'response_type': 'code', |
| 'scope': 'openid email profile', |
| 'access_type': 'offline', |
| 'prompt': 'select_account', |
| }) |
| return redirect(f'https://accounts.google.com/o/oauth2/auth?{params}') |
|
|
| @app.route('/auth/google/callback') |
| def google_callback(): |
| import urllib.request, json as _json |
| from urllib.parse import urlencode |
| code = request.args.get('code') |
| error = request.args.get('error') |
| if error or not code: |
| return redirect(f'/?auth_error={error or "no_code"}') |
| try: |
| |
| token_data = _json.dumps({ |
| 'code': code, |
| 'client_id': GOOGLE_CLIENT_ID, |
| 'client_secret': GOOGLE_CLIENT_SECRET, |
| 'redirect_uri': GOOGLE_REDIRECT_URI, |
| 'grant_type': 'authorization_code', |
| }).encode() |
| req = urllib.request.Request( |
| 'https://oauth2.googleapis.com/token', |
| data=token_data, |
| headers={'Content-Type': 'application/json'}) |
| with urllib.request.urlopen(req, timeout=15) as resp: |
| tokens = _json.loads(resp.read()) |
|
|
| |
| req2 = urllib.request.Request( |
| 'https://www.googleapis.com/oauth2/v2/userinfo', |
| headers={'Authorization': f'Bearer {tokens["access_token"]}'}) |
| with urllib.request.urlopen(req2, timeout=15) as resp2: |
| info = _json.loads(resp2.read()) |
|
|
| email = info.get('email', '') |
| name = info.get('name', email.split('@')[0]) |
| gid = info.get('id', '') |
|
|
| |
| db = load_db() |
| user_key = None |
| for uk, uv in db['users'].items(): |
| if uv.get('google_id') == gid or uk == email: |
| user_key = uk; break |
|
|
| if not user_key: |
| user_key = email or f'g_{gid}' |
| db['users'][user_key] = { |
| 'password': '', 'coins': 1, |
| 'created_at': datetime.now().isoformat(), 'last_login': None, |
| 'total_transcripts': 0, 'total_videos': 0, |
| 'google_id': gid, 'google_name': name, 'google_email': email, |
| 'free_trial': True, |
| } |
| else: |
| db['users'][user_key]['google_id'] = gid |
| db['users'][user_key]['google_name'] = name |
|
|
| db['users'][user_key]['last_login'] = datetime.now().isoformat() |
| save_db(db) |
|
|
| coins = db['users'][user_key].get('coins', 0) |
| is_adm = '1' if user_key == ADMIN_U else '0' |
| params = urlencode({'auth': 'google', 'u': user_key, 'n': name, 'c': coins, 'a': is_adm}) |
| return redirect(f'/?{params}') |
|
|
| except Exception as e: |
| import traceback; traceback.print_exc() |
| return redirect(f'/?auth_error={str(e)[:100]}') |
| @app.route('/') |
| def index(): |
| return send_from_directory(str(BASE_DIR), 'index.html') |
|
|
| @app.route("/manifest.json") |
| def manifest(): |
| return send_from_directory(str(BASE_DIR), "manifest.json", |
| mimetype="application/manifest+json") |
|
|
| @app.route("/sw.js") |
| def service_worker(): |
| return send_from_directory(str(BASE_DIR), "sw.js", |
| mimetype="application/javascript") |
| @app.route('/outputs/<path:fn>', methods=['GET','HEAD']) |
| def serve_output(fn): |
| from flask import make_response |
| fpath = OUTPUT_DIR / fn |
| if not fpath.exists(): |
| return jsonify(ok=False, msg='File not found'), 404 |
| if request.method == 'HEAD': |
| |
| resp = make_response('', 200) |
| resp.headers['Content-Length'] = str(fpath.stat().st_size) |
| resp.headers['Content-Type'] = 'video/mp4' |
| resp.headers['Access-Control-Allow-Origin'] = '*' |
| return resp |
| resp = make_response(send_from_directory( |
| str(OUTPUT_DIR), fn, conditional=True, max_age=0)) |
| resp.headers['Access-Control-Allow-Origin'] = '*' |
| resp.headers['Accept-Ranges'] = 'bytes' |
| resp.headers['Cache-Control'] = 'no-cache' |
| return resp |
|
|
| @app.route('/api/config') |
| def api_config(): |
| return jsonify(admin_tg=ADMIN_TG) |
|
|
| @app.route('/googlefd3d91bc095a2620.html') |
| def google_verify(): |
| return 'google-site-verification: googlefd3d91bc095a2620.html', 200, {'Content-Type': 'text/html'} |
|
|
| @app.route('/sitemap.xml') |
| def sitemap(): |
| xml = '''<?xml version="1.0" encoding="UTF-8"?> |
| <urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"> |
| <url> |
| <loc>https://recap.psonline.shop/</loc> |
| <changefreq>weekly</changefreq> |
| <priority>1.0</priority> |
| </url> |
| <url> |
| <loc>https://recap.psonline.shop/privacy.html</loc> |
| <changefreq>monthly</changefreq> |
| <priority>0.5</priority> |
| </url> |
| <url> |
| <loc>https://recap.psonline.shop/terms.html</loc> |
| <changefreq>monthly</changefreq> |
| <priority>0.5</priority> |
| </url> |
| </urlset>''' |
| return xml, 200, {'Content-Type': 'application/xml'} |
|
|
| |
| @app.route('/api/login', methods=['POST']) |
| def api_login(): |
| try: |
| d = request.get_json(force=True) or {} |
| ok, msg, coins = login_user(d.get('username',''), d.get('password','')) |
| return jsonify(ok=ok, msg=msg, coins=coins, is_admin=(d.get('username','')==ADMIN_U and ok)) |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
|
|
| @app.route('/api/register', methods=['POST']) |
| def api_register(): |
| try: |
| d = request.get_json(force=True) or {} |
| uname = (d.get('username') or '').strip() or gen_uname() |
| pw = d.get('password', '') |
| db = load_db() |
| if uname in db['users']: return jsonify(ok=False, msg='โ Already exists') |
| db['users'][uname] = {'password': hp(pw) if pw else '', 'coins': 1, |
| 'created_at': datetime.now().isoformat(), 'last_login': None, |
| 'total_transcripts': 0, 'total_videos': 0, |
| 'free_trial': True} |
| save_db(db) |
| return jsonify(ok=True, msg=f'โ
{uname} created', username=uname, coins=1) |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
|
|
| @app.route('/api/preview_voice', methods=['POST']) |
| def api_preview_voice(): |
| try: |
| d = request.get_json(force=True) or {} |
| voice_id = d.get('voice', 'my-MM-ThihaNeural') |
| speed = int(d.get('speed', 30)) |
| engine = d.get('engine', 'ms') |
| out = str(OUTPUT_DIR / f'preview_{uuid.uuid4().hex[:8]}.mp3') |
| if engine == 'gemini': |
| run_gemini_preview(voice_id, out) |
| else: |
| run_edge_preview(voice_id, f'+{speed}%', out) |
| return jsonify(ok=True, url='/outputs/' + Path(out).name) |
| except Exception as e: |
| import traceback; traceback.print_exc() |
| return jsonify(ok=False, msg=str(e)) |
|
|
| @app.route('/api/gemini_voices') |
| def api_gemini_voices(): |
| voices = [ |
| {"id": "Charon", "name": "Charon (Male, Informative)"}, |
| {"id": "Kore", "name": "Kore (Female, Firm)"}, |
| {"id": "Puck", "name": "Puck (Male, Upbeat)"}, |
| {"id": "Fenrir", "name": "Fenrir (Male, Excitable)"}, |
| {"id": "Aoede", "name": "Aoede (Female, Breezy)"}, |
| {"id": "Zephyr", "name": "Zephyr (Female, Bright)"}, |
| {"id": "Orus", "name": "Orus (Male, Firm)"}, |
| {"id": "Schedar", "name": "Schedar (Male, Even-keeled)"}, |
| {"id": "Sulafat", "name": "Sulafat (Female, Warm)"}, |
| {"id": "Rasalgethi", "name": "Rasalgethi (Male, Informative)"}, |
| {"id": "Gacrux", "name": "Gacrux (Female, Mature)"}, |
| ] |
| return jsonify(ok=True, voices=voices) |
|
|
| |
| @app.route('/api/preview_clip', methods=['POST']) |
| def api_preview_clip(): |
| try: |
| d = request.get_json(force=True) or {} |
| url = (d.get('url') or '').strip() |
| if not url: |
| return jsonify(ok=False, msg='No URL') |
|
|
| cache_key = _url_cache_key(url) |
| _cleanup_preview_cache() |
| out_mp4 = str(OUTPUT_DIR / f'clip_{cache_key}.mp4') |
|
|
| |
| with _preview_cache_lock: |
| cached = _preview_cache.get(cache_key) |
| if cached and os.path.exists(cached['file']) and os.path.exists(out_mp4): |
| cached['ts'] = time.time() |
| return jsonify(ok=True, url=f'/outputs/clip_{cache_key}.mp4', cache_key=cache_key) |
|
|
| |
| tmp_dir = str(BASE_DIR / f'temp_prev_{cache_key}') |
| os.makedirs(tmp_dir, exist_ok=True) |
| raw = f'{tmp_dir}/raw.%(ext)s' |
| cmd_dl = [ |
| 'yt-dlp', '--no-playlist', |
| '-f', 'bestvideo[height<=720][ext=mp4]+bestaudio[ext=m4a]/bestvideo[height<=720]+bestaudio/best[height<=720]/best', |
| '--no-check-certificates', |
| '--merge-output-format', 'mp4', |
| '-o', raw, url |
| ] |
| if os.path.exists(COOKIES_FILE): |
| cmd_dl += ['--cookies', COOKIES_FILE] |
| subprocess.run(cmd_dl, check=True, timeout=600, capture_output=True) |
|
|
| found = glob.glob(f'{tmp_dir}/raw*') |
| src = found[0] if found else f'{tmp_dir}/raw.mp4' |
|
|
| |
| orig_dim = None |
| try: |
| probe = subprocess.run( |
| f'ffprobe -v error -select_streams v:0 ' |
| f'-show_entries stream=width,height ' |
| f'-of csv=s=x:p=0 "{src}"', |
| shell=True, capture_output=True, text=True, timeout=10 |
| ) |
| if probe.returncode == 0 and probe.stdout.strip(): |
| w, h = map(int, probe.stdout.strip().split('x')) |
| orig_dim = {'width': w, 'height': h} |
| print(f'[preview_clip] original dimensions: {w}x{h}') |
| except Exception as e: |
| print(f'[preview_clip] failed to get dimensions: {e}') |
|
|
| |
| with _preview_cache_lock: |
| _preview_cache[cache_key] = {'file': src, 'dir': tmp_dir, 'ts': time.time()} |
|
|
| |
| subprocess.run( |
| f'ffmpeg -y -i "{src}" -t 10 ' |
| f'-c:v libx264 -crf 26 -preset ultrafast ' |
| f'-c:a aac -b:a 64k ' |
| f'-movflags +faststart "{out_mp4}"', |
| shell=True, check=True, capture_output=True, timeout=60 |
| ) |
| return jsonify(ok=True, url=f'/outputs/clip_{cache_key}.mp4', |
| cache_key=cache_key, |
| original_dimensions=orig_dim) |
|
|
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
|
|
| |
| @app.route('/api/thumb', methods=['POST']) |
| def api_thumb(): |
| try: |
| d = request.get_json(force=True) or {} |
| url = (d.get('url') or '').strip() |
| seek = int(d.get('seek', 5)) |
| if not url: |
| return jsonify(ok=False, msg='No URL') |
|
|
| tid = uuid.uuid4().hex[:8] |
| tmp_dir = str(BASE_DIR / f'temp_thumb_{tid}') |
| os.makedirs(tmp_dir, exist_ok=True) |
| out_jpg = str(OUTPUT_DIR / f'thumb_{tid}.jpg') |
|
|
| try: |
| clip = f'{tmp_dir}/clip.mp4' |
| cmd_dl = [ |
| 'yt-dlp', '--no-playlist', |
| '-f', 'worst[ext=mp4]/worst', |
| '--no-check-certificates', |
| '--download-sections', f'*0-15', |
| '-o', clip, url |
| ] |
| if os.path.exists(COOKIES_FILE): |
| cmd_dl += ['--cookies', COOKIES_FILE] |
| subprocess.run(cmd_dl, check=True, timeout=60, capture_output=True) |
|
|
| found = glob.glob(f'{tmp_dir}/clip*') |
| src = found[0] if found else clip |
| subprocess.run( |
| f'ffmpeg -y -ss {seek} -i "{src}" -vframes 1 -q:v 2 "{out_jpg}"', |
| shell=True, check=True, capture_output=True, timeout=30 |
| ) |
| return jsonify(ok=True, url=f'/outputs/thumb_{tid}.jpg') |
| finally: |
| shutil.rmtree(tmp_dir, ignore_errors=True) |
|
|
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
|
|
| |
| @app.route('/api/draft', methods=['POST']) |
| def api_draft(): |
| global whisper_model |
| try: |
| u = (request.form.get('username') or '').strip() |
| video_url = (request.form.get('video_url') or '').strip() |
| ct = request.form.get('content_type', 'Movie Recap') |
| api = request.form.get('ai_model', 'Gemini') |
| vo_lang = request.form.get('vo_lang', 'my') |
| video_file = request.files.get('video_file') |
| cache_key = request.form.get('cache_key', '') |
|
|
| if not u: return jsonify(ok=False, msg='โ Not logged in') |
| is_adm = (u == ADMIN_U) |
| if not is_adm and get_coins(u) < 1: |
| return jsonify(ok=False, msg='โ Not enough coins') |
|
|
| cpu_queue_wait() |
|
|
| tid = uuid.uuid4().hex[:8] |
| tmp_dir = str(BASE_DIR / f'temp_{tid}') |
| os.makedirs(tmp_dir, exist_ok=True) |
| vpath = None |
|
|
| try: |
| if video_file and video_file.filename: |
| vpath = f'{tmp_dir}/input.mp4' |
| video_file.save(vpath) |
| elif cache_key: |
| |
| with _preview_cache_lock: |
| cached = _preview_cache.get(cache_key) |
| if cached and os.path.exists(cached['file']): |
| vpath = cached['file'] |
| job_progress[tid] = {'pct': 8, 'msg': '๐ฅ Using cached videoโฆ', 'done': False} |
| elif video_url: |
| out_tmpl = f'{tmp_dir}/input.%(ext)s' |
| ytdlp_download(out_tmpl, video_url) |
| found = glob.glob(f'{tmp_dir}/input.*') |
| if found: vpath = found[0] |
| elif video_url: |
| out_tmpl = f'{tmp_dir}/input.%(ext)s' |
| ytdlp_download(out_tmpl, video_url) |
| found = glob.glob(f'{tmp_dir}/input.*') |
| if found: vpath = found[0] |
| if not vpath: return jsonify(ok=False, msg='โ No video selected') |
|
|
| if whisper is None: raise Exception('whisper not installed') |
| if whisper_model is None: |
| whisper_model = whisper.load_model('tiny', device='cpu') |
| res = run_stage('whisper', whisper_model.transcribe, tid, |
| lambda p,m: None, '', '', vpath, fp16=False) |
| tr = res['text']; lang = res.get('language', 'en') |
|
|
| if vo_lang == 'en': |
| |
| sc = tr.strip() |
| ti = sc[:60].strip() + ('โฆ' if len(sc) > 60 else '') |
| ht = '#english #movierecap #viral #foryou #trending' |
| key_n = 'Whisper Direct' |
| else: |
| sys_p = get_sys_prompt(ct, vo_lang) |
| sys_p = sys_p + '\n' + get_num_rule(vo_lang) |
| out_txt, key_n = run_stage('ai', call_api, tid, |
| lambda p,m: None, '', '', |
| [{'role':'system','content':sys_p}, |
| {'role':'user','content':f'Language:{lang}\n\n{tr}'}], api=api, purpose='transcript') |
| sc, ti, ht = parse_out(out_txt) |
|
|
| rem = -1 |
| if not is_adm: _, rem = deduct(u, 1); upd_stat(u, 'tr') |
| return jsonify(ok=True, script=sc, title=ti, hashtags=ht, |
| status=f'{key_n} ยท {lang}', coins=rem) |
| finally: |
| shutil.rmtree(tmp_dir, ignore_errors=True) |
|
|
| except Exception as e: |
| return jsonify(ok=False, msg=f'โ {e}') |
|
|
| |
| def _build_audio_filter(mpath, ad): |
| """ |
| Edge TTS voice enhancement chain โ cleaner, warmer, louder. |
| - highpass=f=120 : cut low rumble / breath / mic noise |
| - lowpass=f=9000 : cut harsh high-frequency hiss |
| - equalizer 250Hz -3 : reduce muddiness / boxiness |
| - equalizer 1500Hz +3 : boost mid presence (voice intelligibility) |
| - equalizer 4000Hz +4 : boost upper-mid clarity / consonant sharpness |
| - equalizer 8000Hz -2 : soften sibilance without losing air |
| - acompressor : gentle compression โ evens out loud/quiet parts |
| - dynaudnorm : dynamic loudness normalization |
| - volume=2.8 : overall loudness boost |
| - loudnorm : broadcast-level LUFS normalization (final pass) |
| """ |
| voice_chain = ( |
| 'highpass=f=120,' |
| 'lowpass=f=9000,' |
| 'equalizer=f=250:width_type=o:width=2:g=-3,' |
| 'equalizer=f=1500:width_type=o:width=2:g=3,' |
| 'equalizer=f=4000:width_type=o:width=2:g=4,' |
| 'equalizer=f=8000:width_type=o:width=2:g=-2,' |
| 'acompressor=threshold=-18dB:ratio=3:attack=5:release=80:makeup=3dB,' |
| 'dynaudnorm=f=200:g=15,' |
| 'volume=2.8,' |
| 'loudnorm=I=-14:TP=-1.5:LRA=9' |
| ) |
| if mpath: |
| return (f'[1:a]{voice_chain}[nar];' |
| f'[2:a]volume=0.09,afade=t=out:st={max(0,ad-2):.3f}:d=2[bgm];' |
| f'[nar][bgm]amix=inputs=2:duration=first:dropout_transition=2[outa]') |
| else: |
| return f'[1:a]{voice_chain}[outa]' |
|
|
| |
| def _get_mid_range(duration): |
| """ |
| Return (start_ratio, end_ratio) for middle section based on total duration. |
| """ |
| if duration < 180: |
| return 0.30, 0.70 |
| elif duration < 300: |
| return 0.25, 0.75 |
| elif duration < 600: |
| return 0.20, 0.80 |
| else: |
| return 0.15, 0.85 |
|
|
| def _fix_mid_sync(audio_path, video_dur, audio_dur, tmp_dir): |
| """ |
| Split audio into 3 parts: head / middle / tail. |
| Apply atempo correction ONLY to middle part if drift > 0.2s. |
| Recombine and return new audio path (or original if no fix needed). |
| Pitch is preserved (atempo only, no asetrate). |
| """ |
| drift = audio_dur - video_dur |
| if abs(drift) <= 0.2: |
| print(f'[sync] drift={drift:.3f}s โค 0.2s โ skip mid-sync') |
| return audio_path |
|
|
| s_ratio, e_ratio = _get_mid_range(audio_dur) |
| t_start = audio_dur * s_ratio |
| t_end = audio_dur * e_ratio |
| mid_dur = t_end - t_start |
|
|
| |
| |
| |
| head_dur = t_start |
| tail_dur = audio_dur - t_end |
| mid_target = video_dur - head_dur - tail_dur |
|
|
| if mid_target <= 0: |
| print(f'[sync] mid_target invalid ({mid_target:.3f}s) โ skip') |
| return audio_path |
|
|
| tempo = mid_dur / mid_target |
| |
| tempo = max(0.5, min(2.0, tempo)) |
|
|
| print(f'[sync] drift={drift:.3f}s | mid {t_start:.2f}s~{t_end:.2f}s | tempo={tempo:.4f}x') |
|
|
| head_f = f'{tmp_dir}/sync_head.mp3' |
| mid_f = f'{tmp_dir}/sync_mid.mp3' |
| tail_f = f'{tmp_dir}/sync_tail.mp3' |
| mid_fx = f'{tmp_dir}/sync_mid_fx.mp3' |
| out_f = f'{tmp_dir}/sync_fixed.mp3' |
| lst_f = f'{tmp_dir}/sync_list.txt' |
|
|
| try: |
| |
| subprocess.run( |
| f'ffmpeg -y -i "{audio_path}" -ss 0 -t {t_start:.6f} ' |
| f'-c:a libmp3lame -q:a 2 "{head_f}"', |
| shell=True, check=True, capture_output=True) |
|
|
| |
| subprocess.run( |
| f'ffmpeg -y -i "{audio_path}" -ss {t_start:.6f} -t {mid_dur:.6f} ' |
| f'-c:a libmp3lame -q:a 2 "{mid_f}"', |
| shell=True, check=True, capture_output=True) |
|
|
| |
| subprocess.run( |
| f'ffmpeg -y -i "{audio_path}" -ss {t_end:.6f} ' |
| f'-c:a libmp3lame -q:a 2 "{tail_f}"', |
| shell=True, check=True, capture_output=True) |
|
|
| |
| subprocess.run( |
| f'ffmpeg -y -i "{mid_f}" -af "atempo={tempo:.6f}" ' |
| f'-c:a libmp3lame -q:a 2 "{mid_fx}"', |
| shell=True, check=True, capture_output=True) |
|
|
| |
| with open(lst_f, 'w') as lf: |
| for f in [head_f, mid_fx, tail_f]: |
| if os.path.exists(f) and os.path.getsize(f) > 0: |
| lf.write(f"file '{os.path.abspath(f)}'\n") |
| subprocess.run( |
| f'ffmpeg -y -f concat -safe 0 -i "{lst_f}" ' |
| f'-c:a libmp3lame -q:a 2 "{out_f}"', |
| shell=True, check=True, capture_output=True) |
|
|
| print(f'[sync] mid-sync done โ {out_f}') |
| return out_f |
|
|
| except Exception as e: |
| print(f'[sync] mid-sync failed: {e} โ using original audio') |
| return audio_path |
|
|
| |
| def _run_ffmpeg(cmd, timeout=1200): |
| """Run ffmpeg safely โ capture output to avoid pipe deadlock, with timeout.""" |
| result = subprocess.run( |
| cmd, shell=True, check=True, |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
| timeout=timeout |
| ) |
| return result |
|
|
| |
| def _ass_time(secs): |
| h = int(secs // 3600) |
| m = int((secs % 3600) // 60) |
| s = int(secs % 60) |
| cs = int(round((secs - int(secs)) * 100)) |
| return "%d:%02d:%02d.%02d" % (h, m, s, cs) |
| def get_sentence_timings_from_audio(audio_path, num_sentences): |
| """ |
| Original silencedetect logic โ unchanged. |
| FIX: total_dur fetched once upfront; last subtitle anchored to total_dur |
| so subtitles never finish before the video ends. |
| """ |
| try: |
| total_dur = float(subprocess.run( |
| f'ffprobe -v quiet -show_entries format=duration -of csv=p=0 "{audio_path}"', |
| shell=True, capture_output=True, text=True).stdout.strip()) |
| except Exception: |
| total_dur = 0.0 |
|
|
| cmd = (f'ffmpeg -i "{audio_path}" ' |
| f'-af "silencedetect=noise=-30dB:d=0.08" -f null - 2>&1') |
| result = subprocess.run(cmd, shell=True, capture_output=True, text=True) |
| output = result.stdout + result.stderr |
|
|
| silence_starts, silence_ends = [], [] |
| for line in output.split('\n'): |
| m = re.search(r'silence_start: ([0-9.]+)', line) |
| if m: |
| silence_starts.append(float(m.group(1))) |
| m = re.search(r'silence_end: ([0-9.]+)', line) |
| if m: |
| silence_ends.append(float(m.group(1))) |
|
|
| timings = [] |
| last = 0.0 |
| for i, s_start in enumerate(silence_starts): |
| if len(timings) >= num_sentences: |
| break |
| if s_start > last + 0.05: |
| timings.append((last, s_start)) |
| s_end = silence_ends[i] if i < len(silence_ends) else s_start + 0.1 |
| last = s_end |
|
|
| |
| if len(timings) < num_sentences: |
| t_end = total_dur if total_dur > last + 0.1 else last + 2.0 |
| timings.append((last, t_end)) |
|
|
| |
| if timings and len(timings) < num_sentences: |
| last_end = timings[-1][1] |
| total_known = total_dur if total_dur > last_end else last_end |
| remaining = num_sentences - len(timings) |
| chunk = (total_known - last_end) / max(remaining, 1) |
| for j in range(remaining): |
| s = last_end + j * chunk |
| timings.append((s, s + chunk)) |
|
|
| timings = timings[:num_sentences] |
|
|
| |
| if timings and total_dur > 0 and timings[-1][1] < total_dur - 0.5: |
| timings[-1] = (timings[-1][0], total_dur) |
| print(f'[silencedetect] anchored last subtitle end to {total_dur:.2f}s') |
|
|
| print(f'[silencedetect] found {len(timings)} boundaries for {num_sentences} sentences') |
| return timings |
| def _make_ass(sentences, total_dur, ass_path, position=85, |
| fontsize=80, color='white', style='outline', |
| sentence_durs=None, sentence_timings=None, crop='9:16'): |
| """ |
| sentence_timings: list of (start, end) tuples from get_sentence_timings_from_audio(). |
| Used first โ most accurate. |
| sentence_durs: fallback list of per-sentence durations (scaled to total_dur). |
| If neither provided: equal-split fallback. |
| """ |
| col_bgr = { |
| 'white': '&H00FFFFFF', 'yellow': '&H0000FFFF', 'cyan': '&H00FFFF00', |
| 'green': '&H0000FF00', 'orange': '&H000080FF', 'pink': '&H00FF80FF', |
| 'red': '&H000000FF', 'lime': '&H0080FF00', |
| 'hotpink': '&H00B469FF', 'gold': '&H0000D7FF', 'violet': '&H00EE82EE', |
| 'deepskyblue':'&H00FFBF00', 'coral': '&H00507FFF', |
| }.get(color, '&H00FFFFFF') |
| |
| |
| |
| try: |
| pos_pct = int(position) |
| except (ValueError, TypeError): |
| pos_pct = 85 |
| pos_pct = max(5, min(95, pos_pct)) |
| align = 5 |
| margin_v = 0 |
| if style == 'box': |
| border_style, back_col, outline_w, shadow_w = 4, '&H99000000', 0, 0 |
| elif style == 'shadow': |
| border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 0, 4 |
| elif style == 'glow': |
| border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 4, 3 |
| elif style == 'stroke': |
| border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 3, 0 |
| elif style == 'plain': |
| border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 0, 0 |
| else: |
| border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 2, 1 |
|
|
| lines = [] |
| |
| if crop == '1:1': |
| play_w, play_h = 720, 720 |
| elif crop == '16:9': |
| play_w, play_h = 1280, 720 |
| else: |
| play_w, play_h = 720, 1280 |
| pos_x = play_w // 2 |
| pos_y = int(pos_pct / 100 * play_h) |
|
|
| lines.append('[Script Info]') |
| lines.append('ScriptType: v4.00+') |
| lines.append('Collisions: Normal') |
| lines.append(f'PlayResX: {play_w}') |
| lines.append(f'PlayResY: {play_h}') |
| lines.append('Timer: 100.0000') |
| lines.append('') |
| lines.append('[V4+ Styles]') |
| lines.append('Format: Name,Fontname,Fontsize,PrimaryColour,SecondaryColour,OutlineColour,BackColour,Bold,Italic,Underline,StrikeOut,ScaleX,ScaleY,Spacing,Angle,BorderStyle,Outline,Shadow,Alignment,MarginL,MarginR,MarginV,Encoding') |
| style_line = ('Style: Default,Noto Sans Myanmar,%d,%s,&H000000FF,&H00000000,%s,' |
| '0,0,0,0,100,100,0,0,%d,%d,%d,%d,0,0,%d,1') % ( |
| fontsize, col_bgr, back_col, |
| border_style, outline_w, shadow_w, |
| align, margin_v) |
| lines.append(style_line) |
| lines.append('') |
| lines.append('[Events]') |
| lines.append('Format: Layer,Start,End,Style,Name,MarginL,MarginR,MarginV,Effect,Text') |
|
|
| n = len(sentences) |
|
|
| pos_tag = '{\\an5\\pos(%d,%d)}' % (pos_x, pos_y) |
|
|
| |
| def _wrap2(txt, max_chars=14): |
| txt = txt.replace('\n', ' ').strip() |
| if len(txt) <= max_chars: |
| return txt |
| mid = len(txt) // 2 |
| best = mid |
| for delta in range(0, mid): |
| for pos in [mid - delta, mid + delta]: |
| if 0 < pos < len(txt) and (txt[pos] == ' ' or ord(txt[pos]) > 0x1000): |
| best = pos |
| break |
| else: |
| continue |
| break |
| return txt[:best].strip() + '\\N' + txt[best:].strip() |
|
|
| |
| if sentence_timings and len(sentence_timings) >= n: |
| raw_end = sentence_timings[n - 1][1] |
| scale = (total_dur / raw_end) if raw_end > 0 else 1.0 |
| for i, sent in enumerate(sentences): |
| txt = _wrap2(sent.strip()) |
| if not txt: |
| continue |
| t0 = _ass_time(max(0.0, sentence_timings[i][0] * scale)) |
| end = sentence_timings[i][1] * scale |
| if i == n - 1: |
| end = total_dur |
| t1 = _ass_time(min(end - 0.03, total_dur)) |
| lines.append('Dialogue: 0,%s,%s,Default,,0,0,0,,%s%s' % (t0, t1, pos_tag, txt)) |
|
|
| |
| elif sentence_durs and len(sentence_durs) >= n: |
| raw_total = sum(sentence_durs[:n]) |
| if raw_total > 0: |
| scale = total_dur / raw_total |
| scaled_durs = [d * scale for d in sentence_durs[:n]] |
| else: |
| scaled_durs = [total_dur / n] * n |
| t = 0.0 |
| for i, sent in enumerate(sentences): |
| txt = _wrap2(sent.strip()) |
| if not txt: |
| t += scaled_durs[i] |
| continue |
| t0 = _ass_time(t) |
| t1 = _ass_time(min(t + scaled_durs[i] - 0.05, total_dur)) |
| lines.append('Dialogue: 0,%s,%s,Default,,0,0,0,,%s%s' % (t0, t1, pos_tag, txt)) |
| t += scaled_durs[i] |
|
|
| |
| else: |
| chunk = total_dur / n |
| for i, sent in enumerate(sentences): |
| txt = _wrap2(sent.strip()) |
| if not txt: |
| continue |
| t0 = _ass_time(i * chunk) |
| t1 = _ass_time(min((i + 1) * chunk - 0.05, total_dur)) |
| lines.append('Dialogue: 0,%s,%s,Default,,0,0,0,,%s%s' % (t0, t1, pos_tag, txt)) |
|
|
| with open(ass_path, 'w', encoding='utf-8') as f: |
| f.write('\n'.join(lines) + '\n') |
| def _srt_to_ass(srt_text, ass_path, fontsize=80, color='white', style='outline', |
| position=85, play_res_x=720, play_res_y=1280): |
| """Convert SRT text to ASS file with 80% width constraint and correct positioning. |
| |
| PlayResX/Y must match the actual video dimensions passed in. |
| libass scales fontsize proportionally to PlayResY, so fontsize values are always |
| relative to whatever PlayResY is set to โ the preview JS uses the same ratio: |
| pxSize = sizeVal * renderedH / play_res_y |
| Keeping PlayResY = actual video height ensures 1:1 mapping between preview and output. |
| """ |
| |
| play_res_x = max(1, int(play_res_x)) |
| play_res_y = max(1, int(play_res_y)) |
| fontsize = max(20, int(fontsize)) |
| col_bgr = { |
| 'white': '&H00FFFFFF', 'yellow': '&H0000FFFF', 'cyan': '&H00FFFF00', |
| 'green': '&H0000FF00', 'orange': '&H000080FF', 'pink': '&H00FF80FF', |
| 'red': '&H000000FF', 'lime': '&H0080FF00', |
| 'hotpink': '&H00B469FF', 'gold': '&H0000D7FF', 'violet': '&H00EE82EE', |
| 'deepskyblue':'&H00FFBF00', 'coral': '&H00507FFF', |
| }.get(color, '&H00FFFFFF') |
|
|
| |
| margin_lr = int(play_res_x * 0.025) |
| |
| |
| pos_pct = max(5, min(95, int(position))) |
| if pos_pct >= 50: |
| |
| alignment = 2 |
| margin_v = int((100 - pos_pct) / 100 * play_res_y) |
| margin_v = max(10, margin_v) |
| else: |
| |
| alignment = 8 |
| margin_v = int(pos_pct / 100 * play_res_y) |
| margin_v = max(10, margin_v) |
|
|
| |
| center_x = play_res_x // 2 |
| if pos_pct >= 50: |
| pos_y = play_res_y - margin_v |
| an_tag = 2 |
| else: |
| pos_y = margin_v |
| an_tag = 8 |
|
|
| if style == 'box': |
| border_style, back_col, outline_w, shadow_w = 4, '&H99000000', 0, 0 |
| elif style == 'shadow': |
| border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 0, 4 |
| elif style == 'glow': |
| border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 4, 3 |
| elif style == 'stroke': |
| border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 3, 0 |
| elif style == 'plain': |
| border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 0, 0 |
| else: |
| border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 2, 1 |
|
|
| header = ( |
| '[Script Info]\n' |
| 'ScriptType: v4.00+\n' |
| 'Collisions: Normal\n' |
| 'WrapStyle: 0\n' |
| f'PlayResX: {play_res_x}\n' |
| f'PlayResY: {play_res_y}\n' |
| 'Timer: 100.0000\n' |
| '\n' |
| '[V4+ Styles]\n' |
| 'Format: Name,Fontname,Fontsize,PrimaryColour,SecondaryColour,OutlineColour,BackColour,' |
| 'Bold,Italic,Underline,StrikeOut,ScaleX,ScaleY,Spacing,Angle,BorderStyle,Outline,Shadow,' |
| 'Alignment,MarginL,MarginR,MarginV,Encoding\n' |
| ) |
| style_line = ( |
| f'Style: Default,Noto Sans Myanmar,{fontsize},{col_bgr},&H000000FF,&H00000000,{back_col},' |
| f'0,0,0,0,100,100,0,0,{border_style},{outline_w},{shadow_w},' |
| f'{an_tag},0,0,0,1\n' |
| ) |
| events_header = ( |
| '\n[Events]\n' |
| 'Format: Layer,Start,End,Style,Name,MarginL,MarginR,MarginV,Effect,Text\n' |
| ) |
|
|
| def _srt_tc_to_ass(tc): |
| |
| |
| tc = _norm_digits(tc.strip()).replace(',', '.') |
| parts = tc.split(':') |
| try: |
| if len(parts) == 3: |
| h, m, s = parts[0], parts[1], parts[2] |
| elif len(parts) == 2: |
| h, m, s = '0', parts[0], parts[1] |
| else: |
| return '0:00:00.00' |
| s_parts = s.split('.') |
| sec = s_parts[0].zfill(2) |
| ms = s_parts[1][:2] if len(s_parts) > 1 else '00' |
| return f'{int(h)}:{m.zfill(2)}:{sec}.{ms}' |
| except Exception: |
| return '0:00:00.00' |
|
|
| dialogue_lines = [] |
| for block in re.split(r'\n\s*\n', srt_text.strip()): |
| lines = [l for l in block.strip().split('\n') if l.strip()] |
| if len(lines) < 2: |
| continue |
| |
| tc_line_idx = None |
| for i, l in enumerate(lines): |
| if '-->' in l: |
| tc_line_idx = i |
| break |
| if tc_line_idx is None: |
| continue |
| tc_parts = lines[tc_line_idx].strip().split(' --> ') |
| if len(tc_parts) != 2: |
| continue |
| |
| |
| def _fix_tc_typo(tc_str): |
| tc_n = _norm_digits(tc_str.strip()).replace(',', '.') |
| try: |
| parts = tc_n.split(':') |
| if len(parts) == 3: |
| h, m, s = int(parts[0]), int(parts[1]), float(parts[2]) |
| total = h * 3600 + m * 60 + s |
| if total > 3600: |
| return f'{h:02d}:00:{parts[2]}' |
| except Exception: |
| pass |
| return tc_str |
| t0 = _srt_tc_to_ass(_fix_tc_typo(tc_parts[0])) |
| t1 = _srt_tc_to_ass(_fix_tc_typo(tc_parts[1])) |
| txt = '\\N'.join(_strip_emoji(l) for l in lines[tc_line_idx+1:] if l.strip()) |
| if txt: |
| dialogue_lines.append(f'Dialogue: 0,{t0},{t1},Default,,0,0,0,,{{\\an{an_tag}\\pos({center_x},{pos_y})}}{txt}') |
|
|
| with open(ass_path, 'w', encoding='utf-8') as f: |
| f.write(header + style_line + events_header + '\n'.join(dialogue_lines) + '\n') |
|
|
|
|
| def _burn_srt_direct(video_path, srt_text, out_path, position=85, |
| fontsize=80, color='white', style='outline', tmp_dir='/tmp', |
| play_res_x=720, play_res_y=1280): |
| """Convert SRTโASS (80% width bounded) then burn via ffmpeg ass= filter.""" |
| import shutil |
| if not srt_text or '-->' not in srt_text: |
| shutil.copy(video_path, out_path) |
| return |
|
|
| ass_path = os.path.join(tmp_dir, 'sub_srt.ass') |
| _srt_to_ass(srt_text, ass_path, fontsize=fontsize, color=color, |
| style=style, position=position, |
| play_res_x=play_res_x, play_res_y=play_res_y) |
|
|
| env = os.environ.copy() |
|
|
| ass_esc = ass_path.replace('\\', '/').replace(':', '\\:') |
| vf = f"ass='{ass_esc}':fontsdir=/usr/local/share/fonts/myanmar" |
|
|
| cmd = ( |
| f'ffmpeg -y -hide_banner -loglevel error ' |
| f'-i "{video_path}" ' |
| f'-vf "{vf}" ' |
| f'-c:v libx264 -crf 24 -preset ultrafast -pix_fmt yuv420p ' |
| f'-c:a copy ' |
| f'"{out_path}"' |
| ) |
| result = subprocess.run(cmd, shell=True, capture_output=True, text=True, env=env, timeout=600) |
| if result.returncode != 0: |
| raise Exception(f'ffmpeg srt burn failed: {result.stderr[-300:]}') |
|
|
|
|
| def _burn_subtitles(video_path, sentences, out_path, position=85, |
| fontsize=80, color='white', style='outline', tmp_dir='/tmp', |
| sentence_durs=None, sentence_timings=None): |
| """Burn Myanmar subtitles via ASS file + ffmpeg ass= filter.""" |
| if not sentences: |
| import shutil; shutil.copy(video_path, out_path); return |
|
|
| probe = subprocess.run( |
| 'ffprobe -v quiet -show_entries format=duration -of csv=p=0 "%s"' % video_path, |
| shell=True, capture_output=True, text=True) |
| try: total_dur = float(probe.stdout.strip()) |
| except: total_dur = 60.0 |
|
|
| ass_path = os.path.join(tmp_dir, 'sub.ass') |
| _make_ass(sentences, total_dur, ass_path, |
| position=position, fontsize=fontsize, |
| color=color, style=style, |
| sentence_durs=sentence_durs, |
| sentence_timings=sentence_timings) |
|
|
| env = os.environ.copy() |
|
|
| |
| ass_esc = ass_path.replace('\\', '/').replace(':', '\\:') |
| vf = f"ass='{ass_esc}':fontsdir=/usr/local/share/fonts/myanmar" |
|
|
| cmd = ('ffmpeg -y -hide_banner -loglevel error ' |
| '-i "%s" ' |
| '-vf "%s" ' |
| '-c:v libx264 -crf 24 -preset ultrafast -pix_fmt yuv420p ' |
| '-c:a copy ' |
| '"%s"') % (video_path, vf, out_path) |
|
|
| result = subprocess.run( |
| cmd, shell=True, check=True, |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
| timeout=600, env=env) |
| return result |
|
|
| |
| |
| |
| def _build_video(vpath, cmb, mpath, ad, vd, crop, flip, col, wmk, out_file, |
| logo_path=None, logo_x=10, logo_y=10, logo_w=80, |
| blur_enabled=False, blur_x=0, blur_y=0, blur_w=0, blur_h=0, |
| blur_orig_w=None, blur_orig_h=None, |
| wmk_x=None, wmk_y=None, wmk_fontsize=35, free_trial=False, |
| logo_orig_w=None, logo_orig_h=None, |
| wmk_orig_w=None, wmk_orig_h=None, |
| sub_ass_path=None): |
| """ |
| Build final video via ffmpeg filter_complex. |
| FIXED: blur, logo, watermark are all applied on original video BEFORE cropping. |
| Filter chain order: original video โ base transforms โ blur โ logo โ watermark โ crop โ free_trial โ subtitles โ final |
| """ |
| |
| sync_r = ad / (vd - 0.14) |
|
|
| base_filters = [ |
| 'scale=trunc(iw/2)*2:trunc(ih/2)*2', |
| f'setpts={sync_r:.6f}*PTS', |
| f'trim=duration={ad:.6f}', |
| 'setpts=PTS-STARTPTS', |
| ] |
| if flip: base_filters.append('hflip') |
| if col: base_filters.append('eq=brightness=0.06:contrast=1.2:saturation=1.4') |
| base_filters.append('format=yuv420p') |
| base_str = ','.join(base_filters) |
|
|
| |
| filter_parts = [f'[0:v]{base_str}[v_base]'] |
| current_label = '[v_base]' |
|
|
| |
| if blur_enabled and blur_w > 0 and blur_h > 0: |
| |
| if blur_orig_w and blur_orig_h: |
| orig_w, orig_h = blur_orig_w, blur_orig_h |
| else: |
| |
| try: |
| probe = subprocess.run( |
| f'ffprobe -v error -select_streams v:0 ' |
| f'-show_entries stream=width,height ' |
| f'-of csv=s=x:p=0 "{vpath}"', |
| shell=True, capture_output=True, text=True, timeout=10 |
| ) |
| orig_w, orig_h = map(int, probe.stdout.strip().split('x')) |
| except: |
| orig_w, orig_h = 1920, 1080 |
|
|
| |
| bx = max(0, min(blur_x, orig_w - 10)) |
| by = max(0, min(blur_y, orig_h - 10)) |
| bw = max(10, min(blur_w, orig_w - bx)) |
| bh = max(10, min(blur_h, orig_h - by)) |
|
|
| print(f'[blur] orig={orig_w}x{orig_h}, box=({bx},{by}) {bw}x{bh}') |
|
|
| _br = max(1, min(10, bw // 4, bh // 4)) |
| filter_parts.append( |
| f'{current_label}split[_bA][_bB];' |
| f'[_bB]crop={bw}:{bh}:{bx}:{by},boxblur={_br}:{_br}[_bBl];' |
| f'[_bA][_bBl]overlay={bx}:{by}[v_blurred]' |
| ) |
| current_label = '[v_blurred]' |
|
|
| |
| logo_idx = 2 if not mpath else 3 |
| logo_exists = logo_path and os.path.exists(logo_path) |
| extra_segs = [] |
|
|
| if logo_exists: |
| |
| _low = logo_orig_w or blur_orig_w or 1920 |
| _loh = logo_orig_h or blur_orig_h or 1080 |
| lx = logo_x if (logo_x is not None and logo_x != 10) else (_low - logo_w - 20) |
| ly = logo_y if (logo_y is not None and logo_y != 10) else 20 |
| |
| print(f'[logo] orig={_low}x{_loh}, pos=({lx},{ly}) w={logo_w}') |
| extra_segs.append( |
| f'[{logo_idx}:v]scale={max(20,int(logo_w))}:-2[_lg];' |
| f'{current_label}[_lg]overlay={lx}:{ly}[v_logo]' |
| ) |
| current_label = '[v_logo]' |
|
|
| |
| if wmk: |
| fs = max(16, int(wmk_fontsize)) |
| txt = wmk.replace("'", "").replace(":", "").replace("\\", "") |
| |
| _wow = wmk_orig_w or blur_orig_w or 1920 |
| _woh = wmk_orig_h or blur_orig_h or 1080 |
| wx = wmk_x if wmk_x is not None else (_wow - 220) |
| wy = wmk_y if wmk_y is not None else (_woh - 80) |
| print(f'[watermark] orig={_wow}x{_woh}, pos=({wx},{wy})') |
| extra_segs.append( |
| f'{current_label}drawtext=text=\'{txt}\':x={wx}:y={wy}:' |
| f'fontsize={fs}:fontcolor=white:shadowcolor=black:shadowx=2:shadowy=2[v_wmk]' |
| ) |
| current_label = '[v_wmk]' |
|
|
| |
| if crop == '9:16': |
| tw, th = 720, 1280 |
| filter_parts.append( |
| f'{current_label}split[_s1][_s2];' |
| f'[_s1]scale={tw}:{th}:force_original_aspect_ratio=increase,' |
| f'crop={tw}:{th},boxblur=10:8[_bg];' |
| f'[_s2]scale={tw}:{th}:force_original_aspect_ratio=decrease[_fg];' |
| f'[_bg][_fg]overlay=(W-w)/2:(H-h)/2[vcrop]' |
| ) |
| elif crop == '16:9': |
| tw, th = 1280, 720 |
| filter_parts.append( |
| f'{current_label}split[_s1][_s2];' |
| f'[_s1]scale={tw}:{th}:force_original_aspect_ratio=increase,' |
| f'crop={tw}:{th},boxblur=10:8[_bg];' |
| f'[_s2]scale={tw}:{th}:force_original_aspect_ratio=decrease[_fg];' |
| f'[_bg][_fg]overlay=(W-w)/2:(H-h)/2[vcrop]' |
| ) |
| elif crop == '1:1': |
| tw, th = 720, 720 |
| filter_parts.append( |
| f'{current_label}split[_s1][_s2];' |
| f'[_s1]scale={tw}:{th}:force_original_aspect_ratio=increase,' |
| f'crop={tw}:{th},boxblur=10:8[_bg];' |
| f'[_s2]scale={tw}:{th}:force_original_aspect_ratio=decrease[_fg];' |
| f'[_bg][_fg]overlay=(W-w)/2:(H-h)/2[vcrop]' |
| ) |
| elif crop == 'original': |
| |
| try: |
| _probe = subprocess.run( |
| f'ffprobe -v error -select_streams v:0 ' + |
| f'-show_entries stream=width,height ' + |
| f'-of csv=s=x:p=0 "{vpath}"', |
| shell=True, capture_output=True, text=True, timeout=30) |
| _dim = _probe.stdout.strip() |
| _vw, _vh = (int(x) for x in _dim.split('x')) |
| except Exception: |
| _vw, _vh = 1280, 720 |
| tw = (_vw // 2) * 2 |
| th = (_vh // 2) * 2 |
| filter_parts.append( |
| f'{current_label}scale={tw}:{th}:force_original_aspect_ratio=decrease,' |
| f'pad={tw}:{th}:(ow-iw)/2:(oh-ih)/2:black[vcrop]' |
| ) |
| else: |
| tw, th = 1280, 720 |
| filter_parts.append(f'{current_label}[vcrop]') |
|
|
| current_label = '[vcrop]' |
|
|
| |
| if free_trial: |
| ft_fs = max(60, int(th * 0.07)) |
| extra_segs.append( |
| f'{current_label}drawtext=text=\'FREE TRIAL\':' |
| f'x=(w-text_w)/2:y=(h-text_h)/2:' |
| f'fontsize={ft_fs}:fontcolor=red:borderw=3:bordercolor=black:' |
| f'alpha=0.75[vft]' |
| ) |
| current_label = '[vft]' |
|
|
| |
| if sub_ass_path and os.path.exists(sub_ass_path): |
| ass_esc = sub_ass_path.replace('\\', '/').replace(':', '\\:') |
| extra_segs.append( |
| f"{current_label}ass='{ass_esc}':fontsdir=/usr/local/share/fonts/myanmar[v_sub]" |
| ) |
| current_label = '[v_sub]' |
|
|
| |
| extra_segs.append(f'{current_label}copy[v_final]') |
|
|
| |
| audio_seg = _build_audio_filter(mpath, ad) |
|
|
| |
| all_segs = filter_parts + extra_segs + [audio_seg] |
| filter_complex = ';'.join(all_segs) |
|
|
| |
| inp = f'-fflags +genpts+igndts -err_detect ignore_err -i "{vpath}" -i "{cmb}"' |
| if mpath: inp += f' -stream_loop -1 -i "{mpath}"' |
| if logo_exists: inp += f' -i "{logo_path}"' |
|
|
| cmd = ( |
| f'ffmpeg -y -hide_banner -loglevel error {inp} ' |
| f'-filter_complex "{filter_complex}" ' |
| f'-map "[v_final]" -map "[outa]" ' |
| f'-c:v libx264 -crf 24 -preset ultrafast -pix_fmt yuv420p ' |
| f'-threads 0 ' |
| f'-c:a aac -ar 44100 -b:a 128k ' |
| f'-t {ad:.3f} -movflags +faststart "{out_file}"' |
| ) |
|
|
| try: |
| _run_ffmpeg(cmd, timeout=900) |
| except subprocess.CalledProcessError as e: |
| err = e.stderr.decode(errors='ignore') if e.stderr else '(no stderr)' |
| raise Exception(f'FFmpeg render failed: {err[-500:]}') |
| |
| |
| @app.route('/api/process', methods=['POST']) |
| def api_process(): |
| try: |
| u = (request.form.get('username') or '').strip() |
| video_url = (request.form.get('video_url') or '').strip() |
| sc = (request.form.get('script') or '').strip() |
| voice_id = request.form.get('voice', 'my-MM-ThihaNeural') |
| engine = request.form.get('engine', 'ms') |
| spd = int(request.form.get('speed', 30)) |
| wmk = request.form.get('watermark', '') |
| wmk_x = int(request.form.get('wmk_x', 20)) if wmk else None |
| wmk_y = int(request.form.get('wmk_y', 40)) if wmk else None |
| wmk_fontsize = int(request.form.get('wmk_fontsize', 40)) if wmk else 40 |
| crop = request.form.get('crop', '9:16') |
| flip = request.form.get('flip', '0') == '1' |
| col = request.form.get('color', '0') == '1' |
| vo_lang = request.form.get('vo_lang', 'my') |
| |
| LANG_SPD = {'th': 20, 'en': 0, 'my': 30} |
| if request.form.get('speed') is None: |
| spd = LANG_SPD.get(vo_lang, 30) |
| is_adm = (u == ADMIN_U) |
| if not is_adm and get_coins(u) < 1: |
| return jsonify(ok=False, msg='โ Not enough coins') |
|
|
| cpu_queue_wait() |
|
|
| tid = uuid.uuid4().hex[:8] |
| tmp_dir = str(BASE_DIR / f'temp_{tid}') |
| os.makedirs(tmp_dir, exist_ok=True) |
| out_file = str(OUTPUT_DIR / f'final_{tid}.mp4') |
| vpath = None; mpath = None |
|
|
| try: |
| video_file = request.files.get('video_file') |
| if video_file and video_file.filename: |
| vpath = f'{tmp_dir}/input.mp4' |
| video_file.save(vpath) |
| elif video_url: |
| out_tmpl = f'{tmp_dir}/input.%(ext)s' |
| ytdlp_download(out_tmpl, video_url) |
| found = glob.glob(f'{tmp_dir}/input.*') |
| if found: vpath = found[0] |
| if not vpath: return jsonify(ok=False, msg='โ No video selected') |
|
|
| music_file = request.files.get('music_file') |
| if music_file and music_file.filename: |
| mpath = f'{tmp_dir}/music.mp3' |
| music_file.save(mpath) |
|
|
| logo_path = None |
| logo_file = request.files.get('logo_file') |
| logo_x = int(request.form.get('logo_x', 10)) |
| logo_y = int(request.form.get('logo_y', 10)) |
| logo_w = int(request.form.get('logo_w', 80)) |
| if logo_file and logo_file.filename: |
| ext = Path(logo_file.filename).suffix or '.png' |
| logo_path = f'{tmp_dir}/logo{ext}' |
| logo_file.save(logo_path) |
| blur_enabled = request.form.get('blur_enabled') == '1' |
| blur_x = int(request.form.get('blur_x', 0)) |
| blur_y = int(request.form.get('blur_y', 0)) |
| blur_w = int(request.form.get('blur_w', 0)) |
| blur_h = int(request.form.get('blur_h', 0)) |
|
|
| _t0 = time.time() |
| sentences = split_txt(sc, vo_lang) |
| rate = f'+{spd}%' |
| if engine == 'gemini': |
| parts = run_stage('tts', run_gemini_tts_sync, tid, |
| lambda p,m: None, '', '', |
| sentences, voice_id, tmp_dir, speed=spd) |
| else: |
| parts = run_stage('tts', run_tts_sync, tid, |
| lambda p,m: None, '', '', |
| sentences, voice_id, rate, tmp_dir) |
|
|
| print(f'[TIMER] TTS: {time.time()-_t0:.1f}s') |
| cmb = f'{tmp_dir}/combined.mp3' |
| lst = f'{tmp_dir}/list.txt' |
| with open(lst, 'w') as f: |
| for a in parts: f.write(f"file '{os.path.abspath(a)}'\n") |
| |
| if engine == 'gemini': |
| _af = ('highpass=f=100,lowpass=f=10000,' |
| 'dynaudnorm=f=200:g=15,' |
| 'loudnorm=I=-16:TP=-1.5:LRA=11') |
| else: |
| _af = ('silenceremove=start_periods=1:stop_periods=-1:stop_duration=0.1:stop_threshold=-45dB,' |
| 'highpass=f=100,lowpass=f=10000,' |
| 'dynaudnorm=f=200:g=15,' |
| 'loudnorm=I=-16:TP=-1.5:LRA=11') |
| _t1 = time.time() |
| _run_ffmpeg( |
| f'ffmpeg -y -f concat -safe 0 -i "{lst}" ' |
| f'-af "{_af}" ' |
| f'-c:a libmp3lame -q:a 2 "{cmb}"', timeout=120) |
|
|
| print(f'[TIMER] audio filter: {time.time()-_t1:.1f}s') |
| vd = dur(vpath); ad = dur(cmb) |
| if vd <= 0: raise Exception('Video duration read failed') |
| if ad <= 0: raise Exception('Audio duration read failed') |
|
|
| _t2 = time.time() |
| _build_video(vpath, cmb, mpath, ad, vd, crop, flip, col, wmk, out_file, |
| logo_path=logo_path, logo_x=logo_x, logo_y=logo_y, logo_w=logo_w, |
| blur_enabled=blur_enabled, blur_x=blur_x, blur_y=blur_y, |
| blur_w=blur_w, blur_h=blur_h, |
| wmk_x=wmk_x, wmk_y=wmk_y, wmk_fontsize=wmk_fontsize) |
| rem = -1 |
| print(f'[TIMER] ffmpeg render: {time.time()-_t2:.1f}s') |
| if not is_adm: _, rem = deduct(u, 1); upd_stat(u, 'vd') |
| return jsonify(ok=True, output_url=f'/outputs/final_{tid}.mp4', coins=rem) |
|
|
| finally: |
| shutil.rmtree(tmp_dir, ignore_errors=True) |
|
|
| except Exception as e: |
| import traceback; traceback.print_exc() |
| return jsonify(ok=False, msg=f'โ {e}') |
| |
| @app.route('/api/progress/<tid>') |
| def api_progress(tid): |
| def generate(): |
| sent_done = False |
| for _ in range(1800): |
| p = job_progress.get(tid) |
| if p is None: |
| yield f"data: {json.dumps({'pct':0,'msg':'Please waitโฆ'})}\n\n" |
| else: |
| yield f"data: {json.dumps(p)}\n\n" |
| if p.get('done') or p.get('error'): |
| sent_done = True |
| break |
| time.sleep(0.4) |
| if not sent_done: |
| yield f"data: {json.dumps({'pct':0,'msg':'Timeout โ process took too long','error':True})}\n\n" |
| return Response(generate(), mimetype='text/event-stream', |
| headers={'Cache-Control':'no-cache','X-Accel-Buffering':'no'}) |
|
|
| @app.route('/api/process_all', methods=['POST']) |
| def api_process_all(): |
| """Non-blocking: read params, start background thread, return tid immediately.""" |
| try: |
| u = (request.form.get('username') or '').strip() |
| if not u: return jsonify(ok=False, msg='โ Not logged in') |
| is_adm = (u == ADMIN_U) |
| if not is_adm and load_db()['users'].get(u, {}).get('banned'): |
| return jsonify(ok=False, msg='โ Your account has been banned') |
| if not is_adm and get_coins(u) < 1: |
| return jsonify(ok=False, msg='โ Not enough coins (need 1)') |
| |
| _db_check = load_db() |
| is_free_trial = (not is_adm) and _db_check['users'].get(u, {}).get('free_trial', False) |
|
|
| |
| video_url = (request.form.get('video_url') or '').strip() |
| voice_id = request.form.get('voice', 'my-MM-ThihaNeural') |
| engine = request.form.get('engine', 'ms') |
| ct = request.form.get('content_type', 'Movie Recap') |
| api_model = request.form.get('ai_model', 'Gemini') |
| vo_lang = request.form.get('vo_lang', 'my') |
| wmk = request.form.get('watermark', '') |
| wmk_fontsize = int(request.form.get('wmk_fontsize', 40)) if wmk else 40 |
| crop = request.form.get('crop', '9:16') |
| flip = request.form.get('flip', '0') == '1' |
| col = request.form.get('color', '0') == '1' |
| blur_enabled = request.form.get('blur_enabled') == '1' |
| sub_enabled = request.form.get('sub_enabled') == '1' |
| sub_size = float(request.form.get('sub_size', 0.0547)) |
| sub_pos = int(request.form.get('sub_pos', 85)) |
| sub_color = request.form.get('sub_color', 'white') |
| sub_style = request.form.get('sub_style', 'outline') |
| client_tid = (request.form.get('tid') or '').strip() |
|
|
| LANG_SPD = {'th': 20, 'en': 0, 'my': 30} |
| spd = int(request.form.get('speed', LANG_SPD.get(vo_lang, 30))) |
|
|
| _CROP_DIM = {'9:16':(720,1280),'16:9':(1280,720),'1:1':(720,720),'original':(1280,720)} |
| FW, FH = _CROP_DIM.get(crop, (1280,720)) |
|
|
| def _pct(kp, kx, dflt, ax): |
| v = request.form.get(kp) |
| return int(float(v)*ax) if v is not None else int(request.form.get(kx, dflt)) |
|
|
| wmk_xp_raw = request.form.get('wmk_xp') |
| wmk_yp_raw = request.form.get('wmk_yp') |
| wmk_x_raw = request.form.get('wmk_x') |
| wmk_y_raw = request.form.get('wmk_y') |
| logo_xp_raw = request.form.get('logo_xp') |
| logo_yp_raw = request.form.get('logo_yp') |
| logo_x_raw = request.form.get('logo_x') |
| logo_y_raw = request.form.get('logo_y') |
| |
| |
| blur_xp = float(request.form.get('blur_xp') or 0) |
| blur_yp = float(request.form.get('blur_yp') or 0) |
| blur_wp = float(request.form.get('blur_wp') or 0) |
| blur_hp = float(request.form.get('blur_hp') or 0) |
| blur_x = int(request.form.get('blur_x', 0)) |
| blur_y = int(request.form.get('blur_y', 0)) |
| blur_w = int(request.form.get('blur_w', 0)) |
| blur_h = int(request.form.get('blur_h', 0)) |
| |
| _logo_wp = float(request.form.get('logo_wp') or 0) |
| logo_w = int(_logo_wp * FW) if _logo_wp > 0 else int(request.form.get('logo_w', int(FW*0.15))) |
| logo_w = max(20, min(logo_w, FW)) |
| _logo_wp_saved = _logo_wp |
| print(f'[OV] crop={crop} FW={FW} FH={FH} wmk="{wmk}" wmk_xp={wmk_xp_raw} logo_xp={logo_xp_raw} logo_wp={_logo_wp} logo_w={logo_w} logo_file={bool(request.files.get("logo_file"))} blur_en={blur_enabled}') |
|
|
| |
| video_bytes = None; video_fname = None |
| vf = request.files.get('video_file') |
| if vf and vf.filename: video_bytes = vf.read(); video_fname = vf.filename |
|
|
| music_bytes = None |
| mf = request.files.get('music_file') |
| if mf and mf.filename: music_bytes = mf.read() |
|
|
| logo_bytes = None; logo_fname = None |
| lf = request.files.get('logo_file') |
| if lf and lf.filename: logo_bytes = lf.read(); logo_fname = lf.filename |
|
|
| tid = client_tid if client_tid else uuid.uuid4().hex[:8] |
| cur_coins = get_coins(u) |
| coin_msg = 'Admin' if is_adm else f'๐ช {cur_coins} coins' |
| job_progress[tid] = {'pct': 2, 'msg': f'โณ แแแบแธแ
แฎแ
แฑแฌแแทแบแแฑแแแบโฆ {coin_msg}', 'done': False} |
|
|
| def _prog(pct, msg): |
| cur = job_progress.get(tid, {}) |
| job_progress[tid] = { |
| 'pct': pct if pct is not None else cur.get('pct', 2), |
| 'msg': msg, 'done': False |
| } |
|
|
| def _bg_job(): |
| nonlocal logo_w, blur_x, blur_y, blur_w, blur_h |
| global whisper_model |
| tmp_dir = str(BASE_DIR / f'temp_{tid}') |
| os.makedirs(tmp_dir, exist_ok=True) |
| out_file = str(OUTPUT_DIR / f'final_{tid}.mp4') |
| vpath = None; mpath = None; logo_path = None |
| try: |
| |
| if video_bytes: |
| vpath = f'{tmp_dir}/input.mp4' |
| with open(vpath,'wb') as wf: wf.write(video_bytes) |
| _prog(10, '๐ Video file แกแแแทแบแแผแ
แบแแผแฎ') |
| elif video_url: |
| def _dl(): |
| out_tmpl = f'{tmp_dir}/input.%(ext)s' |
| ytdlp_download(out_tmpl, video_url) |
| found = glob.glob(f'{tmp_dir}/input.*') |
| return found[0] if found else None |
| result_path = run_stage('download', _dl, tid, _prog, |
| 'โณ Download แแแบแธแ
แฎแ
แฑแฌแแทแบแแฑแแแบ', '๐ฅ Video แแฑแซแแบแธแแฏแแบแแฏแแบแแฑแแแบโฆ') |
| vpath = result_path |
|
|
| if not vpath or not os.path.exists(vpath): |
| job_progress[tid] = {'pct':0,'msg':'โ Video แแแฝแฑแทแแซ','error':True}; return |
|
|
| if music_bytes: |
| mpath = f'{tmp_dir}/music.mp3' |
| with open(mpath,'wb') as wf: wf.write(music_bytes) |
| if logo_bytes: |
| ext = Path(logo_fname).suffix if logo_fname else '.png' |
| logo_path = f'{tmp_dir}/logo{ext}' |
| with open(logo_path,'wb') as wf: wf.write(logo_bytes) |
|
|
| |
| orig_w, orig_h = 1920, 1080 |
| try: |
| _pr = subprocess.run( |
| f'ffprobe -v error -select_streams v:0 ' |
| f'-show_entries stream=width,height ' |
| f'-of csv=s=x:p=0 "{vpath}"', |
| shell=True, capture_output=True, text=True, timeout=10 |
| ) |
| if _pr.returncode == 0 and _pr.stdout.strip(): |
| orig_w, orig_h = map(int, _pr.stdout.strip().split('x')) |
| print(f'[orig] video={orig_w}x{orig_h}') |
| except Exception as _e: |
| print(f'[orig] probe failed: {_e}') |
|
|
| |
| if blur_enabled and (blur_xp or blur_yp or blur_wp or blur_hp): |
| blur_x = int(blur_xp * orig_w) |
| blur_y = int(blur_yp * orig_h) |
| blur_w = int(blur_wp * orig_w) |
| blur_h = int(blur_hp * orig_h) |
| print(f'[blur] coords=({blur_x},{blur_y}) size={blur_w}x{blur_h}') |
|
|
| |
| def _pct_orig(vp, vx, dflt, ax): |
| if vp is not None: |
| return int(float(vp) * ax) |
| if vx is not None: |
| return int(vx) |
| return dflt |
|
|
| wmk_x = _pct_orig(wmk_xp_raw, wmk_x_raw, 20, orig_w) if wmk else None |
| wmk_y = _pct_orig(wmk_yp_raw, wmk_y_raw, orig_h - 80, orig_h) if wmk else None |
| logo_x = _pct_orig(logo_xp_raw, logo_x_raw, int(orig_w * 0.8), orig_w) |
| logo_y = _pct_orig(logo_yp_raw, logo_y_raw, 20, orig_h) |
| |
| if _logo_wp_saved > 0: |
| logo_w = max(20, min(int(_logo_wp_saved * orig_w), orig_w)) |
| else: |
| logo_w = max(20, min(logo_w, orig_w)) |
| print(f'[logo] orig={orig_w}x{orig_h}, pos=({logo_x},{logo_y}) w={logo_w}') |
| if wmk: |
| print(f'[wmk] orig={orig_w}x{orig_h}, pos=({wmk_x},{wmk_y})') |
|
|
| |
| if whisper is None: raise Exception('whisper not installed') |
| if whisper_model is None: |
| whisper_model = whisper.load_model('tiny', device='cpu') |
| _wm = whisper_model |
| res = run_stage('whisper', _wm.transcribe, tid, _prog, |
| 'โณ Transcript แแแบแธแ
แฎแ
แฑแฌแแทแบแแฑแแแบ', '๐๏ธ Whisper แแผแแทแบ transcript แแฏแแบแแฑแแแบโฆ', |
| vpath, fp16=False) |
| tr = res['text']; src_lang = res.get('language','en'); whisper_segments = res.get('segments', []) |
| _prog(40, f'๐๏ธ Transcript แแผแฎแธแแซแแผแฎ ({src_lang})') |
|
|
| |
| if vo_lang == 'en': |
| sc = tr.strip() |
| caption_text = sc[:60].strip() + ('โฆ' if len(sc)>60 else '') |
| hashtags = '#english #movierecap #viral #foryou #trending' |
| else: |
| sys_p = get_sys_prompt(ct, vo_lang) + '\n' + get_num_rule(vo_lang) |
| msgs = [{'role':'system','content':sys_p}, |
| {'role':'user','content':f'Language:{src_lang}\n\n{tr}'}] |
| out_txt, _ = run_stage('ai', call_api, tid, _prog, |
| 'โณ AI Script แแแบแธแ
แฎแ
แฑแฌแแทแบแแฑแแแบ', '๐ค AI Script แแฑแธแแฑแแแบโฆ', |
| msgs, api=api_model, purpose='transcript') |
| sc, caption_text, hashtags = parse_out(out_txt) |
| _prog(65, '๐ค AI Script แแผแฎแธแแซแแผแฎ') |
|
|
| |
| rate = f'+{spd}%' |
| sentences = split_txt(sc, vo_lang) |
| if engine == 'gemini': |
| parts = run_stage('tts', run_gemini_tts_sync, tid, _prog, |
| 'โณ TTS แแแบแธแ
แฎแ
แฑแฌแแทแบแแฑแแแบ', '๐ แกแแถ แแฏแแบแแฑแแแบโฆ', |
| sentences, voice_id, tmp_dir, speed=spd) |
| else: |
| parts = run_stage('tts', run_tts_sync, tid, _prog, |
| 'โณ TTS แแแบแธแ
แฎแ
แฑแฌแแทแบแแฑแแแบ', '๐ แกแแถ แแฏแแบแแฑแแแบโฆ', |
| sentences, voice_id, rate, tmp_dir) |
|
|
| cmb = f'{tmp_dir}/combined.mp3' |
| lst = f'{tmp_dir}/list.txt' |
| with open(lst,'w') as lf2: |
| for a in parts: lf2.write(f"file '{os.path.abspath(a)}'\n") |
| subprocess.run( |
| f'ffmpeg -y -f concat -safe 0 -i "{lst}" ' |
| f'-af "silenceremove=start_periods=1:stop_periods=-1:stop_duration=0.1:stop_threshold=-50dB" ' |
| f'-c:a libmp3lame -q:a 2 "{cmb}"', shell=True, check=True) |
|
|
| |
| |
| |
| sentence_timings = None |
| sub_ass_path = None |
| if sub_enabled and sentences: |
| if engine != 'gemini': |
| try: |
| sil_dur = 0.2 |
| sent_files = parts[::2] |
| durs = [] |
| for sf in sent_files[:len(sentences)]: |
| try: |
| d = float(subprocess.run( |
| f'ffprobe -v quiet -show_entries format=duration -of csv=p=0 "{sf}"', |
| shell=True, capture_output=True, text=True).stdout.strip()) |
| except Exception: |
| d = 0.0 |
| durs.append(max(0.1, d)) |
| t = 0.0 |
| sentence_timings = [] |
| for d in durs: |
| sentence_timings.append((t, t + d)) |
| t += d + sil_dur |
| print(f'[subtitle] per-mp3 timings n={len(sentence_timings)}: ' |
| f'{[(round(s,2),round(e,2)) for s,e in sentence_timings[:4]]}โฆ') |
| except Exception as _st_err: |
| print(f'[subtitle] per-mp3 timing failed: {_st_err} โ will use equal split') |
| |
| try: |
| sub_ass_path = f'{tmp_dir}/sub.ass' |
| _PRES = {'9:16':1280,'16:9':720,'1:1':720,'original':1280} |
| _sub_fs = max(20, round(sub_size * _PRES.get(crop, 1280))) |
| _make_ass(sentences, dur(cmb), sub_ass_path, |
| position=sub_pos, fontsize=_sub_fs, |
| color=sub_color, style=sub_style, |
| sentence_timings=sentence_timings, |
| crop=crop) |
| print(f'[subtitle] ASS file ready: {sub_ass_path}') |
| except Exception as _ae: |
| print(f'[subtitle] ASS build failed: {_ae}') |
| sub_ass_path = None |
|
|
| |
| vd = dur(vpath); ad = dur(cmb) |
| if vd <= 0: raise Exception('Video duration read failed') |
| if ad <= 0: raise Exception('Audio duration read failed') |
|
|
| def _render(): |
| _build_video(vpath, cmb, mpath, ad, vd, crop, flip, col, wmk, out_file, |
| logo_path=logo_path, logo_x=logo_x, logo_y=logo_y, logo_w=logo_w, |
| blur_enabled=blur_enabled, blur_x=blur_x, blur_y=blur_y, |
| blur_w=blur_w, blur_h=blur_h, |
| blur_orig_w=orig_w, blur_orig_h=orig_h, |
| wmk_x=wmk_x, wmk_y=wmk_y, wmk_fontsize=wmk_fontsize, |
| free_trial=is_free_trial, |
| logo_orig_w=orig_w, logo_orig_h=orig_h, |
| wmk_orig_w=orig_w, wmk_orig_h=orig_h, |
| sub_ass_path=sub_ass_path) |
| run_stage('ffmpeg', _render, tid, _prog, |
| 'โณ Render แแแบแธแ
แฎแ
แฑแฌแแทแบแแฑแแแบ', '๐ฌ Video render แแฏแแบแแฑแแแบโฆ') |
|
|
| rem = -1 |
| if not is_adm: |
| _, rem = deduct(u, 1); upd_stat(u,'tr'); upd_stat(u,'vd') |
|
|
| output_url = f'/outputs/final_{tid}.mp4' |
| job_progress[tid] = { |
| 'pct': 100, 'msg': 'โ
แแผแฎแธแแซแแผแฎ!', 'done': True, |
| 'output_url': output_url, |
| 'title': caption_text, 'caption': caption_text, |
| 'hashtags': hashtags, 'source_lang': src_lang, |
| 'coins': rem, 'tid': tid, |
| } |
| |
| try: |
| save_video_history_entry(u, { |
| 'tid': tid, |
| 'output_url': output_url, |
| 'title': caption_text or '(no title)', |
| 'source_url': video_url or '', |
| 'ts': time.time(), |
| 'created_at': datetime.now().strftime('%Y-%m-%d %H:%M'), |
| }) |
| except Exception as _he: |
| print(f'โ ๏ธ history save failed: {_he}') |
| except Exception as e: |
| import traceback; traceback.print_exc() |
| job_progress[tid] = {'pct':0,'msg':f'โ {e}','error':True} |
| finally: |
| shutil.rmtree(tmp_dir, ignore_errors=True) |
|
|
| threading.Thread(target=_bg_job, daemon=True).start() |
| return jsonify(ok=True, tid=tid, msg='โณ Processing started') |
|
|
| except Exception as e: |
| import traceback; traceback.print_exc() |
| return jsonify(ok=False, msg=f'โ {e}') |
|
|
| |
| @app.route('/api/video_history') |
| def api_video_history(): |
| try: |
| u = (request.args.get('username') or '').strip() |
| if not u: |
| return jsonify(ok=False, msg='No username'), 400 |
| cleanup_old_history() |
| records = load_video_history(u) |
| now = time.time() |
| |
| valid = [] |
| for r in records: |
| if now - r.get('ts', 0) > VIDEO_HISTORY_TTL: |
| continue |
| fp = str(BASE_DIR) + r['output_url'] |
| if not os.path.exists(fp): |
| continue |
| r['expires_in'] = int(VIDEO_HISTORY_TTL - (now - r['ts'])) |
| valid.append(r) |
| return jsonify(ok=True, history=valid) |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
| |
| @app.route('/api/admin/create_user', methods=['POST']) |
| def api_create_user(): |
| try: |
| d = request.get_json(force=True) or {} |
| msg, uname = create_user_fn(d.get('username',''), d.get('coins',10), d.get('caller','')) |
| return jsonify(ok=bool(uname), msg=msg, username=uname) |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
|
|
| @app.route('/api/admin/coins', methods=['POST']) |
| def api_coins(): |
| try: |
| d = request.get_json(force=True) or {} |
| if d.get('caller') != ADMIN_U: return jsonify(ok=False, msg='โ Admin only') |
| u = d.get('username',''); n = d.get('amount', 10) |
| msg = set_coins_fn(u, n) if d.get('action') == 'set' else add_coins_fn(u, n) |
| return jsonify(ok=True, msg=msg) |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
|
|
| @app.route('/api/admin/users') |
| def api_users(): |
| try: |
| if request.args.get('caller') != ADMIN_U: |
| return jsonify(ok=False, msg='โ Admin only') |
| db = load_db() |
| users = [{'username':k,'coins':v.get('coins',0), |
| 'transcripts':v.get('total_transcripts',0), |
| 'videos':v.get('total_videos',0), |
| 'created':v.get('created_at','')[:10], |
| 'banned':v.get('banned',False)} |
| for k,v in db['users'].items()] |
| return jsonify(ok=True, users=users) |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
|
|
| @app.route('/api/admin/delete_user', methods=['POST']) |
| def api_delete_user(): |
| try: |
| d = request.get_json(force=True) or {} |
| if d.get('caller') != ADMIN_U: return jsonify(ok=False, msg='โ Admin only') |
| u = d.get('username','').strip() |
| if not u: return jsonify(ok=False, msg='โ No username') |
| db = load_db() |
| if u not in db['users']: return jsonify(ok=False, msg='โ User not found') |
| del db['users'][u]; save_db(db) |
| return jsonify(ok=True, msg=f'โ
{u} deleted') |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
|
|
| @app.route('/api/admin/ban_user', methods=['POST']) |
| def api_ban_user(): |
| try: |
| d = request.get_json(force=True) or {} |
| if d.get('caller') != ADMIN_U: return jsonify(ok=False, msg='โ Admin only') |
| u = d.get('username','').strip() |
| ban = d.get('ban', True) |
| msg = ban_fn(u, ban) |
| return jsonify(ok=True, msg=msg) |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
|
|
| @app.route('/api/admin/gen_username') |
| def api_gen_username(): |
| try: |
| if request.args.get('caller') != ADMIN_U: |
| return jsonify(ok=False, msg='โ Admin only') |
| return jsonify(ok=True, username=gen_uname()) |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
|
|
| |
| |
| |
|
|
| PACKAGES = [ |
| {'coins': 10, 'price': '12,000 MMK', 'price_thb': 100, 'desc': 'Process 10 แแผแญแแบ'}, |
| {'coins': 20, 'price': '24,000 MMK', 'price_thb': 200, 'desc': 'Process 20 แแผแญแแบ โ Best'}, |
| {'coins': 30, 'price': '36,000 MMK', 'price_thb': 300, 'desc': 'Process 30 แแผแญแแบ'}, |
| {'coins': 60, 'price': '72,000 MMK', 'price_thb': 600, 'desc': 'Process 60 แแผแญแแบ'}, |
| ] |
|
|
| @app.route('/api/payment/packages') |
| def api_payment_packages(): |
| return jsonify( |
| ok=True, |
| packages=PACKAGES, |
| kbz_name=KBZ_NAME, |
| kbz_number=KBZ_NUMBER, |
| scb_name=SCB_NAME, |
| scb_number=SCB_NUMBER, |
| promptpay=PROMPTPAY_NUM, |
| truemoney_name=TRUEMONEY_NAME, |
| truemoney_number=TRUEMONEY_NUM, |
| truemoney_qr_url=TRUEMONEY_QR_URL, |
| kbz_qr_url=KBZ_QR_URL, |
| ) |
|
|
| @app.route('/api/payment/submit', methods=['POST']) |
| def api_payment_submit(): |
| try: |
| d = request.get_json(force=True) or {} |
| username = (d.get('username') or '').strip() |
| coins = int(d.get('coins', 0)) |
| price = d.get('price', '') |
| slip_image = d.get('slip_image', '') |
|
|
| db = load_db() |
| if username not in db['users']: |
| return jsonify(ok=False, msg='โ User not found') |
|
|
| payment_id = uuid.uuid4().hex[:10] |
| now = datetime.now().isoformat() |
|
|
| pdb = load_payments_db() |
| pdb['payments'].append({ |
| 'id': payment_id, 'username': username, |
| 'coins': coins, 'price': price, |
| 'status': 'pending', |
| 'created_at': now, 'updated_at': now, |
| 'slip_image': slip_image, 'admin_note': '', |
| }) |
| save_payments_db(pdb) |
|
|
| |
| def _notify(): |
| try: |
| if not TELEGRAM_BOT_TOKEN or not ADMIN_TELEGRAM_CHAT_ID: |
| return |
| import urllib.request as _ur |
| caption = ( |
| f'๐ฐ <b>New Payment Request</b>\n' |
| f'๐ค <code>{username}</code>\n' |
| f'๐ช {coins} Coins โ {price} MMK\n' |
| f'๐ <code>{payment_id}</code>\n' |
| f'โฐ {now[:19]}' |
| ) |
| kb = json.dumps({ |
| 'inline_keyboard': [ |
| [{'text': f'โ
Approve +{coins} coins', |
| 'callback_data': f'adm_pay|approve|{payment_id}|{username}|{coins}'}, |
| {'text': 'โ Reject', |
| 'callback_data': f'adm_pay|reject|{payment_id}|{username}'}], |
| ] |
| }) |
| |
| slip_data = slip_image or '' |
| sent = False |
| if slip_data and ',' in slip_data: |
| try: |
| import base64 as _b64, urllib.parse as _up |
| b64 = slip_data.split(',', 1)[1] |
| img_bytes = _b64.b64decode(b64) |
| bnd = b'----RecapBoundary' + payment_id.encode() |
| def _field(name, val): |
| return (b'--' + bnd + b'\r\nContent-Disposition: form-data; name="' + |
| name.encode() + b'"\r\n\r\n' + val + b'\r\n') |
| body = ( |
| _field('chat_id', str(ADMIN_TELEGRAM_CHAT_ID).encode()) + |
| _field('caption', caption.encode()) + |
| _field('parse_mode', b'HTML') + |
| _field('reply_markup', kb.encode()) + |
| b'--' + bnd + b'\r\nContent-Disposition: form-data; name="photo"; filename="slip.jpg"\r\n' |
| b'Content-Type: image/jpeg\r\n\r\n' + img_bytes + b'\r\n' + |
| b'--' + bnd + b'--\r\n' |
| ) |
| req2 = _ur.Request( |
| f'https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto', |
| data=body, |
| headers={'Content-Type': 'multipart/form-data; boundary=' + bnd.decode()}) |
| _ur.urlopen(req2, timeout=15) |
| sent = True |
| except Exception as img_e: |
| print(f'[notify slip] {img_e}') |
| if not sent: |
| payload = json.dumps({ |
| 'chat_id': ADMIN_TELEGRAM_CHAT_ID, |
| 'text': caption, 'parse_mode': 'HTML', |
| 'reply_markup': json.loads(kb) |
| }).encode() |
| req = _ur.Request( |
| f'https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage', |
| data=payload, headers={'Content-Type': 'application/json'}) |
| _ur.urlopen(req, timeout=10) |
| except Exception as e: |
| print(f'[notify] {e}') |
| threading.Thread(target=_notify, daemon=True).start() |
|
|
| return jsonify(ok=True, |
| msg='โ
Payment แแแบแแผแฎแธแแซแแผแฎแ Admin แ
แ
แบแแฑแธแแผแฎแธ Coins แแแทแบแแฑแธแแซแแแบแ', |
| payment_id=payment_id) |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
|
|
| @app.route('/api/coins') |
| def api_get_coins(): |
| """Lightweight endpoint for frontend coin polling.""" |
| try: |
| username = request.args.get('username', '').strip() |
| if not username: |
| return jsonify(ok=False, msg='missing username') |
| db = load_db() |
| if username not in db['users']: |
| return jsonify(ok=False, msg='not found') |
| coins = db['users'][username].get('coins', 0) |
| return jsonify(ok=True, coins=coins) |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
|
|
| @app.route('/api/payment/history') |
| def api_payment_history(): |
| try: |
| username = request.args.get('username', '').strip() |
| if not username: |
| return jsonify(ok=False, msg='โ Username required') |
| pdb = load_payments_db() |
| pays = [p for p in pdb['payments'] if p['username'] == username] |
| |
| clean = [] |
| for p in pays: |
| c = dict(p); c.pop('slip_image', None); clean.append(c) |
| return jsonify(ok=True, payments=clean) |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
|
|
| @app.route('/api/admin/payments') |
| def api_admin_payments(): |
| try: |
| if request.args.get('caller') != ADMIN_U: |
| return jsonify(ok=False, msg='โ Admin only') |
| status = request.args.get('status', 'pending') |
| pdb = load_payments_db() |
| pays = [p for p in pdb['payments'] if p['status'] == status] |
| clean = [] |
| for p in pays: |
| c = dict(p) |
| |
| clean.append(c) |
| return jsonify(ok=True, payments=clean) |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
|
|
| @app.route('/api/admin/payment/approve', methods=['POST']) |
| def api_admin_payment_approve(): |
| try: |
| d = request.get_json(force=True) or {} |
| if d.get('caller') != ADMIN_U: |
| return jsonify(ok=False, msg='โ Admin only') |
| payment_id = d.get('payment_id', '').strip() |
| pdb = load_payments_db() |
| pay = next((p for p in pdb['payments'] if p['id'] == payment_id), None) |
| if not pay: |
| return jsonify(ok=False, msg='โ Payment not found') |
| if pay['status'] != 'pending': |
| return jsonify(ok=False, msg=f'โ ๏ธ Already {pay["status"]}') |
| pay['status'] = 'approved' |
| pay['updated_at'] = datetime.now().isoformat() |
| save_payments_db(pdb) |
| |
| db = load_db() |
| u = pay['username'] |
| new_bal = 0 |
| if u in db['users']: |
| db['users'][u]['coins'] = db['users'][u].get('coins', 0) + pay['coins'] |
| db['users'][u]['free_trial'] = False |
| new_bal = db['users'][u]['coins'] |
| save_db(db) |
| |
| tg_chat_id = db['users'].get(u, {}).get('tg_chat_id') |
| if tg_chat_id and TELEGRAM_BOT_TOKEN: |
| def _notify_user(chat_id, coins_added, balance, pid): |
| try: |
| import requests as _req |
| _req.post( |
| f'https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage', |
| json={ |
| 'chat_id': chat_id, |
| 'text': ( |
| f'๐ *Coins แแแทแบแแผแฎแธแแซแแผแฎ!*\n' |
| f'๐ช *+{coins_added} Coins* แแฑแฌแแบแแผแฎ\n' |
| f'๐ฐ แแแบแแปแแบ โ *{balance} Coins*\n' |
| f'๐ `{pid}`' |
| ), |
| 'parse_mode': 'Markdown', |
| }, |
| timeout=10 |
| ) |
| except Exception as _e: |
| print(f'[notify user] {_e}') |
| threading.Thread(target=_notify_user, args=(tg_chat_id, pay['coins'], new_bal, pay['id']), daemon=True).start() |
| return jsonify(ok=True, msg=f'โ
Approved +{pay["coins"]} coins โ {u}', new_coins=new_bal) |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
|
|
| @app.route('/api/admin/payment/reject', methods=['POST']) |
| def api_admin_payment_reject(): |
| try: |
| d = request.get_json(force=True) or {} |
| if d.get('caller') != ADMIN_U: |
| return jsonify(ok=False, msg='โ Admin only') |
| payment_id = d.get('payment_id', '').strip() |
| note = d.get('note', '') |
| pdb = load_payments_db() |
| pay = next((p for p in pdb['payments'] if p['id'] == payment_id), None) |
| if not pay: |
| return jsonify(ok=False, msg='โ Payment not found') |
| pay['status'] = 'rejected' |
| pay['admin_note'] = note |
| pay['updated_at'] = datetime.now().isoformat() |
| save_payments_db(pdb) |
| return jsonify(ok=True, msg='โ Rejected') |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
|
|
| @app.route('/api/admin/payment/slip/<payment_id>') |
| def api_admin_payment_slip(payment_id): |
| try: |
| if request.args.get('caller') != ADMIN_U: |
| return jsonify(ok=False, msg='โ Admin only'), 403 |
| pdb = load_payments_db() |
| pay = next((p for p in pdb['payments'] if p['id'] == payment_id), None) |
| if not pay: |
| return jsonify(ok=False, msg='Not found'), 404 |
| return jsonify(ok=True, slip_image=pay.get('slip_image','')) |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
| @app.route('/api/admin/broadcast', methods=['POST']) |
| def api_admin_broadcast(): |
| """Send broadcast message to all users via Telegram bot.""" |
| try: |
| data = request.get_json(force=True) |
| caller = data.get('caller', '') |
| if caller != ADMIN_U: |
| return jsonify(ok=False, msg='โ Admin only'), 403 |
| message = data.get('message', '').strip() |
| if not message: |
| return jsonify(ok=False, msg='โ Message แแแแทแบแแแฑแธแแซ') |
| db = load_db() |
| token = os.getenv('TELEGRAM_BOT_TOKEN', '') |
| if not token: |
| return jsonify(ok=False, msg='โ BOT_TOKEN แแแแบแแพแแบแแแฑแธแแซ') |
| import urllib.request as _ur, json as _json, threading as _th |
| sent = 0; fail = 0 |
| for uname, udata in db.get('users', {}).items(): |
| tg_id = udata.get('tg_chat_id') |
| if not tg_id: |
| continue |
| try: |
| payload = _json.dumps({ |
| 'chat_id': tg_id, |
| 'text': f'๐ข *แแผแฑแแผแฌแแปแแบ*\n\n{message}', |
| 'parse_mode': 'Markdown', |
| }).encode() |
| req = _ur.Request( |
| f'https://api.telegram.org/bot{token}/sendMessage', |
| data=payload, |
| headers={'Content-Type': 'application/json'}) |
| _ur.urlopen(req, timeout=8) |
| sent += 1 |
| except Exception as e: |
| logger.warning(f'[broadcast] {uname}: {e}') |
| fail += 1 |
| return jsonify(ok=True, sent=sent, fail=fail, |
| msg=f'โ
{sent} แแฑแฌแแบ แแญแฏแทแแผแฎแธ โ {fail} แแป') |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)) |
| @app.route('/api/payment/kbz_qr') |
| def api_kbz_qr(): |
| """ |
| Generate KBZ Pay QR as PNG โ uses real KBZ QR payloads per fixed MMK amount. |
| If KBZ_QR_URL env is set, redirect to that static image instead. |
| Query: amount=<MMK> (10000 / 18000 / 27000 / 54000) |
| """ |
| |
| _KBZ_QR_PAYLOADS = { |
| 10000: 'hQZLQlpQYXlhRE8C8FACEFcWCWeYcTUtJgMQEB+fCAQBAZ8kBzEwMDAwLjA=F919d3807b9db=', |
| 18000: 'hQZLQlpQYXlhRE8C8FACEFcWCWeYcTUtJgMQEB+fCAQBAZ8kBzE4MDAwLjA=FF19d38087248=', |
| 27000: 'hQZLQlpQYXlhRE8C8FACEFcWCWeYcTUtJgMQEB+fCAQBAZ8kBzI3MDAwLjA=F419d3808eed3=', |
| 54000: 'hQZLQlpQYXlhRE8C8FACEFcWCWeYcTUtJgMQEB+fCAQBAZ8kBzU0MDAwLjA=F919d380995a0=', |
| } |
| try: |
| if KBZ_QR_URL: |
| return redirect(KBZ_QR_URL) |
| amount_str = request.args.get('amount', '0').strip() |
| try: |
| amount = int(float(amount_str)) |
| except ValueError: |
| return jsonify(ok=False, msg='invalid amount'), 400 |
|
|
| qr_data = _KBZ_QR_PAYLOADS.get(amount) |
| if not qr_data: |
| return jsonify(ok=False, msg=f'No KBZ QR for amount {amount} MMK'), 400 |
|
|
| try: |
| import qrcode as _qr, io |
| q = _qr.QRCode( |
| version=None, |
| error_correction=_qr.constants.ERROR_CORRECT_M, |
| box_size=10, border=4, |
| ) |
| q.add_data(qr_data) |
| q.make(fit=True) |
| img = q.make_image(fill_color='black', back_color='white') |
| buf = io.BytesIO() |
| img.save(buf, format='PNG') |
| buf.seek(0) |
| return Response(buf.read(), mimetype='image/png', |
| headers={'Cache-Control': 'no-store'}) |
| except ImportError: |
| return jsonify(ok=True, payload=qr_data, |
| note='pip install qrcode[pil]') |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)), 500 |
| @app.route('/api/payment/truemoney_qr') |
| def api_truemoney_qr(): |
| """ |
| Generate TrueMoney Wallet QR as PNG using EMV QR spec (PromptPay Topup format). |
| Verified against real TrueMoney QR samples. |
| If TRUEMONEY_QR_URL env is set, redirect to that static image instead. |
| Query: amount=<THB float> |
| """ |
| try: |
| if TRUEMONEY_QR_URL: |
| return redirect(TRUEMONEY_QR_URL) |
| amount_str = request.args.get('amount', '0').strip() |
| try: |
| amount = float(amount_str) |
| except ValueError: |
| return jsonify(ok=False, msg='invalid amount'), 400 |
|
|
| |
| phone = TRUEMONEY_NUM.strip().replace('+', '').replace('-', '').replace(' ', '') |
| phone_digits = phone.lstrip('0') |
| topup_id = '140000' + phone_digits |
|
|
| def _f(tag, val): |
| return f'{tag}{len(val):02d}{val}' |
|
|
| merchant = _f('00', 'A000000677010111') + _f('03', topup_id) |
| tag29 = _f('29', merchant) |
|
|
| amt_str = f'{amount:.2f}' |
| tag54 = _f('54', amt_str) if amount > 0 else '' |
|
|
| payload = ( |
| _f('00', '01') + |
| _f('01', '12') + |
| tag29 + |
| _f('53', '764') + |
| tag54 + |
| _f('58', 'TH') + |
| '6304' |
| ) |
|
|
| def _crc16(s: str) -> str: |
| crc = 0xFFFF |
| for b in s.encode('ascii'): |
| crc ^= b << 8 |
| for _ in range(8): |
| crc = ((crc << 1) ^ 0x1021) if (crc & 0x8000) else (crc << 1) |
| crc &= 0xFFFF |
| return format(crc, '04X') |
|
|
| payload += _crc16(payload) |
| qr_data = payload |
|
|
| try: |
| import qrcode as _qr, io |
| q = _qr.QRCode( |
| version=None, |
| error_correction=_qr.constants.ERROR_CORRECT_M, |
| box_size=10, border=4, |
| ) |
| q.add_data(qr_data) |
| q.make(fit=True) |
| img = q.make_image(fill_color='black', back_color='white') |
| buf = io.BytesIO() |
| img.save(buf, format='PNG') |
| buf.seek(0) |
| return Response(buf.read(), mimetype='image/png', |
| headers={'Cache-Control': 'no-store'}) |
| except ImportError: |
| return jsonify(ok=True, payload=qr_data, |
| note='pip install qrcode[pil]') |
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)), 500 |
| @app.route('/api/payment/promptpay_qr') |
| def api_promptpay_qr(): |
| """ |
| Generate PromptPay QR (Thai EMV QR spec) โ scannable by any Thai banking app. |
| Query: amount=<float THB> |
| """ |
| try: |
| amount_str = request.args.get('amount', '0').strip() |
| try: |
| amount = float(amount_str) |
| except ValueError: |
| return jsonify(ok=False, msg='invalid amount'), 400 |
|
|
| |
| phone = PROMPTPAY_NUM.strip().replace('+', '').replace('-', '').replace(' ', '') |
| if phone.startswith('66'): |
| phone = '0066' + phone[2:] |
| elif phone.startswith('0'): |
| phone = '0066' + phone[1:] |
| else: |
| phone = '0066' + phone |
|
|
| |
| def f(tag, val): |
| return f'{tag}{len(val):02d}{val}' |
|
|
| |
| merchant = f('00', 'A000000677010111') + f('01', phone) |
| tag29 = f('29', merchant) |
|
|
| |
| if amount > 0: |
| amt_str = f'{amount:.2f}' |
| |
| tag54 = f('54', amt_str) |
| else: |
| tag54 = '' |
|
|
| payload = ( |
| f('00', '01') + |
| f('01', '12') + |
| tag29 + |
| f('52', '0000') + |
| f('53', '764') + |
| tag54 + |
| f('58', 'TH') + |
| f('59', 'PromptPay') + |
| f('60', 'Bangkok') + |
| '6304' |
| ) |
|
|
| |
| def crc16(s: str) -> str: |
| crc = 0xFFFF |
| for b in s.encode('ascii'): |
| crc ^= b << 8 |
| for _ in range(8): |
| crc = ((crc << 1) ^ 0x1021) if (crc & 0x8000) else (crc << 1) |
| crc &= 0xFFFF |
| return format(crc, '04X') |
|
|
| payload += crc16(payload) |
|
|
| |
| try: |
| import qrcode as _qr, io |
| q = _qr.QRCode( |
| version=None, |
| error_correction=_qr.constants.ERROR_CORRECT_M, |
| box_size=10, border=4, |
| ) |
| q.add_data(payload) |
| q.make(fit=True) |
| img = q.make_image(fill_color='black', back_color='white') |
| buf = io.BytesIO() |
| img.save(buf, format='PNG') |
| buf.seek(0) |
| return Response(buf.read(), mimetype='image/png', |
| headers={'Cache-Control': 'no-store'}) |
| except ImportError: |
| return jsonify(ok=True, payload=payload, |
| note='pip install qrcode[pil]') |
|
|
| except Exception as e: |
| return jsonify(ok=False, msg=str(e)), 500 |
| |
| |
| |
|
|
| def _norm_digits(s): |
| """Normalize non-ASCII digit-lookalikes in timecodes to ASCII 0-9. |
| Covers: |
| U+1040-U+1049 Myanmar digits แ-แ |
| U+101D Myanmar letter wa 'แ' (Gemini writes this as zero in timecodes) |
| U+0E50-U+0E59 Thai digits เน-เน |
| """ |
| mm = str.maketrans('\u1040\u1041\u1042\u1043\u1044\u1045\u1046\u1047\u1048\u1049', |
| '0123456789') |
| wa = str.maketrans('\u101D', '0') |
| th = str.maketrans('\u0E50\u0E51\u0E52\u0E53\u0E54\u0E55\u0E56\u0E57\u0E58\u0E59', |
| '0123456789') |
| return s.translate(mm).translate(wa).translate(th) |
|
|
| def _tc_to_sec(tc): |
| """HH:MM:SS,mmm or HH:MM:SS.mmm โ float seconds. Handles Myanmar/Thai digits.""" |
| tc = _norm_digits(tc).replace(',', '.') |
| parts = tc.split(':') |
| h, m = int(parts[0]), int(parts[1]) |
| s = float(parts[2]) |
| return h * 3600 + m * 60 + s |
|
|
| def _sec_to_srt_tc(secs): |
| """float seconds โ HH:MM:SS,mmm SRT timecode.""" |
| h = int(secs // 3600) |
| m = int((secs % 3600) // 60) |
| s = int(secs % 60) |
| ms = int(round((secs - int(secs)) * 1000)) |
| return f'{h:02d}:{m:02d}:{s:02d},{ms:03d}' |
|
|
| def _parse_srt(text): |
| """Parse SRT content โ list of (index, timecode_str, text). |
| Normalizes Myanmar/Thai digits to ASCII so ffmpeg can read timecodes.""" |
| blocks = [] |
| for block in re.split(r'\n\s*\n', text.strip()): |
| lines = block.strip().split('\n') |
| if len(lines) < 2: continue |
| try: |
| idx = int(_norm_digits(lines[0].strip())) |
| except ValueError: |
| continue |
| tc_raw = _norm_digits(lines[1].strip()) |
| txt = '\n'.join(lines[2:]).strip() |
| if tc_raw and txt: |
| blocks.append((idx, tc_raw, txt)) |
| return blocks |
|
|
| def _strip_emoji(text): |
| """Remove emoji and non-Myanmar/ASCII characters from subtitle text.""" |
| import unicodedata |
| result = [] |
| for ch in text: |
| cp = ord(ch) |
| |
| if (0x20 <= cp <= 0x7E or |
| 0x1000 <= cp <= 0x109F or |
| 0xA9E0 <= cp <= 0xA9FF or |
| 0xAA60 <= cp <= 0xAA7F or |
| ch in '\n\r '): |
| result.append(ch) |
| return ''.join(result).strip() |
|
|
|
|
| def _build_srt(blocks): |
| """Build SRT string from list of (index, timecode, text).""" |
| parts = [] |
| for idx, tc, txt in blocks: |
| parts.append(f'{idx}\n{tc}\n{txt}') |
| return '\n\n'.join(parts) + '\n' |
|
|
| def _gemini_video_to_myanmar_srt(vpath, prog_fn=None): |
| """ |
| Upload video to Gemini Files API โ generate Myanmar SRT with timecodes. |
| Uses google-genai SDK (ggenai). |
| Returns SRT string. |
| """ |
| if ggenai is None: |
| raise Exception('google-genai package not installed') |
|
|
| _, ordered_keys = next_gemini_key() |
| if not ordered_keys: |
| raise Exception('No Gemini API Key') |
|
|
| prompt = ( |
| "You are a professional subtitle generator with frame-accurate timing.\n\n" |
| "INPUT:\n" |
| "- One video file containing speech (may be Chinese, Thai, English, or any language).\n\n" |
| "GOAL:\n" |
| "- Produce a COMPLETE Burmese (spoken Burmese / everyday colloquial style, NOT formal/literary) " |
| ".SRT covering the ENTIRE video duration.\n" |
| "- Translate ALL dialog, narration, and speech โ do NOT omit anything.\n" |
| "- Use 100% spoken Burmese style only (แแผแฑแฌแแญแฏแแฑแฌแแฌแแฌ). Do NOT use formal/written style.\n" |
| "- Write numbers in Burmese words.\n\n" |
| "TIMING ACCURACY (CRITICAL โ most important rule):\n" |
| "- Listen to the EXACT moment each word/phrase starts and ends in the audio.\n" |
| "- The START time MUST match the exact millisecond the speaker begins that subtitle's speech.\n" |
| "- The END time MUST match the exact millisecond the speech ends (NOT when the next subtitle starts).\n" |
| "- Do NOT guess or estimate timing โ derive timecodes from the actual audio speech boundaries.\n" |
| "- Do NOT pad end times forward to the next subtitle's start โ leave a natural gap of 50โ200 ms between subtitles.\n" |
| "- Do NOT shift timecodes to make subtitles look evenly spaced โ preserve the real speech rhythm.\n" |
| "- Silence or pause gaps in the audio MUST appear as gaps between subtitle blocks, not filled with text.\n" |
| "- Each subtitle block MUST NOT overlap in time with the next block.\n\n" |
| "CRITICAL OUTPUT RULES:\n" |
| "- Output ONLY valid .SRT content (plain text).\n" |
| "- Do NOT add explanations or comments.\n" |
| "- Do NOT ask questions.\n" |
| "- Do NOT stop early due to length limits.\n" |
| "- Do NOT include emoji in subtitle text.\n" |
| "- ALWAYS use ENGLISH/ASCII digits (0-9) for timecodes.\n" |
| "- NEVER use Myanmar digits (แ-แ) or the letter 'แ'.\n" |
| "- Example: 00:00:01,240 --> 00:00:02,810 is correct.\n" |
| "- NEVER write: แแ:แแ:แแ,แแแ or 00:แแ:แแ,แแแ\n\n" |
| "LONG-OUTPUT HANDLING (MANDATORY):\n" |
| "- If the full SRT cannot fit in a single response:\n" |
| " 1) Output as much SRT as possible.\n" |
| " 2) End the response with EXACTLY this line:\n" |
| " [CONTINUE]\n" |
| " 3) Stop immediately after that line.\n" |
| "- When the user replies with \"continue\", resume from the NEXT subtitle number.\n" |
| "- Never repeat or reset subtitle numbers.\n" |
| "- Continue this process until the FINAL subtitle is output.\n" |
| "- The LAST response must NOT include [CONTINUE].\n\n" |
| "FORMAT (MANDATORY, exactly like this example โ use real measured timecodes):\n" |
| "1\n" |
| "00:00:01,240 --> 00:00:02,810\n" |
| "แแผแแบแแฌ subtitle text\n\n" |
| "2\n" |
| "00:00:03,050 --> 00:00:04,630\n" |
| "แแฑแฌแแบ subtitle text\n\n" |
| "SEGMENTATION RULES:\n" |
| "- Prefer many short subtitles over fewer long ones.\n" |
| "- Split at natural pauses, breaths, or speaker changes.\n" |
| "- Avoid subtitles longer than ~3 seconds unless a single sentence requires it.\n" |
| "- Each subtitle should display for at least 0.5 seconds.\n" |
| "- Do not leave large timing gaps unless the audio is genuinely silent.\n\n" |
| "VALIDATION:\n" |
| "- Start time must always be strictly less than end time.\n" |
| "- No two subtitle blocks may overlap in time.\n" |
| "- If any text appears outside SRT structure (except [CONTINUE]), the output is INVALID.\n\n" |
| "BEGIN OUTPUT." |
| ) |
|
|
| last_err = None |
| for api_key in ordered_keys: |
| uploaded_file = None |
| try: |
| client = ggenai.Client(api_key=api_key) |
|
|
| |
| if prog_fn: prog_fn(None, '๐ค Video Gemini แแฒ upload แแฏแแบแแฑแแแบโฆ') |
| print(f'[srt_gemini] uploading {vpath} ({os.path.getsize(vpath)//1024}KB)') |
|
|
| with open(vpath, 'rb') as f: |
| uploaded_file = client.files.upload( |
| file=f, |
| config={'mime_type': 'video/mp4', 'display_name': 'srt_input'} |
| ) |
|
|
| |
| if prog_fn: prog_fn(None, 'โณ Gemini video processingโฆ') |
| for _ in range(60): |
| finfo = client.files.get(name=uploaded_file.name) |
| if finfo.state.name == 'ACTIVE': |
| break |
| if finfo.state.name == 'FAILED': |
| raise Exception('Gemini file processing FAILED') |
| time.sleep(2) |
| else: |
| raise Exception('Gemini file processing timeout') |
|
|
| |
| if prog_fn: prog_fn(None, '๐ค Gemini Myanmar SRT แแฏแแบแแฑแแแบโฆ') |
| response = client.models.generate_content( |
| model='gemini-3-flash', |
| contents=[ |
| gtypes.Part.from_uri(file_uri=uploaded_file.uri, mime_type='video/mp4'), |
| prompt, |
| ], |
| config=gtypes.GenerateContentConfig( |
| max_output_tokens=65536, |
| thinking_config=gtypes.ThinkingConfig(thinking_level="minimal"), |
| ) |
| ) |
| raw = response.text.strip() if response.text else '' |
|
|
| |
| raw = re.sub(r'^```[a-z]*\n?', '', raw, flags=re.MULTILINE) |
| raw = re.sub(r'\n?```$', '', raw, flags=re.MULTILINE) |
| |
| raw = re.sub(r'\[CONTINUE\]\s*$', '', raw.strip()).strip() |
|
|
| if not raw: |
| raise Exception('Gemini returned empty SRT') |
|
|
| |
| |
| fixed_lines = [] |
| for line in raw.splitlines(): |
| if '-->' in line: |
| fixed_lines.append(_norm_digits(line)) |
| elif re.match(r'^[แ-แ\d]+\s*$', line.strip()): |
| |
| fixed_lines.append(_norm_digits(line)) |
| else: |
| fixed_lines.append(line) |
| raw = '\n'.join(fixed_lines) |
|
|
| |
| if '-->' not in raw: |
| raise Exception(f'Gemini output has no SRT timecodes: {raw[:200]}') |
|
|
| print(f'[srt_gemini] SRT generated, {len(raw)} chars, key=...{api_key[-6:]}') |
| return raw |
|
|
| except Exception as e: |
| last_err = e |
| print(f'[srt_gemini] key failed: {e}') |
| continue |
| finally: |
| |
| if uploaded_file: |
| try: |
| client.files.delete(name=uploaded_file.name) |
| except: |
| pass |
|
|
| raise Exception(f'โ Gemini SRT generation all keys failed: {last_err}') |
| |
| @app.route('/api/generate_srt', methods=['POST']) |
| def api_generate_srt(): |
| """ |
| Step 1: Download video (or use upload/cache) โ Gemini generates Myanmar SRT. |
| Non-blocking: returns tid immediately, use /api/progress/<tid> for updates. |
| Result in job_progress[tid]: {done, srt, total, coins, cache_key} |
| """ |
| try: |
| u = (request.form.get('username') or '').strip() |
| if not u: |
| return jsonify(ok=False, msg='โ Not logged in') |
| is_adm = (u == ADMIN_U) |
| if not is_adm and load_db()['users'].get(u, {}).get('banned'): |
| return jsonify(ok=False, msg='โ Account banned') |
| if not is_adm and get_coins(u) < 1: |
| return jsonify(ok=False, msg='โ Not enough coins (need 1)') |
|
|
| video_url = (request.form.get('video_url') or '').strip() |
| cache_key = (request.form.get('cache_key') or '').strip() |
| client_tid = (request.form.get('tid') or '').strip() |
|
|
| |
| video_bytes = None; video_fname = None |
| vf = request.files.get('video_file') |
| if vf and vf.filename: |
| video_bytes = vf.read() |
| video_fname = vf.filename |
|
|
| tid = client_tid or uuid.uuid4().hex[:8] |
| job_progress[tid] = {'pct': 2, 'msg': 'โณ แแแบแธแ
แฎแ
แฑแฌแแทแบแแฑแแแบโฆ', 'done': False} |
|
|
| def _prog(pct, msg): |
| cur = job_progress.get(tid, {}) |
| job_progress[tid] = { |
| 'pct': pct if pct is not None else cur.get('pct', 2), |
| 'msg': msg, 'done': False |
| } |
|
|
| def _bg(): |
| tmp_dir = str(BASE_DIR / f'temp_srt_{tid}') |
| os.makedirs(tmp_dir, exist_ok=True) |
| vpath = None |
| try: |
| |
| if video_bytes: |
| vpath = f'{tmp_dir}/input.mp4' |
| with open(vpath, 'wb') as wf: wf.write(video_bytes) |
| _prog(10, '๐ Video file แกแแแทแบแแผแ
แบแแผแฎ') |
| elif cache_key: |
| with _preview_cache_lock: |
| cached = _preview_cache.get(cache_key) |
| if cached and os.path.exists(cached['file']): |
| vpath = cached['file'] |
| _prog(10, '๐ Cached video แกแแแทแบแแผแ
แบแแผแฎ') |
| elif video_url: |
| def _dl(): |
| out_tmpl = f'{tmp_dir}/input.%(ext)s' |
| ytdlp_download(out_tmpl, video_url) |
| found = glob.glob(f'{tmp_dir}/input.*') |
| return found[0] if found else None |
| vpath = run_stage('download', _dl, tid, _prog, |
| 'โณ Download แแแบแธแ
แฎแ
แฑแฌแแทแบแแฑแแแบ', '๐ฅ Video แแฑแซแแบแธแแฏแแบแแฏแแบแแฑแแแบโฆ') |
| elif video_url: |
| def _dl2(): |
| out_tmpl = f'{tmp_dir}/input.%(ext)s' |
| ytdlp_download(out_tmpl, video_url) |
| found = glob.glob(f'{tmp_dir}/input.*') |
| return found[0] if found else None |
| vpath = run_stage('download', _dl2, tid, _prog, |
| 'โณ Download แแแบแธแ
แฎแ
แฑแฌแแทแบแแฑแแแบ', '๐ฅ Video แแฑแซแแบแธแแฏแแบแแฏแแบแแฑแแแบโฆ') |
|
|
| if not vpath or not os.path.exists(vpath): |
| job_progress[tid] = {'pct': 0, 'msg': 'โ Video แแแฝแฑแทแแซ', 'error': True} |
| return |
|
|
| _prog(25, '๐ค Gemini แแฒ video upload แแฏแแบแแฑแแแบโฆ') |
|
|
| |
| def _gen_srt(): |
| return _gemini_video_to_myanmar_srt(vpath, prog_fn=_prog) |
|
|
| srt_text = run_stage('ai', _gen_srt, tid, _prog, |
| 'โณ AI แแแบแธแ
แฎแ
แฑแฌแแทแบแแฑแแแบ', '๐ค Gemini SRT แแฏแแบแแฑแแแบโฆ') |
|
|
| blocks = _parse_srt(srt_text) |
| total = len(blocks) |
| print(f'[generate_srt] {total} subtitle blocks for user={u}') |
|
|
| |
| |
| vpath_file = str(BASE_DIR / f'srt_vpath_{tid}.txt') |
| with open(vpath_file, 'w') as f: |
| f.write(vpath) |
|
|
| |
| rem = -1 |
| if not is_adm: |
| _, rem = deduct(u, 1) |
|
|
| job_progress[tid] = { |
| 'pct': 100, 'done': True, |
| 'msg': f'โ
Myanmar SRT แแผแฎแธแแซแแผแฎ! ({total} lines)', |
| 'srt': srt_text, |
| 'total': total, |
| 'coins': rem, |
| 'vpath_key': tid, |
| } |
|
|
| except Exception as e: |
| import traceback; traceback.print_exc() |
| job_progress[tid] = {'pct': 0, 'msg': f'โ {e}', 'error': True} |
| finally: |
| |
| pass |
|
|
| threading.Thread(target=_bg, daemon=True).start() |
| return jsonify(ok=True, tid=tid) |
|
|
| except Exception as e: |
| import traceback; traceback.print_exc() |
| return jsonify(ok=False, msg=f'โ {e}') |
| @app.route('/api/burn_srt', methods=['POST']) |
| def api_burn_srt(): |
| """ |
| Step 2: Burn Myanmar SRT onto video with original audio preserved. |
| Input: srt_text, vpath_key (tid from generate_srt), sub settings, crop/flip/color |
| No coin deduction โ already paid in generate_srt step. |
| Non-blocking: returns tid for /api/progress/<tid>. |
| """ |
| try: |
| u = (request.form.get('username') or '').strip() |
| if not u: |
| return jsonify(ok=False, msg='โ Not logged in') |
| is_adm = (u == ADMIN_U) |
|
|
| srt_text = (request.form.get('srt_text') or '').strip() |
| vpath_key = (request.form.get('vpath_key') or '').strip() |
| sub_pos = int(request.form.get('sub_pos', 85)) |
| sub_size = float(request.form.get('sub_size', 0.0547)) |
| sub_color = request.form.get('sub_color', 'white') |
| sub_style = request.form.get('sub_style', 'outline') |
| crop = request.form.get('crop', 'original') |
| flip = request.form.get('flip', '0') == '1' |
| col = request.form.get('color', '0') == '1' |
|
|
| |
| blur_enabled = request.form.get('blur_enabled', '0') == '1' |
| blur_xp = float(request.form.get('blur_xp') or 0) |
| blur_yp = float(request.form.get('blur_yp') or 0) |
| blur_wp = float(request.form.get('blur_wp') or 0) |
| blur_hp = float(request.form.get('blur_hp') or 0) |
|
|
| |
| zoom_enabled = request.form.get('zoom_enabled', '0') == '1' |
| zoom_factor = float(request.form.get('zoom_factor', 1.03)) |
| zoom_factor = max(1.01, min(zoom_factor, 1.30)) |
|
|
| |
| wmk = (request.form.get('watermark') or '').strip() |
| wmk_xp_raw = request.form.get('wmk_xp') |
| wmk_yp_raw = request.form.get('wmk_yp') |
| wmk_fontsize = int(request.form.get('wmk_fontsize', 28)) |
| wmk_xp = float(wmk_xp_raw) if wmk_xp_raw else None |
| wmk_yp = float(wmk_yp_raw) if wmk_yp_raw else None |
|
|
| |
| logo_bytes = None; logo_fname = None |
| lf = request.files.get('logo_file') |
| if lf and lf.filename: logo_bytes = lf.read(); logo_fname = lf.filename |
| logo_xp_raw = request.form.get('logo_xp') |
| logo_yp_raw = request.form.get('logo_yp') |
| logo_xp = float(logo_xp_raw) if logo_xp_raw else None |
| logo_yp = float(logo_yp_raw) if logo_yp_raw else None |
| _logo_wp = float(request.form.get('logo_wp') or 0) |
|
|
| if not srt_text: |
| return jsonify(ok=False, msg='โ SRT text แแแพแญแแซ') |
|
|
| |
| video_url = (request.form.get('video_url') or '').strip() |
| video_bytes = None; video_fname = None |
| vf = request.files.get('video_file') |
| if vf and vf.filename: |
| video_bytes = vf.read() |
| video_fname = vf.filename |
|
|
| client_tid = (request.form.get('tid') or '').strip() |
| tid = client_tid or uuid.uuid4().hex[:8] |
| job_progress[tid] = {'pct': 2, 'msg': 'โณ Render แแแบแธแ
แฎแ
แฑแฌแแทแบแแฑแแแบโฆ', 'done': False} |
|
|
| def _prog(pct, msg): |
| cur = job_progress.get(tid, {}) |
| job_progress[tid] = { |
| 'pct': pct if pct is not None else cur.get('pct', 2), |
| 'msg': msg, 'done': False |
| } |
|
|
| def _bg(): |
| tmp_dir = str(BASE_DIR / f'temp_burn_{tid}') |
| os.makedirs(tmp_dir, exist_ok=True) |
| out_file = str(OUTPUT_DIR / f'final_{tid}.mp4') |
| vpath = None |
| try: |
| |
| |
| if vpath_key: |
| vpath_file = str(BASE_DIR / f'srt_vpath_{vpath_key}.txt') |
| if os.path.exists(vpath_file): |
| with open(vpath_file) as f: |
| cached_vpath = f.read().strip() |
| if os.path.exists(cached_vpath): |
| vpath = cached_vpath |
| _prog(10, '๐ Video cached โ แกแแแทแบแแผแ
แบแแผแฎ') |
| try: os.remove(vpath_file) |
| except: pass |
|
|
| if not vpath: |
| if video_bytes: |
| vpath = f'{tmp_dir}/input.mp4' |
| with open(vpath, 'wb') as wf: wf.write(video_bytes) |
| _prog(10, '๐ Video file แกแแแทแบแแผแ
แบแแผแฎ') |
| elif video_url: |
| def _dl(): |
| out_tmpl = f'{tmp_dir}/input.%(ext)s' |
| ytdlp_download(out_tmpl, video_url) |
| found = glob.glob(f'{tmp_dir}/input.*') |
| return found[0] if found else None |
| vpath = run_stage('download', _dl, tid, _prog, |
| 'โณ Download แแแบแธแ
แฎ', '๐ฅ Video แแฑแซแแบแธแแฏแแบโฆ') |
|
|
| if not vpath or not os.path.exists(vpath): |
| job_progress[tid] = {'pct': 0, 'msg': 'โ Video แแแฝแฑแทแแซ', 'error': True} |
| return |
|
|
| |
| if '-->' not in srt_text: |
| job_progress[tid] = {'pct': 0, 'msg': 'โ SRT parse แแแแซ', 'error': True} |
| return |
|
|
| vd = dur(vpath) |
| if vd <= 0: |
| job_progress[tid] = {'pct': 0, 'msg': 'โ Video duration read failed', 'error': True} |
| return |
|
|
| _prog(20, '๐ฌ Video แแผแแบแแแบแแฑแแแบโฆ') |
|
|
| |
| try: |
| _probe2 = subprocess.run( |
| f'ffprobe -v error -select_streams v:0 ' |
| f'-show_entries stream=width,height ' |
| f'-of csv=s=x:p=0 "{vpath}"', |
| shell=True, capture_output=True, text=True, timeout=15) |
| orig_w, orig_h = map(int, _probe2.stdout.strip().split('x')) |
| except Exception: |
| orig_w, orig_h = 1920, 1080 |
|
|
| |
| blur_x = int(blur_xp * orig_w) if blur_enabled and blur_wp > 0 else 0 |
| blur_y = int(blur_yp * orig_h) if blur_enabled and blur_hp > 0 else 0 |
| blur_w = int(blur_wp * orig_w) if blur_enabled and blur_wp > 0 else 0 |
| blur_h = int(blur_hp * orig_h) if blur_enabled and blur_hp > 0 else 0 |
|
|
| |
| pre_out = f'{tmp_dir}/pre.mp4' |
| base_vf = ['setpts=PTS-STARTPTS'] |
| if flip: base_vf.append('hflip') |
| if col: base_vf.append('eq=brightness=0.06:contrast=1.2:saturation=1.4') |
|
|
| if zoom_enabled: |
| zf = zoom_factor |
| base_vf.append( |
| f'scale=iw*{zf:.3f}:ih*{zf:.3f},' |
| f'crop=iw/{zf:.3f}:ih/{zf:.3f}' |
| f':(iw-iw/{zf:.3f})/2:(ih-ih/{zf:.3f})/2' |
| ) |
|
|
| base_vf.append('format=yuv420p') |
|
|
| |
| if blur_enabled and blur_w > 0 and blur_h > 0: |
| eff_w = orig_w / zoom_factor if zoom_enabled else orig_w |
| eff_h = orig_h / zoom_factor if zoom_enabled else orig_h |
| bx = max(0, min(blur_x, eff_w - 10)) |
| by = max(0, min(blur_y, eff_h - 10)) |
| bw = max(10, min(blur_w, eff_w - bx)) |
| bh = max(10, min(blur_h, eff_h - by)) |
| _br = max(1, min(10, bw // 4, bh // 4)) |
| base_vf.append( |
| f'split[_bA][_bB];' |
| f'[_bB]crop={bw}:{bh}:{bx}:{by},boxblur={_br}:{_br}[_bBl];' |
| f'[_bA][_bBl]overlay={bx}:{by}' |
| ) |
|
|
| if crop == '9:16': |
| base_vf.append('scale=720:1280:force_original_aspect_ratio=increase,crop=720:1280') |
| elif crop == '16:9': |
| base_vf.append('scale=1280:720:force_original_aspect_ratio=increase,crop=1280:720') |
| elif crop == '1:1': |
| base_vf.append('scale=720:720:force_original_aspect_ratio=increase,crop=720:720') |
| |
|
|
| vf_chain = ','.join(base_vf) |
|
|
| def _render(): |
| _run_ffmpeg( |
| f'ffmpeg -y -hide_banner -loglevel error ' |
| f'-i "{vpath}" ' |
| f'-vf "{vf_chain}" ' |
| f'-c:v libx264 -crf 24 -preset ultrafast -pix_fmt yuv420p ' |
| f'-c:a copy -t {vd:.3f} "{pre_out}"', |
| timeout=600 |
| ) |
| run_stage('ffmpeg', _render, tid, _prog, |
| 'โณ Render แแแบแธแ
แฎ', '๐ฌ Video render แแฏแแบแแฑแแแบโฆ') |
|
|
| |
| logo_path = None |
| if logo_bytes and logo_fname: |
| ext = Path(logo_fname).suffix or '.png' |
| logo_path = f'{tmp_dir}/logo{ext}' |
| with open(logo_path, 'wb') as wf: wf.write(logo_bytes) |
|
|
| logo_w_px = int(_logo_wp * orig_w) if _logo_wp > 0 else int(orig_w * 0.12) |
| logo_w_px = max(20, min(logo_w_px, orig_w)) |
| logo_x = int(logo_xp * orig_w) if logo_xp is not None else (orig_w - logo_w_px - 20) |
| logo_y = int(logo_yp * orig_h) if logo_yp is not None else 20 |
| wmk_x = int(wmk_xp * orig_w) if wmk_xp is not None else None |
| wmk_y = int(wmk_yp * orig_h) if wmk_yp is not None else None |
|
|
| |
| if logo_path and os.path.exists(logo_path): |
| logo_out = f'{tmp_dir}/logo_out.mp4' |
| lx = max(0, logo_x) |
| ly = max(0, logo_y) |
| lw = max(20, logo_w_px) |
| _run_ffmpeg( |
| f'ffmpeg -y -hide_banner -loglevel error ' |
| f'-i "{pre_out}" -i "{logo_path}" ' |
| f'-filter_complex "[1:v]scale={lw}:-2[_lg];[0:v][_lg]overlay={lx}:{ly}[v_out]" ' |
| f'-map "[v_out]" -map "0:a" ' |
| f'-c:v libx264 -crf 24 -preset ultrafast -pix_fmt yuv420p ' |
| f'-c:a copy "{logo_out}"', |
| timeout=300 |
| ) |
| os.replace(logo_out, pre_out) |
|
|
| |
| if wmk: |
| wmk_out = f'{tmp_dir}/wmk_out.mp4' |
| fs = max(16, int(wmk_fontsize)) |
| txt = wmk.replace("'", "").replace(":", "").replace("\\", "") |
| wx = wmk_x if wmk_x is not None else (orig_w - 220) |
| wy = wmk_y if wmk_y is not None else (orig_h - 80) |
| _run_ffmpeg( |
| f'ffmpeg -y -hide_banner -loglevel error ' |
| f'-i "{pre_out}" ' |
| f'-vf "drawtext=text=\'{txt}\':x={wx}:y={wy}:' |
| f'fontsize={fs}:fontcolor=white:shadowcolor=black:shadowx=2:shadowy=2" ' |
| f'-c:v libx264 -crf 24 -preset ultrafast -pix_fmt yuv420p ' |
| f'-c:a copy "{wmk_out}"', |
| timeout=300 |
| ) |
| os.replace(wmk_out, pre_out) |
|
|
| |
| _prog(80, '๐ค Subtitle burn แแฏแแบแแฑแแแบโฆ') |
| _CROP_RES_MAP = {'9:16':(720,1280),'16:9':(1280,720),'1:1':(720,720)} |
| _prx, _pry = _CROP_RES_MAP.get(crop, (0, 0)) |
| if _prx == 0: |
| try: |
| import json as _json |
| _pr = subprocess.run( |
| f'ffprobe -v error -select_streams v:0 -show_entries stream=width,height -of json "{pre_out}"', |
| shell=True, capture_output=True, text=True, timeout=30) |
| _vinfo = _json.loads(_pr.stdout) |
| _s = _vinfo['streams'][0] |
| _prx, _pry = int(_s['width']), int(_s['height']) |
| except Exception: |
| _prx, _pry = 720, 1280 |
| _sub_fs2 = max(20, round(sub_size * _pry)) |
| _burn_srt_direct(pre_out, srt_text, out_file, |
| position=sub_pos, fontsize=_sub_fs2, |
| color=sub_color, style=sub_style, |
| tmp_dir=tmp_dir, |
| play_res_x=_prx, play_res_y=_pry) |
|
|
| output_url = f'/outputs/final_{tid}.mp4' |
| job_progress[tid] = { |
| 'pct': 100, 'done': True, |
| 'msg': 'โ
แแผแฎแธแแซแแผแฎ!', |
| 'output_url': output_url, |
| } |
|
|
| |
| try: |
| save_video_history_entry(u, { |
| 'tid': tid, |
| 'output_url': output_url, |
| 'title': '(Translate SRT)', |
| 'source_url': video_url or '', |
| 'ts': time.time(), |
| 'created_at': datetime.now().strftime('%Y-%m-%d %H:%M'), |
| }) |
| except Exception as _he: |
| print(f'โ ๏ธ history save failed: {_he}') |
|
|
| except Exception as e: |
| import traceback; traceback.print_exc() |
| job_progress[tid] = {'pct': 0, 'msg': f'โ {e}', 'error': True} |
| finally: |
| shutil.rmtree(tmp_dir, ignore_errors=True) |
|
|
| threading.Thread(target=_bg, daemon=True).start() |
| return jsonify(ok=True, tid=tid) |
|
|
| except Exception as e: |
| import traceback; traceback.print_exc() |
| return jsonify(ok=False, msg=f'โ {e}') |
| |
| @app.route('/api/process_srt', methods=['POST']) |
| def api_process_srt(): |
| """ |
| ONE-CLICK SRT mode: Download video โ Gemini Myanmar SRT โ Render with subtitles. |
| Supports: zoom, audio boost, blur box, logo, watermark, crop/flip/color. |
| Stages: 'download' โ 'ai' โ 'ffmpeg' |
| """ |
| try: |
| u = (request.form.get('username') or '').strip() |
| if not u: |
| return jsonify(ok=False, msg='โ Not logged in') |
| is_adm = (u == ADMIN_U) |
| if not is_adm and load_db()['users'].get(u, {}).get('banned'): |
| return jsonify(ok=False, msg='โ Account banned') |
| if not is_adm and get_coins(u) < 1: |
| return jsonify(ok=False, msg='โ Not enough coins (need 1)') |
|
|
| video_url = (request.form.get('video_url') or '').strip() |
| cache_key = (request.form.get('cache_key') or '').strip() |
|
|
| |
| sub_pos = int(request.form.get('sub_pos', 85)) |
| sub_size = float(request.form.get('sub_size', 0.0547)) |
| sub_color = request.form.get('sub_color', 'white') |
| sub_style = request.form.get('sub_style', 'outline') |
|
|
| |
| crop = request.form.get('crop', 'original') |
| flip = request.form.get('flip', '0') == '1' |
| col = request.form.get('color', '0') == '1' |
|
|
| |
| zoom_enabled = request.form.get('zoom_enabled', '0') == '1' |
| zoom_factor = float(request.form.get('zoom_factor', 1.03)) |
| zoom_factor = max(1.01, min(zoom_factor, 1.30)) |
|
|
| |
| audio_boost = request.form.get('audio_boost', '0') == '1' |
|
|
| |
| blur_enabled = request.form.get('blur_enabled', '0') == '1' |
| blur_xp = float(request.form.get('blur_xp') or 0) |
| blur_yp = float(request.form.get('blur_yp') or 0) |
| blur_wp = float(request.form.get('blur_wp') or 0) |
| blur_hp = float(request.form.get('blur_hp') or 0) |
|
|
| |
| wmk = (request.form.get('watermark') or '').strip() |
| wmk_xp_raw = request.form.get('wmk_xp') |
| wmk_yp_raw = request.form.get('wmk_yp') |
| wmk_fontsize = int(request.form.get('wmk_fontsize', 35)) |
| wmk_xp = float(wmk_xp_raw) if wmk_xp_raw else None |
| wmk_yp = float(wmk_yp_raw) if wmk_yp_raw else None |
|
|
| |
| logo_bytes = None; logo_fname = None |
| lf = request.files.get('logo_file') |
| if lf and lf.filename: logo_bytes = lf.read(); logo_fname = lf.filename |
| logo_xp_raw = request.form.get('logo_xp') |
| logo_yp_raw = request.form.get('logo_yp') |
| logo_xp = float(logo_xp_raw) if logo_xp_raw else None |
| logo_yp = float(logo_yp_raw) if logo_yp_raw else None |
| _logo_wp = float(request.form.get('logo_wp') or 0) |
|
|
| |
| video_bytes = None |
| vf = request.files.get('video_file') |
| if vf and vf.filename: video_bytes = vf.read() |
|
|
| client_tid = (request.form.get('tid') or '').strip() |
| tid = client_tid or uuid.uuid4().hex[:8] |
| job_progress[tid] = {'pct': 2, 'msg': 'โณ แแแบแธแ
แฎแ
แฑแฌแแทแบแแฑแแแบโฆ', 'done': False} |
|
|
| def _prog(pct, msg): |
| cur = job_progress.get(tid, {}) |
| job_progress[tid] = { |
| 'pct': pct if pct is not None else cur.get('pct', 2), |
| 'msg': msg, 'done': False |
| } |
|
|
| def _bg(): |
| tmp_dir = str(BASE_DIR / f'temp_psrt_{tid}') |
| os.makedirs(tmp_dir, exist_ok=True) |
| out_file = str(OUTPUT_DIR / f'final_{tid}.mp4') |
| vpath = None; logo_path = None |
| try: |
| |
| if video_bytes: |
| vpath = f'{tmp_dir}/input.mp4' |
| with open(vpath, 'wb') as wf: wf.write(video_bytes) |
| _prog(8, '๐ Video file แกแแแทแบแแผแ
แบแแผแฎ') |
| elif cache_key: |
| with _preview_cache_lock: |
| cached = _preview_cache.get(cache_key) |
| if cached and os.path.exists(cached['file']): |
| vpath = cached['file'] |
| _prog(8, '๐ Cached video แกแแแทแบแแผแ
แบแแผแฎ') |
| elif video_url: |
| def _dl(): |
| out_tmpl = f'{tmp_dir}/input.%(ext)s' |
| ytdlp_download(out_tmpl, video_url) |
| found = glob.glob(f'{tmp_dir}/input.*') |
| return found[0] if found else None |
| vpath = run_stage('download', _dl, tid, _prog, |
| 'โณ Download แแแบแธแ
แฎ', '๐ฅ Video แแฑแซแแบแธแแฏแแบโฆ') |
| elif video_url: |
| def _dl2(): |
| out_tmpl = f'{tmp_dir}/input.%(ext)s' |
| ytdlp_download(out_tmpl, video_url) |
| found = glob.glob(f'{tmp_dir}/input.*') |
| return found[0] if found else None |
| vpath = run_stage('download', _dl2, tid, _prog, |
| 'โณ Download แแแบแธแ
แฎ', '๐ฅ Video แแฑแซแแบแธแแฏแแบโฆ') |
|
|
| if not vpath or not os.path.exists(vpath): |
| job_progress[tid] = {'pct': 0, 'msg': 'โ Video แแแฝแฑแทแแซ', 'error': True} |
| return |
|
|
| |
| try: |
| _probe = subprocess.run( |
| f'ffprobe -v error -select_streams v:0 ' |
| f'-show_entries stream=width,height ' |
| f'-of csv=s=x:p=0 "{vpath}"', |
| shell=True, capture_output=True, text=True, timeout=15) |
| orig_w, orig_h = map(int, _probe.stdout.strip().split('x')) |
| except Exception: |
| orig_w, orig_h = 1920, 1080 |
|
|
| |
| blur_x = int(blur_xp * orig_w) if blur_enabled and blur_wp > 0 else 0 |
| blur_y = int(blur_yp * orig_h) if blur_enabled and blur_hp > 0 else 0 |
| blur_w = int(blur_wp * orig_w) if blur_enabled and blur_wp > 0 else 0 |
| blur_h = int(blur_hp * orig_h) if blur_enabled and blur_hp > 0 else 0 |
|
|
| |
| logo_w_px = int(_logo_wp * orig_w) if _logo_wp > 0 else int(orig_w * 0.12) |
| logo_w_px = max(20, min(logo_w_px, orig_w)) |
|
|
| |
| wmk_x = int(wmk_xp * orig_w) if wmk_xp is not None else None |
| wmk_y = int(wmk_yp * orig_h) if wmk_yp is not None else None |
| logo_x = int(logo_xp * orig_w) if logo_xp is not None else (orig_w - logo_w_px - 20) |
| logo_y = int(logo_yp * orig_h) if logo_yp is not None else 20 |
|
|
| |
| if logo_bytes and logo_fname: |
| ext = Path(logo_fname).suffix or '.png' |
| logo_path = f'{tmp_dir}/logo{ext}' |
| with open(logo_path, 'wb') as wf: wf.write(logo_bytes) |
|
|
| |
| def _gen_srt(): |
| return _gemini_video_to_myanmar_srt(vpath, prog_fn=_prog) |
| srt_text = run_stage('ai', _gen_srt, tid, _prog, |
| 'โณ AI แแแบแธแ
แฎ', '๐ค Gemini SRT แแฏแแบแแฑแแแบโฆ') |
|
|
| blocks = _parse_srt(srt_text) |
| if not blocks: |
| job_progress[tid] = {'pct': 0, 'msg': 'โ SRT parse แแแแซ', 'error': True} |
| return |
|
|
| vd = dur(vpath) |
| if vd <= 0: |
| job_progress[tid] = {'pct': 0, 'msg': 'โ Video duration read failed', 'error': True} |
| return |
|
|
| _prog(65, '๐ฌ Video render แแผแแบแแแบแแฑแแแบโฆ') |
|
|
| |
| pre_out = f'{tmp_dir}/pre.mp4' |
|
|
| |
| base_vf = ['setpts=PTS-STARTPTS'] |
| if flip: base_vf.append('hflip') |
| if col: base_vf.append('eq=brightness=0.06:contrast=1.2:saturation=1.4') |
|
|
| |
| if zoom_enabled: |
| zf = zoom_factor |
| |
| base_vf.append( |
| f'scale=iw*{zf:.3f}:ih*{zf:.3f},' |
| f'crop=iw/{zf:.3f}:ih/{zf:.3f}' |
| f':(iw-iw/{zf:.3f})/2:(ih-ih/{zf:.3f})/2' |
| ) |
|
|
| base_vf.append('format=yuv420p') |
|
|
| |
| if blur_enabled and blur_w > 0 and blur_h > 0: |
| |
| eff_w = orig_w / zf if zoom_enabled else orig_w |
| eff_h = orig_h / zf if zoom_enabled else orig_h |
| bx = max(0, min(blur_x, eff_w - 10)) |
| by = max(0, min(blur_y, eff_h - 10)) |
| bw = max(10, min(blur_w, eff_w - bx)) |
| bh = max(10, min(blur_h, eff_h - by)) |
| _br = max(1, min(10, bw // 4, bh // 4)) |
| base_vf.append( |
| f'split[_bA][_bB];' |
| f'[_bB]crop={bw}:{bh}:{bx}:{by},boxblur={_br}:{_br}[_bBl];' |
| f'[_bA][_bBl]overlay={bx}:{by}' |
| ) |
|
|
| |
| if crop == '9:16': |
| base_vf.append('scale=720:1280:force_original_aspect_ratio=increase,crop=720:1280') |
| elif crop == '16:9': |
| base_vf.append('scale=1280:720:force_original_aspect_ratio=increase,crop=1280:720') |
| elif crop == '1:1': |
| base_vf.append('scale=720:720:force_original_aspect_ratio=increase,crop=720:720') |
|
|
| vf_chain = ','.join(base_vf) |
|
|
| |
| if audio_boost: |
| af_chain = ( |
| 'highpass=f=200,' |
| 'lowpass=f=8000,' |
| 'equalizer=f=3000:width_type=o:width=2:g=5,' |
| 'equalizer=f=200:width_type=o:width=1:g=-4,' |
| 'dynaudnorm=f=150:g=15,' |
| 'volume=2.2,' |
| 'loudnorm=I=-14:TP=-1.5:LRA=11' |
| ) |
| else: |
| af_chain = 'acopy' |
|
|
| def _render(): |
| _run_ffmpeg( |
| f'ffmpeg -y -hide_banner -loglevel error ' |
| f'-i "{vpath}" ' |
| f'-vf "{vf_chain}" ' |
| f'-af "{af_chain}" ' |
| f'-c:v libx264 -crf 24 -preset ultrafast -pix_fmt yuv420p ' |
| f'-c:a aac -ar 44100 -b:a 128k ' |
| f'-t {vd:.3f} "{pre_out}"', |
| timeout=600 |
| ) |
|
|
| run_stage('ffmpeg', _render, tid, _prog, |
| 'โณ Render แแแบแธแ
แฎ', '๐ฌ Video render แแฏแแบแแฑแแแบโฆ') |
|
|
| |
| if logo_path and os.path.exists(logo_path): |
| logo_out = f'{tmp_dir}/logo_out.mp4' |
| lx = max(0, logo_x) |
| ly = max(0, logo_y) |
| lw = max(20, logo_w_px) |
| _run_ffmpeg( |
| f'ffmpeg -y -hide_banner -loglevel error ' |
| f'-i "{pre_out}" -i "{logo_path}" ' |
| f'-filter_complex "[1:v]scale={lw}:-2[_lg];[0:v][_lg]overlay={lx}:{ly}[v_out]" ' |
| f'-map "[v_out]" -map "0:a" ' |
| f'-c:v libx264 -crf 24 -preset ultrafast -pix_fmt yuv420p ' |
| f'-c:a copy "{logo_out}"', |
| timeout=300 |
| ) |
| os.replace(logo_out, pre_out) |
|
|
| |
| if wmk: |
| wmk_out = f'{tmp_dir}/wmk_out.mp4' |
| fs = max(16, int(wmk_fontsize)) |
| txt = wmk.replace("'", "").replace(":", "").replace("\\", "") |
| wx = wmk_x if wmk_x is not None else (orig_w - 220) |
| wy = wmk_y if wmk_y is not None else (orig_h - 80) |
| _run_ffmpeg( |
| f'ffmpeg -y -hide_banner -loglevel error ' |
| f'-i "{pre_out}" ' |
| f'-vf "drawtext=text=\'{txt}\':x={wx}:y={wy}:' |
| f'fontsize={fs}:fontcolor=white:shadowcolor=black:shadowx=2:shadowy=2" ' |
| f'-c:v libx264 -crf 24 -preset ultrafast -pix_fmt yuv420p ' |
| f'-c:a copy "{wmk_out}"', |
| timeout=300 |
| ) |
| os.replace(wmk_out, pre_out) |
|
|
| |
| _prog(88, '๐ค Subtitle burn แแฏแแบแแฑแแแบโฆ') |
| _CROP_RES_MAP2 = {'9:16':(720,1280),'16:9':(1280,720),'1:1':(720,720)} |
| _prx2, _pry2 = _CROP_RES_MAP2.get(crop, (0, 0)) |
| if _prx2 == 0: |
| try: |
| import json as _json2 |
| _pr2 = subprocess.run( |
| f'ffprobe -v error -select_streams v:0 -show_entries stream=width,height -of json "{pre_out}"', |
| shell=True, capture_output=True, text=True, timeout=30) |
| _vinfo2 = _json2.loads(_pr2.stdout) |
| _s2 = _vinfo2['streams'][0] |
| _prx2, _pry2 = int(_s2['width']), int(_s2['height']) |
| except Exception: |
| _prx2, _pry2 = 720, 1280 |
| _sub_fs3 = max(20, round(sub_size * _pry2)) |
| _burn_srt_direct(pre_out, srt_text, out_file, |
| position=sub_pos, fontsize=_sub_fs3, |
| color=sub_color, style=sub_style, |
| tmp_dir=tmp_dir, |
| play_res_x=_prx2, play_res_y=_pry2) |
|
|
| |
| rem = -1 |
| if not is_adm: |
| _, rem = deduct(u, 1) |
|
|
| out_url2 = f'/outputs/final_{tid}.mp4' |
| job_progress[tid] = { |
| 'pct': 100, 'done': True, |
| 'msg': f'โ
แแผแฎแธแแซแแผแฎ! ({len(blocks)} lines)', |
| 'output_url': out_url2, |
| 'coins': rem, |
| 'total': len(blocks), |
| } |
|
|
| |
| try: |
| save_video_history_entry(u, { |
| 'tid': tid, |
| 'output_url': out_url2, |
| 'title': '(SRT Video)', |
| 'source_url': video_url or '', |
| 'ts': time.time(), |
| 'created_at': datetime.now().strftime('%Y-%m-%d %H:%M'), |
| }) |
| except Exception as _he: |
| print(f'โ ๏ธ history save failed: {_he}') |
|
|
| except Exception as e: |
| import traceback; traceback.print_exc() |
| job_progress[tid] = {'pct': 0, 'msg': f'โ {e}', 'error': True} |
| finally: |
| shutil.rmtree(tmp_dir, ignore_errors=True) |
|
|
| threading.Thread(target=_bg, daemon=True).start() |
| return jsonify(ok=True, tid=tid) |
|
|
| except Exception as e: |
| import traceback; traceback.print_exc() |
| return jsonify(ok=False, msg=f'โ {e}') |
| if __name__ == '__main__': |
| app.run(host='0.0.0.0', port=7860, debug=False, threaded=True) |