| """ |
| EL HELAL Studio β Web Backend (FastAPI) |
| Integrated with Auto-Cleanup and Custom Cropping |
| """ |
|
|
| from fastapi import FastAPI, UploadFile, File, Form, BackgroundTasks |
| from fastapi.responses import JSONResponse, FileResponse |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.middleware.cors import CORSMiddleware |
| from contextlib import asynccontextmanager |
| import uvicorn |
| import shutil |
| import os |
| import json |
| import uuid |
| from pathlib import Path |
| from PIL import Image |
| import threading |
| import sys |
| import asyncio |
| import time |
|
|
| |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'core'))) |
|
|
| |
| import crop |
| import process_images |
| import color_steal |
| import retouch |
| from layout_engine import generate_layout, load_settings |
|
|
| |
| WEB_DIR = Path(os.path.dirname(__file__)) / "web_storage" |
| ROOT_DIR = Path(os.path.dirname(__file__)).parent |
| STORAGE_DIR = ROOT_DIR / "storage" |
|
|
| UPLOAD_DIR = STORAGE_DIR / "uploads" |
| PROCESSED_DIR = STORAGE_DIR / "processed" |
| RESULT_DIR = STORAGE_DIR / "results" |
|
|
| for d in [UPLOAD_DIR, PROCESSED_DIR, RESULT_DIR]: |
| d.mkdir(parents=True, exist_ok=True) |
|
|
| |
| models = { |
| "model": None, |
| "transform": None, |
| "luts": color_steal.load_trained_curves(), |
| "ready": False |
| } |
|
|
| def warm_up_ai(): |
| print("AI Model: Loading in background...") |
| try: |
| models["model"], _ = process_images.setup_model() |
| models["transform"] = process_images.get_transform() |
| models["ready"] = True |
| print("AI Model: READY") |
| except Exception as e: |
| print(f"AI Model: FAILED to load - {e}") |
|
|
| async def cleanup_task(): |
| """Background task to delete files older than 24 hours.""" |
| while True: |
| print("Cleanup: Checking for old files...") |
| now = time.time() |
| count = 0 |
| for folder in [UPLOAD_DIR, PROCESSED_DIR, RESULT_DIR]: |
| for path in folder.glob("*"): |
| if path.is_file() and (now - path.stat().st_mtime) > 86400: |
| path.unlink() |
| count += 1 |
| if count > 0: print(f"Cleanup: Removed {count} old files.") |
| await asyncio.sleep(3600) |
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| |
| threading.Thread(target=warm_up_ai, daemon=True).start() |
| asyncio.create_task(cleanup_task()) |
| yield |
| |
| pass |
|
|
| app = FastAPI(title="EL HELAL Studio API", lifespan=lifespan) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
|
|
| @app.get("/") |
| async def read_index(): |
| return FileResponse(WEB_DIR / "index.html") |
|
|
| @app.get("/status") |
| async def get_status(): |
| return {"ai_ready": models["ready"]} |
|
|
| @app.post("/upload") |
| async def upload_image(file: UploadFile = File(...)): |
| file_id = str(uuid.uuid4()) |
| ext = Path(file.filename).suffix |
| file_path = UPLOAD_DIR / f"{file_id}{ext}" |
| |
| with file_path.open("wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
| |
| with Image.open(file_path) as img: |
| from PIL import ImageOps |
| |
| img = ImageOps.exif_transpose(img) |
|
|
| |
| width, height = img.size |
|
|
| |
| img.thumbnail((200, 200), Image.BILINEAR) |
| thumb_path = UPLOAD_DIR / f"{file_id}_thumb.jpg" |
| if img.mode in ("RGBA", "LA"): |
| bg = Image.new("RGB", img.size, (255, 255, 255)) |
| bg.paste(img, mask=img.split()[-1]) |
| bg.save(thumb_path, "JPEG", quality=60) |
| else: |
| img.convert("RGB").save(thumb_path, "JPEG", quality=60) |
| return { |
| "id": file_id, |
| "filename": file.filename, |
| "thumb_url": f"/static/uploads/{file_id}_thumb.jpg", |
| "width": width, |
| "height": height |
| } |
|
|
| @app.post("/process/{file_id}") |
| async def process_image( |
| file_id: str, |
| name: str = Form(""), |
| id_number: str = Form(""), |
| |
| do_rmbg: bool = Form(True), |
| do_color: bool = Form(True), |
| do_retouch: bool = Form(True), |
| do_crop: bool = Form(True), |
| |
| add_studio_name: bool = Form(True), |
| add_logo: bool = Form(True), |
| add_date: bool = Form(True), |
| |
| x1: int = Form(None), |
| y1: int = Form(None), |
| x2: int = Form(None), |
| y2: int = Form(None) |
| ): |
| if not models["ready"]: |
| return JSONResponse(status_code=503, content={"error": "AI Model not ready"}) |
| |
| files = list(UPLOAD_DIR.glob(f"{file_id}.*")) |
| if not files: return JSONResponse(status_code=404, content={"error": "File not found"}) |
| orig_path = files[0] |
| |
| try: |
| temp_crop = PROCESSED_DIR / f"{file_id}_processed_crop.jpg" |
| |
| |
| if x1 is not None and y1 is not None: |
| print(f"Pipeline: Applying manual crop for {file_id} | Rect: ({x1}, {y1}, {x2}, {y2})") |
| rect = (x1, y1, x2, y2) |
| crop.apply_custom_crop(str(orig_path), str(temp_crop), rect) |
| cropped_img = Image.open(temp_crop) |
| elif do_crop: |
| print(f"Pipeline: Applying auto crop for {file_id}...") |
| crop.crop_to_4x6_opencv(str(orig_path), str(temp_crop)) |
| cropped_img = Image.open(temp_crop) |
| else: |
| print(f"Pipeline: Skipping crop for {file_id}") |
| cropped_img = Image.open(orig_path) |
| |
| |
| if do_rmbg: |
| print(f"Pipeline: Removing background for {file_id}...") |
| processed_img = process_images.remove_background(models["model"], cropped_img, models["transform"]) |
| print(f"Pipeline: BG Removal Done. Image Mode: {processed_img.mode}") |
| else: |
| print(f"Pipeline: Skipping background removal for {file_id}") |
| processed_img = cropped_img |
| |
| |
| if do_color and models["luts"]: |
| print(f"Pipeline: Applying color grading for {file_id}...") |
| graded_img = color_steal.apply_to_image(models["luts"], processed_img) |
| print(f"Pipeline: Color Grading Done. Image Mode: {graded_img.mode}") |
| else: |
| print(f"Pipeline: Skipping color grading for {file_id}") |
| graded_img = processed_img |
| |
| |
| current_settings = load_settings() |
| |
| if do_retouch and current_settings.get("retouch", {}).get("enabled", False): |
| retouch_cfg = current_settings["retouch"] |
| print(f"Pipeline: Retouching face for {file_id} (Sensitivity: {retouch_cfg.get('sensitivity', 3.0)})") |
| final_processed, count = retouch.retouch_image_pil( |
| graded_img, |
| sensitivity=retouch_cfg.get("sensitivity", 3.0), |
| tone_smoothing=retouch_cfg.get("tone_smoothing", 0.6) |
| ) |
| print(f"Pipeline: Retouch Done. Blemishes: {count}. Image Mode: {final_processed.mode}") |
| else: |
| print(f"Pipeline: Retouching skipped for {file_id}") |
| final_processed = graded_img |
| |
| print(f"Pipeline: Generating final layout for {file_id}...") |
| final_layout = generate_layout( |
| final_processed, name, id_number, |
| add_studio_name=add_studio_name, |
| add_logo=add_logo, |
| add_date=add_date |
| ) |
|
|
| result_path = RESULT_DIR / f"{file_id}_layout.jpg" |
| final_layout.save(result_path, "JPEG", quality=95, dpi=(300, 300)) |
|
|
| |
| preview_path = RESULT_DIR / f"{file_id}_preview.jpg" |
| pw, ph = final_layout.size |
| p_scale = 900 / pw if pw > 900 else 1.0 |
| if p_scale < 1.0: |
| preview_img = final_layout.resize((int(pw * p_scale), int(ph * p_scale)), Image.BILINEAR) |
| preview_img.save(preview_path, "JPEG", quality=70) |
| else: |
| final_layout.save(preview_path, "JPEG", quality=70) |
|
|
| if temp_crop.exists(): temp_crop.unlink() |
|
|
| return { |
| "id": file_id, |
| "result_url": f"/static/results/{file_id}_layout.jpg", |
| "preview_url": f"/static/results/{file_id}_preview.jpg" |
| } |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return JSONResponse(status_code=500, content={"error": str(e)}) |
|
|
| @app.post("/clear-all") |
| async def clear_all(): |
| """Manually clear all uploaded and processed files.""" |
| count = 0 |
| try: |
| for folder in [UPLOAD_DIR, PROCESSED_DIR, RESULT_DIR]: |
| for path in folder.glob("*"): |
| if path.is_file() and not path.name.endswith(".gitkeep"): |
| path.unlink() |
| count += 1 |
| return {"status": "success", "removed_count": count} |
| except Exception as e: |
| return JSONResponse(status_code=500, content={"error": f"Failed to clear storage: {str(e)}"}) |
|
|
| |
| SETTINGS_PATH = ROOT_DIR / "config" / "settings.json" |
|
|
| @app.get("/settings") |
| async def get_settings(): |
| """Return current settings.json contents.""" |
| try: |
| if SETTINGS_PATH.exists(): |
| with open(SETTINGS_PATH, "r") as f: |
| return json.load(f) |
| return {} |
| except Exception as e: |
| return JSONResponse(status_code=500, content={"error": str(e)}) |
|
|
| @app.post("/settings") |
| async def update_settings(data: dict): |
| """Merge incoming settings into settings.json (partial update).""" |
| try: |
| current = {} |
| if SETTINGS_PATH.exists(): |
| with open(SETTINGS_PATH, "r") as f: |
| current = json.load(f) |
| |
| for key, val in data.items(): |
| if key in current and isinstance(val, dict) and isinstance(current[key], dict): |
| current[key].update(val) |
| else: |
| current[key] = val |
| SETTINGS_PATH.parent.mkdir(parents=True, exist_ok=True) |
| with open(SETTINGS_PATH, "w") as f: |
| json.dump(current, f, indent=4, ensure_ascii=False) |
| return {"status": "success"} |
| except Exception as e: |
| return JSONResponse(status_code=500, content={"error": str(e)}) |
|
|
| app.mount("/static", StaticFiles(directory=str(STORAGE_DIR)), name="static") |
|
|
| if __name__ == "__main__": |
| |
| port = int(os.environ.get("PORT", 7860)) |
| uvicorn.run(app, host="0.0.0.0", port=port) |
|
|