YTShortMakerArchx commited on
Commit
85410b0
·
verified ·
1 Parent(s): adabf73

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +1038 -0
app.py ADDED
@@ -0,0 +1,1038 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import uuid
3
+ import time
4
+ import subprocess
5
+ import base64
6
+ import hashlib
7
+ import json
8
+ import random
9
+ import asyncio
10
+ import aiofiles
11
+ import logging
12
+ import secrets
13
+
14
+ from collections import deque
15
+ from datetime import datetime, timedelta
16
+ from pathlib import Path
17
+ from typing import Optional, Dict, List
18
+
19
+ from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
20
+ from fastapi.responses import FileResponse, JSONResponse
21
+ from fastapi.middleware.cors import CORSMiddleware
22
+ from huggingface_hub import HfApi, hf_hub_download
23
+ from pydantic import BaseModel, Field
24
+
25
+ # =============================================================================
26
+ # CONFIG
27
+ # =============================================================================
28
+ HF_DATASET = os.getenv("HF_DATASET", "YTShortMakerArchx/BG_VIDS_ARCHX_YT")
29
+ HF_TOKEN = os.getenv("HF_TOKEN")
30
+ APP_KEY = os.getenv("APP_KEY", "archx_3f9d15f52n48d41h5fj8a7e2b_private")
31
+ UI_ORIGIN = os.getenv("UI_ORIGIN", "https://shortgen-archx.pages.dev")
32
+
33
+ BASE_DIR = Path("/tmp")
34
+ AUDIO_DIR = BASE_DIR / "audio"
35
+ OUTPUT_DIR = BASE_DIR / "out"
36
+ JOBS_DIR = BASE_DIR / "jobs"
37
+ CLIENT_DIR = BASE_DIR / "clients"
38
+
39
+ for d in [AUDIO_DIR, OUTPUT_DIR, JOBS_DIR, CLIENT_DIR]:
40
+ d.mkdir(parents=True, exist_ok=True)
41
+
42
+ # Rate limiting
43
+ RATE_LIMITS: Dict[str, list] = {}
44
+ GENERATE_LIMIT = 3 # per hour per client
45
+ STATUS_LIMIT = 1000 # per hour per job
46
+
47
+ # Job timeout
48
+ JOBS: Dict[str, Dict] = {}
49
+ JOB_TIMEOUT = 900 # 15 min hard cap
50
+
51
+ # Video listing cache
52
+ VIDEO_CACHE: Dict[str, tuple] = {}
53
+ CACHE_TIMEOUT = 300
54
+
55
+ # =============================================================================
56
+ # QUEUE SYSTEM
57
+ # =============================================================================
58
+ MAX_CONCURRENT = 1
59
+ AVG_JOB_DURATION_S = 360 # 6 minutes per job
60
+
61
+ _JOB_QUEUE: deque = deque() # ordered list of waiting job_ids
62
+ _RUNNING_JOBS: set = set() # job_ids currently encoding
63
+ _QUEUE_LOCK: asyncio.Lock = None # created at startup
64
+
65
+ # =============================================================================
66
+ # CLIENT ID SYSTEM
67
+ # =============================================================================
68
+ CLIENT_TOKENS: Dict[str, Dict] = {}
69
+
70
+ def _client_file(token: str) -> Path:
71
+ return CLIENT_DIR / f"{token[:8]}.json"
72
+
73
+ def _load_client(token: str) -> Optional[Dict]:
74
+ if token in CLIENT_TOKENS:
75
+ return CLIENT_TOKENS[token]
76
+ path = _client_file(token)
77
+ if path.exists():
78
+ try:
79
+ rec = json.loads(path.read_text())
80
+ CLIENT_TOKENS[token] = rec
81
+ return rec
82
+ except Exception:
83
+ pass
84
+ return None
85
+
86
+ def _save_client(rec: Dict):
87
+ token = rec["token"]
88
+ CLIENT_TOKENS[token] = rec
89
+ try:
90
+ rec_copy = rec.copy()
91
+ rec_copy["created_at"] = (
92
+ rec_copy["created_at"].isoformat()
93
+ if isinstance(rec_copy["created_at"], datetime)
94
+ else rec_copy["created_at"]
95
+ )
96
+ rec_copy["last_seen"] = (
97
+ rec_copy["last_seen"].isoformat()
98
+ if isinstance(rec_copy["last_seen"], datetime)
99
+ else rec_copy["last_seen"]
100
+ )
101
+ _client_file(token).write_text(json.dumps(rec_copy))
102
+ except Exception as e:
103
+ logger.warning(f"Failed to persist client {token[:8]}: {e}")
104
+
105
+ def _create_client(fingerprint: str) -> Dict:
106
+ rec = {
107
+ "token": secrets.token_hex(24),
108
+ "fingerprint": fingerprint,
109
+ "created_at": datetime.now(),
110
+ "last_seen": datetime.now(),
111
+ "job_count": 0,
112
+ "jobs": [],
113
+ }
114
+ _save_client(rec)
115
+ logger.info(f"🆕 New client {rec['token'][:8]}…")
116
+ return rec
117
+
118
+ def _fingerprint(req: Request) -> str:
119
+ data = (
120
+ f"{req.client.host}:"
121
+ f"{req.headers.get('user-agent', '')}:"
122
+ f"{req.headers.get('accept', '')}"
123
+ )
124
+ return hashlib.sha256(data.encode()).hexdigest()
125
+
126
+ def resolve_client(req: Request) -> Dict:
127
+ token = req.headers.get("X-Client-ID", "").strip()
128
+
129
+ if token:
130
+ rec = _load_client(token)
131
+ if rec:
132
+ rec["last_seen"] = datetime.now()
133
+ _save_client(rec)
134
+ return rec
135
+ fp = _fingerprint(req)
136
+ rec = {
137
+ "token": token,
138
+ "fingerprint": fp,
139
+ "created_at": datetime.now(),
140
+ "last_seen": datetime.now(),
141
+ "job_count": 0,
142
+ "jobs": [],
143
+ }
144
+ _save_client(rec)
145
+ logger.info(f"♻️ Restored client {token[:8]}… from header")
146
+ return rec
147
+
148
+ fp = _fingerprint(req)
149
+ for rec in CLIENT_TOKENS.values():
150
+ if rec.get("fingerprint") == fp:
151
+ rec["last_seen"] = datetime.now()
152
+ _save_client(rec)
153
+ return rec
154
+
155
+ return _create_client(fp)
156
+
157
+ # =============================================================================
158
+ # LOGGING
159
+ # =============================================================================
160
+ logging.basicConfig(
161
+ level=logging.INFO,
162
+ format="%(asctime)s | %(levelname)s | %(message)s",
163
+ )
164
+ logger = logging.getLogger(__name__)
165
+
166
+ # =============================================================================
167
+ # APP
168
+ # =============================================================================
169
+ app = FastAPI(
170
+ title="ArchNemix Shorts Generator API",
171
+ version="5.0.0",
172
+ docs_url="/docs",
173
+ redoc_url="/redoc",
174
+ )
175
+
176
+ app.add_middleware(
177
+ CORSMiddleware,
178
+ allow_origins=[
179
+ UI_ORIGIN,
180
+ "http://localhost:3000",
181
+ "http://127.0.0.1:3000",
182
+ "http://127.0.0.1:5500",
183
+ "http://localhost:5500",
184
+ "http://localhost:8000",
185
+ "https://huggingface.co",
186
+ ],
187
+ allow_credentials=True,
188
+ allow_methods=["GET", "POST", "DELETE", "OPTIONS", "HEAD"],
189
+ allow_headers=["*"],
190
+ expose_headers=["X-Client-ID"],
191
+ max_age=600,
192
+ )
193
+
194
+ # =============================================================================
195
+ # MODELS
196
+ # =============================================================================
197
+ class GenerateRequest(BaseModel):
198
+ audio_base64: str = Field(..., min_length=10)
199
+ subtitles_ass: str = Field(..., min_length=10)
200
+ background: str
201
+ duration: Optional[float] = Field(None, ge=1, le=180)
202
+ request_id: Optional[str] = None
203
+
204
+ # =============================================================================
205
+ # SECURITY
206
+ # =============================================================================
207
+ def validate_app_key(req: Request) -> bool:
208
+ if not APP_KEY or APP_KEY == "archnemix-secret-key-change-in-production":
209
+ return True
210
+ key = (
211
+ req.headers.get("X-APP-KEY")
212
+ or req.headers.get("Authorization", "").replace("Bearer ", "")
213
+ )
214
+ return key == APP_KEY
215
+
216
+ def validate_origin(req: Request) -> bool:
217
+ origin = req.headers.get("origin") or req.headers.get("referer", "")
218
+ if not origin:
219
+ return True
220
+ allowed = [
221
+ "https://shortgen-archx.pages.dev",
222
+ "http://localhost:3000", "http://127.0.0.1:3000",
223
+ "http://localhost:5500", "http://127.0.0.1:5500",
224
+ "https://huggingface.co",
225
+ ]
226
+ return any(origin.startswith(a) for a in allowed)
227
+
228
+ def rate_limit(key: str, limit: int, window: int = 3600) -> bool:
229
+ now = time.time()
230
+ RATE_LIMITS.setdefault(key, [])
231
+ RATE_LIMITS[key] = [t for t in RATE_LIMITS[key] if now - t < window]
232
+ if len(RATE_LIMITS[key]) >= limit:
233
+ return False
234
+ RATE_LIMITS[key].append(now)
235
+ return True
236
+
237
+ # =============================================================================
238
+ # DATASET HELPERS
239
+ # =============================================================================
240
+ def get_hf_api() -> HfApi:
241
+ if not HF_TOKEN:
242
+ raise ValueError("HF_TOKEN not configured")
243
+ return HfApi(token=HF_TOKEN)
244
+
245
+ async def list_videos_from_dataset(category: str = "minecraft") -> List[str]:
246
+ cache_key = f"videos_{category}"
247
+ if cache_key in VIDEO_CACHE:
248
+ ts, data = VIDEO_CACHE[cache_key]
249
+ if time.time() - ts < CACHE_TIMEOUT:
250
+ return data
251
+ try:
252
+ api = get_hf_api()
253
+ files = list(api.list_repo_files(repo_id=HF_DATASET, repo_type="dataset"))
254
+ result = [
255
+ f.split("/")[-1].replace(".mp4", "")
256
+ for f in files
257
+ if f.startswith(f"{category}/") and f.endswith(".mp4")
258
+ ]
259
+ VIDEO_CACHE[cache_key] = (time.time(), result)
260
+ return result
261
+ except Exception as e:
262
+ logger.error(f"list_videos failed: {e}")
263
+ return [f"mc{i}" for i in range(1, 7)]
264
+
265
+ async def download_video_from_dataset(video_name: str, category: str = "minecraft") -> str:
266
+ video_name = video_name.replace(".mp4", "")
267
+ for path in [f"{category}/{video_name}.mp4", f"{video_name}.mp4"]:
268
+ try:
269
+ return hf_hub_download(
270
+ repo_id=HF_DATASET, filename=path,
271
+ repo_type="dataset", token=HF_TOKEN,
272
+ cache_dir="/tmp/hf_cache",
273
+ )
274
+ except Exception:
275
+ continue
276
+ available = await list_videos_from_dataset(category)
277
+ raise HTTPException(404, f"Video '{video_name}' not found. Available: {available}")
278
+
279
+ # =============================================================================
280
+ # FILE HELPERS
281
+ # =============================================================================
282
+ async def save_base64_audio(data: str, path: Path) -> int:
283
+ if data.startswith("data:audio/"):
284
+ data = data.split(",", 1)[1]
285
+ audio_bytes = base64.b64decode(data)
286
+ if len(audio_bytes) > 15 * 1024 * 1024:
287
+ raise ValueError("Audio too large (max 15 MB)")
288
+ async with aiofiles.open(path, "wb") as f:
289
+ await f.write(audio_bytes)
290
+ return len(audio_bytes)
291
+
292
+ async def save_subtitles(ass: str, path: Path):
293
+ if "[Script Info]" not in ass or "[Events]" not in ass:
294
+ raise ValueError("Invalid ASS subtitle format")
295
+ if len(ass.encode()) > 500 * 1024:
296
+ raise ValueError("Subtitles too large (max 500 KB)")
297
+ async with aiofiles.open(path, "w", encoding="utf-8") as f:
298
+ await f.write(ass)
299
+
300
+ def get_media_duration(path: Path) -> float:
301
+ try:
302
+ r = subprocess.run(
303
+ ["ffprobe", "-v", "error",
304
+ "-show_entries", "format=duration",
305
+ "-of", "default=noprint_wrappers=1:nokey=1",
306
+ str(path)],
307
+ capture_output=True, text=True, check=True, timeout=10,
308
+ )
309
+ return float(r.stdout.strip())
310
+ except Exception:
311
+ try:
312
+ return max(1.0, min(180.0, path.stat().st_size / 16000))
313
+ except Exception:
314
+ return 30.0
315
+
316
+ # =============================================================================
317
+ # JOB MANAGEMENT
318
+ # =============================================================================
319
+ def _make_job(job_id: str, client_token: str) -> Dict:
320
+ return {
321
+ "id": job_id,
322
+ "client_token": client_token,
323
+ "status": "queued",
324
+ "queue_position": None,
325
+ "progress": 0,
326
+ "message": "Waiting in queue…",
327
+ "created_at": datetime.now(),
328
+ "updated_at": datetime.now(),
329
+ "error": None,
330
+ "output_path": None,
331
+ "audio_path": None,
332
+ "subs_path": None,
333
+ "ffmpeg_frame": 0,
334
+ "ffmpeg_fps": 0.0,
335
+ "ffmpeg_out_time": 0.0,
336
+ "ffmpeg_speed": 0.0,
337
+ "total_duration": 0.0,
338
+ }
339
+
340
+ def create_job(job_id: str, client_token: str) -> Dict:
341
+ JOBS[job_id] = _make_job(job_id, client_token)
342
+ _persist_job(job_id)
343
+ return JOBS[job_id]
344
+
345
+ def update_job(job_id: str, **kwargs):
346
+ if job_id not in JOBS:
347
+ return
348
+ job = JOBS[job_id]
349
+ for k, v in kwargs.items():
350
+ job[k] = v
351
+ job["updated_at"] = datetime.now()
352
+ _persist_job(job_id)
353
+
354
+ def _persist_job(job_id: str):
355
+ if job_id not in JOBS:
356
+ return
357
+ try:
358
+ data = {
359
+ k: (v.isoformat() if isinstance(v, datetime) else v)
360
+ for k, v in JOBS[job_id].items()
361
+ }
362
+ (JOBS_DIR / f"{job_id}.json").write_text(json.dumps(data))
363
+ except Exception as e:
364
+ logger.error(f"persist_job failed: {e}")
365
+
366
+ def cleanup_old_jobs():
367
+ now = datetime.now()
368
+ for job_id in list(JOBS):
369
+ job = JOBS[job_id]
370
+ age = (now - job["updated_at"]).total_seconds()
371
+ if age > 3600:
372
+ JOBS.pop(job_id, None)
373
+ _RUNNING_JOBS.discard(job_id)
374
+ try:
375
+ _JOB_QUEUE.remove(job_id)
376
+ except ValueError:
377
+ pass
378
+ for p in [
379
+ JOBS_DIR / f"{job_id}.json",
380
+ AUDIO_DIR / f"{job_id}.mp3",
381
+ AUDIO_DIR / f"{job_id}.ass",
382
+ AUDIO_DIR / f"{job_id}_trimmed.mp4",
383
+ OUTPUT_DIR / f"{job_id}.mp4",
384
+ ]:
385
+ try:
386
+ p.unlink(missing_ok=True)
387
+ except Exception:
388
+ pass
389
+
390
+ # =============================================================================
391
+ # QUEUE HELPERS
392
+ # =============================================================================
393
+ def queue_position(job_id: str) -> Optional[int]:
394
+ try:
395
+ idx = list(_JOB_QUEUE).index(job_id)
396
+ return idx + 1
397
+ except ValueError:
398
+ return None
399
+
400
+ def queue_eta(position: int, job_duration: Optional[float] = None) -> Dict:
401
+ running_remaining = AVG_JOB_DURATION_S
402
+ for jid in _RUNNING_JOBS:
403
+ j = JOBS.get(jid, {})
404
+ total = j.get("total_duration", 0) or AVG_JOB_DURATION_S
405
+ out = j.get("ffmpeg_out_time", 0) or 0
406
+ remaining = max(0, total - out)
407
+ running_remaining = min(running_remaining, remaining)
408
+
409
+ queued_ahead = max(0, position - 1)
410
+ total_wait_s = running_remaining + queued_ahead * AVG_JOB_DURATION_S
411
+ own_duration = job_duration or AVG_JOB_DURATION_S
412
+ total_s = total_wait_s + own_duration
413
+
414
+ low_s = int(total_wait_s * 0.75)
415
+ high_s = int(total_wait_s * 1.2)
416
+
417
+ def _fmt(s: int) -> str:
418
+ if s < 60:
419
+ return f"{s}s"
420
+ m, sec = divmod(s, 60)
421
+ return f"{m}m {sec:02d}s" if sec else f"{m}m"
422
+
423
+ return {
424
+ "queue_position": position,
425
+ "jobs_ahead": position,
426
+ "wait_low_s": low_s,
427
+ "wait_high_s": high_s,
428
+ "wait_low_human": _fmt(low_s),
429
+ "wait_high_human": _fmt(high_s),
430
+ "total_with_own_s": int(total_s),
431
+ "message": (
432
+ f"Position {position} in queue — "
433
+ f"roughly {_fmt(low_s)}–{_fmt(high_s)} wait, "
434
+ f"likely less"
435
+ ),
436
+ }
437
+
438
+ async def _queue_worker():
439
+ while True:
440
+ await asyncio.sleep(2)
441
+
442
+ if len(_RUNNING_JOBS) >= MAX_CONCURRENT:
443
+ continue
444
+
445
+ if not _JOB_QUEUE:
446
+ continue
447
+
448
+ job_id = _JOB_QUEUE[0]
449
+ job = JOBS.get(job_id)
450
+ if not job or job["status"] in ("failed", "completed"):
451
+ _JOB_QUEUE.popleft()
452
+ continue
453
+
454
+ _JOB_QUEUE.popleft()
455
+ _RUNNING_JOBS.add(job_id)
456
+ _refresh_queue_positions()
457
+
458
+ audio_path = Path(job["audio_path"])
459
+ subs_path = Path(job["subs_path"])
460
+ background = job.get("background", "mc1")
461
+ duration = job.get("requested_duration")
462
+
463
+ asyncio.create_task(
464
+ _run_and_release(job_id, audio_path, subs_path, background, duration)
465
+ )
466
+
467
+ async def _run_and_release(job_id, audio_path, subs_path, background, duration):
468
+ try:
469
+ await process_video_task(job_id, audio_path, subs_path, background, duration)
470
+ finally:
471
+ _RUNNING_JOBS.discard(job_id)
472
+ _refresh_queue_positions()
473
+
474
+ def _refresh_queue_positions():
475
+ for i, jid in enumerate(_JOB_QUEUE):
476
+ if jid in JOBS:
477
+ pos = i + 1
478
+ JOBS[jid]["queue_position"] = pos
479
+ JOBS[jid]["message"] = queue_eta(pos)["message"]
480
+ _persist_job(jid)
481
+
482
+ # =============================================================================
483
+ # FFMPEG PROCESS MANAGEMENT
484
+ # =============================================================================
485
+ _FFMPEG_PROCS: Dict[str, asyncio.subprocess.Process] = {}
486
+
487
+ def _kill_ffmpeg_for_job(job_id: str):
488
+ proc = _FFMPEG_PROCS.pop(job_id, None)
489
+ if proc and proc.returncode is None:
490
+ try:
491
+ proc.kill()
492
+ logger.info(f"🔪 Killed FFmpeg for job {job_id}")
493
+ except Exception:
494
+ pass
495
+
496
+ def _parse_ffmpeg_kv(line: str) -> Dict:
497
+ parts = line.strip().split("=", 1)
498
+ return {parts[0].strip(): parts[1].strip()} if len(parts) == 2 else {}
499
+
500
+ async def _stream_ffmpeg_progress(
501
+ proc: asyncio.subprocess.Process,
502
+ job_id: str,
503
+ total_duration: float,
504
+ progress_start: int = 50,
505
+ progress_end: int = 95,
506
+ ):
507
+ frame = fps = speed = 0
508
+ out_time_s = 0.0
509
+ last_update_time = time.time()
510
+
511
+ async for raw_line in proc.stderr:
512
+ line = raw_line.decode(errors="replace").strip()
513
+ if not line:
514
+ continue
515
+
516
+ kv = _parse_ffmpeg_kv(line)
517
+ if not kv:
518
+ continue
519
+ key, val = next(iter(kv.items()))
520
+
521
+ if key == "frame":
522
+ frame = int(val) if val.isdigit() else frame
523
+ elif key == "fps":
524
+ try: fps = float(val)
525
+ except ValueError: pass
526
+ elif key == "out_time_ms":
527
+ try: out_time_s = int(val) / 1_000_000
528
+ except ValueError: pass
529
+ elif key == "out_time":
530
+ try:
531
+ parts = val.split(':')
532
+ if len(parts) == 3:
533
+ h, m, s = parts
534
+ out_time_s = int(h) * 3600 + int(m) * 60 + float(s)
535
+ except ValueError: pass
536
+ elif key == "speed":
537
+ try:
538
+ speed = float(val.replace("x", ""))
539
+ except ValueError: pass
540
+ elif key == "progress":
541
+ now = time.time()
542
+ if val == "end" or now - last_update_time >= 0.5:
543
+ last_update_time = now
544
+ ratio = min(1.0, out_time_s / total_duration) if total_duration > 0 else 0
545
+ mapped = int(progress_start + ratio * (progress_end - progress_start))
546
+ update_job(
547
+ job_id,
548
+ progress=mapped,
549
+ ffmpeg_frame=frame,
550
+ ffmpeg_fps=fps,
551
+ ffmpeg_out_time=round(out_time_s, 2),
552
+ ffmpeg_speed=speed,
553
+ total_duration=total_duration,
554
+ message=(
555
+ f"Encoding {out_time_s:.1f}s / {total_duration:.1f}s "
556
+ f"({speed:.1f}× speed)"
557
+ if total_duration > 0 and speed > 0 else "Encoding…"
558
+ ),
559
+ )
560
+ if val == "end":
561
+ break
562
+ if JOBS.get(job_id, {}).get("status") == "failed":
563
+ proc.kill()
564
+ break
565
+
566
+ # =============================================================================
567
+ # CORE VIDEO PROCESSING
568
+ # =============================================================================
569
+ async def process_video_task(
570
+ job_id: str,
571
+ audio_path: Path,
572
+ subs_path: Path,
573
+ background: str,
574
+ duration: Optional[float],
575
+ ):
576
+ try:
577
+ logger.info(f"🚀 Job {job_id}: starting single-pass encode")
578
+ update_job(job_id, status="processing", progress=10,
579
+ message="Downloading background video…", queue_position=None)
580
+
581
+ try:
582
+ bg_path = await download_video_from_dataset(background, "minecraft")
583
+ except HTTPException as e:
584
+ update_job(job_id, status="failed", error=str(e.detail))
585
+ return
586
+
587
+ update_job(job_id, progress=20, message="Analysing media…")
588
+
589
+ audio_dur = duration or get_media_duration(audio_path)
590
+ video_dur = get_media_duration(Path(bg_path))
591
+
592
+ if not (1 <= audio_dur <= 180):
593
+ update_job(job_id, status="failed", error="Audio duration out of range (1–180 s)")
594
+ return
595
+
596
+ logger.info(f"Job {job_id}: audio={audio_dur:.2f}s bg={video_dur:.2f}s")
597
+
598
+ if video_dur > audio_dur:
599
+ max_start = video_dur - audio_dur
600
+ start = random.uniform(0, max_start)
601
+ bg_input_args = [
602
+ "-ss", f"{start:.3f}",
603
+ "-t", f"{audio_dur:.3f}",
604
+ "-i", str(bg_path),
605
+ ]
606
+ logger.info(f"Job {job_id}: trim start={start:.2f}s (input-side seek)")
607
+ else:
608
+ loop_n = int(audio_dur / video_dur) + 2
609
+ bg_input_args = [
610
+ "-stream_loop", str(loop_n),
611
+ "-i", str(bg_path),
612
+ ]
613
+ logger.info(f"Job {job_id}: loop ×{loop_n}")
614
+
615
+ subs_str = str(subs_path).replace("\\", "/").replace(":", "\\:")
616
+ vf = (
617
+ f"[0:v]"
618
+ f"ass='{subs_str}',"
619
+ f"scale=1080:1920:force_original_aspect_ratio=increase,"
620
+ f"crop=1080:1920,"
621
+ f"setsar=1"
622
+ f"[v]"
623
+ )
624
+
625
+ output_path = OUTPUT_DIR / f"{job_id}.mp4"
626
+
627
+ final_cmd = [
628
+ "ffmpeg", "-y",
629
+ "-hide_banner",
630
+ "-loglevel", "error",
631
+ *bg_input_args,
632
+ "-i", str(audio_path),
633
+ "-filter_complex", vf,
634
+ "-map", "[v]",
635
+ "-map", "1:a",
636
+ "-c:v", "libx264",
637
+ "-preset", "medium",
638
+ "-crf", "16",
639
+ "-minrate", "8M",
640
+ "-maxrate", "16M",
641
+ "-bufsize", "16M",
642
+ "-profile:v", "high",
643
+ "-level:v", "4.2",
644
+ "-pix_fmt", "yuv420p",
645
+ "-c:a", "aac",
646
+ "-b:a", "192k",
647
+ "-shortest",
648
+ "-movflags", "+faststart",
649
+ "-threads", "0",
650
+ "-progress", "pipe:2",
651
+ "-t", f"{audio_dur:.3f}",
652
+ str(output_path),
653
+ ]
654
+
655
+ update_job(job_id, progress=40, message="Encoding… (single-pass)", total_duration=audio_dur)
656
+ logger.info(
657
+ f"Job {job_id}: CRF=16 preset=medium "
658
+ f"minrate=8M maxrate=16M bufsize=16M threads=auto"
659
+ )
660
+
661
+ timeout = max(480, int(audio_dur * 5) + 120)
662
+
663
+ try:
664
+ encode_proc = await asyncio.create_subprocess_exec(
665
+ *final_cmd,
666
+ stdout=asyncio.subprocess.DEVNULL,
667
+ stderr=asyncio.subprocess.PIPE,
668
+ )
669
+ _FFMPEG_PROCS[job_id] = encode_proc
670
+
671
+ await asyncio.wait_for(
672
+ _stream_ffmpeg_progress(encode_proc, job_id, audio_dur,
673
+ progress_start=40, progress_end=95),
674
+ timeout=timeout,
675
+ )
676
+ await asyncio.wait_for(encode_proc.wait(), timeout=30)
677
+ _FFMPEG_PROCS.pop(job_id, None)
678
+
679
+ if encode_proc.returncode != 0:
680
+ raise RuntimeError(f"FFmpeg exited with code {encode_proc.returncode}")
681
+
682
+ except asyncio.TimeoutError:
683
+ _kill_ffmpeg_for_job(job_id)
684
+ update_job(job_id, status="failed", error="Encoding timed out — try a shorter script")
685
+ return
686
+ except Exception as e:
687
+ _kill_ffmpeg_for_job(job_id)
688
+ if JOBS.get(job_id, {}).get("status") == "failed":
689
+ return
690
+ update_job(job_id, status="failed", error=f"Encoding failed: {str(e)[:200]}")
691
+ return
692
+
693
+ if not output_path.exists() or output_path.stat().st_size == 0:
694
+ update_job(job_id, status="failed", error="Output file missing after encode")
695
+ return
696
+
697
+ size_mb = output_path.stat().st_size / 1024 / 1024
698
+ logger.info(f"✅ Job {job_id}: {size_mb:.2f} MB")
699
+
700
+ for p in [audio_path, subs_path]:
701
+ try:
702
+ p.unlink(missing_ok=True)
703
+ except Exception:
704
+ pass
705
+
706
+ update_job(
707
+ job_id,
708
+ status="completed",
709
+ progress=100,
710
+ message=f"Done! {size_mb:.1f} MB • CRF 16 • medium • 8–16 Mbps",
711
+ output_path=str(output_path),
712
+ )
713
+
714
+ except Exception:
715
+ logger.exception(f"process_video_task unhandled for {job_id}")
716
+ update_job(job_id, status="failed", error="Unexpected server error")
717
+
718
+ # =============================================================================
719
+ # API ENDPOINTS
720
+ # =============================================================================
721
+
722
+ @app.get("/")
723
+ async def root():
724
+ return {
725
+ "name": "ArchNemix Shorts Generator API",
726
+ "version": "5.0.0",
727
+ "queue": {
728
+ "max_concurrent": MAX_CONCURRENT,
729
+ "avg_job_duration_s": AVG_JOB_DURATION_S,
730
+ },
731
+ "endpoints": {
732
+ "register": "GET /register — get or create your client token",
733
+ "generate": "POST /generate — submit a job",
734
+ "cancel": "DELETE /job/{id} — cancel job (called by controller)",
735
+ "job_status": "GET /job/{id} — poll status + queue position",
736
+ "queue": "GET /queue — overall queue snapshot",
737
+ "download": "GET /download/{id}",
738
+ "delete": "DELETE /video/{id}",
739
+ "videos": "GET /videos/{category}",
740
+ "health": "GET /health",
741
+ },
742
+ }
743
+
744
+ @app.get("/register")
745
+ async def register_client(req: Request):
746
+ client = resolve_client(req)
747
+ return JSONResponse(
748
+ {
749
+ "client_token": client["token"],
750
+ "is_new": client["job_count"] == 0,
751
+ "job_count": client["job_count"],
752
+ "instructions": (
753
+ "Store client_token in localStorage as 'archx_client_id'. "
754
+ "Send it as the X-Client-ID header on every request."
755
+ ),
756
+ },
757
+ headers={"X-Client-ID": client["token"]},
758
+ )
759
+
760
+ @app.get("/health")
761
+ async def health():
762
+ try:
763
+ videos = await list_videos_from_dataset("minecraft")
764
+ return {
765
+ "status": "healthy",
766
+ "timestamp": datetime.now().isoformat(),
767
+ "videos": len(videos),
768
+ "running": len(_RUNNING_JOBS),
769
+ "queued": len(_JOB_QUEUE),
770
+ "active_jobs": len([j for j in JOBS.values() if j["status"] == "processing"]),
771
+ }
772
+ except Exception as e:
773
+ return JSONResponse({"status": "degraded", "error": str(e)}, status_code=503)
774
+
775
+ @app.get("/queue")
776
+ async def queue_status():
777
+ running = len(_RUNNING_JOBS)
778
+ waiting = len(_JOB_QUEUE)
779
+ return {
780
+ "running": running,
781
+ "waiting": waiting,
782
+ "total_active": running + waiting,
783
+ "avg_job_s": AVG_JOB_DURATION_S,
784
+ "estimated_wait_s": waiting * AVG_JOB_DURATION_S if running == 0 else
785
+ (waiting + 1) * AVG_JOB_DURATION_S,
786
+ "message": (
787
+ "No jobs running — submit yours now!" if (running + waiting) == 0 else
788
+ f"{running} running, {waiting} in queue"
789
+ ),
790
+ }
791
+
792
+ @app.get("/videos/{category}")
793
+ async def list_videos(category: str = "minecraft"):
794
+ try:
795
+ videos = await list_videos_from_dataset(category) or [f"mc{i}" for i in range(1, 7)]
796
+ return {"category": category, "videos": videos, "count": len(videos)}
797
+ except Exception as e:
798
+ return {"category": category, "videos": [f"mc{i}" for i in range(1, 7)], "count": 6, "error": str(e)}
799
+
800
+ @app.post("/generate")
801
+ async def generate_video(
802
+ req: GenerateRequest,
803
+ background_tasks: BackgroundTasks,
804
+ http_req: Request,
805
+ ):
806
+ if not validate_app_key(http_req):
807
+ raise HTTPException(403, "Invalid API key")
808
+ if not validate_origin(http_req):
809
+ raise HTTPException(403, "Invalid origin")
810
+
811
+ client = resolve_client(http_req)
812
+ token = client["token"]
813
+
814
+ if not rate_limit(f"generate:{token}", GENERATE_LIMIT):
815
+ raise HTTPException(429, f"Rate limit: max {GENERATE_LIMIT} requests/hour")
816
+
817
+ cleanup_old_jobs()
818
+
819
+ active_for_client = [
820
+ j for j in JOBS.values()
821
+ if j.get("client_token") == token
822
+ and j["status"] in ("queued", "processing")
823
+ ]
824
+ if active_for_client:
825
+ existing_id = active_for_client[0]["id"]
826
+ raise HTTPException(
827
+ 409,
828
+ f"You already have an active job ({existing_id}). "
829
+ "Wait for it to finish or cancel it first.",
830
+ )
831
+
832
+ job_id = str(uuid.uuid4())
833
+ audio_path = AUDIO_DIR / f"{job_id}.mp3"
834
+ subs_path = AUDIO_DIR / f"{job_id}.ass"
835
+
836
+ create_job(job_id, token)
837
+
838
+ try:
839
+ audio_size = await save_base64_audio(req.audio_base64, audio_path)
840
+ await save_subtitles(req.subtitles_ass, subs_path)
841
+ except ValueError as e:
842
+ update_job(job_id, status="failed", error=str(e))
843
+ raise HTTPException(400, str(e))
844
+
845
+ update_job(
846
+ job_id,
847
+ audio_path=str(audio_path),
848
+ subs_path=str(subs_path),
849
+ background=req.background,
850
+ requested_duration=req.duration,
851
+ )
852
+
853
+ client["job_count"] += 1
854
+ client["jobs"].append(job_id)
855
+ if len(client["jobs"]) > 50:
856
+ client["jobs"] = client["jobs"][-50:]
857
+ _save_client(client)
858
+
859
+ position = len(_JOB_QUEUE) + 1
860
+ if _RUNNING_JOBS:
861
+ pass
862
+ else:
863
+ position = 0
864
+
865
+ _JOB_QUEUE.append(job_id)
866
+ _refresh_queue_positions()
867
+
868
+ if position == 0:
869
+ eta = {"message": "Starting very soon…", "wait_low_s": 0, "wait_high_s": 30}
870
+ else:
871
+ eta = queue_eta(position, req.duration)
872
+
873
+ return JSONResponse(
874
+ {
875
+ "job_id": job_id,
876
+ "client_token": token,
877
+ "status": "queued",
878
+ "queue_position": position if position > 0 else 1,
879
+ "queue": eta,
880
+ "check_status": f"/job/{job_id}",
881
+ },
882
+ headers={"X-Client-ID": token},
883
+ )
884
+
885
+ @app.get("/job/{job_id}")
886
+ async def job_status(job_id: str, http_req: Request):
887
+ if not rate_limit(f"status:{job_id}", STATUS_LIMIT):
888
+ return JSONResponse({"job_id": job_id, "status": "rate_limited"})
889
+
890
+ if job_id not in JOBS:
891
+ job_file = JOBS_DIR / f"{job_id}.json"
892
+ if job_file.exists():
893
+ try:
894
+ return JSONResponse(json.loads(job_file.read_text()))
895
+ except Exception:
896
+ pass
897
+ raise HTTPException(404, "Job not found")
898
+
899
+ job = JOBS[job_id]
900
+
901
+ if (datetime.now() - job["updated_at"]).total_seconds() > JOB_TIMEOUT:
902
+ if job["status"] not in ("completed", "failed"):
903
+ update_job(job_id, status="failed", error="Job timed out")
904
+
905
+ pos = queue_position(job_id)
906
+
907
+ resp: Dict = {
908
+ "job_id": job["id"],
909
+ "status": job["status"],
910
+ "progress": job["progress"],
911
+ "message": job["message"],
912
+ "queue_position": pos,
913
+ "queue": queue_eta(pos) if pos else None,
914
+ "created_at": job["created_at"].isoformat() if isinstance(job["created_at"], datetime) else job["created_at"],
915
+ "updated_at": job["updated_at"].isoformat() if isinstance(job["updated_at"], datetime) else job["updated_at"],
916
+ "encode_stats": {
917
+ "frame": job.get("ffmpeg_frame", 0),
918
+ "fps": job.get("ffmpeg_fps", 0.0),
919
+ "out_time_s": job.get("ffmpeg_out_time", 0.0),
920
+ "total_s": job.get("total_duration", 0.0),
921
+ "speed": job.get("ffmpeg_speed", 0.0),
922
+ },
923
+ }
924
+
925
+ if job["status"] == "completed" and job.get("output_path"):
926
+ resp["download_url"] = f"/download/{job_id}"
927
+ if job.get("error"):
928
+ resp["error"] = job["error"]
929
+
930
+ return JSONResponse(resp)
931
+
932
+ @app.get("/download/{job_id}")
933
+ async def download_video(job_id: str):
934
+ output_path = OUTPUT_DIR / f"{job_id}.mp4"
935
+ if not output_path.exists():
936
+ raise HTTPException(404, "Video not found or not ready")
937
+ if job_id in JOBS and JOBS[job_id]["status"] != "completed":
938
+ raise HTTPException(400, "Video not ready yet")
939
+ return FileResponse(
940
+ output_path,
941
+ media_type="video/mp4",
942
+ filename=f"archnemix-short-{job_id[:8]}.mp4",
943
+ headers={
944
+ "Content-Disposition": f'attachment; filename="archnemix-short-{job_id[:8]}.mp4"',
945
+ "Cache-Control": "no-cache",
946
+ },
947
+ )
948
+
949
+ @app.delete("/video/{job_id}")
950
+ async def delete_video(job_id: str, http_req: Request):
951
+ if not validate_app_key(http_req):
952
+ raise HTTPException(403, "Invalid API key")
953
+
954
+ output_path = OUTPUT_DIR / f"{job_id}.mp4"
955
+ removed = False
956
+ if output_path.exists():
957
+ try:
958
+ output_path.unlink()
959
+ removed = True
960
+ logger.info(f"🗑 Deleted {job_id}")
961
+ except Exception as e:
962
+ raise HTTPException(500, f"Delete failed: {e}")
963
+
964
+ if job_id in JOBS:
965
+ JOBS[job_id]["output_path"] = None
966
+ JOBS[job_id]["message"] = "Video deleted by user"
967
+ _persist_job(job_id)
968
+
969
+ return {"job_id": job_id, "deleted": removed}
970
+
971
+ @app.delete("/job/{job_id}")
972
+ async def cancel_job(job_id: str, http_req: Request):
973
+ """Called by controller when frontend goes silent."""
974
+ if not validate_app_key(http_req):
975
+ raise HTTPException(403, "Invalid API key")
976
+
977
+ if job_id not in JOBS:
978
+ return {"job_id": job_id, "cancelled": False, "reason": "not found"}
979
+
980
+ job = JOBS[job_id]
981
+ if job["status"] in ("completed", "failed"):
982
+ return {"job_id": job_id, "cancelled": False, "reason": job["status"]}
983
+
984
+ _kill_ffmpeg_for_job(job_id)
985
+ _RUNNING_JOBS.discard(job_id)
986
+ try:
987
+ _JOB_QUEUE.remove(job_id)
988
+ except ValueError:
989
+ pass
990
+
991
+ update_job(job_id, status="failed", error="Cancelled by controller — client disconnected")
992
+ _refresh_queue_positions()
993
+
994
+ logger.info(f"🛑 Job {job_id} cancelled via controller request")
995
+ return {"job_id": job_id, "cancelled": True}
996
+
997
+ @app.get("/debug/dataset")
998
+ async def debug_dataset():
999
+ try:
1000
+ api = get_hf_api()
1001
+ files = list(api.list_repo_files(repo_id=HF_DATASET, repo_type="dataset"))
1002
+ mc = [f for f in files if "minecraft" in f.lower() and f.endswith(".mp4")]
1003
+ return {"dataset": HF_DATASET, "total_files": len(files), "minecraft": mc}
1004
+ except Exception as e:
1005
+ return JSONResponse({"status": "error", "error": str(e)}, status_code=500)
1006
+
1007
+ # =============================================================================
1008
+ # STARTUP / SHUTDOWN
1009
+ # =============================================================================
1010
+ @app.on_event("startup")
1011
+ async def startup():
1012
+ global _QUEUE_LOCK
1013
+ _QUEUE_LOCK = asyncio.Lock()
1014
+
1015
+ logger.info("=" * 60)
1016
+ logger.info("ArchNemix Shorts Generator API v5.0.0")
1017
+ logger.info(f"Dataset : {HF_DATASET}")
1018
+ logger.info(f"Concurrent : {MAX_CONCURRENT}")
1019
+ logger.info(f"Avg job : {AVG_JOB_DURATION_S}s ({AVG_JOB_DURATION_S//60}m)")
1020
+ logger.info("Quality : CRF 16 · medium · 8–16 Mbps · AAC 192k")
1021
+ logger.info("Speed opts : single-pass trim · ass-first filter · threads=auto")
1022
+ logger.info("=" * 60)
1023
+
1024
+ asyncio.create_task(_queue_worker())
1025
+
1026
+ try:
1027
+ vids = await list_videos_from_dataset("minecraft")
1028
+ logger.info(f"✅ {len(vids)} videos cached")
1029
+ except Exception as e:
1030
+ logger.warning(f"⚠️ Video cache warm-up: {e}")
1031
+
1032
+ cleanup_old_jobs()
1033
+ logger.info("🚀 Ready")
1034
+
1035
+ @app.on_event("shutdown")
1036
+ async def shutdown():
1037
+ logger.info("Shutdown — cleaning jobs")
1038
+ cleanup_old_jobs()