ttup / app.py
Phoe2004's picture
Upload app.py
dfda715 verified
import os, json, hashlib, uuid, random, re, glob, shutil, subprocess, threading, time, struct, wave
from collections import defaultdict
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# GLOBAL STAGE QUEUE SYSTEM
# Each stage runs strictly 1 job at a time.
# Jobs queue up and show position to user.
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# One threading.Event per stage slot โ€” clean handoff, no deadlock
# Pattern: acquire event โ†’ run โ†’ set event to release next waiter
_stage_locks = {
'download': threading.Lock(),
'whisper': threading.Lock(),
'ai': threading.Lock(),
'tts': threading.Lock(),
'ffmpeg': threading.Lock(),
}
# Queue position tracking
_stage_queues = {s: [] for s in _stage_locks}
_sq_lock = threading.Lock()
def _stage_enqueue(stage, tid):
with _sq_lock:
if tid not in _stage_queues[stage]:
_stage_queues[stage].append(tid)
def _stage_dequeue(stage, tid):
with _sq_lock:
try: _stage_queues[stage].remove(tid)
except ValueError: pass
def _stage_queue_len(stage):
with _sq_lock:
return len(_stage_queues[stage])
def run_stage(name, fn, tid, prog_fn, wait_msg, run_msg, *args, **kwargs):
"""
Strictly sequential stage execution with queue position display.
Uses threading.Lock โ€” only 1 job runs per stage at a time.
Other jobs spin-wait showing their position to the user.
"""
_stage_enqueue(name, tid)
lock = _stage_locks[name]
acquired = False
try:
while True:
# Try to grab the lock without blocking
acquired = lock.acquire(blocking=False)
if acquired:
# We own the lock โ€” dequeue self and run
_stage_dequeue(name, tid)
prog_fn(None, run_msg)
return fn(*args, **kwargs)
# Lock busy โ€” show queue position and wait
with _sq_lock:
q = _stage_queues[name]
pos = q.index(tid) if tid in q else 0
prog_fn(None, f'{wait_msg} ({pos} แ€šแ€ฑแ€ฌแ€€แ€บ แ€กแ€›แ€„แ€บแ€†แ€ฏแ€ถแ€ธ)')
time.sleep(0.8)
except Exception:
_stage_dequeue(name, tid)
raise
finally:
if acquired:
lock.release()
from datetime import datetime
from pathlib import Path
from flask import Flask, request, jsonify, send_from_directory, Response, redirect
try:
from openai import OpenAI
except ImportError:
OpenAI = None
try:
import whisper
except ImportError:
whisper = None
try:
import edge_tts, asyncio
except ImportError:
edge_tts = None
try:
from google import genai as ggenai
from google.genai import types as gtypes
except ImportError:
ggenai = None
# โ”€โ”€ APP SETUP โ”€โ”€
BASE_DIR = Path(__file__).parent
COOKIES_FILE = str(BASE_DIR / 'm_youtube_com_cookies.txt')
app = Flask(__name__)
# #5: YouTube/TikTok/Facebook/Instagram download โ€” hard cap 720p
def ytdlp_download(out_tmpl, video_url, timeout=1200):
"""yt-dlp download โ€” hard cap 720p max, platform-aware, cookies, robust fallback."""
url_lower = video_url.lower()
is_tiktok = 'tiktok.com' in url_lower
is_facebook = 'facebook.com' in url_lower or 'fb.watch' in url_lower
is_instagram = 'instagram.com' in url_lower
if is_tiktok or is_facebook or is_instagram:
# These platforms don't always have mp4+m4a splits โ€” use best available โ‰ค720p
fmt = (
'bestvideo[height<=720]+bestaudio'
'/best[height<=720]'
'/best'
)
else:
# YouTube and others โ€” prefer mp4+m4a for clean merge
fmt = (
'bestvideo[height<=720][ext=mp4]+bestaudio[ext=m4a]'
'/bestvideo[height<=720]+bestaudio'
'/best[height<=720][ext=mp4]'
'/best[height<=720]'
'/best[height<=480]'
'/best'
)
cmd = [
'yt-dlp', '--no-playlist',
'-f', fmt,
'--merge-output-format', 'mp4',
'--no-check-certificates',
]
if os.path.exists(COOKIES_FILE):
cmd += ['--cookies', COOKIES_FILE]
cmd += ['-o', out_tmpl, video_url]
print(f'[ytdlp] Running: {" ".join(cmd)}')
subprocess.run(cmd, check=True, timeout=timeout)
app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024
OUTPUT_DIR = BASE_DIR / 'outputs'
OUTPUT_DIR.mkdir(exist_ok=True)
VIDEO_HISTORY_DIR = BASE_DIR / 'video_history'
VIDEO_HISTORY_DIR.mkdir(exist_ok=True)
VIDEO_HISTORY_TTL = 5 * 3600 # 5 hours
_vh_lock = threading.Lock()
def _vh_file(username):
safe = re.sub(r'[^a-zA-Z0-9_\-]', '_', username)
return VIDEO_HISTORY_DIR / f'{safe}.json'
def load_video_history(username):
f = _vh_file(username)
try:
with open(f, encoding='utf-8') as fh:
return json.load(fh)
except:
return []
def save_video_history_entry(username, entry):
"""Add entry to user's video history, enforce 5-hour TTL, keep latest 50."""
with _vh_lock:
f = _vh_file(username)
try:
with open(f, encoding='utf-8') as fh:
records = json.load(fh)
except:
records = []
now = time.time()
# Expire old entries
records = [r for r in records if now - r.get('ts', 0) < VIDEO_HISTORY_TTL]
records.insert(0, entry)
records = records[:50]
with open(f, 'w', encoding='utf-8') as fh:
json.dump(records, fh, ensure_ascii=False)
def cleanup_old_history():
"""Called periodically โ€” remove expired entries from all history files."""
now = time.time()
for hf in VIDEO_HISTORY_DIR.glob('*.json'):
try:
with open(hf, encoding='utf-8') as fh:
records = json.load(fh)
kept = [r for r in records if now - r.get('ts', 0) < VIDEO_HISTORY_TTL]
with open(hf, 'w', encoding='utf-8') as fh:
json.dump(kept, fh, ensure_ascii=False)
except:
pass
# โ”€โ”€ AUTO CLEANUP: delete outputs/final_*.mp4 + expired history every 30min โ”€โ”€
def _auto_cleanup_loop():
while True:
try:
now = time.time()
deleted = 0
for fp in OUTPUT_DIR.glob('final_*.mp4'):
try:
if now - fp.stat().st_mtime > VIDEO_HISTORY_TTL:
fp.unlink(missing_ok=True)
deleted += 1
except:
pass
if deleted:
print(f'๐Ÿ—‘๏ธ Auto-cleanup: deleted {deleted} old output(s)')
cleanup_old_history()
except Exception as e:
print(f'โš ๏ธ cleanup error: {e}')
time.sleep(1800) # every 30 minutes
threading.Thread(target=_auto_cleanup_loop, daemon=True).start()
# โ”€โ”€ JOB PROGRESS (for SSE real-time updates) โ”€โ”€
job_progress = {}
# โ”€โ”€ PREVIEW CACHE โ€” reuse downloaded file for process_all โ”€โ”€
_preview_cache = {} # url_md5 โ†’ {'file': path, 'dir': tmp_dir, 'ts': float}
_preview_cache_lock = threading.Lock()
PREVIEW_CACHE_TTL = 1800 # 30 minutes
def _url_cache_key(url):
import hashlib
return hashlib.md5(url.encode()).hexdigest()[:12]
def _cleanup_preview_cache():
now = time.time()
with _preview_cache_lock:
expired = [k for k, v in _preview_cache.items() if now - v['ts'] > PREVIEW_CACHE_TTL]
for k in expired:
shutil.rmtree(_preview_cache[k]['dir'], ignore_errors=True)
del _preview_cache[k]
# โ”€โ”€ CPU QUEUE โ€” 5 second gap between jobs โ”€โ”€
def cpu_queue_wait():
pass # removed โ€” stage queue handles ordering
# โ”€โ”€ DB CONFIG โ”€โ”€
DB_FILE = str(BASE_DIR / 'users_db.json')
HF_TOKEN = os.getenv('HF_TOKEN', '')
HF_REPO = 'Phoe2004/MovieRecapDB'
ADMIN_U = os.getenv('ADMIN_USERNAME', '')
ADMIN_P = os.getenv('ADMIN_PASSWORD', '')
ADMIN_TG = os.getenv('ADMIN_TG_USERNAME', '')
GEMINI_KEYS = [os.getenv(f'GEMINI_API_KEY_{i}') for i in range(1, 11)]
DEEPSEEK_KEYS = [os.getenv('DEEPSEEK_API_KEY')]
# โ”€โ”€ GOOGLE OAUTH โ”€โ”€
GOOGLE_CLIENT_ID = os.getenv('GOOGLE_CLIENT_ID', '')
GOOGLE_CLIENT_SECRET = os.getenv('GOOGLE_CLIENT_SECRET', '')
GOOGLE_REDIRECT_URI = os.getenv('GOOGLE_REDIRECT_URI', 'https://recap.psonline.shop/auth/google/callback')
ADMIN_TELEGRAM_CHAT_ID = os.getenv('ADMIN_TELEGRAM_CHAT_ID', '')
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', '')
_rr_idx = 0
_rr_lock = threading.Lock()
def next_gemini_key():
global _rr_idx
valid = [k for k in GEMINI_KEYS if k]
if not valid: return None, []
with _rr_lock:
idx = _rr_idx % len(valid)
_rr_idx += 1
primary = valid[idx]
ordered = valid[idx:] + valid[:idx]
return primary, ordered
# โ”€โ”€ DB HELPERS โ”€โ”€
def pull_db():
if not HF_TOKEN:
print('โš ๏ธ pull: HF_TOKEN missing')
return
try:
from huggingface_hub import hf_hub_download
import traceback
path = hf_hub_download(
repo_id=HF_REPO, filename='users_db.json', repo_type='dataset',
token=HF_TOKEN, local_dir=str(BASE_DIR), force_download=True,
)
dest = str(BASE_DIR / 'users_db.json')
if path != dest:
import shutil as _shutil
_shutil.copy2(path, dest)
print('โœ… DB pulled from HuggingFace')
except Exception as e:
import traceback
print(f'โš ๏ธ pull failed: {e}')
traceback.print_exc()
_push_lock = threading.Lock()
def push_db():
if not HF_TOKEN:
print('โš ๏ธ push: HF_TOKEN missing')
return
with _push_lock:
for attempt in range(4):
try:
from huggingface_hub import HfApi
api = HfApi(token=HF_TOKEN)
api.upload_file(
path_or_fileobj=DB_FILE, path_in_repo='users_db.json',
repo_id=HF_REPO, repo_type='dataset',
commit_message=f'db {datetime.now().strftime("%Y%m%d_%H%M%S")}',
)
print(f'โœ… DB pushed (attempt {attempt+1})')
return
except Exception as e:
print(f'โš ๏ธ push attempt {attempt+1} failed: {e}')
if attempt < 3:
time.sleep(3 * (attempt + 1))
print('โŒ push_db: all retries failed')
def load_db():
if not os.path.exists(DB_FILE): return {'users': {}}
try:
with open(DB_FILE, encoding='utf-8') as f: return json.load(f)
except: return {'users': {}}
def save_db(db):
with open(DB_FILE, 'w', encoding='utf-8') as f:
json.dump(db, f, ensure_ascii=False, indent=2)
threading.Thread(target=push_db, daemon=True).start()
# โ”€โ”€ PAYMENTS DB โ”€โ”€
PAYMENTS_DB_FILE = str(BASE_DIR / 'payments_db.json')
KBZ_NAME = os.getenv('KBZ_NAME', 'Phoe Shan')
KBZ_NUMBER = os.getenv('KBZ_NUMBER', '09679871352')
KBZ_QR_URL = os.getenv('KBZ_QR_URL', '') # static QR image URL (optional)
SCB_NAME = os.getenv('SCB_NAME', 'Phoe Shan')
SCB_NUMBER = os.getenv('SCB_NUMBER', '3664768187')
PROMPTPAY_NUM = os.getenv('PROMPTPAY_NUMBER', '0951236012')
TRUEMONEY_NAME = os.getenv('TRUEMONEY_NAME', 'Phoe Shan')
TRUEMONEY_NUM = os.getenv('TRUEMONEY_NUMBER', '0951236012')
TRUEMONEY_QR_URL = os.getenv('TRUEMONEY_QR_URL', '') # static TrueMoney QR image URL (optional)
def load_payments_db():
if not os.path.exists(PAYMENTS_DB_FILE):
return {'payments': []}
try:
with open(PAYMENTS_DB_FILE, encoding='utf-8') as f:
return json.load(f)
except:
return {'payments': []}
def save_payments_db(db):
with open(PAYMENTS_DB_FILE, 'w', encoding='utf-8') as f:
json.dump(db, f, ensure_ascii=False, indent=2)
def hp(p): return hashlib.sha256(p.encode()).hexdigest()
ADJ = ['Red','Blue','Gold','Star','Sky','Fire','Moon','Cool','Ice','Dark','Neon','Wild']
NOUN = ['Tiger','Dragon','Wolf','Hawk','Lion','Fox','Eagle','Storm','Flash','Ghost']
def gen_uname():
db = load_db()
for _ in range(60):
u = random.choice(ADJ)+random.choice(NOUN)+str(random.randint(10,999))
if u not in db['users']: return u
return 'User'+str(uuid.uuid4())[:6].upper()
def login_user(u, p):
if u == ADMIN_U and p == ADMIN_P: return True, 'โœ… Admin', -1
db = load_db()
if u not in db['users']: return False, 'โŒ Username not found', 0
stored = db['users'][u].get('password', '')
if stored and stored != hp(p): return False, 'โŒ Wrong password', 0
db['users'][u]['last_login'] = datetime.now().isoformat()
save_db(db)
return True, 'โœ… Logged in', db['users'][u]['coins']
def get_coins(u): return load_db()['users'].get(u, {}).get('coins', 0)
def deduct(u, n):
db = load_db()
if u not in db['users']: return False, 0
if db['users'][u]['coins'] < n: return False, db['users'][u]['coins']
db['users'][u]['coins'] -= n; save_db(db)
return True, db['users'][u]['coins']
def add_coins_fn(u, n, source=None):
db = load_db()
if u not in db['users']: return 'โŒ User not found'
db['users'][u]['coins'] += int(n)
db['users'][u]['free_trial'] = False # admin/payment coins โ€” remove free trial
save_db(db)
return f"โœ… +{n} โ†’ {db['users'][u]['coins']} ๐Ÿช™"
def set_coins_fn(u, n):
db = load_db()
if u not in db['users']: return 'โŒ User not found'
db['users'][u]['coins'] = int(n); save_db(db)
return f'โœ… Coin = {n} ๐Ÿช™'
def ban_fn(u, ban=True):
db = load_db()
if u not in db['users']: return 'โŒ User not found'
db['users'][u]['banned'] = ban
save_db(db)
return f"โœ… {'Banned' if ban else 'Unbanned'}: {u}"
def upd_stat(u, t):
db = load_db()
if u not in db['users']: return
k = 'total_transcripts' if t == 'tr' else 'total_videos'
db['users'][u][k] = db['users'][u].get(k, 0) + 1; save_db(db)
def create_user_fn(uname, coins, caller):
if caller != ADMIN_U: return 'โŒ Not admin', ''
uname = (uname or '').strip() or gen_uname()
db = load_db()
if uname in db['users']: return f"โŒ '{uname}' already exists", ''
db['users'][uname] = {'password': '', 'coins': int(coins),
'created_at': datetime.now().isoformat(), 'last_login': None,
'total_transcripts': 0, 'total_videos': 0}
save_db(db); return f"โœ… '{uname}' created", uname
# โ”€โ”€ AI โ”€โ”€
# โ”€โ”€ Language-aware system prompts โ”€โ”€
def get_sys_prompt(ct, vo_lang='my'):
"""
vo_lang: 'my' = Myanmar (default), 'th' = Thai, 'en' = English
"""
if vo_lang == 'th':
# Thai language prompts
if ct == 'Medical/Health':
return (
"เธ„เธธเธ“เธ„เธทเธญเธœเธนเน‰เนเธ›เธฅเธ”เน‰เธฒเธ™เธเธฒเธฃเนเธžเธ—เธขเนŒเธ เธฒเธฉเธฒเน„เธ—เธข โ€” เธ เธฒเธฉเธฒเน„เธ—เธขเธ—เธตเนˆเธžเธนเธ”เนƒเธ™เธŠเธตเธงเธดเธ•เธ›เธฃเธฐเธˆเธณเธงเธฑเธ™\n"
"Rules: 100% เธ เธฒเธฉเธฒเน„เธ—เธข | เน„เธกเนˆเนƒเธŠเน‰เธ เธฒเธฉเธฒเธ—เธฒเธ‡เธเธฒเธฃเธกเธฒเธเน€เธเธดเธ™เน„เธ› | เน€เธ™เธทเน‰เธญเธซเธฒเธ•เน‰เธ™เธ‰เธšเธฑเธšเน€เธ—เนˆเธฒเธ™เธฑเน‰เธ™\n"
"เนƒเธŠเน‰เธ•เธฑเธงเน€เธฅเธ‚เน„เธ—เธข: 1=เธซเธ™เธถเนˆเธ‡, 2=เธชเธญเธ‡, 10=เธชเธดเธš, 100=เธฃเน‰เธญเธข, 1000=เธžเธฑเธ™\n"
"Format EXACTLY:\n[SCRIPT](full thai script here)\n[TITLE](short title)\n[HASHTAGS](exactly 5 hashtags e.g. #เธชเธธเธ‚เธ เธฒเธž #thailand #health #viral #trending)"
)
else:
return (
"เธ„เธธเธ“เธ„เธทเธญเธ™เธฑเธเน€เธ‚เธตเธขเธ™เธชเธ„เธฃเธดเธ›เธ•เนŒเธชเธฃเธธเธ›เธซเธ™เธฑเธ‡เธ เธฒเธฉเธฒเน„เธ—เธข โ€” เน€เธฅเนˆเธฒเนเธšเธšเธชเธ™เธธเธ เธ เธฒเธฉเธฒเธžเธนเธ”เธ˜เธฃเธฃเธกเธŠเธฒเธ•เธด\n"
"Rules: 100% เธ เธฒเธฉเธฒเน„เธ—เธข | เน„เธกเนˆเนƒเธŠเน‰เธ เธฒเธฉเธฒเธ—เธฒเธ‡เธเธฒเธฃ | เน€เธ™เธทเน‰เธญเธซเธฒเธ•เน‰เธ™เธ‰เธšเธฑเธšเน€เธ—เนˆเธฒเธ™เธฑเน‰เธ™\n"
"เนƒเธŠเน‰เธ•เธฑเธงเน€เธฅเธ‚เน„เธ—เธข: 1=เธซเธ™เธถเนˆเธ‡, 2=เธชเธญเธ‡, 10=เธชเธดเธš, 100=เธฃเน‰เธญเธข, 1000=เธžเธฑเธ™\n"
"เนเธ›เธฅเน€เธ™เธทเน‰เธญเธซเธฒเธ•เนˆเธญเน„เธ›เธ™เธตเน‰เน€เธ›เน‡เธ™เธ เธฒเธฉเธฒเน„เธ—เธข (เธชเน„เธ•เธฅเนŒเน€เธฅเนˆเธฒเน€เธฃเธทเนˆเธญเธ‡ movie recap เธ—เธตเนˆเธชเธ™เธธเธ)\n"
"เธ•เธญเธšเน€เธ›เน‡เธ™เธ เธฒเธฉเธฒเน„เธ—เธขเน€เธ—เนˆเธฒเธ™เธฑเน‰เธ™ เธซเน‰เธฒเธกเธกเธตเธ เธฒเธฉเธฒเธญเธฑเธ‡เธเธคเธฉเนƒเธ™เธชเธ„เธฃเธดเธ›เธ•เนŒ\n"
"Format: [SCRIPT](script)[TITLE](title โ‰ค10 words)[HASHTAGS]#movierecap #thailand"
)
elif vo_lang == 'en':
# English language prompts
if ct == 'Medical/Health':
return (
"You are an English medical content translator โ€” use clear, conversational English.\n"
"Rules: 100% English | conversational tone | original content only\n"
"Write numbers as words: 1=one, 2=two, 10=ten, 100=one hundred\n"
"Format EXACTLY:\n[SCRIPT](full english script here)\n[TITLE](short title)\n[HASHTAGS](exactly 5 hashtags e.g. #health #medical #wellness #viral #trending)"
)
else:
return (
"You are an English movie recap script writer โ€” engaging storytelling tone, conversational.\n"
"Rules: 100% English | conversational not formal | original content only\n"
"Write numbers as words: 1=one, 2=two, 10=ten, 100=one hundred\n"
"Translate and retell the following content in English (movie recap storytelling style)\n"
"Format: [SCRIPT](script)[TITLE](title โ‰ค10 words)[HASHTAGS]#movierecap #english"
)
else:
# Myanmar (default)
if ct == 'Medical/Health':
return (
"แ€™แ€ผแ€”แ€บแ€™แ€ฌ แ€†แ€ฑแ€ธแ€˜แ€€แ€บ แ€˜แ€ฌแ€žแ€ฌแ€•แ€ผแ€”แ€บแ€žแ€ฐ โ€” spoken Myanmar\n"
"Rules: 100% แ€™แ€ผแ€”แ€บแ€™แ€ฌ | แ€€แ€ปแ€ฑแ€ฌแ€„แ€บแ€ธแ€žแ€ฏแ€ถแ€ธแ€…แ€ฌแ€•แ€ฑแ€™แ€žแ€ฏแ€ถแ€ธแ€› | แ€•แ€ฏแ€’แ€บแ€™แ€แ€ญแ€ฏแ€„แ€บแ€ธ แ‹\n"
"แ€‚แ€แ€”แ€บแ€ธแ€™แ€ปแ€ฌแ€ธแ€€แ€ญแ€ฏ แ€™แ€ผแ€”แ€บแ€™แ€ฌแ€…แ€€แ€ฌแ€ธแ€–แ€ผแ€„แ€ทแ€บแ€žแ€ฌ แ€›แ€ฑแ€ธแ€•แ€ซ โ€” แ€ฅแ€•แ€™แ€ฌ 1=แ€แ€…แ€บ, 2=แ€”แ€พแ€…แ€บ, 10=แ€แ€…แ€บแ€†แ€šแ€บ, 12=แ€แ€…แ€บแ€†แ€šแ€ทแ€บแ€”แ€พแ€…แ€บ, 20=แ€”แ€พแ€…แ€บแ€†แ€šแ€บ, 100=แ€แ€…แ€บแ€›แ€ฌ, 1000=แ€แ€…แ€บแ€‘แ€ฑแ€ฌแ€„แ€บ โ€” Arabic digit แ€™แ€žแ€ฏแ€ถแ€ธแ€›\n"
"English transcriptแ€กแ€แ€ญแ€ฏแ€„แ€บแ€ธ แ€กแ€•แ€ญแ€ฏแ€กแ€œแ€ญแ€ฏ 100%แ€™แ€›แ€พแ€ญแ€•แ€ฒ movie recap styleแ€˜แ€ฌแ€žแ€ฌแ€•แ€ผแ€”แ€บแ€•แ€ซ แ€•แ€ผแ€ฎแ€แ€ฑแ€ฌแ€ท -แ€แ€šแ€บ-แ€…แ€€แ€ฌแ€ธแ€œแ€ฏแ€ถแ€ธแ€€แ€ญแ€ฏ 40%แ€žแ€ฌแ€žแ€ฏแ€ถแ€ธแ€•แ€ซ\n"
"Format EXACTLY:\n[SCRIPT](full myanmar script here)\n[TITLE](short title)\n[HASHTAGS](exactly 5 hashtags e.g. #แ€€แ€ปแ€”แ€บแ€ธแ€™แ€ฌแ€›แ€ฑแ€ธ #myanmar #health #viral #trending)"
)
else:
return (
"แ€™แ€ผแ€”แ€บแ€™แ€ฌ movie recap script แ€›แ€ฑแ€ธแ€žแ€ฌแ€ธแ€žแ€ฐ โ€” spoken Myanmar (แ€”แ€ฑแ€ทแ€…แ€‰แ€บแ€•แ€ผแ€ฑแ€ฌแ€†แ€ญแ€ฏแ€™แ€พแ€ฏแ€˜แ€ฌแ€žแ€ฌ)\n"
"Rules: 100% แ€™แ€ผแ€”แ€บแ€™แ€ฌแ€˜แ€ฌแ€žแ€ฌ | แ€€แ€ปแ€ฑแ€ฌแ€„แ€บแ€ธแ€žแ€ฏแ€ถแ€ธแ€…แ€ฌแ€•แ€ฑแ€™แ€žแ€ฏแ€ถแ€ธแ€› | แ€™แ€ฐแ€œcontent แ€žแ€ฌ | แ€•แ€ฏแ€’แ€บแ€™แ€แ€ญแ€ฏแ€„แ€บแ€ธ แ‹\n"
"แ€‚แ€แ€”แ€บแ€ธแ€™แ€ปแ€ฌแ€ธแ€€แ€ญแ€ฏ แ€™แ€ผแ€”แ€บแ€™แ€ฌแ€…แ€€แ€ฌแ€ธแ€–แ€ผแ€„แ€ทแ€บแ€žแ€ฌ แ€›แ€ฑแ€ธแ€•แ€ซ โ€” แ€ฅแ€•แ€™แ€ฌ 1=แ€แ€…แ€บ, 2=แ€”แ€พแ€…แ€บ, 10=แ€แ€…แ€บแ€†แ€šแ€บ, 12=แ€แ€…แ€บแ€†แ€šแ€ทแ€บแ€”แ€พแ€…แ€บ, 20=แ€”แ€พแ€…แ€บแ€†แ€šแ€บ, 100=แ€แ€…แ€บแ€›แ€ฌ, 1000=แ€แ€…แ€บแ€‘แ€ฑแ€ฌแ€„แ€บ โ€” Arabic digit แ€™แ€žแ€ฏแ€ถแ€ธแ€›\n"
"Translate the following content into Burmese (storytelling tone movie recap tone and keep original content)\n"
"แ€™แ€ผแ€”แ€บแ€™แ€ฌแ€œแ€ญแ€ฏแ€•แ€ฒ แ€–แ€ผแ€ฑแ€•แ€ฑแ€ธแ€•แ€ซแ‹ แ€กแ€„แ€บแ€นแ€‚แ€œแ€ญแ€•แ€บแ€œแ€ญแ€ฏ แ€˜แ€ฌแ€™แ€พแ€™แ€•แ€ผแ€”แ€บแ€”แ€ฒแ€ทแ‹แ€กแ€„แ€บแ€นแ€‚แ€œแ€ญแ€•แ€บแ€…แ€€แ€ฌแ€ธแ€œแ€ฏแ€ถแ€ธแ€แ€ฝแ€ฑแ€€แ€ญแ€ฏแ€แ€ฝแ€ฑแ€ทแ€›แ€„แ€บแ€œแ€Šแ€บแ€ธ แ€™แ€ผแ€”แ€บแ€™แ€ฌแ€œแ€ญแ€ฏแ€•แ€ฒ แ€˜แ€ฌแ€žแ€ฌแ€•แ€ผแ€”แ€บแ€•แ€ผแ€ฎแ€ธ แ€–แ€ผแ€ฑแ€•แ€ฑแ€ธแ€•แ€ซ)\n"
"แ€€แ€ผแ€ญแ€šแ€ฌแ€†แ€ฏแ€ถแ€ธแ€žแ€แ€บ: -แ€แ€šแ€บ แ€€แ€ญแ€ฏ 50%แ€žแ€ฌแ€žแ€ฏแ€ถแ€ธแ€•แ€ซแ‹ แ€€แ€ปแ€”แ€บ 50% แ€™แ€พแ€ฌ -แ€แ€ฌแ€•แ€ฑแ€ซแ€ท / -แ€แ€ฒแ€ทแ€แ€šแ€บ / -แ€œแ€ญแ€ฏแ€€แ€บแ€แ€šแ€บ / -แ€”แ€ฑแ€แ€ฌ แ€€แ€ญแ€ฏแ€œแ€ฒ แ€›แ€ฑแ€ฌแ€žแ€ฏแ€ถแ€ธแ€•แ€ฑแ€ธแ€•แ€ซแ‹\n"
"Format: [SCRIPT](script)[TITLE](title โ‰ค10 words)[HASHTAGS]#movierecap #แ€™แ€ผแ€”แ€บแ€™แ€ฌ"
)
# Keep legacy constants for backward compat
SYS_MOVIE = get_sys_prompt('Movie Recap', 'my')
SYS_MED = get_sys_prompt('Medical/Health', 'my')
NUM_TO_MM_RULE = (
"แ€‚แ€แ€”แ€บแ€ธแ€™แ€ปแ€ฌแ€ธแ€€แ€ญแ€ฏ แ€™แ€ผแ€”แ€บแ€™แ€ฌแ€…แ€€แ€ฌแ€ธแ€–แ€ผแ€„แ€ทแ€บแ€žแ€ฌ แ€›แ€ฑแ€ธแ€•แ€ซ โ€” "
"แ€ฅแ€•แ€™แ€ฌ 1=แ€แ€…แ€บ, 2=แ€”แ€พแ€…แ€บ, 10=แ€แ€…แ€บแ€†แ€šแ€บ, 12=แ€แ€…แ€บแ€†แ€šแ€ทแ€บแ€”แ€พแ€…แ€บ, 20=แ€”แ€พแ€…แ€บแ€†แ€šแ€บ, "
"100=แ€แ€…แ€บแ€›แ€ฌ, 1000=แ€แ€…แ€บแ€‘แ€ฑแ€ฌแ€„แ€บ โ€” Arabic digit แ€™แ€žแ€ฏแ€ถแ€ธแ€›แ‹"
)
def get_num_rule(vo_lang='my'):
if vo_lang == 'th':
return "เนƒเธŠเน‰เธ•เธฑเธงเน€เธฅเธ‚เน„เธ—เธขเน€เธ—เนˆเธฒเธ™เธฑเน‰เธ™: 1=เธซเธ™เธถเนˆเธ‡, 2=เธชเธญเธ‡, 10=เธชเธดเธš, 20=เธขเธตเนˆเธชเธดเธš, 100=เธฃเน‰เธญเธข, 1000=เธžเธฑเธ™ เธซเน‰เธฒเธกเนƒเธŠเน‰เธ•เธฑเธงเน€เธฅเธ‚เธญเธฒเธฃเธšเธดเธ"
elif vo_lang == 'en':
return "Write all numbers as English words: 1=one, 2=two, 10=ten, 20=twenty, 100=one hundred, 1000=one thousand โ€” no Arabic digits."
else:
return NUM_TO_MM_RULE
# โ”€โ”€ Transcript: gemini-3-flash + gemini-2.5-flash (round-robin)
GEMINI_MODELS_TRANSCRIPT = [
'gemini-3-flash', # newest
'gemini-2.5-flash', # stable
]
# โ”€โ”€ Caption: gemini-3.1-flash-lite + gemini-2.5-flash-lite (round-robin)
GEMINI_MODELS_CAPTION = [
'gemini-3.1-flash-lite', # newest lite
'gemini-2.5-flash-lite', # stable lite
]
_mdl_tr_idx = 0 # transcript model round-robin index
_mdl_cap_idx = 0 # caption model round-robin index
_mdl_lock = threading.Lock()
def next_model(purpose='transcript'):
"""Round-robin model selector โ€” spins like a spinner per call."""
global _mdl_tr_idx, _mdl_cap_idx
models = GEMINI_MODELS_TRANSCRIPT if purpose == 'transcript' else GEMINI_MODELS_CAPTION
with _mdl_lock:
if purpose == 'transcript':
idx = _mdl_tr_idx % len(models)
_mdl_tr_idx += 1
else:
idx = _mdl_cap_idx % len(models)
_mdl_cap_idx += 1
# Return spinner-ordered list starting from current idx
return models[idx:] + models[:idx]
def call_api(msgs, api='Gemini', purpose='transcript'):
"""
purpose='transcript' โ†’ GEMINI_MODELS_TRANSCRIPT round-robin (gemini-3-flash โ†” gemini-2.5-flash)
purpose='caption' โ†’ GEMINI_MODELS_CAPTION round-robin (gemini-3.1-flash-lite โ†” gemini-2.5-flash-lite)
"""
if api == 'DeepSeek':
keys, base = DEEPSEEK_KEYS, 'https://api.deepseek.com'
models = ['deepseek-chat']
else:
keys, base = GEMINI_KEYS, 'https://generativelanguage.googleapis.com/v1beta/openai/'
models = next_model(purpose) # spinner: returns rotation-ordered list
valid = [(i+1, k) for i, k in enumerate(keys) if k]
if not valid: raise Exception('No API Key available')
if api == 'Gemini':
_, ordered = next_gemini_key()
valid = sorted(valid, key=lambda x: ordered.index(x[1]) if x[1] in ordered else 99)
else:
random.shuffle(valid)
last_err = None
for n, k in valid:
for mdl in models:
try:
r = OpenAI(api_key=k, base_url=base, timeout=300.0).chat.completions.create(
model=mdl, messages=msgs, max_tokens=16384)
if r and r.choices and r.choices[0].message.content:
print(f'โœ… call_api key={n} model={mdl} purpose={purpose}')
return r.choices[0].message.content.strip(), f'โœ… Key{n} ({mdl})'
except Exception as e:
err = str(e); last_err = e
if '400' in err: continue
if '401' in err or '403' in err: break
if '429' in err: time.sleep(2); break
continue
raise Exception(f'โŒ All keys/models failed ({purpose}): {last_err}')
def parse_out(text):
sc, ti, ht = '', '', ''
m = re.search(r'\[SCRIPT\](.*?)\[TITLE\]', text, re.DOTALL)
if m: sc = m.group(1).strip()
m2 = re.search(r'\[TITLE\](.*?)(\[HASHTAGS\]|$)', text, re.DOTALL)
m3 = re.search(r'\[HASHTAGS\](.*?)$', text, re.DOTALL)
if m2: ti = m2.group(1).strip()
if m3: ht = m3.group(1).strip()
if not sc: sc = re.sub(r'\[SCRIPT\]|\[TITLE\]|\[HASHTAGS\]', '', text.split('[TITLE]')[0]).strip()
tags = re.findall(r'#\S+', ht)
if len(tags) < 5:
defaults = ['#myanmar','#viral','#trending','#foryou','#entertainment']
tags = tags + [t for t in defaults if t not in tags]
ht = ' '.join(tags[:5])
return sc, ti, ht
def split_txt(txt, vo_lang='my'):
if vo_lang == 'th':
parts = re.split(r'[ใ€‚\n]', txt)
return [s.strip() for s in parts if s.strip()] or [txt]
elif vo_lang == 'en':
parts = re.split(r'(?<=[.!?])\s+', txt)
return [s.strip() for s in parts if s.strip()] or [txt]
else:
return [s.strip() + 'แ‹' for s in re.split(r'[แ‹]', txt) if s.strip()] or [txt]
def dur(fp):
try:
r = subprocess.run(
f'ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 "{fp}"',
shell=True, capture_output=True, text=True)
return float(r.stdout.strip())
except: return 0
# โ”€โ”€ ASYNC HELPERS โ”€โ”€
def run_tts_sync(sentences, voice_id, rate, tmp_dir):
async def _run():
sil = f'{tmp_dir}/sil.mp3'
proc = await asyncio.create_subprocess_shell(
f'ffmpeg -f lavfi -i anullsrc=r=24000:cl=mono -t 0.2 -c:a libmp3lame -q:a 2 "{sil}" -y',
stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL)
await proc.wait()
# Parallel TTS โ€” max 5 concurrent requests to avoid Edge TTS rate limit
sem = asyncio.Semaphore(5)
raw_files = [f'{tmp_dir}/r{i:03d}.mp3' for i in range(len(sentences))]
async def _one(i, s):
async with sem:
last_err = None
for attempt in range(3): # 1 try + 2 retries
try:
await edge_tts.Communicate(s, voice_id, rate=rate).save(raw_files[i])
return
except Exception as e:
last_err = e
print(f'[TTS] sentence {i} attempt {attempt+1} failed: {e}')
if attempt < 2:
await asyncio.sleep(1.5 * (attempt + 1))
raise Exception(f'[TTS] sentence {i} failed after 3 attempts: {last_err}')
await asyncio.gather(*[_one(i, s) for i, s in enumerate(sentences)])
# โ”€โ”€ Normalize intro/outro sentences (first & last) to match middle volume โ”€โ”€
# Edge TTS tends to generate quieter audio for sentence boundaries
n = len(raw_files)
_norm_af = 'loudnorm=I=-14:TP=-1.5:LRA=11'
_boost_af = f'volume=1.15,{_norm_af}'
_to_normalize = set()
if n >= 1: _to_normalize.add(0) # first
if n >= 2: _to_normalize.add(n - 1) # last
if n >= 4: _to_normalize.add(1) # second (often still quiet)
for idx in _to_normalize:
rf = raw_files[idx]
tmp_rf = rf + '.norm.mp3'
try:
subprocess.run(
f'ffmpeg -y -i "{rf}" -af "{_boost_af}" '
f'-c:a libmp3lame -q:a 2 "{tmp_rf}"',
shell=True, check=True, capture_output=True)
os.replace(tmp_rf, rf)
except Exception as _ne:
print(f'[TTS norm] sentence {idx} normalize failed: {_ne}')
try: os.remove(tmp_rf)
except: pass
# Rebuild parts in original order: [r000, sil, r001, sil, ...]
parts = []
for rf in raw_files:
parts += [rf, sil]
return parts
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(_run())
finally:
loop.close()
MULTILINGUAL_VOICES = {
'en-US-AndrewMultilingualNeural',
'en-US-AvaMultilingualNeural',
'en-US-BrianMultilingualNeural',
'en-US-EmmaMultilingualNeural',
'de-DE-FlorianMultilingualNeural',
'de-DE-SeraphinaMultilingualNeural',
'fr-FR-RemyMultilingualNeural',
'fr-FR-VivienneMultilingualNeural',
}
def run_edge_preview(voice_id, rate, out_path):
# Choose preview text based on voice language
if voice_id in MULTILINGUAL_VOICES:
text = 'แ€™แ€„แ€บแ€นแ€‚แ€œแ€ฌแ€•แ€ซแ‹ แ€€แ€ผแ€ญแ€ฏแ€†แ€ญแ€ฏแ€•แ€ซแ€แ€šแ€บแ‹'
elif voice_id.startswith('th-'):
text = 'เธชเธงเธฑเธชเธ”เธตเธ„เธฃเธฑเธš เธขเธดเธ™เธ”เธตเธ•เน‰เธญเธ™เธฃเธฑเธš'
elif voice_id.startswith('en-'):
text = 'Hello, welcome to Recap Studio.'
else:
text = 'แ€™แ€„แ€บแ€นแ€‚แ€œแ€ฌแ€•แ€ซแ‹ แ€€แ€ผแ€ญแ€ฏแ€†แ€ญแ€ฏแ€•แ€ซแ€แ€šแ€บแ‹'
async def _run():
await edge_tts.Communicate(text, voice_id, rate=rate).save(out_path)
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(_run())
finally:
loop.close()
# โ”€โ”€ GEMINI TTS โ”€โ”€
def _get_gemini_client():
if ggenai is None:
raise Exception('google-genai package not installed')
valid_keys = [k for k in GEMINI_KEYS if k]
if not valid_keys:
raise Exception('No Gemini API Key')
random.shuffle(valid_keys)
return ggenai.Client(api_key=valid_keys[0]), valid_keys
def _save_pcm_as_wav(pcm_data, wav_path, sample_rate=24000, channels=1, sample_width=2):
with wave.open(wav_path, 'wb') as wf:
wf.setnchannels(channels)
wf.setsampwidth(sample_width)
wf.setframerate(sample_rate)
wf.writeframes(pcm_data)
def _wav_to_mp3(wav_path, mp3_path):
subprocess.run(
f'ffmpeg -y -i "{wav_path}" -c:a libmp3lame -q:a 2 "{mp3_path}"',
shell=True, check=True, capture_output=True)
def _gemini_tts_one_shot(client, text, voice_name, wav_path):
"""Call Gemini TTS API once, save raw PCM as WAV. Returns wav_path."""
response = client.models.generate_content(
model="gemini-2.5-flash-preview-tts",
contents=text,
config=gtypes.GenerateContentConfig(
response_modalities=["AUDIO"],
speech_config=gtypes.SpeechConfig(
voice_config=gtypes.VoiceConfig(
prebuilt_voice_config=gtypes.PrebuiltVoiceConfig(
voice_name=voice_name or "Kore"
)
)
)
)
)
audio_data = None
if response.candidates:
for part in response.candidates[0].content.parts:
if part.inline_data and part.inline_data.mime_type.startswith('audio/'):
audio_data = part.inline_data.data
break
if not audio_data:
raise Exception('Gemini TTS: no audio data received')
_save_pcm_as_wav(audio_data, wav_path)
return wav_path
def run_gemini_tts_sync(sentences, voice_name, tmp_dir, speed=0):
if ggenai is None:
raise Exception('google-genai package not installed')
_, ordered_keys = next_gemini_key()
if not ordered_keys:
raise Exception('No Gemini API Key')
time.sleep(2)
# Single-shot: all sentences joined into one text block
full_txt = '\n'.join(sentences)
mp3_out = f'{tmp_dir}/gemini_final.mp3'
last_err = None
for api_key in ordered_keys:
try:
client = ggenai.Client(api_key=api_key)
wav_out = f'{tmp_dir}/gemini_out.wav'
mp3_raw = f'{tmp_dir}/gemini_raw.mp3'
_gemini_tts_one_shot(client, full_txt, voice_name, wav_out)
_wav_to_mp3(wav_out, mp3_raw)
try: os.remove(wav_out)
except: pass
# โ”€โ”€ Speed + Volume + Normalize โ”€โ”€
spd_pct = speed if isinstance(speed, (int, float)) else 0
tempo = max(0.5, min(2.0, 1.0 + spd_pct / 100.0))
af_filters = []
if abs(tempo - 1.0) > 0.01:
af_filters.append(f'atempo={tempo:.4f}')
af_filters += ['volume=2.3', 'loudnorm=I=-14:TP=-1.5:LRA=11']
af_str = ','.join(af_filters)
subprocess.run(
f'ffmpeg -y -i "{mp3_raw}" -af "{af_str}" '
f'-c:a libmp3lame -q:a 2 "{mp3_out}"',
shell=True, check=True, capture_output=True)
try: os.remove(mp3_raw)
except: pass
print(f'โœ… Gemini TTS done (1-shot), key=...{api_key[-6:]}')
return [mp3_out]
except Exception as e:
last_err = e
print(f'โš ๏ธ Gemini TTS key failed: {e}')
for _f in [f'{tmp_dir}/gemini_out.wav', f'{tmp_dir}/gemini_raw.mp3']:
try: os.remove(_f)
except: pass
continue
raise Exception(f'โŒ Gemini TTS all keys failed: {last_err}')
def run_gemini_preview(voice_name, out_path):
if ggenai is None:
raise Exception('google-genai package not installed')
_, ordered_keys = next_gemini_key()
if not ordered_keys:
raise Exception('No Gemini API Key')
wav_path = out_path.replace('.mp3', '.wav')
for api_key in ordered_keys:
try:
client = ggenai.Client(api_key=api_key)
response = client.models.generate_content(
model="gemini-2.5-flash-preview-tts",
contents="แ€™แ€„แ€บแ€นแ€‚แ€œแ€ฌแ€•แ€ซแ‹ แ€’แ€ฎแ€”แ€ฑแ€ท แ€˜แ€ฌแ€™แ€ปแ€ฌแ€ธ แ€œแ€ฏแ€•แ€บแ€™แ€œแ€ฒแ‹",
config=gtypes.GenerateContentConfig(
response_modalities=["AUDIO"],
speech_config=gtypes.SpeechConfig(
voice_config=gtypes.VoiceConfig(
prebuilt_voice_config=gtypes.PrebuiltVoiceConfig(
voice_name=voice_name or "Kore"
)
)
)
)
)
audio_data = None
if response.candidates:
for part in response.candidates[0].content.parts:
if part.inline_data and part.inline_data.mime_type.startswith('audio/'):
audio_data = part.inline_data.data
break
if not audio_data:
raise Exception('Gemini TTS preview: no audio data')
_save_pcm_as_wav(audio_data, wav_path)
_wav_to_mp3(wav_path, out_path)
try: os.remove(wav_path)
except: pass
return
except Exception as e:
print(f'โš ๏ธ Gemini preview key failed: {e}')
continue
raise Exception('โŒ Gemini TTS preview: all keys failed')
# โ”€โ”€ PULL DB ON START โ”€โ”€
threading.Thread(target=pull_db, daemon=True).start()
whisper_model = None
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ROUTES
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
@app.route('/login')
def login_page():
return redirect('/')
@app.route('/app')
def app_page():
return send_from_directory(str(BASE_DIR), 'index.html')
@app.route('/terms')
def terms_page():
return send_from_directory(str(BASE_DIR), 'terms.html')
@app.route('/privacy')
def privacy_page():
return send_from_directory(str(BASE_DIR), 'privacy.html')
# โ”€โ”€ GOOGLE OAUTH โ”€โ”€
@app.route('/api/auth/google_enabled')
def google_enabled():
return jsonify(enabled=bool(GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET))
@app.route('/auth/google')
def google_login():
if not GOOGLE_CLIENT_ID:
return redirect('/?auth_error=Google+OAuth+not+configured')
from urllib.parse import urlencode
params = urlencode({
'client_id': GOOGLE_CLIENT_ID,
'redirect_uri': GOOGLE_REDIRECT_URI,
'response_type': 'code',
'scope': 'openid email profile',
'access_type': 'offline',
'prompt': 'select_account',
})
return redirect(f'https://accounts.google.com/o/oauth2/auth?{params}')
@app.route('/auth/google/callback')
def google_callback():
import urllib.request, json as _json
from urllib.parse import urlencode
code = request.args.get('code')
error = request.args.get('error')
if error or not code:
return redirect(f'/?auth_error={error or "no_code"}')
try:
# Exchange code โ†’ tokens
token_data = _json.dumps({
'code': code,
'client_id': GOOGLE_CLIENT_ID,
'client_secret': GOOGLE_CLIENT_SECRET,
'redirect_uri': GOOGLE_REDIRECT_URI,
'grant_type': 'authorization_code',
}).encode()
req = urllib.request.Request(
'https://oauth2.googleapis.com/token',
data=token_data,
headers={'Content-Type': 'application/json'})
with urllib.request.urlopen(req, timeout=15) as resp:
tokens = _json.loads(resp.read())
# Get user info
req2 = urllib.request.Request(
'https://www.googleapis.com/oauth2/v2/userinfo',
headers={'Authorization': f'Bearer {tokens["access_token"]}'})
with urllib.request.urlopen(req2, timeout=15) as resp2:
info = _json.loads(resp2.read())
email = info.get('email', '')
name = info.get('name', email.split('@')[0])
gid = info.get('id', '')
# Find or create user
db = load_db()
user_key = None
for uk, uv in db['users'].items():
if uv.get('google_id') == gid or uk == email:
user_key = uk; break
if not user_key:
user_key = email or f'g_{gid}'
db['users'][user_key] = {
'password': '', 'coins': 1,
'created_at': datetime.now().isoformat(), 'last_login': None,
'total_transcripts': 0, 'total_videos': 0,
'google_id': gid, 'google_name': name, 'google_email': email,
'free_trial': True,
}
else:
db['users'][user_key]['google_id'] = gid
db['users'][user_key]['google_name'] = name
db['users'][user_key]['last_login'] = datetime.now().isoformat()
save_db(db)
coins = db['users'][user_key].get('coins', 0)
is_adm = '1' if user_key == ADMIN_U else '0'
params = urlencode({'auth': 'google', 'u': user_key, 'n': name, 'c': coins, 'a': is_adm})
return redirect(f'/?{params}')
except Exception as e:
import traceback; traceback.print_exc()
return redirect(f'/?auth_error={str(e)[:100]}')
@app.route('/')
def index():
return send_from_directory(str(BASE_DIR), 'index.html')
@app.route("/manifest.json")
def manifest():
return send_from_directory(str(BASE_DIR), "manifest.json",
mimetype="application/manifest+json")
@app.route("/sw.js")
def service_worker():
return send_from_directory(str(BASE_DIR), "sw.js",
mimetype="application/javascript")
@app.route('/outputs/<path:fn>', methods=['GET','HEAD'])
def serve_output(fn):
from flask import make_response
fpath = OUTPUT_DIR / fn
if not fpath.exists():
return jsonify(ok=False, msg='File not found'), 404
if request.method == 'HEAD':
# Quick existence + size check for frontend polling
resp = make_response('', 200)
resp.headers['Content-Length'] = str(fpath.stat().st_size)
resp.headers['Content-Type'] = 'video/mp4'
resp.headers['Access-Control-Allow-Origin'] = '*'
return resp
resp = make_response(send_from_directory(
str(OUTPUT_DIR), fn, conditional=True, max_age=0))
resp.headers['Access-Control-Allow-Origin'] = '*'
resp.headers['Accept-Ranges'] = 'bytes'
resp.headers['Cache-Control'] = 'no-cache'
return resp
@app.route('/api/config')
def api_config():
return jsonify(admin_tg=ADMIN_TG)
@app.route('/googlefd3d91bc095a2620.html')
def google_verify():
return 'google-site-verification: googlefd3d91bc095a2620.html', 200, {'Content-Type': 'text/html'}
@app.route('/sitemap.xml')
def sitemap():
xml = '''<?xml version="1.0" encoding="UTF-8"?>
<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">
<url>
<loc>https://recap.psonline.shop/</loc>
<changefreq>weekly</changefreq>
<priority>1.0</priority>
</url>
<url>
<loc>https://recap.psonline.shop/privacy.html</loc>
<changefreq>monthly</changefreq>
<priority>0.5</priority>
</url>
<url>
<loc>https://recap.psonline.shop/terms.html</loc>
<changefreq>monthly</changefreq>
<priority>0.5</priority>
</url>
</urlset>'''
return xml, 200, {'Content-Type': 'application/xml'}
# โ”€โ”€ AUTH โ”€โ”€
@app.route('/api/login', methods=['POST'])
def api_login():
try:
d = request.get_json(force=True) or {}
ok, msg, coins = login_user(d.get('username',''), d.get('password',''))
return jsonify(ok=ok, msg=msg, coins=coins, is_admin=(d.get('username','')==ADMIN_U and ok))
except Exception as e:
return jsonify(ok=False, msg=str(e))
@app.route('/api/register', methods=['POST'])
def api_register():
try:
d = request.get_json(force=True) or {}
uname = (d.get('username') or '').strip() or gen_uname()
pw = d.get('password', '')
db = load_db()
if uname in db['users']: return jsonify(ok=False, msg='โŒ Already exists')
db['users'][uname] = {'password': hp(pw) if pw else '', 'coins': 1,
'created_at': datetime.now().isoformat(), 'last_login': None,
'total_transcripts': 0, 'total_videos': 0,
'free_trial': True}
save_db(db)
return jsonify(ok=True, msg=f'โœ… {uname} created', username=uname, coins=1)
except Exception as e:
return jsonify(ok=False, msg=str(e))
@app.route('/api/preview_voice', methods=['POST'])
def api_preview_voice():
try:
d = request.get_json(force=True) or {}
voice_id = d.get('voice', 'my-MM-ThihaNeural')
speed = int(d.get('speed', 30))
engine = d.get('engine', 'ms')
out = str(OUTPUT_DIR / f'preview_{uuid.uuid4().hex[:8]}.mp3')
if engine == 'gemini':
run_gemini_preview(voice_id, out)
else:
run_edge_preview(voice_id, f'+{speed}%', out)
return jsonify(ok=True, url='/outputs/' + Path(out).name)
except Exception as e:
import traceback; traceback.print_exc()
return jsonify(ok=False, msg=str(e))
@app.route('/api/gemini_voices')
def api_gemini_voices():
voices = [
{"id": "Charon", "name": "Charon (Male, Informative)"},
{"id": "Kore", "name": "Kore (Female, Firm)"},
{"id": "Puck", "name": "Puck (Male, Upbeat)"},
{"id": "Fenrir", "name": "Fenrir (Male, Excitable)"},
{"id": "Aoede", "name": "Aoede (Female, Breezy)"},
{"id": "Zephyr", "name": "Zephyr (Female, Bright)"},
{"id": "Orus", "name": "Orus (Male, Firm)"},
{"id": "Schedar", "name": "Schedar (Male, Even-keeled)"},
{"id": "Sulafat", "name": "Sulafat (Female, Warm)"},
{"id": "Rasalgethi", "name": "Rasalgethi (Male, Informative)"},
{"id": "Gacrux", "name": "Gacrux (Female, Mature)"},
]
return jsonify(ok=True, voices=voices)
# โ”€โ”€ VIDEO CLIP PREVIEW โ€” full download, cache for reuse, show first 10s โ”€โ”€
@app.route('/api/preview_clip', methods=['POST'])
def api_preview_clip():
try:
d = request.get_json(force=True) or {}
url = (d.get('url') or '').strip()
if not url:
return jsonify(ok=False, msg='No URL')
cache_key = _url_cache_key(url)
_cleanup_preview_cache()
out_mp4 = str(OUTPUT_DIR / f'clip_{cache_key}.mp4')
# Check if already cached (both raw file and preview clip)
with _preview_cache_lock:
cached = _preview_cache.get(cache_key)
if cached and os.path.exists(cached['file']) and os.path.exists(out_mp4):
cached['ts'] = time.time()
return jsonify(ok=True, url=f'/outputs/clip_{cache_key}.mp4', cache_key=cache_key)
# Full download (no section limit โ€” needed for final process)
tmp_dir = str(BASE_DIR / f'temp_prev_{cache_key}')
os.makedirs(tmp_dir, exist_ok=True)
raw = f'{tmp_dir}/raw.%(ext)s'
cmd_dl = [
'yt-dlp', '--no-playlist',
'-f', 'bestvideo[height<=720][ext=mp4]+bestaudio[ext=m4a]/bestvideo[height<=720]+bestaudio/best[height<=720]/best',
'--no-check-certificates',
'--merge-output-format', 'mp4',
'-o', raw, url
]
if os.path.exists(COOKIES_FILE):
cmd_dl += ['--cookies', COOKIES_FILE]
subprocess.run(cmd_dl, check=True, timeout=600, capture_output=True)
found = glob.glob(f'{tmp_dir}/raw*')
src = found[0] if found else f'{tmp_dir}/raw.mp4'
# โ”€โ”€ FIXED: Get original video dimensions for frontend โ”€โ”€
orig_dim = None
try:
probe = subprocess.run(
f'ffprobe -v error -select_streams v:0 '
f'-show_entries stream=width,height '
f'-of csv=s=x:p=0 "{src}"',
shell=True, capture_output=True, text=True, timeout=10
)
if probe.returncode == 0 and probe.stdout.strip():
w, h = map(int, probe.stdout.strip().split('x'))
orig_dim = {'width': w, 'height': h}
print(f'[preview_clip] original dimensions: {w}x{h}')
except Exception as e:
print(f'[preview_clip] failed to get dimensions: {e}')
# Cache the full file
with _preview_cache_lock:
_preview_cache[cache_key] = {'file': src, 'dir': tmp_dir, 'ts': time.time()}
# Make 10s preview clip
subprocess.run(
f'ffmpeg -y -i "{src}" -t 10 '
f'-c:v libx264 -crf 26 -preset ultrafast '
f'-c:a aac -b:a 64k '
f'-movflags +faststart "{out_mp4}"',
shell=True, check=True, capture_output=True, timeout=60
)
return jsonify(ok=True, url=f'/outputs/clip_{cache_key}.mp4',
cache_key=cache_key,
original_dimensions=orig_dim)
except Exception as e:
return jsonify(ok=False, msg=str(e))
# โ”€โ”€ VIDEO FRAME PREVIEW (5s thumbnail) โ€” kept for fallback โ”€โ”€
@app.route('/api/thumb', methods=['POST'])
def api_thumb():
try:
d = request.get_json(force=True) or {}
url = (d.get('url') or '').strip()
seek = int(d.get('seek', 5))
if not url:
return jsonify(ok=False, msg='No URL')
tid = uuid.uuid4().hex[:8]
tmp_dir = str(BASE_DIR / f'temp_thumb_{tid}')
os.makedirs(tmp_dir, exist_ok=True)
out_jpg = str(OUTPUT_DIR / f'thumb_{tid}.jpg')
try:
clip = f'{tmp_dir}/clip.mp4'
cmd_dl = [
'yt-dlp', '--no-playlist',
'-f', 'worst[ext=mp4]/worst',
'--no-check-certificates',
'--download-sections', f'*0-15',
'-o', clip, url
]
if os.path.exists(COOKIES_FILE):
cmd_dl += ['--cookies', COOKIES_FILE]
subprocess.run(cmd_dl, check=True, timeout=60, capture_output=True)
found = glob.glob(f'{tmp_dir}/clip*')
src = found[0] if found else clip
subprocess.run(
f'ffmpeg -y -ss {seek} -i "{src}" -vframes 1 -q:v 2 "{out_jpg}"',
shell=True, check=True, capture_output=True, timeout=30
)
return jsonify(ok=True, url=f'/outputs/thumb_{tid}.jpg')
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
except Exception as e:
return jsonify(ok=False, msg=str(e))
# โ”€โ”€ DRAFT โ”€โ”€
@app.route('/api/draft', methods=['POST'])
def api_draft():
global whisper_model
try:
u = (request.form.get('username') or '').strip()
video_url = (request.form.get('video_url') or '').strip()
ct = request.form.get('content_type', 'Movie Recap')
api = request.form.get('ai_model', 'Gemini')
vo_lang = request.form.get('vo_lang', 'my') # 'my', 'th', 'en'
video_file = request.files.get('video_file')
cache_key = request.form.get('cache_key', '')
if not u: return jsonify(ok=False, msg='โŒ Not logged in')
is_adm = (u == ADMIN_U)
if not is_adm and get_coins(u) < 1:
return jsonify(ok=False, msg='โŒ Not enough coins')
cpu_queue_wait()
tid = uuid.uuid4().hex[:8]
tmp_dir = str(BASE_DIR / f'temp_{tid}')
os.makedirs(tmp_dir, exist_ok=True)
vpath = None
try:
if video_file and video_file.filename:
vpath = f'{tmp_dir}/input.mp4'
video_file.save(vpath)
elif cache_key:
# Reuse cached full download from preview โ€” skip yt-dlp
with _preview_cache_lock:
cached = _preview_cache.get(cache_key)
if cached and os.path.exists(cached['file']):
vpath = cached['file']
job_progress[tid] = {'pct': 8, 'msg': '๐Ÿ“ฅ Using cached videoโ€ฆ', 'done': False}
elif video_url:
out_tmpl = f'{tmp_dir}/input.%(ext)s'
ytdlp_download(out_tmpl, video_url)
found = glob.glob(f'{tmp_dir}/input.*')
if found: vpath = found[0]
elif video_url:
out_tmpl = f'{tmp_dir}/input.%(ext)s'
ytdlp_download(out_tmpl, video_url)
found = glob.glob(f'{tmp_dir}/input.*')
if found: vpath = found[0]
if not vpath: return jsonify(ok=False, msg='โŒ No video selected')
if whisper is None: raise Exception('whisper not installed')
if whisper_model is None:
whisper_model = whisper.load_model('tiny', device='cpu')
res = run_stage('whisper', whisper_model.transcribe, tid,
lambda p,m: None, '', '', vpath, fp16=False)
tr = res['text']; lang = res.get('language', 'en')
if vo_lang == 'en':
# English โ€” skip AI API, return whisper transcript directly
sc = tr.strip()
ti = sc[:60].strip() + ('โ€ฆ' if len(sc) > 60 else '')
ht = '#english #movierecap #viral #foryou #trending'
key_n = 'Whisper Direct'
else:
sys_p = get_sys_prompt(ct, vo_lang)
sys_p = sys_p + '\n' + get_num_rule(vo_lang)
out_txt, key_n = run_stage('ai', call_api, tid,
lambda p,m: None, '', '',
[{'role':'system','content':sys_p},
{'role':'user','content':f'Language:{lang}\n\n{tr}'}], api=api, purpose='transcript')
sc, ti, ht = parse_out(out_txt)
rem = -1
if not is_adm: _, rem = deduct(u, 1); upd_stat(u, 'tr')
return jsonify(ok=True, script=sc, title=ti, hashtags=ht,
status=f'{key_n} ยท {lang}', coins=rem)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
except Exception as e:
return jsonify(ok=False, msg=f'โŒ {e}')
# โ”€โ”€ #7: Audio filter โ€” Edge TTS voice enhancement โ”€โ”€
def _build_audio_filter(mpath, ad):
"""
Edge TTS voice enhancement chain โ€” cleaner, warmer, louder.
- highpass=f=120 : cut low rumble / breath / mic noise
- lowpass=f=9000 : cut harsh high-frequency hiss
- equalizer 250Hz -3 : reduce muddiness / boxiness
- equalizer 1500Hz +3 : boost mid presence (voice intelligibility)
- equalizer 4000Hz +4 : boost upper-mid clarity / consonant sharpness
- equalizer 8000Hz -2 : soften sibilance without losing air
- acompressor : gentle compression โ€” evens out loud/quiet parts
- dynaudnorm : dynamic loudness normalization
- volume=2.8 : overall loudness boost
- loudnorm : broadcast-level LUFS normalization (final pass)
"""
voice_chain = (
'highpass=f=120,'
'lowpass=f=9000,'
'equalizer=f=250:width_type=o:width=2:g=-3,'
'equalizer=f=1500:width_type=o:width=2:g=3,'
'equalizer=f=4000:width_type=o:width=2:g=4,'
'equalizer=f=8000:width_type=o:width=2:g=-2,'
'acompressor=threshold=-18dB:ratio=3:attack=5:release=80:makeup=3dB,'
'dynaudnorm=f=200:g=15,'
'volume=2.8,'
'loudnorm=I=-14:TP=-1.5:LRA=9'
)
if mpath:
return (f'[1:a]{voice_chain}[nar];'
f'[2:a]volume=0.09,afade=t=out:st={max(0,ad-2):.3f}:d=2[bgm];'
f'[nar][bgm]amix=inputs=2:duration=first:dropout_transition=2[outa]')
else:
return f'[1:a]{voice_chain}[outa]'
# โ”€โ”€ Mid-section Audio Sync Correction โ”€โ”€
def _get_mid_range(duration):
"""
Return (start_ratio, end_ratio) for middle section based on total duration.
"""
if duration < 180: # < 3 min
return 0.30, 0.70
elif duration < 300: # 3โ€“5 min
return 0.25, 0.75
elif duration < 600: # 5โ€“10 min
return 0.20, 0.80
else: # > 10 min
return 0.15, 0.85
def _fix_mid_sync(audio_path, video_dur, audio_dur, tmp_dir):
"""
Split audio into 3 parts: head / middle / tail.
Apply atempo correction ONLY to middle part if drift > 0.2s.
Recombine and return new audio path (or original if no fix needed).
Pitch is preserved (atempo only, no asetrate).
"""
drift = audio_dur - video_dur
if abs(drift) <= 0.2:
print(f'[sync] drift={drift:.3f}s โ‰ค 0.2s โ€” skip mid-sync')
return audio_path
s_ratio, e_ratio = _get_mid_range(audio_dur)
t_start = audio_dur * s_ratio
t_end = audio_dur * e_ratio
mid_dur = t_end - t_start
# Target mid duration after correction
# We want total audio โ‰ˆ video_dur
# head + mid_corrected + tail = video_dur
head_dur = t_start
tail_dur = audio_dur - t_end
mid_target = video_dur - head_dur - tail_dur
if mid_target <= 0:
print(f'[sync] mid_target invalid ({mid_target:.3f}s) โ€” skip')
return audio_path
tempo = mid_dur / mid_target
# atempo range: 0.5 ~ 2.0 (chain if needed)
tempo = max(0.5, min(2.0, tempo))
print(f'[sync] drift={drift:.3f}s | mid {t_start:.2f}s~{t_end:.2f}s | tempo={tempo:.4f}x')
head_f = f'{tmp_dir}/sync_head.mp3'
mid_f = f'{tmp_dir}/sync_mid.mp3'
tail_f = f'{tmp_dir}/sync_tail.mp3'
mid_fx = f'{tmp_dir}/sync_mid_fx.mp3'
out_f = f'{tmp_dir}/sync_fixed.mp3'
lst_f = f'{tmp_dir}/sync_list.txt'
try:
# Cut head
subprocess.run(
f'ffmpeg -y -i "{audio_path}" -ss 0 -t {t_start:.6f} '
f'-c:a libmp3lame -q:a 2 "{head_f}"',
shell=True, check=True, capture_output=True)
# Cut middle
subprocess.run(
f'ffmpeg -y -i "{audio_path}" -ss {t_start:.6f} -t {mid_dur:.6f} '
f'-c:a libmp3lame -q:a 2 "{mid_f}"',
shell=True, check=True, capture_output=True)
# Cut tail
subprocess.run(
f'ffmpeg -y -i "{audio_path}" -ss {t_end:.6f} '
f'-c:a libmp3lame -q:a 2 "{tail_f}"',
shell=True, check=True, capture_output=True)
# Apply atempo to middle (pitch unchanged)
subprocess.run(
f'ffmpeg -y -i "{mid_f}" -af "atempo={tempo:.6f}" '
f'-c:a libmp3lame -q:a 2 "{mid_fx}"',
shell=True, check=True, capture_output=True)
# Concat head + mid_fixed + tail
with open(lst_f, 'w') as lf:
for f in [head_f, mid_fx, tail_f]:
if os.path.exists(f) and os.path.getsize(f) > 0:
lf.write(f"file '{os.path.abspath(f)}'\n")
subprocess.run(
f'ffmpeg -y -f concat -safe 0 -i "{lst_f}" '
f'-c:a libmp3lame -q:a 2 "{out_f}"',
shell=True, check=True, capture_output=True)
print(f'[sync] mid-sync done โ†’ {out_f}')
return out_f
except Exception as e:
print(f'[sync] mid-sync failed: {e} โ€” using original audio')
return audio_path
# โ”€โ”€ #6: Video render โ€” smaller output file โ”€โ”€
def _run_ffmpeg(cmd, timeout=1200):
"""Run ffmpeg safely โ€” capture output to avoid pipe deadlock, with timeout."""
result = subprocess.run(
cmd, shell=True, check=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
timeout=timeout
)
return result
# โ”€โ”€ SUBTITLE BURN โ€” Myanmar drawtext โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _ass_time(secs):
h = int(secs // 3600)
m = int((secs % 3600) // 60)
s = int(secs % 60)
cs = int(round((secs - int(secs)) * 100))
return "%d:%02d:%02d.%02d" % (h, m, s, cs)
def get_sentence_timings_from_audio(audio_path, num_sentences):
"""
Original silencedetect logic โ€” unchanged.
FIX: total_dur fetched once upfront; last subtitle anchored to total_dur
so subtitles never finish before the video ends.
"""
try:
total_dur = float(subprocess.run(
f'ffprobe -v quiet -show_entries format=duration -of csv=p=0 "{audio_path}"',
shell=True, capture_output=True, text=True).stdout.strip())
except Exception:
total_dur = 0.0
cmd = (f'ffmpeg -i "{audio_path}" '
f'-af "silencedetect=noise=-30dB:d=0.08" -f null - 2>&1')
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
output = result.stdout + result.stderr
silence_starts, silence_ends = [], []
for line in output.split('\n'):
m = re.search(r'silence_start: ([0-9.]+)', line)
if m:
silence_starts.append(float(m.group(1)))
m = re.search(r'silence_end: ([0-9.]+)', line)
if m:
silence_ends.append(float(m.group(1)))
timings = []
last = 0.0
for i, s_start in enumerate(silence_starts):
if len(timings) >= num_sentences:
break
if s_start > last + 0.05:
timings.append((last, s_start))
s_end = silence_ends[i] if i < len(silence_ends) else s_start + 0.1
last = s_end
# Last sentence โ€” anchor to total_dur so subtitles don't finish before video
if len(timings) < num_sentences:
t_end = total_dur if total_dur > last + 0.1 else last + 2.0
timings.append((last, t_end))
# Pad with equal splits if silencedetect found too few boundaries
if timings and len(timings) < num_sentences:
last_end = timings[-1][1]
total_known = total_dur if total_dur > last_end else last_end
remaining = num_sentences - len(timings)
chunk = (total_known - last_end) / max(remaining, 1)
for j in range(remaining):
s = last_end + j * chunk
timings.append((s, s + chunk))
timings = timings[:num_sentences]
# Anchor last subtitle end to total_dur (fixes early finish)
if timings and total_dur > 0 and timings[-1][1] < total_dur - 0.5:
timings[-1] = (timings[-1][0], total_dur)
print(f'[silencedetect] anchored last subtitle end to {total_dur:.2f}s')
print(f'[silencedetect] found {len(timings)} boundaries for {num_sentences} sentences')
return timings
def _make_ass(sentences, total_dur, ass_path, position=85,
fontsize=80, color='white', style='outline',
sentence_durs=None, sentence_timings=None, crop='9:16'):
"""
sentence_timings: list of (start, end) tuples from get_sentence_timings_from_audio().
Used first โ€” most accurate.
sentence_durs: fallback list of per-sentence durations (scaled to total_dur).
If neither provided: equal-split fallback.
"""
col_bgr = {
'white': '&H00FFFFFF', 'yellow': '&H0000FFFF', 'cyan': '&H00FFFF00',
'green': '&H0000FF00', 'orange': '&H000080FF', 'pink': '&H00FF80FF',
'red': '&H000000FF', 'lime': '&H0080FF00',
'hotpink': '&H00B469FF', 'gold': '&H0000D7FF', 'violet': '&H00EE82EE',
'deepskyblue':'&H00FFBF00', 'coral': '&H00507FFF',
}.get(color, '&H00FFFFFF')
# position: 0=top, 100=bottom
# Use \an5 (center anchor) + \pos(x,y) โ€” subtitle center at pos_pct% of height
# This exactly matches preview CSS: top:pos_pct% + translateY(-50%)
try:
pos_pct = int(position)
except (ValueError, TypeError):
pos_pct = 85
pos_pct = max(5, min(95, pos_pct))
align = 5 # center anchor โ€” \pos y = center of subtitle
margin_v = 0 # not used with \pos
if style == 'box':
border_style, back_col, outline_w, shadow_w = 4, '&H99000000', 0, 0
elif style == 'shadow':
border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 0, 4
elif style == 'glow':
border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 4, 3
elif style == 'stroke':
border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 3, 0
elif style == 'plain':
border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 0, 0
else: # outline (default)
border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 2, 1
lines = []
# PlayRes matches actual output resolution for each crop ratio
if crop == '1:1':
play_w, play_h = 720, 720
elif crop == '16:9':
play_w, play_h = 1280, 720
else: # 9:16 default
play_w, play_h = 720, 1280
pos_x = play_w // 2
pos_y = int(pos_pct / 100 * play_h)
lines.append('[Script Info]')
lines.append('ScriptType: v4.00+')
lines.append('Collisions: Normal')
lines.append(f'PlayResX: {play_w}')
lines.append(f'PlayResY: {play_h}')
lines.append('Timer: 100.0000')
lines.append('')
lines.append('[V4+ Styles]')
lines.append('Format: Name,Fontname,Fontsize,PrimaryColour,SecondaryColour,OutlineColour,BackColour,Bold,Italic,Underline,StrikeOut,ScaleX,ScaleY,Spacing,Angle,BorderStyle,Outline,Shadow,Alignment,MarginL,MarginR,MarginV,Encoding')
style_line = ('Style: Default,Noto Sans Myanmar,%d,%s,&H000000FF,&H00000000,%s,'
'0,0,0,0,100,100,0,0,%d,%d,%d,%d,0,0,%d,1') % (
fontsize, col_bgr, back_col,
border_style, outline_w, shadow_w,
align, margin_v)
lines.append(style_line)
lines.append('')
lines.append('[Events]')
lines.append('Format: Layer,Start,End,Style,Name,MarginL,MarginR,MarginV,Effect,Text')
n = len(sentences)
pos_tag = '{\\an5\\pos(%d,%d)}' % (pos_x, pos_y)
# โ”€โ”€ 2-line wrap: Myanmar text ~14 chars per line max โ”€โ”€
def _wrap2(txt, max_chars=14):
txt = txt.replace('\n', ' ').strip()
if len(txt) <= max_chars:
return txt
mid = len(txt) // 2
best = mid
for delta in range(0, mid):
for pos in [mid - delta, mid + delta]:
if 0 < pos < len(txt) and (txt[pos] == ' ' or ord(txt[pos]) > 0x1000):
best = pos
break
else:
continue
break
return txt[:best].strip() + '\\N' + txt[best:].strip()
# โ”€โ”€ Priority 1: use actual timings from silencedetect (most accurate) โ”€โ”€
if sentence_timings and len(sentence_timings) >= n:
raw_end = sentence_timings[n - 1][1]
scale = (total_dur / raw_end) if raw_end > 0 else 1.0
for i, sent in enumerate(sentences):
txt = _wrap2(sent.strip())
if not txt:
continue
t0 = _ass_time(max(0.0, sentence_timings[i][0] * scale))
end = sentence_timings[i][1] * scale
if i == n - 1:
end = total_dur
t1 = _ass_time(min(end - 0.03, total_dur))
lines.append('Dialogue: 0,%s,%s,Default,,0,0,0,,%s%s' % (t0, t1, pos_tag, txt))
# โ”€โ”€ Priority 2: scaled sentence_durs โ”€โ”€
elif sentence_durs and len(sentence_durs) >= n:
raw_total = sum(sentence_durs[:n])
if raw_total > 0:
scale = total_dur / raw_total
scaled_durs = [d * scale for d in sentence_durs[:n]]
else:
scaled_durs = [total_dur / n] * n
t = 0.0
for i, sent in enumerate(sentences):
txt = _wrap2(sent.strip())
if not txt:
t += scaled_durs[i]
continue
t0 = _ass_time(t)
t1 = _ass_time(min(t + scaled_durs[i] - 0.05, total_dur))
lines.append('Dialogue: 0,%s,%s,Default,,0,0,0,,%s%s' % (t0, t1, pos_tag, txt))
t += scaled_durs[i]
# โ”€โ”€ Priority 3: equal-split fallback โ”€โ”€
else:
chunk = total_dur / n
for i, sent in enumerate(sentences):
txt = _wrap2(sent.strip())
if not txt:
continue
t0 = _ass_time(i * chunk)
t1 = _ass_time(min((i + 1) * chunk - 0.05, total_dur))
lines.append('Dialogue: 0,%s,%s,Default,,0,0,0,,%s%s' % (t0, t1, pos_tag, txt))
with open(ass_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines) + '\n')
def _srt_to_ass(srt_text, ass_path, fontsize=80, color='white', style='outline',
position=85, play_res_x=720, play_res_y=1280):
"""Convert SRT text to ASS file with 80% width constraint and correct positioning.
PlayResX/Y must match the actual video dimensions passed in.
libass scales fontsize proportionally to PlayResY, so fontsize values are always
relative to whatever PlayResY is set to โ€” the preview JS uses the same ratio:
pxSize = sizeVal * renderedH / play_res_y
Keeping PlayResY = actual video height ensures 1:1 mapping between preview and output.
"""
# Clamp to sane values; use caller-supplied dimensions (default 720ร—1280 for 9:16)
play_res_x = max(1, int(play_res_x))
play_res_y = max(1, int(play_res_y))
fontsize = max(20, int(fontsize))
col_bgr = {
'white': '&H00FFFFFF', 'yellow': '&H0000FFFF', 'cyan': '&H00FFFF00',
'green': '&H0000FF00', 'orange': '&H000080FF', 'pink': '&H00FF80FF',
'red': '&H000000FF', 'lime': '&H0080FF00',
'hotpink': '&H00B469FF', 'gold': '&H0000D7FF', 'violet': '&H00EE82EE',
'deepskyblue':'&H00FFBF00', 'coral': '&H00507FFF',
}.get(color, '&H00FFFFFF')
# 95% width โ†’ 2.5% margin each side in pixels
margin_lr = int(play_res_x * 0.025)
# position 0=top 100=bottom โ†’ MarginV from respective edge
# clamped to 5-95% so subtitle never exits video bounds
pos_pct = max(5, min(95, int(position)))
if pos_pct >= 50:
# bottom-anchored (n2)
alignment = 2
margin_v = int((100 - pos_pct) / 100 * play_res_y)
margin_v = max(10, margin_v)
else:
# top-anchored (n8)
alignment = 8
margin_v = int(pos_pct / 100 * play_res_y)
margin_v = max(10, margin_v)
# \pos tag values โ€” center_x anchors all \N-broken lines at horizontal center
center_x = play_res_x // 2
if pos_pct >= 50:
pos_y = play_res_y - margin_v
an_tag = 2 # bottom-center
else:
pos_y = margin_v
an_tag = 8 # top-center
if style == 'box':
border_style, back_col, outline_w, shadow_w = 4, '&H99000000', 0, 0
elif style == 'shadow':
border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 0, 4
elif style == 'glow':
border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 4, 3
elif style == 'stroke':
border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 3, 0
elif style == 'plain':
border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 0, 0
else: # outline (default)
border_style, back_col, outline_w, shadow_w = 1, '&H00000000', 2, 1
header = (
'[Script Info]\n'
'ScriptType: v4.00+\n'
'Collisions: Normal\n'
'WrapStyle: 0\n'
f'PlayResX: {play_res_x}\n'
f'PlayResY: {play_res_y}\n'
'Timer: 100.0000\n'
'\n'
'[V4+ Styles]\n'
'Format: Name,Fontname,Fontsize,PrimaryColour,SecondaryColour,OutlineColour,BackColour,'
'Bold,Italic,Underline,StrikeOut,ScaleX,ScaleY,Spacing,Angle,BorderStyle,Outline,Shadow,'
'Alignment,MarginL,MarginR,MarginV,Encoding\n'
)
style_line = (
f'Style: Default,Noto Sans Myanmar,{fontsize},{col_bgr},&H000000FF,&H00000000,{back_col},'
f'0,0,0,0,100,100,0,0,{border_style},{outline_w},{shadow_w},'
f'{an_tag},0,0,0,1\n'
)
events_header = (
'\n[Events]\n'
'Format: Layer,Start,End,Style,Name,MarginL,MarginR,MarginV,Effect,Text\n'
)
def _srt_tc_to_ass(tc):
# 00:00:01,000 โ†’ 0:00:01.00
# Normalize Myanmar/Thai digits โ†’ ASCII first, then parse
tc = _norm_digits(tc.strip()).replace(',', '.')
parts = tc.split(':')
try:
if len(parts) == 3:
h, m, s = parts[0], parts[1], parts[2]
elif len(parts) == 2:
h, m, s = '0', parts[0], parts[1]
else:
return '0:00:00.00'
s_parts = s.split('.')
sec = s_parts[0].zfill(2)
ms = s_parts[1][:2] if len(s_parts) > 1 else '00'
return f'{int(h)}:{m.zfill(2)}:{sec}.{ms}'
except Exception:
return '0:00:00.00'
dialogue_lines = []
for block in re.split(r'\n\s*\n', srt_text.strip()):
lines = [l for l in block.strip().split('\n') if l.strip()]
if len(lines) < 2:
continue
# Find the timecode line (contains -->)
tc_line_idx = None
for i, l in enumerate(lines):
if '-->' in l:
tc_line_idx = i
break
if tc_line_idx is None:
continue
tc_parts = lines[tc_line_idx].strip().split(' --> ')
if len(tc_parts) != 2:
continue
# โ”€โ”€ Timecode typo fix: if end > 1 hour it's likely a minutes digit typo โ”€โ”€
# e.g. 00:11:12,630 should be 00:00:12,630 โ€” strip the extra minute digit
def _fix_tc_typo(tc_str):
tc_n = _norm_digits(tc_str.strip()).replace(',', '.')
try:
parts = tc_n.split(':')
if len(parts) == 3:
h, m, s = int(parts[0]), int(parts[1]), float(parts[2])
total = h * 3600 + m * 60 + s
if total > 3600: # clearly a typo โ€” clamp minutes to 0
return f'{h:02d}:00:{parts[2]}'
except Exception:
pass
return tc_str
t0 = _srt_tc_to_ass(_fix_tc_typo(tc_parts[0]))
t1 = _srt_tc_to_ass(_fix_tc_typo(tc_parts[1]))
txt = '\\N'.join(_strip_emoji(l) for l in lines[tc_line_idx+1:] if l.strip())
if txt:
dialogue_lines.append(f'Dialogue: 0,{t0},{t1},Default,,0,0,0,,{{\\an{an_tag}\\pos({center_x},{pos_y})}}{txt}')
with open(ass_path, 'w', encoding='utf-8') as f:
f.write(header + style_line + events_header + '\n'.join(dialogue_lines) + '\n')
def _burn_srt_direct(video_path, srt_text, out_path, position=85,
fontsize=80, color='white', style='outline', tmp_dir='/tmp',
play_res_x=720, play_res_y=1280):
"""Convert SRTโ†’ASS (80% width bounded) then burn via ffmpeg ass= filter."""
import shutil
if not srt_text or '-->' not in srt_text:
shutil.copy(video_path, out_path)
return
ass_path = os.path.join(tmp_dir, 'sub_srt.ass')
_srt_to_ass(srt_text, ass_path, fontsize=fontsize, color=color,
style=style, position=position,
play_res_x=play_res_x, play_res_y=play_res_y)
env = os.environ.copy()
ass_esc = ass_path.replace('\\', '/').replace(':', '\\:')
vf = f"ass='{ass_esc}':fontsdir=/usr/local/share/fonts/myanmar"
cmd = (
f'ffmpeg -y -hide_banner -loglevel error '
f'-i "{video_path}" '
f'-vf "{vf}" '
f'-c:v libx264 -crf 24 -preset ultrafast -pix_fmt yuv420p '
f'-c:a copy '
f'"{out_path}"'
)
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, env=env, timeout=600)
if result.returncode != 0:
raise Exception(f'ffmpeg srt burn failed: {result.stderr[-300:]}')
def _burn_subtitles(video_path, sentences, out_path, position=85,
fontsize=80, color='white', style='outline', tmp_dir='/tmp',
sentence_durs=None, sentence_timings=None):
"""Burn Myanmar subtitles via ASS file + ffmpeg ass= filter."""
if not sentences:
import shutil; shutil.copy(video_path, out_path); return
probe = subprocess.run(
'ffprobe -v quiet -show_entries format=duration -of csv=p=0 "%s"' % video_path,
shell=True, capture_output=True, text=True)
try: total_dur = float(probe.stdout.strip())
except: total_dur = 60.0
ass_path = os.path.join(tmp_dir, 'sub.ass')
_make_ass(sentences, total_dur, ass_path,
position=position, fontsize=fontsize,
color=color, style=style,
sentence_durs=sentence_durs,
sentence_timings=sentence_timings)
env = os.environ.copy()
# ass= with fontsdir so Myanmar font is found by name
ass_esc = ass_path.replace('\\', '/').replace(':', '\\:')
vf = f"ass='{ass_esc}':fontsdir=/usr/local/share/fonts/myanmar"
cmd = ('ffmpeg -y -hide_banner -loglevel error '
'-i "%s" '
'-vf "%s" '
'-c:v libx264 -crf 24 -preset ultrafast -pix_fmt yuv420p '
'-c:a copy '
'"%s"') % (video_path, vf, out_path)
result = subprocess.run(
cmd, shell=True, check=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
timeout=600, env=env)
return result
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# โ”€โ”€ FIXED: _build_video() โ€” Blur applied on original video BEFORE crop โ”€โ”€
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def _build_video(vpath, cmb, mpath, ad, vd, crop, flip, col, wmk, out_file,
logo_path=None, logo_x=10, logo_y=10, logo_w=80,
blur_enabled=False, blur_x=0, blur_y=0, blur_w=0, blur_h=0,
blur_orig_w=None, blur_orig_h=None, # original video dimensions for blur
wmk_x=None, wmk_y=None, wmk_fontsize=35, free_trial=False,
logo_orig_w=None, logo_orig_h=None, # NEW: original dims for logo coords
wmk_orig_w=None, wmk_orig_h=None, # NEW: original dims for watermark coords
sub_ass_path=None): # NEW: ASS subtitle file path
"""
Build final video via ffmpeg filter_complex.
FIXED: blur, logo, watermark are all applied on original video BEFORE cropping.
Filter chain order: original video โ†’ base transforms โ†’ blur โ†’ logo โ†’ watermark โ†’ crop โ†’ free_trial โ†’ subtitles โ†’ final
"""
# โ”€โ”€ Step 1: Sync โ€” stretch or compress video to exactly match audio duration โ”€โ”€
sync_r = ad / (vd - 0.14)
base_filters = [
'scale=trunc(iw/2)*2:trunc(ih/2)*2',
f'setpts={sync_r:.6f}*PTS',
f'trim=duration={ad:.6f}',
'setpts=PTS-STARTPTS',
]
if flip: base_filters.append('hflip')
if col: base_filters.append('eq=brightness=0.06:contrast=1.2:saturation=1.4')
base_filters.append('format=yuv420p')
base_str = ','.join(base_filters)
# โ”€โ”€ Step 3: Build filter chain โ€” blur/logo/watermark on original FIRST โ”€โ”€
filter_parts = [f'[0:v]{base_str}[v_base]']
current_label = '[v_base]'
# โ”€โ”€ Step 4: Apply blur on ORIGINAL video (before crop) โ”€โ”€
if blur_enabled and blur_w > 0 and blur_h > 0:
# Use original dimensions if provided, otherwise fallback to video dimensions
if blur_orig_w and blur_orig_h:
orig_w, orig_h = blur_orig_w, blur_orig_h
else:
# Fallback: try to get from video
try:
probe = subprocess.run(
f'ffprobe -v error -select_streams v:0 '
f'-show_entries stream=width,height '
f'-of csv=s=x:p=0 "{vpath}"',
shell=True, capture_output=True, text=True, timeout=10
)
orig_w, orig_h = map(int, probe.stdout.strip().split('x'))
except:
orig_w, orig_h = 1920, 1080
# Clamp coordinates to original dimensions
bx = max(0, min(blur_x, orig_w - 10))
by = max(0, min(blur_y, orig_h - 10))
bw = max(10, min(blur_w, orig_w - bx))
bh = max(10, min(blur_h, orig_h - by))
print(f'[blur] orig={orig_w}x{orig_h}, box=({bx},{by}) {bw}x{bh}')
_br = max(1, min(10, bw // 4, bh // 4))
filter_parts.append(
f'{current_label}split[_bA][_bB];'
f'[_bB]crop={bw}:{bh}:{bx}:{by},boxblur={_br}:{_br}[_bBl];'
f'[_bA][_bBl]overlay={bx}:{by}[v_blurred]'
)
current_label = '[v_blurred]'
# โ”€โ”€ Step 4b: Logo overlay on ORIGINAL video (before crop) โ”€โ”€
logo_idx = 2 if not mpath else 3
logo_exists = logo_path and os.path.exists(logo_path)
extra_segs = []
if logo_exists:
# Use original video dimensions for logo positioning
_low = logo_orig_w or blur_orig_w or 1920
_loh = logo_orig_h or blur_orig_h or 1080
lx = logo_x if (logo_x is not None and logo_x != 10) else (_low - logo_w - 20)
ly = logo_y if (logo_y is not None and logo_y != 10) else 20
# Scale logo_w relative to original video width (not crop canvas)
print(f'[logo] orig={_low}x{_loh}, pos=({lx},{ly}) w={logo_w}')
extra_segs.append(
f'[{logo_idx}:v]scale={max(20,int(logo_w))}:-2[_lg];'
f'{current_label}[_lg]overlay={lx}:{ly}[v_logo]'
)
current_label = '[v_logo]'
# โ”€โ”€ Step 4c: Watermark drawtext on ORIGINAL video (before crop) โ”€โ”€
if wmk:
fs = max(16, int(wmk_fontsize))
txt = wmk.replace("'", "").replace(":", "").replace("\\", "")
# Use original video dimensions for watermark positioning
_wow = wmk_orig_w or blur_orig_w or 1920
_woh = wmk_orig_h or blur_orig_h or 1080
wx = wmk_x if wmk_x is not None else (_wow - 220)
wy = wmk_y if wmk_y is not None else (_woh - 80)
print(f'[watermark] orig={_wow}x{_woh}, pos=({wx},{wy})')
extra_segs.append(
f'{current_label}drawtext=text=\'{txt}\':x={wx}:y={wy}:'
f'fontsize={fs}:fontcolor=white:shadowcolor=black:shadowx=2:shadowy=2[v_wmk]'
)
current_label = '[v_wmk]'
# โ”€โ”€ Step 5: Crop to target aspect ratio (blur/logo/watermark already applied) โ”€โ”€
if crop == '9:16':
tw, th = 720, 1280
filter_parts.append(
f'{current_label}split[_s1][_s2];'
f'[_s1]scale={tw}:{th}:force_original_aspect_ratio=increase,'
f'crop={tw}:{th},boxblur=10:8[_bg];'
f'[_s2]scale={tw}:{th}:force_original_aspect_ratio=decrease[_fg];'
f'[_bg][_fg]overlay=(W-w)/2:(H-h)/2[vcrop]'
)
elif crop == '16:9':
tw, th = 1280, 720
filter_parts.append(
f'{current_label}split[_s1][_s2];'
f'[_s1]scale={tw}:{th}:force_original_aspect_ratio=increase,'
f'crop={tw}:{th},boxblur=10:8[_bg];'
f'[_s2]scale={tw}:{th}:force_original_aspect_ratio=decrease[_fg];'
f'[_bg][_fg]overlay=(W-w)/2:(H-h)/2[vcrop]'
)
elif crop == '1:1':
tw, th = 720, 720
filter_parts.append(
f'{current_label}split[_s1][_s2];'
f'[_s1]scale={tw}:{th}:force_original_aspect_ratio=increase,'
f'crop={tw}:{th},boxblur=10:8[_bg];'
f'[_s2]scale={tw}:{th}:force_original_aspect_ratio=decrease[_fg];'
f'[_bg][_fg]overlay=(W-w)/2:(H-h)/2[vcrop]'
)
elif crop == 'original':
# Original ratio โ€” detect video dims, pad with black bars, no cropping
try:
_probe = subprocess.run(
f'ffprobe -v error -select_streams v:0 ' +
f'-show_entries stream=width,height ' +
f'-of csv=s=x:p=0 "{vpath}"',
shell=True, capture_output=True, text=True, timeout=30)
_dim = _probe.stdout.strip()
_vw, _vh = (int(x) for x in _dim.split('x'))
except Exception:
_vw, _vh = 1280, 720
tw = (_vw // 2) * 2
th = (_vh // 2) * 2
filter_parts.append(
f'{current_label}scale={tw}:{th}:force_original_aspect_ratio=decrease,'
f'pad={tw}:{th}:(ow-iw)/2:(oh-ih)/2:black[vcrop]'
)
else:
tw, th = 1280, 720
filter_parts.append(f'{current_label}[vcrop]')
current_label = '[vcrop]'
# โ”€โ”€ Step 6: Free Trial watermark (on cropped canvas, centered) โ”€โ”€
if free_trial:
ft_fs = max(60, int(th * 0.07))
extra_segs.append(
f'{current_label}drawtext=text=\'FREE TRIAL\':'
f'x=(w-text_w)/2:y=(h-text_h)/2:'
f'fontsize={ft_fs}:fontcolor=red:borderw=3:bordercolor=black:'
f'alpha=0.75[vft]'
)
current_label = '[vft]'
# โ”€โ”€ Step 7: Subtitle burn (ass= filter, same pass) โ”€โ”€
if sub_ass_path and os.path.exists(sub_ass_path):
ass_esc = sub_ass_path.replace('\\', '/').replace(':', '\\:')
extra_segs.append(
f"{current_label}ass='{ass_esc}':fontsdir=/usr/local/share/fonts/myanmar[v_sub]"
)
current_label = '[v_sub]'
# โ”€โ”€ Step 8: Final label โ”€โ”€
extra_segs.append(f'{current_label}copy[v_final]')
# โ”€โ”€ Step 9: Audio filter โ”€โ”€
audio_seg = _build_audio_filter(mpath, ad)
# โ”€โ”€ Assemble filter_complex โ”€โ”€
all_segs = filter_parts + extra_segs + [audio_seg]
filter_complex = ';'.join(all_segs)
# โ”€โ”€ Step 10: Input flags & final command โ”€โ”€
inp = f'-fflags +genpts+igndts -err_detect ignore_err -i "{vpath}" -i "{cmb}"'
if mpath: inp += f' -stream_loop -1 -i "{mpath}"'
if logo_exists: inp += f' -i "{logo_path}"'
cmd = (
f'ffmpeg -y -hide_banner -loglevel error {inp} '
f'-filter_complex "{filter_complex}" '
f'-map "[v_final]" -map "[outa]" '
f'-c:v libx264 -crf 24 -preset ultrafast -pix_fmt yuv420p '
f'-threads 0 '
f'-c:a aac -ar 44100 -b:a 128k '
f'-t {ad:.3f} -movflags +faststart "{out_file}"'
)
try:
_run_ffmpeg(cmd, timeout=900)
except subprocess.CalledProcessError as e:
err = e.stderr.decode(errors='ignore') if e.stderr else '(no stderr)'
raise Exception(f'FFmpeg render failed: {err[-500:]}')
# (no pre_out to clean up)
# โ”€โ”€ PROCESS โ”€โ”€
@app.route('/api/process', methods=['POST'])
def api_process():
try:
u = (request.form.get('username') or '').strip()
video_url = (request.form.get('video_url') or '').strip()
sc = (request.form.get('script') or '').strip()
voice_id = request.form.get('voice', 'my-MM-ThihaNeural')
engine = request.form.get('engine', 'ms')
spd = int(request.form.get('speed', 30))
wmk = request.form.get('watermark', '')
wmk_x = int(request.form.get('wmk_x', 20)) if wmk else None
wmk_y = int(request.form.get('wmk_y', 40)) if wmk else None
wmk_fontsize = int(request.form.get('wmk_fontsize', 40)) if wmk else 40
crop = request.form.get('crop', '9:16')
flip = request.form.get('flip', '0') == '1'
col = request.form.get('color', '0') == '1'
vo_lang = request.form.get('vo_lang', 'my')
# Speed default per language (can be overridden by slider)
LANG_SPD = {'th': 20, 'en': 0, 'my': 30}
if request.form.get('speed') is None:
spd = LANG_SPD.get(vo_lang, 30)
is_adm = (u == ADMIN_U)
if not is_adm and get_coins(u) < 1:
return jsonify(ok=False, msg='โŒ Not enough coins')
cpu_queue_wait()
tid = uuid.uuid4().hex[:8]
tmp_dir = str(BASE_DIR / f'temp_{tid}')
os.makedirs(tmp_dir, exist_ok=True)
out_file = str(OUTPUT_DIR / f'final_{tid}.mp4')
vpath = None; mpath = None
try:
video_file = request.files.get('video_file')
if video_file and video_file.filename:
vpath = f'{tmp_dir}/input.mp4'
video_file.save(vpath)
elif video_url:
out_tmpl = f'{tmp_dir}/input.%(ext)s'
ytdlp_download(out_tmpl, video_url)
found = glob.glob(f'{tmp_dir}/input.*')
if found: vpath = found[0]
if not vpath: return jsonify(ok=False, msg='โŒ No video selected')
music_file = request.files.get('music_file')
if music_file and music_file.filename:
mpath = f'{tmp_dir}/music.mp3'
music_file.save(mpath)
logo_path = None
logo_file = request.files.get('logo_file')
logo_x = int(request.form.get('logo_x', 10))
logo_y = int(request.form.get('logo_y', 10))
logo_w = int(request.form.get('logo_w', 80))
if logo_file and logo_file.filename:
ext = Path(logo_file.filename).suffix or '.png'
logo_path = f'{tmp_dir}/logo{ext}'
logo_file.save(logo_path)
blur_enabled = request.form.get('blur_enabled') == '1'
blur_x = int(request.form.get('blur_x', 0))
blur_y = int(request.form.get('blur_y', 0))
blur_w = int(request.form.get('blur_w', 0))
blur_h = int(request.form.get('blur_h', 0))
_t0 = time.time()
sentences = split_txt(sc, vo_lang)
rate = f'+{spd}%'
if engine == 'gemini':
parts = run_stage('tts', run_gemini_tts_sync, tid,
lambda p,m: None, '', '',
sentences, voice_id, tmp_dir, speed=spd)
else:
parts = run_stage('tts', run_tts_sync, tid,
lambda p,m: None, '', '',
sentences, voice_id, rate, tmp_dir)
print(f'[TIMER] TTS: {time.time()-_t0:.1f}s')
cmb = f'{tmp_dir}/combined.mp3'
lst = f'{tmp_dir}/list.txt'
with open(lst, 'w') as f:
for a in parts: f.write(f"file '{os.path.abspath(a)}'\n")
# Pre-polish: Edge TTS โ†’ silenceremove (-45dB) + normalize; Gemini TTS โ†’ normalize only
if engine == 'gemini':
_af = ('highpass=f=100,lowpass=f=10000,'
'dynaudnorm=f=200:g=15,'
'loudnorm=I=-16:TP=-1.5:LRA=11')
else:
_af = ('silenceremove=start_periods=1:stop_periods=-1:stop_duration=0.1:stop_threshold=-45dB,'
'highpass=f=100,lowpass=f=10000,'
'dynaudnorm=f=200:g=15,'
'loudnorm=I=-16:TP=-1.5:LRA=11')
_t1 = time.time()
_run_ffmpeg(
f'ffmpeg -y -f concat -safe 0 -i "{lst}" '
f'-af "{_af}" '
f'-c:a libmp3lame -q:a 2 "{cmb}"', timeout=120)
print(f'[TIMER] audio filter: {time.time()-_t1:.1f}s')
vd = dur(vpath); ad = dur(cmb)
if vd <= 0: raise Exception('Video duration read failed')
if ad <= 0: raise Exception('Audio duration read failed')
_t2 = time.time()
_build_video(vpath, cmb, mpath, ad, vd, crop, flip, col, wmk, out_file,
logo_path=logo_path, logo_x=logo_x, logo_y=logo_y, logo_w=logo_w,
blur_enabled=blur_enabled, blur_x=blur_x, blur_y=blur_y,
blur_w=blur_w, blur_h=blur_h,
wmk_x=wmk_x, wmk_y=wmk_y, wmk_fontsize=wmk_fontsize)
rem = -1
print(f'[TIMER] ffmpeg render: {time.time()-_t2:.1f}s')
if not is_adm: _, rem = deduct(u, 1); upd_stat(u, 'vd')
return jsonify(ok=True, output_url=f'/outputs/final_{tid}.mp4', coins=rem)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
except Exception as e:
import traceback; traceback.print_exc()
return jsonify(ok=False, msg=f'โŒ {e}')
# โ”€โ”€ PROCESS ALL โ”€โ”€
@app.route('/api/progress/<tid>')
def api_progress(tid):
def generate():
sent_done = False
for _ in range(1800): # 1800 ร— 0.4s = 12 minutes max
p = job_progress.get(tid)
if p is None:
yield f"data: {json.dumps({'pct':0,'msg':'Please waitโ€ฆ'})}\n\n"
else:
yield f"data: {json.dumps(p)}\n\n"
if p.get('done') or p.get('error'):
sent_done = True
break
time.sleep(0.4)
if not sent_done:
yield f"data: {json.dumps({'pct':0,'msg':'Timeout โ€” process took too long','error':True})}\n\n"
return Response(generate(), mimetype='text/event-stream',
headers={'Cache-Control':'no-cache','X-Accel-Buffering':'no'})
@app.route('/api/process_all', methods=['POST'])
def api_process_all():
"""Non-blocking: read params, start background thread, return tid immediately."""
try:
u = (request.form.get('username') or '').strip()
if not u: return jsonify(ok=False, msg='โŒ Not logged in')
is_adm = (u == ADMIN_U)
if not is_adm and load_db()['users'].get(u, {}).get('banned'):
return jsonify(ok=False, msg='โŒ Your account has been banned')
if not is_adm and get_coins(u) < 1:
return jsonify(ok=False, msg='โŒ Not enough coins (need 1)')
# Check free_trial flag โ€” system auto coins only
_db_check = load_db()
is_free_trial = (not is_adm) and _db_check['users'].get(u, {}).get('free_trial', False)
# Read all params before leaving request context
video_url = (request.form.get('video_url') or '').strip()
voice_id = request.form.get('voice', 'my-MM-ThihaNeural')
engine = request.form.get('engine', 'ms')
ct = request.form.get('content_type', 'Movie Recap')
api_model = request.form.get('ai_model', 'Gemini')
vo_lang = request.form.get('vo_lang', 'my')
wmk = request.form.get('watermark', '')
wmk_fontsize = int(request.form.get('wmk_fontsize', 40)) if wmk else 40
crop = request.form.get('crop', '9:16')
flip = request.form.get('flip', '0') == '1'
col = request.form.get('color', '0') == '1'
blur_enabled = request.form.get('blur_enabled') == '1'
sub_enabled = request.form.get('sub_enabled') == '1'
sub_size = float(request.form.get('sub_size', 0.0547)) # fraction of play_res_y
sub_pos = int(request.form.get('sub_pos', 85))
sub_color = request.form.get('sub_color', 'white')
sub_style = request.form.get('sub_style', 'outline')
client_tid = (request.form.get('tid') or '').strip()
LANG_SPD = {'th': 20, 'en': 0, 'my': 30}
spd = int(request.form.get('speed', LANG_SPD.get(vo_lang, 30)))
_CROP_DIM = {'9:16':(720,1280),'16:9':(1280,720),'1:1':(720,720),'original':(1280,720)}
FW, FH = _CROP_DIM.get(crop, (1280,720))
def _pct(kp, kx, dflt, ax):
v = request.form.get(kp)
return int(float(v)*ax) if v is not None else int(request.form.get(kx, dflt))
wmk_xp_raw = request.form.get('wmk_xp')
wmk_yp_raw = request.form.get('wmk_yp')
wmk_x_raw = request.form.get('wmk_x')
wmk_y_raw = request.form.get('wmk_y')
logo_xp_raw = request.form.get('logo_xp')
logo_yp_raw = request.form.get('logo_yp')
logo_x_raw = request.form.get('logo_x')
logo_y_raw = request.form.get('logo_y')
# logo_w computed above with safe clamp
# Blur coords relative to ORIGINAL video โ€” convert after ffprobe below
blur_xp = float(request.form.get('blur_xp') or 0)
blur_yp = float(request.form.get('blur_yp') or 0)
blur_wp = float(request.form.get('blur_wp') or 0)
blur_hp = float(request.form.get('blur_hp') or 0)
blur_x = int(request.form.get('blur_x', 0))
blur_y = int(request.form.get('blur_y', 0))
blur_w = int(request.form.get('blur_w', 0))
blur_h = int(request.form.get('blur_h', 0))
# Logo โ€” safe width clamp (use FW as rough fallback, will re-clamp after orig probe)
_logo_wp = float(request.form.get('logo_wp') or 0)
logo_w = int(_logo_wp * FW) if _logo_wp > 0 else int(request.form.get('logo_w', int(FW*0.15)))
logo_w = max(20, min(logo_w, FW))
_logo_wp_saved = _logo_wp # save for re-computation after orig probe
print(f'[OV] crop={crop} FW={FW} FH={FH} wmk="{wmk}" wmk_xp={wmk_xp_raw} logo_xp={logo_xp_raw} logo_wp={_logo_wp} logo_w={logo_w} logo_file={bool(request.files.get("logo_file"))} blur_en={blur_enabled}')
# Read uploaded file bytes now (request context closes after return)
video_bytes = None; video_fname = None
vf = request.files.get('video_file')
if vf and vf.filename: video_bytes = vf.read(); video_fname = vf.filename
music_bytes = None
mf = request.files.get('music_file')
if mf and mf.filename: music_bytes = mf.read()
logo_bytes = None; logo_fname = None
lf = request.files.get('logo_file')
if lf and lf.filename: logo_bytes = lf.read(); logo_fname = lf.filename
tid = client_tid if client_tid else uuid.uuid4().hex[:8]
cur_coins = get_coins(u)
coin_msg = 'Admin' if is_adm else f'๐Ÿช™ {cur_coins} coins'
job_progress[tid] = {'pct': 2, 'msg': f'โณ แ€แ€”แ€บแ€ธแ€…แ€ฎแ€…แ€ฑแ€ฌแ€„แ€ทแ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ {coin_msg}', 'done': False}
def _prog(pct, msg):
cur = job_progress.get(tid, {})
job_progress[tid] = {
'pct': pct if pct is not None else cur.get('pct', 2),
'msg': msg, 'done': False
}
def _bg_job():
nonlocal logo_w, blur_x, blur_y, blur_w, blur_h
global whisper_model
tmp_dir = str(BASE_DIR / f'temp_{tid}')
os.makedirs(tmp_dir, exist_ok=True)
out_file = str(OUTPUT_DIR / f'final_{tid}.mp4')
vpath = None; mpath = None; logo_path = None
try:
# โ”€โ”€ Stage 1: Download โ”€โ”€
if video_bytes:
vpath = f'{tmp_dir}/input.mp4'
with open(vpath,'wb') as wf: wf.write(video_bytes)
_prog(10, '๐Ÿ“ Video file แ€กแ€žแ€„แ€ทแ€บแ€–แ€ผแ€…แ€บแ€•แ€ผแ€ฎ')
elif video_url:
def _dl():
out_tmpl = f'{tmp_dir}/input.%(ext)s'
ytdlp_download(out_tmpl, video_url)
found = glob.glob(f'{tmp_dir}/input.*')
return found[0] if found else None
result_path = run_stage('download', _dl, tid, _prog,
'โณ Download แ€แ€”แ€บแ€ธแ€…แ€ฎแ€…แ€ฑแ€ฌแ€„แ€ทแ€บแ€”แ€ฑแ€žแ€Šแ€บ', '๐Ÿ“ฅ Video แ€’แ€ฑแ€ซแ€„แ€บแ€ธแ€œแ€ฏแ€•แ€บแ€œแ€ฏแ€•แ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ')
vpath = result_path
if not vpath or not os.path.exists(vpath):
job_progress[tid] = {'pct':0,'msg':'โŒ Video แ€™แ€แ€ฝแ€ฑแ€ทแ€•แ€ซ','error':True}; return
if music_bytes:
mpath = f'{tmp_dir}/music.mp3'
with open(mpath,'wb') as wf: wf.write(music_bytes)
if logo_bytes:
ext = Path(logo_fname).suffix if logo_fname else '.png'
logo_path = f'{tmp_dir}/logo{ext}'
with open(logo_path,'wb') as wf: wf.write(logo_bytes)
# โ”€โ”€ FIXED: Get original video dimensions for ALL overlays BEFORE any processing โ”€โ”€
orig_w, orig_h = 1920, 1080 # fallback
try:
_pr = subprocess.run(
f'ffprobe -v error -select_streams v:0 '
f'-show_entries stream=width,height '
f'-of csv=s=x:p=0 "{vpath}"',
shell=True, capture_output=True, text=True, timeout=10
)
if _pr.returncode == 0 and _pr.stdout.strip():
orig_w, orig_h = map(int, _pr.stdout.strip().split('x'))
print(f'[orig] video={orig_w}x{orig_h}')
except Exception as _e:
print(f'[orig] probe failed: {_e}')
# Convert blur percentages โ†’ original video pixels
if blur_enabled and (blur_xp or blur_yp or blur_wp or blur_hp):
blur_x = int(blur_xp * orig_w)
blur_y = int(blur_yp * orig_h)
blur_w = int(blur_wp * orig_w)
blur_h = int(blur_hp * orig_h)
print(f'[blur] coords=({blur_x},{blur_y}) size={blur_w}x{blur_h}')
# Convert logo/watermark percentages โ†’ original video pixels (NOT crop canvas)
def _pct_orig(vp, vx, dflt, ax):
if vp is not None:
return int(float(vp) * ax)
if vx is not None:
return int(vx)
return dflt
wmk_x = _pct_orig(wmk_xp_raw, wmk_x_raw, 20, orig_w) if wmk else None
wmk_y = _pct_orig(wmk_yp_raw, wmk_y_raw, orig_h - 80, orig_h) if wmk else None
logo_x = _pct_orig(logo_xp_raw, logo_x_raw, int(orig_w * 0.8), orig_w)
logo_y = _pct_orig(logo_yp_raw, logo_y_raw, 20, orig_h)
# Re-compute logo_w relative to original width
if _logo_wp_saved > 0:
logo_w = max(20, min(int(_logo_wp_saved * orig_w), orig_w))
else:
logo_w = max(20, min(logo_w, orig_w))
print(f'[logo] orig={orig_w}x{orig_h}, pos=({logo_x},{logo_y}) w={logo_w}')
if wmk:
print(f'[wmk] orig={orig_w}x{orig_h}, pos=({wmk_x},{wmk_y})')
# โ”€โ”€ Stage 2: Whisper โ”€โ”€
if whisper is None: raise Exception('whisper not installed')
if whisper_model is None:
whisper_model = whisper.load_model('tiny', device='cpu')
_wm = whisper_model
res = run_stage('whisper', _wm.transcribe, tid, _prog,
'โณ Transcript แ€แ€”แ€บแ€ธแ€…แ€ฎแ€…แ€ฑแ€ฌแ€„แ€ทแ€บแ€”แ€ฑแ€žแ€Šแ€บ', '๐ŸŽ™๏ธ Whisper แ€–แ€ผแ€„แ€ทแ€บ transcript แ€œแ€ฏแ€•แ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ',
vpath, fp16=False)
tr = res['text']; src_lang = res.get('language','en'); whisper_segments = res.get('segments', [])
_prog(40, f'๐ŸŽ™๏ธ Transcript แ€•แ€ผแ€ฎแ€ธแ€•แ€ซแ€•แ€ผแ€ฎ ({src_lang})')
# โ”€โ”€ Stage 3: AI Script โ”€โ”€
if vo_lang == 'en':
sc = tr.strip()
caption_text = sc[:60].strip() + ('โ€ฆ' if len(sc)>60 else '')
hashtags = '#english #movierecap #viral #foryou #trending'
else:
sys_p = get_sys_prompt(ct, vo_lang) + '\n' + get_num_rule(vo_lang)
msgs = [{'role':'system','content':sys_p},
{'role':'user','content':f'Language:{src_lang}\n\n{tr}'}]
out_txt, _ = run_stage('ai', call_api, tid, _prog,
'โณ AI Script แ€แ€”แ€บแ€ธแ€…แ€ฎแ€…แ€ฑแ€ฌแ€„แ€ทแ€บแ€”แ€ฑแ€žแ€Šแ€บ', '๐Ÿค– AI Script แ€›แ€ฑแ€ธแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ',
msgs, api=api_model, purpose='transcript')
sc, caption_text, hashtags = parse_out(out_txt)
_prog(65, '๐Ÿค– AI Script แ€•แ€ผแ€ฎแ€ธแ€•แ€ซแ€•แ€ผแ€ฎ')
# โ”€โ”€ Stage 4: TTS โ”€โ”€
rate = f'+{spd}%'
sentences = split_txt(sc, vo_lang)
if engine == 'gemini':
parts = run_stage('tts', run_gemini_tts_sync, tid, _prog,
'โณ TTS แ€แ€”แ€บแ€ธแ€…แ€ฎแ€…แ€ฑแ€ฌแ€„แ€ทแ€บแ€”แ€ฑแ€žแ€Šแ€บ', '๐Ÿ”Š แ€กแ€žแ€ถ แ€‘แ€ฏแ€แ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ',
sentences, voice_id, tmp_dir, speed=spd)
else:
parts = run_stage('tts', run_tts_sync, tid, _prog,
'โณ TTS แ€แ€”แ€บแ€ธแ€…แ€ฎแ€…แ€ฑแ€ฌแ€„แ€ทแ€บแ€”แ€ฑแ€žแ€Šแ€บ', '๐Ÿ”Š แ€กแ€žแ€ถ แ€‘แ€ฏแ€แ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ',
sentences, voice_id, rate, tmp_dir)
cmb = f'{tmp_dir}/combined.mp3'
lst = f'{tmp_dir}/list.txt'
with open(lst,'w') as lf2:
for a in parts: lf2.write(f"file '{os.path.abspath(a)}'\n")
subprocess.run(
f'ffmpeg -y -f concat -safe 0 -i "{lst}" '
f'-af "silenceremove=start_periods=1:stop_periods=-1:stop_duration=0.1:stop_threshold=-50dB" '
f'-c:a libmp3lame -q:a 2 "{cmb}"', shell=True, check=True)
# โ”€โ”€ Get sentence timings from per-sentence mp3 durations (Edge TTS) โ”€โ”€
# parts = [r000.mp3, sil.mp3, r001.mp3, sil.mp3, ...]
# Measure each sentence mp3 before silenceremove alters timing.
sentence_timings = None
sub_ass_path = None
if sub_enabled and sentences:
if engine != 'gemini':
try:
sil_dur = 0.2 # matches 0.2s sil.mp3 in run_tts_sync
sent_files = parts[::2] # every other file starting at 0
durs = []
for sf in sent_files[:len(sentences)]:
try:
d = float(subprocess.run(
f'ffprobe -v quiet -show_entries format=duration -of csv=p=0 "{sf}"',
shell=True, capture_output=True, text=True).stdout.strip())
except Exception:
d = 0.0
durs.append(max(0.1, d))
t = 0.0
sentence_timings = []
for d in durs:
sentence_timings.append((t, t + d))
t += d + sil_dur
print(f'[subtitle] per-mp3 timings n={len(sentence_timings)}: '
f'{[(round(s,2),round(e,2)) for s,e in sentence_timings[:4]]}โ€ฆ')
except Exception as _st_err:
print(f'[subtitle] per-mp3 timing failed: {_st_err} โ€” will use equal split')
# Build ASS file โ€” works for both Edge TTS (with timings) and Gemini (equal-split)
try:
sub_ass_path = f'{tmp_dir}/sub.ass'
_PRES = {'9:16':1280,'16:9':720,'1:1':720,'original':1280}
_sub_fs = max(20, round(sub_size * _PRES.get(crop, 1280)))
_make_ass(sentences, dur(cmb), sub_ass_path,
position=sub_pos, fontsize=_sub_fs,
color=sub_color, style=sub_style,
sentence_timings=sentence_timings,
crop=crop)
print(f'[subtitle] ASS file ready: {sub_ass_path}')
except Exception as _ae:
print(f'[subtitle] ASS build failed: {_ae}')
sub_ass_path = None
# โ”€โ”€ Stage 5: FFmpeg Render (subtitles burned inline) โ”€โ”€
vd = dur(vpath); ad = dur(cmb)
if vd <= 0: raise Exception('Video duration read failed')
if ad <= 0: raise Exception('Audio duration read failed')
def _render():
_build_video(vpath, cmb, mpath, ad, vd, crop, flip, col, wmk, out_file,
logo_path=logo_path, logo_x=logo_x, logo_y=logo_y, logo_w=logo_w,
blur_enabled=blur_enabled, blur_x=blur_x, blur_y=blur_y,
blur_w=blur_w, blur_h=blur_h,
blur_orig_w=orig_w, blur_orig_h=orig_h,
wmk_x=wmk_x, wmk_y=wmk_y, wmk_fontsize=wmk_fontsize,
free_trial=is_free_trial,
logo_orig_w=orig_w, logo_orig_h=orig_h,
wmk_orig_w=orig_w, wmk_orig_h=orig_h,
sub_ass_path=sub_ass_path)
run_stage('ffmpeg', _render, tid, _prog,
'โณ Render แ€แ€”แ€บแ€ธแ€…แ€ฎแ€…แ€ฑแ€ฌแ€„แ€ทแ€บแ€”แ€ฑแ€žแ€Šแ€บ', '๐ŸŽฌ Video render แ€œแ€ฏแ€•แ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ')
rem = -1
if not is_adm:
_, rem = deduct(u, 1); upd_stat(u,'tr'); upd_stat(u,'vd')
output_url = f'/outputs/final_{tid}.mp4'
job_progress[tid] = {
'pct': 100, 'msg': 'โœ… แ€•แ€ผแ€ฎแ€ธแ€•แ€ซแ€•แ€ผแ€ฎ!', 'done': True,
'output_url': output_url,
'title': caption_text, 'caption': caption_text,
'hashtags': hashtags, 'source_lang': src_lang,
'coins': rem, 'tid': tid,
}
# โ”€โ”€ Save video history entry โ”€โ”€
try:
save_video_history_entry(u, {
'tid': tid,
'output_url': output_url,
'title': caption_text or '(no title)',
'source_url': video_url or '',
'ts': time.time(),
'created_at': datetime.now().strftime('%Y-%m-%d %H:%M'),
})
except Exception as _he:
print(f'โš ๏ธ history save failed: {_he}')
except Exception as e:
import traceback; traceback.print_exc()
job_progress[tid] = {'pct':0,'msg':f'โŒ {e}','error':True}
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
threading.Thread(target=_bg_job, daemon=True).start()
return jsonify(ok=True, tid=tid, msg='โณ Processing started')
except Exception as e:
import traceback; traceback.print_exc()
return jsonify(ok=False, msg=f'โŒ {e}')
# โ”€โ”€ VIDEO HISTORY โ”€โ”€
@app.route('/api/video_history')
def api_video_history():
try:
u = (request.args.get('username') or '').strip()
if not u:
return jsonify(ok=False, msg='No username'), 400
cleanup_old_history()
records = load_video_history(u)
now = time.time()
# Filter expired + check file still exists on disk
valid = []
for r in records:
if now - r.get('ts', 0) > VIDEO_HISTORY_TTL:
continue
fp = str(BASE_DIR) + r['output_url'] # /outputs/final_xxx.mp4
if not os.path.exists(fp):
continue
r['expires_in'] = int(VIDEO_HISTORY_TTL - (now - r['ts']))
valid.append(r)
return jsonify(ok=True, history=valid)
except Exception as e:
return jsonify(ok=False, msg=str(e))
# โ”€โ”€ ADMIN โ”€โ”€
@app.route('/api/admin/create_user', methods=['POST'])
def api_create_user():
try:
d = request.get_json(force=True) or {}
msg, uname = create_user_fn(d.get('username',''), d.get('coins',10), d.get('caller',''))
return jsonify(ok=bool(uname), msg=msg, username=uname)
except Exception as e:
return jsonify(ok=False, msg=str(e))
@app.route('/api/admin/coins', methods=['POST'])
def api_coins():
try:
d = request.get_json(force=True) or {}
if d.get('caller') != ADMIN_U: return jsonify(ok=False, msg='โŒ Admin only')
u = d.get('username',''); n = d.get('amount', 10)
msg = set_coins_fn(u, n) if d.get('action') == 'set' else add_coins_fn(u, n)
return jsonify(ok=True, msg=msg)
except Exception as e:
return jsonify(ok=False, msg=str(e))
@app.route('/api/admin/users')
def api_users():
try:
if request.args.get('caller') != ADMIN_U:
return jsonify(ok=False, msg='โŒ Admin only')
db = load_db()
users = [{'username':k,'coins':v.get('coins',0),
'transcripts':v.get('total_transcripts',0),
'videos':v.get('total_videos',0),
'created':v.get('created_at','')[:10],
'banned':v.get('banned',False)}
for k,v in db['users'].items()]
return jsonify(ok=True, users=users)
except Exception as e:
return jsonify(ok=False, msg=str(e))
@app.route('/api/admin/delete_user', methods=['POST'])
def api_delete_user():
try:
d = request.get_json(force=True) or {}
if d.get('caller') != ADMIN_U: return jsonify(ok=False, msg='โŒ Admin only')
u = d.get('username','').strip()
if not u: return jsonify(ok=False, msg='โŒ No username')
db = load_db()
if u not in db['users']: return jsonify(ok=False, msg='โŒ User not found')
del db['users'][u]; save_db(db)
return jsonify(ok=True, msg=f'โœ… {u} deleted')
except Exception as e:
return jsonify(ok=False, msg=str(e))
@app.route('/api/admin/ban_user', methods=['POST'])
def api_ban_user():
try:
d = request.get_json(force=True) or {}
if d.get('caller') != ADMIN_U: return jsonify(ok=False, msg='โŒ Admin only')
u = d.get('username','').strip()
ban = d.get('ban', True)
msg = ban_fn(u, ban)
return jsonify(ok=True, msg=msg)
except Exception as e:
return jsonify(ok=False, msg=str(e))
@app.route('/api/admin/gen_username')
def api_gen_username():
try:
if request.args.get('caller') != ADMIN_U:
return jsonify(ok=False, msg='โŒ Admin only')
return jsonify(ok=True, username=gen_uname())
except Exception as e:
return jsonify(ok=False, msg=str(e))
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# PAYMENT ROUTES
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
PACKAGES = [
{'coins': 10, 'price': '12,000 MMK', 'price_thb': 100, 'desc': 'Process 10 แ€€แ€ผแ€ญแ€™แ€บ'},
{'coins': 20, 'price': '24,000 MMK', 'price_thb': 200, 'desc': 'Process 20 แ€€แ€ผแ€ญแ€™แ€บ โ€” Best'},
{'coins': 30, 'price': '36,000 MMK', 'price_thb': 300, 'desc': 'Process 30 แ€€แ€ผแ€ญแ€™แ€บ'},
{'coins': 60, 'price': '72,000 MMK', 'price_thb': 600, 'desc': 'Process 60 แ€€แ€ผแ€ญแ€™แ€บ'},
]
@app.route('/api/payment/packages')
def api_payment_packages():
return jsonify(
ok=True,
packages=PACKAGES,
kbz_name=KBZ_NAME,
kbz_number=KBZ_NUMBER,
scb_name=SCB_NAME,
scb_number=SCB_NUMBER,
promptpay=PROMPTPAY_NUM,
truemoney_name=TRUEMONEY_NAME,
truemoney_number=TRUEMONEY_NUM,
truemoney_qr_url=TRUEMONEY_QR_URL,
kbz_qr_url=KBZ_QR_URL,
)
@app.route('/api/payment/submit', methods=['POST'])
def api_payment_submit():
try:
d = request.get_json(force=True) or {}
username = (d.get('username') or '').strip()
coins = int(d.get('coins', 0))
price = d.get('price', '')
slip_image = d.get('slip_image', '')
db = load_db()
if username not in db['users']:
return jsonify(ok=False, msg='โŒ User not found')
payment_id = uuid.uuid4().hex[:10]
now = datetime.now().isoformat()
pdb = load_payments_db()
pdb['payments'].append({
'id': payment_id, 'username': username,
'coins': coins, 'price': price,
'status': 'pending',
'created_at': now, 'updated_at': now,
'slip_image': slip_image, 'admin_note': '',
})
save_payments_db(pdb)
# Telegram notification (background)
def _notify():
try:
if not TELEGRAM_BOT_TOKEN or not ADMIN_TELEGRAM_CHAT_ID:
return
import urllib.request as _ur
caption = (
f'๐Ÿ’ฐ <b>New Payment Request</b>\n'
f'๐Ÿ‘ค <code>{username}</code>\n'
f'๐Ÿช™ {coins} Coins โ€” {price} MMK\n'
f'๐Ÿ†” <code>{payment_id}</code>\n'
f'โฐ {now[:19]}'
)
kb = json.dumps({
'inline_keyboard': [
[{'text': f'โœ… Approve +{coins} coins',
'callback_data': f'adm_pay|approve|{payment_id}|{username}|{coins}'},
{'text': 'โŒ Reject',
'callback_data': f'adm_pay|reject|{payment_id}|{username}'}],
]
})
# Try to send slip photo with buttons
slip_data = slip_image or ''
sent = False
if slip_data and ',' in slip_data:
try:
import base64 as _b64, urllib.parse as _up
b64 = slip_data.split(',', 1)[1]
img_bytes = _b64.b64decode(b64)
bnd = b'----RecapBoundary' + payment_id.encode()
def _field(name, val):
return (b'--' + bnd + b'\r\nContent-Disposition: form-data; name="' +
name.encode() + b'"\r\n\r\n' + val + b'\r\n')
body = (
_field('chat_id', str(ADMIN_TELEGRAM_CHAT_ID).encode()) +
_field('caption', caption.encode()) +
_field('parse_mode', b'HTML') +
_field('reply_markup', kb.encode()) +
b'--' + bnd + b'\r\nContent-Disposition: form-data; name="photo"; filename="slip.jpg"\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + img_bytes + b'\r\n' +
b'--' + bnd + b'--\r\n'
)
req2 = _ur.Request(
f'https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto',
data=body,
headers={'Content-Type': 'multipart/form-data; boundary=' + bnd.decode()})
_ur.urlopen(req2, timeout=15)
sent = True
except Exception as img_e:
print(f'[notify slip] {img_e}')
if not sent:
payload = json.dumps({
'chat_id': ADMIN_TELEGRAM_CHAT_ID,
'text': caption, 'parse_mode': 'HTML',
'reply_markup': json.loads(kb)
}).encode()
req = _ur.Request(
f'https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage',
data=payload, headers={'Content-Type': 'application/json'})
_ur.urlopen(req, timeout=10)
except Exception as e:
print(f'[notify] {e}')
threading.Thread(target=_notify, daemon=True).start()
return jsonify(ok=True,
msg='โœ… Payment แ€แ€„แ€บแ€•แ€ผแ€ฎแ€ธแ€•แ€ซแ€•แ€ผแ€ฎแ‹ Admin แ€…แ€…แ€บแ€†แ€ฑแ€ธแ€•แ€ผแ€ฎแ€ธ Coins แ€‘แ€Šแ€ทแ€บแ€•แ€ฑแ€ธแ€•แ€ซแ€™แ€Šแ€บแ‹',
payment_id=payment_id)
except Exception as e:
return jsonify(ok=False, msg=str(e))
@app.route('/api/coins')
def api_get_coins():
"""Lightweight endpoint for frontend coin polling."""
try:
username = request.args.get('username', '').strip()
if not username:
return jsonify(ok=False, msg='missing username')
db = load_db()
if username not in db['users']:
return jsonify(ok=False, msg='not found')
coins = db['users'][username].get('coins', 0)
return jsonify(ok=True, coins=coins)
except Exception as e:
return jsonify(ok=False, msg=str(e))
@app.route('/api/payment/history')
def api_payment_history():
try:
username = request.args.get('username', '').strip()
if not username:
return jsonify(ok=False, msg='โŒ Username required')
pdb = load_payments_db()
pays = [p for p in pdb['payments'] if p['username'] == username]
# Strip slip_image from history (large base64)
clean = []
for p in pays:
c = dict(p); c.pop('slip_image', None); clean.append(c)
return jsonify(ok=True, payments=clean)
except Exception as e:
return jsonify(ok=False, msg=str(e))
@app.route('/api/admin/payments')
def api_admin_payments():
try:
if request.args.get('caller') != ADMIN_U:
return jsonify(ok=False, msg='โŒ Admin only')
status = request.args.get('status', 'pending')
pdb = load_payments_db()
pays = [p for p in pdb['payments'] if p['status'] == status]
clean = []
for p in pays:
c = dict(p)
# Keep slip_image for admin but truncate if huge
clean.append(c)
return jsonify(ok=True, payments=clean)
except Exception as e:
return jsonify(ok=False, msg=str(e))
@app.route('/api/admin/payment/approve', methods=['POST'])
def api_admin_payment_approve():
try:
d = request.get_json(force=True) or {}
if d.get('caller') != ADMIN_U:
return jsonify(ok=False, msg='โŒ Admin only')
payment_id = d.get('payment_id', '').strip()
pdb = load_payments_db()
pay = next((p for p in pdb['payments'] if p['id'] == payment_id), None)
if not pay:
return jsonify(ok=False, msg='โŒ Payment not found')
if pay['status'] != 'pending':
return jsonify(ok=False, msg=f'โš ๏ธ Already {pay["status"]}')
pay['status'] = 'approved'
pay['updated_at'] = datetime.now().isoformat()
save_payments_db(pdb)
# Add coins
db = load_db()
u = pay['username']
new_bal = 0
if u in db['users']:
db['users'][u]['coins'] = db['users'][u].get('coins', 0) + pay['coins']
db['users'][u]['free_trial'] = False # paid user โ€” remove free trial
new_bal = db['users'][u]['coins']
save_db(db)
# Notify user via Telegram
tg_chat_id = db['users'].get(u, {}).get('tg_chat_id')
if tg_chat_id and TELEGRAM_BOT_TOKEN:
def _notify_user(chat_id, coins_added, balance, pid):
try:
import requests as _req
_req.post(
f'https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage',
json={
'chat_id': chat_id,
'text': (
f'๐ŸŽ‰ *Coins แ€‘แ€Šแ€ทแ€บแ€•แ€ผแ€ฎแ€ธแ€•แ€ซแ€•แ€ผแ€ฎ!*\n'
f'๐Ÿช™ *+{coins_added} Coins* แ€›แ€ฑแ€ฌแ€€แ€บแ€•แ€ผแ€ฎ\n'
f'๐Ÿ’ฐ แ€œแ€€แ€บแ€€แ€ปแ€”แ€บ โ€” *{balance} Coins*\n'
f'๐Ÿ†” `{pid}`'
),
'parse_mode': 'Markdown',
},
timeout=10
)
except Exception as _e:
print(f'[notify user] {_e}')
threading.Thread(target=_notify_user, args=(tg_chat_id, pay['coins'], new_bal, pay['id']), daemon=True).start()
return jsonify(ok=True, msg=f'โœ… Approved +{pay["coins"]} coins โ†’ {u}', new_coins=new_bal)
except Exception as e:
return jsonify(ok=False, msg=str(e))
@app.route('/api/admin/payment/reject', methods=['POST'])
def api_admin_payment_reject():
try:
d = request.get_json(force=True) or {}
if d.get('caller') != ADMIN_U:
return jsonify(ok=False, msg='โŒ Admin only')
payment_id = d.get('payment_id', '').strip()
note = d.get('note', '')
pdb = load_payments_db()
pay = next((p for p in pdb['payments'] if p['id'] == payment_id), None)
if not pay:
return jsonify(ok=False, msg='โŒ Payment not found')
pay['status'] = 'rejected'
pay['admin_note'] = note
pay['updated_at'] = datetime.now().isoformat()
save_payments_db(pdb)
return jsonify(ok=True, msg='โŒ Rejected')
except Exception as e:
return jsonify(ok=False, msg=str(e))
@app.route('/api/admin/payment/slip/<payment_id>')
def api_admin_payment_slip(payment_id):
try:
if request.args.get('caller') != ADMIN_U:
return jsonify(ok=False, msg='โŒ Admin only'), 403
pdb = load_payments_db()
pay = next((p for p in pdb['payments'] if p['id'] == payment_id), None)
if not pay:
return jsonify(ok=False, msg='Not found'), 404
return jsonify(ok=True, slip_image=pay.get('slip_image',''))
except Exception as e:
return jsonify(ok=False, msg=str(e))
@app.route('/api/admin/broadcast', methods=['POST'])
def api_admin_broadcast():
"""Send broadcast message to all users via Telegram bot."""
try:
data = request.get_json(force=True)
caller = data.get('caller', '')
if caller != ADMIN_U:
return jsonify(ok=False, msg='โŒ Admin only'), 403
message = data.get('message', '').strip()
if not message:
return jsonify(ok=False, msg='โŒ Message แ€™แ€‘แ€Šแ€ทแ€บแ€›แ€žแ€ฑแ€ธแ€•แ€ซ')
db = load_db()
token = os.getenv('TELEGRAM_BOT_TOKEN', '')
if not token:
return jsonify(ok=False, msg='โŒ BOT_TOKEN แ€™แ€žแ€แ€บแ€™แ€พแ€แ€บแ€›แ€žแ€ฑแ€ธแ€•แ€ซ')
import urllib.request as _ur, json as _json, threading as _th
sent = 0; fail = 0
for uname, udata in db.get('users', {}).items():
tg_id = udata.get('tg_chat_id')
if not tg_id:
continue
try:
payload = _json.dumps({
'chat_id': tg_id,
'text': f'๐Ÿ“ข *แ€€แ€ผแ€ฑแ€„แ€ผแ€ฌแ€แ€ปแ€€แ€บ*\n\n{message}',
'parse_mode': 'Markdown',
}).encode()
req = _ur.Request(
f'https://api.telegram.org/bot{token}/sendMessage',
data=payload,
headers={'Content-Type': 'application/json'})
_ur.urlopen(req, timeout=8)
sent += 1
except Exception as e:
logger.warning(f'[broadcast] {uname}: {e}')
fail += 1
return jsonify(ok=True, sent=sent, fail=fail,
msg=f'โœ… {sent} แ€šแ€ฑแ€ฌแ€€แ€บ แ€•แ€ญแ€ฏแ€ทแ€•แ€ผแ€ฎแ€ธ โ€” {fail} แ€€แ€ป')
except Exception as e:
return jsonify(ok=False, msg=str(e))
@app.route('/api/payment/kbz_qr')
def api_kbz_qr():
"""
Generate KBZ Pay QR as PNG โ€” uses real KBZ QR payloads per fixed MMK amount.
If KBZ_QR_URL env is set, redirect to that static image instead.
Query: amount=<MMK> (10000 / 18000 / 27000 / 54000)
"""
# Real KBZ Pay QR payloads (binary EMV format, base64+checksum string)
_KBZ_QR_PAYLOADS = {
10000: 'hQZLQlpQYXlhRE8C8FACEFcWCWeYcTUtJgMQEB+fCAQBAZ8kBzEwMDAwLjA=F919d3807b9db=',
18000: 'hQZLQlpQYXlhRE8C8FACEFcWCWeYcTUtJgMQEB+fCAQBAZ8kBzE4MDAwLjA=FF19d38087248=',
27000: 'hQZLQlpQYXlhRE8C8FACEFcWCWeYcTUtJgMQEB+fCAQBAZ8kBzI3MDAwLjA=F419d3808eed3=',
54000: 'hQZLQlpQYXlhRE8C8FACEFcWCWeYcTUtJgMQEB+fCAQBAZ8kBzU0MDAwLjA=F919d380995a0=',
}
try:
if KBZ_QR_URL:
return redirect(KBZ_QR_URL)
amount_str = request.args.get('amount', '0').strip()
try:
amount = int(float(amount_str))
except ValueError:
return jsonify(ok=False, msg='invalid amount'), 400
qr_data = _KBZ_QR_PAYLOADS.get(amount)
if not qr_data:
return jsonify(ok=False, msg=f'No KBZ QR for amount {amount} MMK'), 400
try:
import qrcode as _qr, io
q = _qr.QRCode(
version=None,
error_correction=_qr.constants.ERROR_CORRECT_M,
box_size=10, border=4,
)
q.add_data(qr_data)
q.make(fit=True)
img = q.make_image(fill_color='black', back_color='white')
buf = io.BytesIO()
img.save(buf, format='PNG')
buf.seek(0)
return Response(buf.read(), mimetype='image/png',
headers={'Cache-Control': 'no-store'})
except ImportError:
return jsonify(ok=True, payload=qr_data,
note='pip install qrcode[pil]')
except Exception as e:
return jsonify(ok=False, msg=str(e)), 500
@app.route('/api/payment/truemoney_qr')
def api_truemoney_qr():
"""
Generate TrueMoney Wallet QR as PNG using EMV QR spec (PromptPay Topup format).
Verified against real TrueMoney QR samples.
If TRUEMONEY_QR_URL env is set, redirect to that static image instead.
Query: amount=<THB float>
"""
try:
if TRUEMONEY_QR_URL:
return redirect(TRUEMONEY_QR_URL)
amount_str = request.args.get('amount', '0').strip()
try:
amount = float(amount_str)
except ValueError:
return jsonify(ok=False, msg='invalid amount'), 400
# Normalize phone -> strip leading 0 -> build 15-digit topup ID
phone = TRUEMONEY_NUM.strip().replace('+', '').replace('-', '').replace(' ', '')
phone_digits = phone.lstrip('0')
topup_id = '140000' + phone_digits # e.g. 0951236012 -> 140000951236012
def _f(tag, val):
return f'{tag}{len(val):02d}{val}'
merchant = _f('00', 'A000000677010111') + _f('03', topup_id)
tag29 = _f('29', merchant)
amt_str = f'{amount:.2f}'
tag54 = _f('54', amt_str) if amount > 0 else ''
payload = (
_f('00', '01') +
_f('01', '12') +
tag29 +
_f('53', '764') +
tag54 +
_f('58', 'TH') +
'6304'
)
def _crc16(s: str) -> str:
crc = 0xFFFF
for b in s.encode('ascii'):
crc ^= b << 8
for _ in range(8):
crc = ((crc << 1) ^ 0x1021) if (crc & 0x8000) else (crc << 1)
crc &= 0xFFFF
return format(crc, '04X')
payload += _crc16(payload)
qr_data = payload
try:
import qrcode as _qr, io
q = _qr.QRCode(
version=None,
error_correction=_qr.constants.ERROR_CORRECT_M,
box_size=10, border=4,
)
q.add_data(qr_data)
q.make(fit=True)
img = q.make_image(fill_color='black', back_color='white')
buf = io.BytesIO()
img.save(buf, format='PNG')
buf.seek(0)
return Response(buf.read(), mimetype='image/png',
headers={'Cache-Control': 'no-store'})
except ImportError:
return jsonify(ok=True, payload=qr_data,
note='pip install qrcode[pil]')
except Exception as e:
return jsonify(ok=False, msg=str(e)), 500
@app.route('/api/payment/promptpay_qr')
def api_promptpay_qr():
"""
Generate PromptPay QR (Thai EMV QR spec) โ€” scannable by any Thai banking app.
Query: amount=<float THB>
"""
try:
amount_str = request.args.get('amount', '0').strip()
try:
amount = float(amount_str)
except ValueError:
return jsonify(ok=False, msg='invalid amount'), 400
# โ”€โ”€ Normalize phone โ†’ 0066XXXXXXXXX โ”€โ”€
phone = PROMPTPAY_NUM.strip().replace('+', '').replace('-', '').replace(' ', '')
if phone.startswith('66'):
phone = '0066' + phone[2:]
elif phone.startswith('0'):
phone = '0066' + phone[1:]
else:
phone = '0066' + phone
# โ”€โ”€ EMV TLV helper โ”€โ”€
def f(tag, val):
return f'{tag}{len(val):02d}{val}'
# Tag 29 โ€” PromptPay merchant account (phone)
merchant = f('00', 'A000000677010111') + f('01', phone)
tag29 = f('29', merchant)
# Amount string โ€” strip trailing zeros but keep 2dp if needed
if amount > 0:
amt_str = f'{amount:.2f}' # e.g. "90.00"
# Remove trailing .00 only if whole number โ€” banks accept both
tag54 = f('54', amt_str)
else:
tag54 = ''
payload = (
f('00', '01') + # Payload Format Indicator
f('01', '12') + # Point of Initiation Method (dynamic=12)
tag29 + # Merchant Account Info โ€” PromptPay
f('52', '0000') + # Merchant Category Code
f('53', '764') + # Transaction Currency โ€” THB
tag54 + # Transaction Amount
f('58', 'TH') + # Country Code
f('59', 'PromptPay') +
f('60', 'Bangkok') +
'6304' # CRC tag (value appended below)
)
# โ”€โ”€ CRC-16/CCITT-FALSE โ”€โ”€
def crc16(s: str) -> str:
crc = 0xFFFF
for b in s.encode('ascii'):
crc ^= b << 8
for _ in range(8):
crc = ((crc << 1) ^ 0x1021) if (crc & 0x8000) else (crc << 1)
crc &= 0xFFFF
return format(crc, '04X')
payload += crc16(payload)
# โ”€โ”€ Render QR PNG โ”€โ”€
try:
import qrcode as _qr, io
q = _qr.QRCode(
version=None,
error_correction=_qr.constants.ERROR_CORRECT_M,
box_size=10, border=4,
)
q.add_data(payload)
q.make(fit=True)
img = q.make_image(fill_color='black', back_color='white')
buf = io.BytesIO()
img.save(buf, format='PNG')
buf.seek(0)
return Response(buf.read(), mimetype='image/png',
headers={'Cache-Control': 'no-store'})
except ImportError:
return jsonify(ok=True, payload=payload,
note='pip install qrcode[pil]')
except Exception as e:
return jsonify(ok=False, msg=str(e)), 500
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# โ”€โ”€ SRT MODE โ€” Video โ†’ Gemini โ†’ Myanmar SRT โ†’ burn onto original video โ”€โ”€
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def _norm_digits(s):
"""Normalize non-ASCII digit-lookalikes in timecodes to ASCII 0-9.
Covers:
U+1040-U+1049 Myanmar digits แ€-แ‰
U+101D Myanmar letter wa 'แ€' (Gemini writes this as zero in timecodes)
U+0E50-U+0E59 Thai digits เน-เน™
"""
mm = str.maketrans('\u1040\u1041\u1042\u1043\u1044\u1045\u1046\u1047\u1048\u1049',
'0123456789')
wa = str.maketrans('\u101D', '0') # Myanmar letter wa looks like 0
th = str.maketrans('\u0E50\u0E51\u0E52\u0E53\u0E54\u0E55\u0E56\u0E57\u0E58\u0E59',
'0123456789')
return s.translate(mm).translate(wa).translate(th)
def _tc_to_sec(tc):
"""HH:MM:SS,mmm or HH:MM:SS.mmm โ†’ float seconds. Handles Myanmar/Thai digits."""
tc = _norm_digits(tc).replace(',', '.')
parts = tc.split(':')
h, m = int(parts[0]), int(parts[1])
s = float(parts[2])
return h * 3600 + m * 60 + s
def _sec_to_srt_tc(secs):
"""float seconds โ†’ HH:MM:SS,mmm SRT timecode."""
h = int(secs // 3600)
m = int((secs % 3600) // 60)
s = int(secs % 60)
ms = int(round((secs - int(secs)) * 1000))
return f'{h:02d}:{m:02d}:{s:02d},{ms:03d}'
def _parse_srt(text):
"""Parse SRT content โ†’ list of (index, timecode_str, text).
Normalizes Myanmar/Thai digits to ASCII so ffmpeg can read timecodes."""
blocks = []
for block in re.split(r'\n\s*\n', text.strip()):
lines = block.strip().split('\n')
if len(lines) < 2: continue
try:
idx = int(_norm_digits(lines[0].strip()))
except ValueError:
continue
tc_raw = _norm_digits(lines[1].strip())
txt = '\n'.join(lines[2:]).strip()
if tc_raw and txt:
blocks.append((idx, tc_raw, txt))
return blocks
def _strip_emoji(text):
"""Remove emoji and non-Myanmar/ASCII characters from subtitle text."""
import unicodedata
result = []
for ch in text:
cp = ord(ch)
# Allow: ASCII printable, Myanmar (U+1000โ€“U+109F, U+A9E0โ€“U+A9FF, U+AA60โ€“U+AA7F)
if (0x20 <= cp <= 0x7E or
0x1000 <= cp <= 0x109F or
0xA9E0 <= cp <= 0xA9FF or
0xAA60 <= cp <= 0xAA7F or
ch in '\n\r '):
result.append(ch)
return ''.join(result).strip()
def _build_srt(blocks):
"""Build SRT string from list of (index, timecode, text)."""
parts = []
for idx, tc, txt in blocks:
parts.append(f'{idx}\n{tc}\n{txt}')
return '\n\n'.join(parts) + '\n'
def _gemini_video_to_myanmar_srt(vpath, prog_fn=None):
"""
Upload video to Gemini Files API โ†’ generate Myanmar SRT with timecodes.
Uses google-genai SDK (ggenai).
Returns SRT string.
"""
if ggenai is None:
raise Exception('google-genai package not installed')
_, ordered_keys = next_gemini_key()
if not ordered_keys:
raise Exception('No Gemini API Key')
prompt = (
"You are a professional subtitle generator with frame-accurate timing.\n\n"
"INPUT:\n"
"- One video file containing speech (may be Chinese, Thai, English, or any language).\n\n"
"GOAL:\n"
"- Produce a COMPLETE Burmese (spoken Burmese / everyday colloquial style, NOT formal/literary) "
".SRT covering the ENTIRE video duration.\n"
"- Translate ALL dialog, narration, and speech โ€” do NOT omit anything.\n"
"- Use 100% spoken Burmese style only (แ€•แ€ผแ€ฑแ€ฌแ€†แ€ญแ€ฏแ€žแ€ฑแ€ฌแ€˜แ€ฌแ€žแ€ฌ). Do NOT use formal/written style.\n"
"- Write numbers in Burmese words.\n\n"
"TIMING ACCURACY (CRITICAL โ€” most important rule):\n"
"- Listen to the EXACT moment each word/phrase starts and ends in the audio.\n"
"- The START time MUST match the exact millisecond the speaker begins that subtitle's speech.\n"
"- The END time MUST match the exact millisecond the speech ends (NOT when the next subtitle starts).\n"
"- Do NOT guess or estimate timing โ€” derive timecodes from the actual audio speech boundaries.\n"
"- Do NOT pad end times forward to the next subtitle's start โ€” leave a natural gap of 50โ€“200 ms between subtitles.\n"
"- Do NOT shift timecodes to make subtitles look evenly spaced โ€” preserve the real speech rhythm.\n"
"- Silence or pause gaps in the audio MUST appear as gaps between subtitle blocks, not filled with text.\n"
"- Each subtitle block MUST NOT overlap in time with the next block.\n\n"
"CRITICAL OUTPUT RULES:\n"
"- Output ONLY valid .SRT content (plain text).\n"
"- Do NOT add explanations or comments.\n"
"- Do NOT ask questions.\n"
"- Do NOT stop early due to length limits.\n"
"- Do NOT include emoji in subtitle text.\n"
"- ALWAYS use ENGLISH/ASCII digits (0-9) for timecodes.\n"
"- NEVER use Myanmar digits (แ€-แ‰) or the letter 'แ€'.\n"
"- Example: 00:00:01,240 --> 00:00:02,810 is correct.\n"
"- NEVER write: แ€แ€:แ€แ€:แ€แ,แ‚แ„แ€ or 00:แ€แ€:แ€แ,แ‚แ„แ€\n\n"
"LONG-OUTPUT HANDLING (MANDATORY):\n"
"- If the full SRT cannot fit in a single response:\n"
" 1) Output as much SRT as possible.\n"
" 2) End the response with EXACTLY this line:\n"
" [CONTINUE]\n"
" 3) Stop immediately after that line.\n"
"- When the user replies with \"continue\", resume from the NEXT subtitle number.\n"
"- Never repeat or reset subtitle numbers.\n"
"- Continue this process until the FINAL subtitle is output.\n"
"- The LAST response must NOT include [CONTINUE].\n\n"
"FORMAT (MANDATORY, exactly like this example โ€” use real measured timecodes):\n"
"1\n"
"00:00:01,240 --> 00:00:02,810\n"
"แ€™แ€ผแ€”แ€บแ€™แ€ฌ subtitle text\n\n"
"2\n"
"00:00:03,050 --> 00:00:04,630\n"
"แ€”แ€ฑแ€ฌแ€€แ€บ subtitle text\n\n"
"SEGMENTATION RULES:\n"
"- Prefer many short subtitles over fewer long ones.\n"
"- Split at natural pauses, breaths, or speaker changes.\n"
"- Avoid subtitles longer than ~3 seconds unless a single sentence requires it.\n"
"- Each subtitle should display for at least 0.5 seconds.\n"
"- Do not leave large timing gaps unless the audio is genuinely silent.\n\n"
"VALIDATION:\n"
"- Start time must always be strictly less than end time.\n"
"- No two subtitle blocks may overlap in time.\n"
"- If any text appears outside SRT structure (except [CONTINUE]), the output is INVALID.\n\n"
"BEGIN OUTPUT."
)
last_err = None
for api_key in ordered_keys:
uploaded_file = None
try:
client = ggenai.Client(api_key=api_key)
# โ”€โ”€ Upload video file โ”€โ”€
if prog_fn: prog_fn(None, '๐Ÿ“ค Video Gemini แ€‘แ€ฒ upload แ€œแ€ฏแ€•แ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ')
print(f'[srt_gemini] uploading {vpath} ({os.path.getsize(vpath)//1024}KB)')
with open(vpath, 'rb') as f:
uploaded_file = client.files.upload(
file=f,
config={'mime_type': 'video/mp4', 'display_name': 'srt_input'}
)
# Wait for processing
if prog_fn: prog_fn(None, 'โณ Gemini video processingโ€ฆ')
for _ in range(60):
finfo = client.files.get(name=uploaded_file.name)
if finfo.state.name == 'ACTIVE':
break
if finfo.state.name == 'FAILED':
raise Exception('Gemini file processing FAILED')
time.sleep(2)
else:
raise Exception('Gemini file processing timeout')
# โ”€โ”€ Generate SRT โ”€โ”€
if prog_fn: prog_fn(None, '๐Ÿค– Gemini Myanmar SRT แ€‘แ€ฏแ€แ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ')
response = client.models.generate_content(
model='gemini-3-flash',
contents=[
gtypes.Part.from_uri(file_uri=uploaded_file.uri, mime_type='video/mp4'),
prompt,
],
config=gtypes.GenerateContentConfig(
max_output_tokens=65536,
thinking_config=gtypes.ThinkingConfig(thinking_level="minimal"),
)
)
raw = response.text.strip() if response.text else ''
# Strip any markdown fences
raw = re.sub(r'^```[a-z]*\n?', '', raw, flags=re.MULTILINE)
raw = re.sub(r'\n?```$', '', raw, flags=re.MULTILINE)
# Strip [CONTINUE] marker if model included it
raw = re.sub(r'\[CONTINUE\]\s*$', '', raw.strip()).strip()
if not raw:
raise Exception('Gemini returned empty SRT')
# โ”€โ”€ Normalize Myanmar/Thai digits in timecode lines only โ”€โ”€
# Walk line by line: normalize digits on lines that contain '-->'
fixed_lines = []
for line in raw.splitlines():
if '-->' in line:
fixed_lines.append(_norm_digits(line))
elif re.match(r'^[แ€-แ‰\d]+\s*$', line.strip()):
# subtitle index line with Myanmar digits
fixed_lines.append(_norm_digits(line))
else:
fixed_lines.append(line)
raw = '\n'.join(fixed_lines)
# Validate โ€” must have at least one timecode
if '-->' not in raw:
raise Exception(f'Gemini output has no SRT timecodes: {raw[:200]}')
print(f'[srt_gemini] SRT generated, {len(raw)} chars, key=...{api_key[-6:]}')
return raw
except Exception as e:
last_err = e
print(f'[srt_gemini] key failed: {e}')
continue
finally:
# Clean up uploaded file
if uploaded_file:
try:
client.files.delete(name=uploaded_file.name)
except:
pass
raise Exception(f'โŒ Gemini SRT generation all keys failed: {last_err}')
# โ”€โ”€ /api/generate_srt โ€” Non-blocking: download video + Gemini SRT โ”€โ”€
@app.route('/api/generate_srt', methods=['POST'])
def api_generate_srt():
"""
Step 1: Download video (or use upload/cache) โ†’ Gemini generates Myanmar SRT.
Non-blocking: returns tid immediately, use /api/progress/<tid> for updates.
Result in job_progress[tid]: {done, srt, total, coins, cache_key}
"""
try:
u = (request.form.get('username') or '').strip()
if not u:
return jsonify(ok=False, msg='โŒ Not logged in')
is_adm = (u == ADMIN_U)
if not is_adm and load_db()['users'].get(u, {}).get('banned'):
return jsonify(ok=False, msg='โŒ Account banned')
if not is_adm and get_coins(u) < 1:
return jsonify(ok=False, msg='โŒ Not enough coins (need 1)')
video_url = (request.form.get('video_url') or '').strip()
cache_key = (request.form.get('cache_key') or '').strip()
client_tid = (request.form.get('tid') or '').strip()
# Read uploaded video bytes
video_bytes = None; video_fname = None
vf = request.files.get('video_file')
if vf and vf.filename:
video_bytes = vf.read()
video_fname = vf.filename
tid = client_tid or uuid.uuid4().hex[:8]
job_progress[tid] = {'pct': 2, 'msg': 'โณ แ€แ€”แ€บแ€ธแ€…แ€ฎแ€…แ€ฑแ€ฌแ€„แ€ทแ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ', 'done': False}
def _prog(pct, msg):
cur = job_progress.get(tid, {})
job_progress[tid] = {
'pct': pct if pct is not None else cur.get('pct', 2),
'msg': msg, 'done': False
}
def _bg():
tmp_dir = str(BASE_DIR / f'temp_srt_{tid}')
os.makedirs(tmp_dir, exist_ok=True)
vpath = None
try:
# โ”€โ”€ Stage 1: Get video โ”€โ”€
if video_bytes:
vpath = f'{tmp_dir}/input.mp4'
with open(vpath, 'wb') as wf: wf.write(video_bytes)
_prog(10, '๐Ÿ“ Video file แ€กแ€žแ€„แ€ทแ€บแ€–แ€ผแ€…แ€บแ€•แ€ผแ€ฎ')
elif cache_key:
with _preview_cache_lock:
cached = _preview_cache.get(cache_key)
if cached and os.path.exists(cached['file']):
vpath = cached['file']
_prog(10, '๐Ÿ“ Cached video แ€กแ€žแ€„แ€ทแ€บแ€–แ€ผแ€…แ€บแ€•แ€ผแ€ฎ')
elif video_url:
def _dl():
out_tmpl = f'{tmp_dir}/input.%(ext)s'
ytdlp_download(out_tmpl, video_url)
found = glob.glob(f'{tmp_dir}/input.*')
return found[0] if found else None
vpath = run_stage('download', _dl, tid, _prog,
'โณ Download แ€แ€”แ€บแ€ธแ€…แ€ฎแ€…แ€ฑแ€ฌแ€„แ€ทแ€บแ€”แ€ฑแ€žแ€Šแ€บ', '๐Ÿ“ฅ Video แ€’แ€ฑแ€ซแ€„แ€บแ€ธแ€œแ€ฏแ€•แ€บแ€œแ€ฏแ€•แ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ')
elif video_url:
def _dl2():
out_tmpl = f'{tmp_dir}/input.%(ext)s'
ytdlp_download(out_tmpl, video_url)
found = glob.glob(f'{tmp_dir}/input.*')
return found[0] if found else None
vpath = run_stage('download', _dl2, tid, _prog,
'โณ Download แ€แ€”แ€บแ€ธแ€…แ€ฎแ€…แ€ฑแ€ฌแ€„แ€ทแ€บแ€”แ€ฑแ€žแ€Šแ€บ', '๐Ÿ“ฅ Video แ€’แ€ฑแ€ซแ€„แ€บแ€ธแ€œแ€ฏแ€•แ€บแ€œแ€ฏแ€•แ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ')
if not vpath or not os.path.exists(vpath):
job_progress[tid] = {'pct': 0, 'msg': 'โŒ Video แ€™แ€แ€ฝแ€ฑแ€ทแ€•แ€ซ', 'error': True}
return
_prog(25, '๐Ÿ“ค Gemini แ€‘แ€ฒ video upload แ€œแ€ฏแ€•แ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ')
# โ”€โ”€ Stage 2: Gemini SRT โ”€โ”€
def _gen_srt():
return _gemini_video_to_myanmar_srt(vpath, prog_fn=_prog)
srt_text = run_stage('ai', _gen_srt, tid, _prog,
'โณ AI แ€แ€”แ€บแ€ธแ€…แ€ฎแ€…แ€ฑแ€ฌแ€„แ€ทแ€บแ€”แ€ฑแ€žแ€Šแ€บ', '๐Ÿค– Gemini SRT แ€‘แ€ฏแ€แ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ')
blocks = _parse_srt(srt_text)
total = len(blocks)
print(f'[generate_srt] {total} subtitle blocks for user={u}')
# Cache video path for burn step (reuse same tmp_dir)
# Store in a temp file so burn can find it
vpath_file = str(BASE_DIR / f'srt_vpath_{tid}.txt')
with open(vpath_file, 'w') as f:
f.write(vpath)
# Deduct coin
rem = -1
if not is_adm:
_, rem = deduct(u, 1)
job_progress[tid] = {
'pct': 100, 'done': True,
'msg': f'โœ… Myanmar SRT แ€•แ€ผแ€ฎแ€ธแ€•แ€ซแ€•แ€ผแ€ฎ! ({total} lines)',
'srt': srt_text,
'total': total,
'coins': rem,
'vpath_key': tid, # burn step uses this
}
except Exception as e:
import traceback; traceback.print_exc()
job_progress[tid] = {'pct': 0, 'msg': f'โŒ {e}', 'error': True}
finally:
# Don't delete tmp_dir yet โ€” burn step needs the video
pass
threading.Thread(target=_bg, daemon=True).start()
return jsonify(ok=True, tid=tid)
except Exception as e:
import traceback; traceback.print_exc()
return jsonify(ok=False, msg=f'โŒ {e}')
@app.route('/api/burn_srt', methods=['POST'])
def api_burn_srt():
"""
Step 2: Burn Myanmar SRT onto video with original audio preserved.
Input: srt_text, vpath_key (tid from generate_srt), sub settings, crop/flip/color
No coin deduction โ€” already paid in generate_srt step.
Non-blocking: returns tid for /api/progress/<tid>.
"""
try:
u = (request.form.get('username') or '').strip()
if not u:
return jsonify(ok=False, msg='โŒ Not logged in')
is_adm = (u == ADMIN_U)
srt_text = (request.form.get('srt_text') or '').strip()
vpath_key = (request.form.get('vpath_key') or '').strip()
sub_pos = int(request.form.get('sub_pos', 85))
sub_size = float(request.form.get('sub_size', 0.0547)) # fraction of play_res_y
sub_color = request.form.get('sub_color', 'white')
sub_style = request.form.get('sub_style', 'outline')
crop = request.form.get('crop', 'original')
flip = request.form.get('flip', '0') == '1'
col = request.form.get('color', '0') == '1'
# Blur box
blur_enabled = request.form.get('blur_enabled', '0') == '1'
blur_xp = float(request.form.get('blur_xp') or 0)
blur_yp = float(request.form.get('blur_yp') or 0)
blur_wp = float(request.form.get('blur_wp') or 0)
blur_hp = float(request.form.get('blur_hp') or 0)
# Zoom
zoom_enabled = request.form.get('zoom_enabled', '0') == '1'
zoom_factor = float(request.form.get('zoom_factor', 1.03))
zoom_factor = max(1.01, min(zoom_factor, 1.30))
# Watermark
wmk = (request.form.get('watermark') or '').strip()
wmk_xp_raw = request.form.get('wmk_xp')
wmk_yp_raw = request.form.get('wmk_yp')
wmk_fontsize = int(request.form.get('wmk_fontsize', 28))
wmk_xp = float(wmk_xp_raw) if wmk_xp_raw else None
wmk_yp = float(wmk_yp_raw) if wmk_yp_raw else None
# Logo
logo_bytes = None; logo_fname = None
lf = request.files.get('logo_file')
if lf and lf.filename: logo_bytes = lf.read(); logo_fname = lf.filename
logo_xp_raw = request.form.get('logo_xp')
logo_yp_raw = request.form.get('logo_yp')
logo_xp = float(logo_xp_raw) if logo_xp_raw else None
logo_yp = float(logo_yp_raw) if logo_yp_raw else None
_logo_wp = float(request.form.get('logo_wp') or 0)
if not srt_text:
return jsonify(ok=False, msg='โŒ SRT text แ€™แ€›แ€พแ€ญแ€•แ€ซ')
# Read uploaded video bytes (fallback if vpath_key not available)
video_url = (request.form.get('video_url') or '').strip()
video_bytes = None; video_fname = None
vf = request.files.get('video_file')
if vf and vf.filename:
video_bytes = vf.read()
video_fname = vf.filename
client_tid = (request.form.get('tid') or '').strip()
tid = client_tid or uuid.uuid4().hex[:8]
job_progress[tid] = {'pct': 2, 'msg': 'โณ Render แ€แ€”แ€บแ€ธแ€…แ€ฎแ€…แ€ฑแ€ฌแ€„แ€ทแ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ', 'done': False}
def _prog(pct, msg):
cur = job_progress.get(tid, {})
job_progress[tid] = {
'pct': pct if pct is not None else cur.get('pct', 2),
'msg': msg, 'done': False
}
def _bg():
tmp_dir = str(BASE_DIR / f'temp_burn_{tid}')
os.makedirs(tmp_dir, exist_ok=True)
out_file = str(OUTPUT_DIR / f'final_{tid}.mp4')
vpath = None
try:
# โ”€โ”€ Get video โ”€โ”€
# Try cached vpath from generate_srt step
if vpath_key:
vpath_file = str(BASE_DIR / f'srt_vpath_{vpath_key}.txt')
if os.path.exists(vpath_file):
with open(vpath_file) as f:
cached_vpath = f.read().strip()
if os.path.exists(cached_vpath):
vpath = cached_vpath
_prog(10, '๐Ÿ“ Video cached โ€” แ€กแ€žแ€„แ€ทแ€บแ€–แ€ผแ€…แ€บแ€•แ€ผแ€ฎ')
try: os.remove(vpath_file)
except: pass
if not vpath:
if video_bytes:
vpath = f'{tmp_dir}/input.mp4'
with open(vpath, 'wb') as wf: wf.write(video_bytes)
_prog(10, '๐Ÿ“ Video file แ€กแ€žแ€„แ€ทแ€บแ€–แ€ผแ€…แ€บแ€•แ€ผแ€ฎ')
elif video_url:
def _dl():
out_tmpl = f'{tmp_dir}/input.%(ext)s'
ytdlp_download(out_tmpl, video_url)
found = glob.glob(f'{tmp_dir}/input.*')
return found[0] if found else None
vpath = run_stage('download', _dl, tid, _prog,
'โณ Download แ€แ€”แ€บแ€ธแ€…แ€ฎ', '๐Ÿ“ฅ Video แ€’แ€ฑแ€ซแ€„แ€บแ€ธแ€œแ€ฏแ€•แ€บโ€ฆ')
if not vpath or not os.path.exists(vpath):
job_progress[tid] = {'pct': 0, 'msg': 'โŒ Video แ€™แ€แ€ฝแ€ฑแ€ทแ€•แ€ซ', 'error': True}
return
# โ”€โ”€ Validate SRT โ”€โ”€
if '-->' not in srt_text:
job_progress[tid] = {'pct': 0, 'msg': 'โŒ SRT parse แ€™แ€›แ€•แ€ซ', 'error': True}
return
vd = dur(vpath)
if vd <= 0:
job_progress[tid] = {'pct': 0, 'msg': 'โŒ Video duration read failed', 'error': True}
return
_prog(20, '๐ŸŽฌ Video แ€•แ€ผแ€„แ€บแ€†แ€„แ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ')
# โ”€โ”€ Probe original video dims for blur โ”€โ”€
try:
_probe2 = subprocess.run(
f'ffprobe -v error -select_streams v:0 '
f'-show_entries stream=width,height '
f'-of csv=s=x:p=0 "{vpath}"',
shell=True, capture_output=True, text=True, timeout=15)
orig_w, orig_h = map(int, _probe2.stdout.strip().split('x'))
except Exception:
orig_w, orig_h = 1920, 1080
# โ”€โ”€ Compute blur coords โ”€โ”€
blur_x = int(blur_xp * orig_w) if blur_enabled and blur_wp > 0 else 0
blur_y = int(blur_yp * orig_h) if blur_enabled and blur_hp > 0 else 0
blur_w = int(blur_wp * orig_w) if blur_enabled and blur_wp > 0 else 0
blur_h = int(blur_hp * orig_h) if blur_enabled and blur_hp > 0 else 0
# โ”€โ”€ Apply crop/flip/color/zoom/blur, keep original audio โ”€โ”€
pre_out = f'{tmp_dir}/pre.mp4'
base_vf = ['setpts=PTS-STARTPTS']
if flip: base_vf.append('hflip')
if col: base_vf.append('eq=brightness=0.06:contrast=1.2:saturation=1.4')
if zoom_enabled:
zf = zoom_factor
base_vf.append(
f'scale=iw*{zf:.3f}:ih*{zf:.3f},'
f'crop=iw/{zf:.3f}:ih/{zf:.3f}'
f':(iw-iw/{zf:.3f})/2:(ih-ih/{zf:.3f})/2'
)
base_vf.append('format=yuv420p')
# Blur box (applied pre-crop, against original dims scaled by zoom)
if blur_enabled and blur_w > 0 and blur_h > 0:
eff_w = orig_w / zoom_factor if zoom_enabled else orig_w
eff_h = orig_h / zoom_factor if zoom_enabled else orig_h
bx = max(0, min(blur_x, eff_w - 10))
by = max(0, min(blur_y, eff_h - 10))
bw = max(10, min(blur_w, eff_w - bx))
bh = max(10, min(blur_h, eff_h - by))
_br = max(1, min(10, bw // 4, bh // 4))
base_vf.append(
f'split[_bA][_bB];'
f'[_bB]crop={bw}:{bh}:{bx}:{by},boxblur={_br}:{_br}[_bBl];'
f'[_bA][_bBl]overlay={bx}:{by}'
)
if crop == '9:16':
base_vf.append('scale=720:1280:force_original_aspect_ratio=increase,crop=720:1280')
elif crop == '16:9':
base_vf.append('scale=1280:720:force_original_aspect_ratio=increase,crop=1280:720')
elif crop == '1:1':
base_vf.append('scale=720:720:force_original_aspect_ratio=increase,crop=720:720')
# else: original โ€” no crop
vf_chain = ','.join(base_vf)
def _render():
_run_ffmpeg(
f'ffmpeg -y -hide_banner -loglevel error '
f'-i "{vpath}" '
f'-vf "{vf_chain}" '
f'-c:v libx264 -crf 24 -preset ultrafast -pix_fmt yuv420p '
f'-c:a copy -t {vd:.3f} "{pre_out}"',
timeout=600
)
run_stage('ffmpeg', _render, tid, _prog,
'โณ Render แ€แ€”แ€บแ€ธแ€…แ€ฎ', '๐ŸŽฌ Video render แ€œแ€ฏแ€•แ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ')
# โ”€โ”€ Compute logo/watermark pixel positions โ”€โ”€
logo_path = None
if logo_bytes and logo_fname:
ext = Path(logo_fname).suffix or '.png'
logo_path = f'{tmp_dir}/logo{ext}'
with open(logo_path, 'wb') as wf: wf.write(logo_bytes)
logo_w_px = int(_logo_wp * orig_w) if _logo_wp > 0 else int(orig_w * 0.12)
logo_w_px = max(20, min(logo_w_px, orig_w))
logo_x = int(logo_xp * orig_w) if logo_xp is not None else (orig_w - logo_w_px - 20)
logo_y = int(logo_yp * orig_h) if logo_yp is not None else 20
wmk_x = int(wmk_xp * orig_w) if wmk_xp is not None else None
wmk_y = int(wmk_yp * orig_h) if wmk_yp is not None else None
# โ”€โ”€ Logo overlay โ”€โ”€
if logo_path and os.path.exists(logo_path):
logo_out = f'{tmp_dir}/logo_out.mp4'
lx = max(0, logo_x)
ly = max(0, logo_y)
lw = max(20, logo_w_px)
_run_ffmpeg(
f'ffmpeg -y -hide_banner -loglevel error '
f'-i "{pre_out}" -i "{logo_path}" '
f'-filter_complex "[1:v]scale={lw}:-2[_lg];[0:v][_lg]overlay={lx}:{ly}[v_out]" '
f'-map "[v_out]" -map "0:a" '
f'-c:v libx264 -crf 24 -preset ultrafast -pix_fmt yuv420p '
f'-c:a copy "{logo_out}"',
timeout=300
)
os.replace(logo_out, pre_out)
# โ”€โ”€ Watermark drawtext โ”€โ”€
if wmk:
wmk_out = f'{tmp_dir}/wmk_out.mp4'
fs = max(16, int(wmk_fontsize))
txt = wmk.replace("'", "").replace(":", "").replace("\\", "")
wx = wmk_x if wmk_x is not None else (orig_w - 220)
wy = wmk_y if wmk_y is not None else (orig_h - 80)
_run_ffmpeg(
f'ffmpeg -y -hide_banner -loglevel error '
f'-i "{pre_out}" '
f'-vf "drawtext=text=\'{txt}\':x={wx}:y={wy}:'
f'fontsize={fs}:fontcolor=white:shadowcolor=black:shadowx=2:shadowy=2" '
f'-c:v libx264 -crf 24 -preset ultrafast -pix_fmt yuv420p '
f'-c:a copy "{wmk_out}"',
timeout=300
)
os.replace(wmk_out, pre_out)
# โ”€โ”€ Burn subtitles directly from SRT โ”€โ”€
_prog(80, '๐Ÿ”ค Subtitle burn แ€œแ€ฏแ€•แ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ')
_CROP_RES_MAP = {'9:16':(720,1280),'16:9':(1280,720),'1:1':(720,720)}
_prx, _pry = _CROP_RES_MAP.get(crop, (0, 0))
if _prx == 0:
try:
import json as _json
_pr = subprocess.run(
f'ffprobe -v error -select_streams v:0 -show_entries stream=width,height -of json "{pre_out}"',
shell=True, capture_output=True, text=True, timeout=30)
_vinfo = _json.loads(_pr.stdout)
_s = _vinfo['streams'][0]
_prx, _pry = int(_s['width']), int(_s['height'])
except Exception:
_prx, _pry = 720, 1280
_sub_fs2 = max(20, round(sub_size * _pry))
_burn_srt_direct(pre_out, srt_text, out_file,
position=sub_pos, fontsize=_sub_fs2,
color=sub_color, style=sub_style,
tmp_dir=tmp_dir,
play_res_x=_prx, play_res_y=_pry)
output_url = f'/outputs/final_{tid}.mp4'
job_progress[tid] = {
'pct': 100, 'done': True,
'msg': 'โœ… แ€•แ€ผแ€ฎแ€ธแ€•แ€ซแ€•แ€ผแ€ฎ!',
'output_url': output_url,
}
# โ”€โ”€ Save video history โ”€โ”€
try:
save_video_history_entry(u, {
'tid': tid,
'output_url': output_url,
'title': '(Translate SRT)',
'source_url': video_url or '',
'ts': time.time(),
'created_at': datetime.now().strftime('%Y-%m-%d %H:%M'),
})
except Exception as _he:
print(f'โš ๏ธ history save failed: {_he}')
except Exception as e:
import traceback; traceback.print_exc()
job_progress[tid] = {'pct': 0, 'msg': f'โŒ {e}', 'error': True}
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
threading.Thread(target=_bg, daemon=True).start()
return jsonify(ok=True, tid=tid)
except Exception as e:
import traceback; traceback.print_exc()
return jsonify(ok=False, msg=f'โŒ {e}')
# โ”€โ”€ /api/process_srt โ€” ONE-CLICK: Download โ†’ Gemini SRT โ†’ Render โ”€โ”€
@app.route('/api/process_srt', methods=['POST'])
def api_process_srt():
"""
ONE-CLICK SRT mode: Download video โ†’ Gemini Myanmar SRT โ†’ Render with subtitles.
Supports: zoom, audio boost, blur box, logo, watermark, crop/flip/color.
Stages: 'download' โ†’ 'ai' โ†’ 'ffmpeg'
"""
try:
u = (request.form.get('username') or '').strip()
if not u:
return jsonify(ok=False, msg='โŒ Not logged in')
is_adm = (u == ADMIN_U)
if not is_adm and load_db()['users'].get(u, {}).get('banned'):
return jsonify(ok=False, msg='โŒ Account banned')
if not is_adm and get_coins(u) < 1:
return jsonify(ok=False, msg='โŒ Not enough coins (need 1)')
video_url = (request.form.get('video_url') or '').strip()
cache_key = (request.form.get('cache_key') or '').strip()
# Subtitle settings
sub_pos = int(request.form.get('sub_pos', 85))
sub_size = float(request.form.get('sub_size', 0.0547)) # fraction of play_res_y
sub_color = request.form.get('sub_color', 'white')
sub_style = request.form.get('sub_style', 'outline')
# Video settings
crop = request.form.get('crop', 'original')
flip = request.form.get('flip', '0') == '1'
col = request.form.get('color', '0') == '1'
# Zoom (gentle, copyright-safe)
zoom_enabled = request.form.get('zoom_enabled', '0') == '1'
zoom_factor = float(request.form.get('zoom_factor', 1.03))
zoom_factor = max(1.01, min(zoom_factor, 1.30)) # clamp to match UI slider
# Audio boost
audio_boost = request.form.get('audio_boost', '0') == '1'
# Blur box
blur_enabled = request.form.get('blur_enabled', '0') == '1'
blur_xp = float(request.form.get('blur_xp') or 0)
blur_yp = float(request.form.get('blur_yp') or 0)
blur_wp = float(request.form.get('blur_wp') or 0)
blur_hp = float(request.form.get('blur_hp') or 0)
# Watermark
wmk = (request.form.get('watermark') or '').strip()
wmk_xp_raw = request.form.get('wmk_xp')
wmk_yp_raw = request.form.get('wmk_yp')
wmk_fontsize = int(request.form.get('wmk_fontsize', 35))
wmk_xp = float(wmk_xp_raw) if wmk_xp_raw else None
wmk_yp = float(wmk_yp_raw) if wmk_yp_raw else None
# Logo
logo_bytes = None; logo_fname = None
lf = request.files.get('logo_file')
if lf and lf.filename: logo_bytes = lf.read(); logo_fname = lf.filename
logo_xp_raw = request.form.get('logo_xp')
logo_yp_raw = request.form.get('logo_yp')
logo_xp = float(logo_xp_raw) if logo_xp_raw else None
logo_yp = float(logo_yp_raw) if logo_yp_raw else None
_logo_wp = float(request.form.get('logo_wp') or 0)
# Video file upload
video_bytes = None
vf = request.files.get('video_file')
if vf and vf.filename: video_bytes = vf.read()
client_tid = (request.form.get('tid') or '').strip()
tid = client_tid or uuid.uuid4().hex[:8]
job_progress[tid] = {'pct': 2, 'msg': 'โณ แ€แ€”แ€บแ€ธแ€…แ€ฎแ€…แ€ฑแ€ฌแ€„แ€ทแ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ', 'done': False}
def _prog(pct, msg):
cur = job_progress.get(tid, {})
job_progress[tid] = {
'pct': pct if pct is not None else cur.get('pct', 2),
'msg': msg, 'done': False
}
def _bg():
tmp_dir = str(BASE_DIR / f'temp_psrt_{tid}')
os.makedirs(tmp_dir, exist_ok=True)
out_file = str(OUTPUT_DIR / f'final_{tid}.mp4')
vpath = None; logo_path = None
try:
# โ”€โ”€ Stage 1: Get video โ”€โ”€
if video_bytes:
vpath = f'{tmp_dir}/input.mp4'
with open(vpath, 'wb') as wf: wf.write(video_bytes)
_prog(8, '๐Ÿ“ Video file แ€กแ€žแ€„แ€ทแ€บแ€–แ€ผแ€…แ€บแ€•แ€ผแ€ฎ')
elif cache_key:
with _preview_cache_lock:
cached = _preview_cache.get(cache_key)
if cached and os.path.exists(cached['file']):
vpath = cached['file']
_prog(8, '๐Ÿ“ Cached video แ€กแ€žแ€„แ€ทแ€บแ€–แ€ผแ€…แ€บแ€•แ€ผแ€ฎ')
elif video_url:
def _dl():
out_tmpl = f'{tmp_dir}/input.%(ext)s'
ytdlp_download(out_tmpl, video_url)
found = glob.glob(f'{tmp_dir}/input.*')
return found[0] if found else None
vpath = run_stage('download', _dl, tid, _prog,
'โณ Download แ€แ€”แ€บแ€ธแ€…แ€ฎ', '๐Ÿ“ฅ Video แ€’แ€ฑแ€ซแ€„แ€บแ€ธแ€œแ€ฏแ€•แ€บโ€ฆ')
elif video_url:
def _dl2():
out_tmpl = f'{tmp_dir}/input.%(ext)s'
ytdlp_download(out_tmpl, video_url)
found = glob.glob(f'{tmp_dir}/input.*')
return found[0] if found else None
vpath = run_stage('download', _dl2, tid, _prog,
'โณ Download แ€แ€”แ€บแ€ธแ€…แ€ฎ', '๐Ÿ“ฅ Video แ€’แ€ฑแ€ซแ€„แ€บแ€ธแ€œแ€ฏแ€•แ€บโ€ฆ')
if not vpath or not os.path.exists(vpath):
job_progress[tid] = {'pct': 0, 'msg': 'โŒ Video แ€™แ€แ€ฝแ€ฑแ€ทแ€•แ€ซ', 'error': True}
return
# โ”€โ”€ Probe original video dims โ”€โ”€
try:
_probe = subprocess.run(
f'ffprobe -v error -select_streams v:0 '
f'-show_entries stream=width,height '
f'-of csv=s=x:p=0 "{vpath}"',
shell=True, capture_output=True, text=True, timeout=15)
orig_w, orig_h = map(int, _probe.stdout.strip().split('x'))
except Exception:
orig_w, orig_h = 1920, 1080
# โ”€โ”€ Compute blur coords from percentages โ”€โ”€
blur_x = int(blur_xp * orig_w) if blur_enabled and blur_wp > 0 else 0
blur_y = int(blur_yp * orig_h) if blur_enabled and blur_hp > 0 else 0
blur_w = int(blur_wp * orig_w) if blur_enabled and blur_wp > 0 else 0
blur_h = int(blur_hp * orig_h) if blur_enabled and blur_hp > 0 else 0
# โ”€โ”€ Compute logo size โ”€โ”€
logo_w_px = int(_logo_wp * orig_w) if _logo_wp > 0 else int(orig_w * 0.12)
logo_w_px = max(20, min(logo_w_px, orig_w))
# โ”€โ”€ Compute watermark / logo pixel positions โ”€โ”€
wmk_x = int(wmk_xp * orig_w) if wmk_xp is not None else None
wmk_y = int(wmk_yp * orig_h) if wmk_yp is not None else None
logo_x = int(logo_xp * orig_w) if logo_xp is not None else (orig_w - logo_w_px - 20)
logo_y = int(logo_yp * orig_h) if logo_yp is not None else 20
# โ”€โ”€ Save logo file โ”€โ”€
if logo_bytes and logo_fname:
ext = Path(logo_fname).suffix or '.png'
logo_path = f'{tmp_dir}/logo{ext}'
with open(logo_path, 'wb') as wf: wf.write(logo_bytes)
# โ”€โ”€ Stage 2: Gemini SRT โ”€โ”€
def _gen_srt():
return _gemini_video_to_myanmar_srt(vpath, prog_fn=_prog)
srt_text = run_stage('ai', _gen_srt, tid, _prog,
'โณ AI แ€แ€”แ€บแ€ธแ€…แ€ฎ', '๐Ÿค– Gemini SRT แ€‘แ€ฏแ€แ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ')
blocks = _parse_srt(srt_text)
if not blocks:
job_progress[tid] = {'pct': 0, 'msg': 'โŒ SRT parse แ€™แ€›แ€•แ€ซ', 'error': True}
return
vd = dur(vpath)
if vd <= 0:
job_progress[tid] = {'pct': 0, 'msg': 'โŒ Video duration read failed', 'error': True}
return
_prog(65, '๐ŸŽฌ Video render แ€•แ€ผแ€„แ€บแ€†แ€„แ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ')
# โ”€โ”€ Stage 3: Render โ€” build filter chain โ”€โ”€
pre_out = f'{tmp_dir}/pre.mp4'
# Video filter chain
base_vf = ['setpts=PTS-STARTPTS']
if flip: base_vf.append('hflip')
if col: base_vf.append('eq=brightness=0.06:contrast=1.2:saturation=1.4')
# Gentle zoom (subtle slow zoom, copyright-safe)
if zoom_enabled:
zf = zoom_factor
# zoompan: slow drift across video duration, max scale=zf
base_vf.append(
f'scale=iw*{zf:.3f}:ih*{zf:.3f},'
f'crop=iw/{zf:.3f}:ih/{zf:.3f}'
f':(iw-iw/{zf:.3f})/2:(ih-ih/{zf:.3f})/2'
)
base_vf.append('format=yuv420p')
# Blur box (applied after zoom crop, so coords must be scaled down by zf)
if blur_enabled and blur_w > 0 and blur_h > 0:
# After zoom, effective dimensions = orig/zf
eff_w = orig_w / zf if zoom_enabled else orig_w
eff_h = orig_h / zf if zoom_enabled else orig_h
bx = max(0, min(blur_x, eff_w - 10))
by = max(0, min(blur_y, eff_h - 10))
bw = max(10, min(blur_w, eff_w - bx))
bh = max(10, min(blur_h, eff_h - by))
_br = max(1, min(10, bw // 4, bh // 4))
base_vf.append(
f'split[_bA][_bB];'
f'[_bB]crop={bw}:{bh}:{bx}:{by},boxblur={_br}:{_br}[_bBl];'
f'[_bA][_bBl]overlay={bx}:{by}'
)
# Crop
if crop == '9:16':
base_vf.append('scale=720:1280:force_original_aspect_ratio=increase,crop=720:1280')
elif crop == '16:9':
base_vf.append('scale=1280:720:force_original_aspect_ratio=increase,crop=1280:720')
elif crop == '1:1':
base_vf.append('scale=720:720:force_original_aspect_ratio=increase,crop=720:720')
vf_chain = ','.join(base_vf)
# Audio filter
if audio_boost:
af_chain = (
'highpass=f=200,'
'lowpass=f=8000,'
'equalizer=f=3000:width_type=o:width=2:g=5,'
'equalizer=f=200:width_type=o:width=1:g=-4,'
'dynaudnorm=f=150:g=15,'
'volume=2.2,'
'loudnorm=I=-14:TP=-1.5:LRA=11'
)
else:
af_chain = 'acopy'
def _render():
_run_ffmpeg(
f'ffmpeg -y -hide_banner -loglevel error '
f'-i "{vpath}" '
f'-vf "{vf_chain}" '
f'-af "{af_chain}" '
f'-c:v libx264 -crf 24 -preset ultrafast -pix_fmt yuv420p '
f'-c:a aac -ar 44100 -b:a 128k '
f'-t {vd:.3f} "{pre_out}"',
timeout=600
)
run_stage('ffmpeg', _render, tid, _prog,
'โณ Render แ€แ€”แ€บแ€ธแ€…แ€ฎ', '๐ŸŽฌ Video render แ€œแ€ฏแ€•แ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ')
# โ”€โ”€ Logo overlay (post-render pass, simpler) โ”€โ”€
if logo_path and os.path.exists(logo_path):
logo_out = f'{tmp_dir}/logo_out.mp4'
lx = max(0, logo_x)
ly = max(0, logo_y)
lw = max(20, logo_w_px)
_run_ffmpeg(
f'ffmpeg -y -hide_banner -loglevel error '
f'-i "{pre_out}" -i "{logo_path}" '
f'-filter_complex "[1:v]scale={lw}:-2[_lg];[0:v][_lg]overlay={lx}:{ly}[v_out]" '
f'-map "[v_out]" -map "0:a" '
f'-c:v libx264 -crf 24 -preset ultrafast -pix_fmt yuv420p '
f'-c:a copy "{logo_out}"',
timeout=300
)
os.replace(logo_out, pre_out)
# โ”€โ”€ Watermark drawtext (post-render pass) โ”€โ”€
if wmk:
wmk_out = f'{tmp_dir}/wmk_out.mp4'
fs = max(16, int(wmk_fontsize))
txt = wmk.replace("'", "").replace(":", "").replace("\\", "")
wx = wmk_x if wmk_x is not None else (orig_w - 220)
wy = wmk_y if wmk_y is not None else (orig_h - 80)
_run_ffmpeg(
f'ffmpeg -y -hide_banner -loglevel error '
f'-i "{pre_out}" '
f'-vf "drawtext=text=\'{txt}\':x={wx}:y={wy}:'
f'fontsize={fs}:fontcolor=white:shadowcolor=black:shadowx=2:shadowy=2" '
f'-c:v libx264 -crf 24 -preset ultrafast -pix_fmt yuv420p '
f'-c:a copy "{wmk_out}"',
timeout=300
)
os.replace(wmk_out, pre_out)
# โ”€โ”€ Burn subtitles directly from SRT (Gemini timings as-is) โ”€โ”€
_prog(88, '๐Ÿ”ค Subtitle burn แ€œแ€ฏแ€•แ€บแ€”แ€ฑแ€žแ€Šแ€บโ€ฆ')
_CROP_RES_MAP2 = {'9:16':(720,1280),'16:9':(1280,720),'1:1':(720,720)}
_prx2, _pry2 = _CROP_RES_MAP2.get(crop, (0, 0))
if _prx2 == 0:
try:
import json as _json2
_pr2 = subprocess.run(
f'ffprobe -v error -select_streams v:0 -show_entries stream=width,height -of json "{pre_out}"',
shell=True, capture_output=True, text=True, timeout=30)
_vinfo2 = _json2.loads(_pr2.stdout)
_s2 = _vinfo2['streams'][0]
_prx2, _pry2 = int(_s2['width']), int(_s2['height'])
except Exception:
_prx2, _pry2 = 720, 1280
_sub_fs3 = max(20, round(sub_size * _pry2))
_burn_srt_direct(pre_out, srt_text, out_file,
position=sub_pos, fontsize=_sub_fs3,
color=sub_color, style=sub_style,
tmp_dir=tmp_dir,
play_res_x=_prx2, play_res_y=_pry2)
# โ”€โ”€ Deduct coin โ”€โ”€
rem = -1
if not is_adm:
_, rem = deduct(u, 1)
out_url2 = f'/outputs/final_{tid}.mp4'
job_progress[tid] = {
'pct': 100, 'done': True,
'msg': f'โœ… แ€•แ€ผแ€ฎแ€ธแ€•แ€ซแ€•แ€ผแ€ฎ! ({len(blocks)} lines)',
'output_url': out_url2,
'coins': rem,
'total': len(blocks),
}
# โ”€โ”€ Save video history โ”€โ”€
try:
save_video_history_entry(u, {
'tid': tid,
'output_url': out_url2,
'title': '(SRT Video)',
'source_url': video_url or '',
'ts': time.time(),
'created_at': datetime.now().strftime('%Y-%m-%d %H:%M'),
})
except Exception as _he:
print(f'โš ๏ธ history save failed: {_he}')
except Exception as e:
import traceback; traceback.print_exc()
job_progress[tid] = {'pct': 0, 'msg': f'โŒ {e}', 'error': True}
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
threading.Thread(target=_bg, daemon=True).start()
return jsonify(ok=True, tid=tid)
except Exception as e:
import traceback; traceback.print_exc()
return jsonify(ok=False, msg=f'โŒ {e}')
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=False, threaded=True)