| | |
| | import asyncio |
| | import os |
| | import uuid |
| | from typing import Optional |
| | from fastapi import FastAPI, HTTPException, BackgroundTasks |
| | from contextlib import asynccontextmanager |
| | from pydantic import BaseModel |
| | from fastapi.responses import FileResponse |
| | from playwright.async_api import async_playwright |
| |
|
| | from app.login_logic import GenApeLogin |
| | from app.gen_logic import GenApeGen |
| |
|
| | playwright = None |
| | browser = None |
| | context = None |
| | semaphore = asyncio.Semaphore(5) |
| |
|
| | BASE_URL = "https://app.genape.ai/ai-image-generator" |
| | STORAGE_PATH = "genape_state.json" |
| |
|
| | tasks_status = {} |
| |
|
| | @asynccontextmanager |
| | async def lifespan(app: FastAPI): |
| | global playwright, browser, context |
| |
|
| | playwright = await async_playwright().start() |
| | browser = await playwright.chromium.launch( |
| | headless=True, |
| | args=["--no-sandbox", "--disable-setuid-sandbox"] |
| | ) |
| |
|
| | storage = STORAGE_PATH if os.path.exists(STORAGE_PATH) else None |
| |
|
| | context = await browser.new_context(storage_state=storage) |
| |
|
| | yield |
| |
|
| | await browser.close() |
| | await playwright.stop() |
| |
|
| | app = FastAPI(lifespan=lifespan) |
| |
|
| | class LoginRequest(BaseModel): |
| | email: str |
| | password: str |
| |
|
| | @app.post("/api/login") |
| | async def login_api(req: LoginRequest): |
| | login_handler = GenApeLogin(context, BASE_URL, STORAGE_PATH) |
| | res = await login_handler.login(req.email, req.password) |
| |
|
| | if not res["ok"]: |
| | raise HTTPException(500, res["error"]) |
| |
|
| | return res |
| |
|
| | @app.post("/api/gen") |
| | async def gen_image_async(prompt: str, image_link: str = None, background_tasks: BackgroundTasks = None): |
| | task_id = str(uuid.uuid4()) |
| | tasks_status[task_id] = {"status": "processing"} |
| |
|
| | background_tasks.add_task(run_gen_task, task_id, prompt, image_link) |
| |
|
| | return {"task_id": task_id} |
| |
|
| | @app.get("/api/task/{task_id}") |
| | async def get_task_status(task_id: str): |
| | task = tasks_status.get(task_id) |
| | if not task: |
| | raise HTTPException(status_code=404, detail="Task ID không tồn tại") |
| | |
| | return task |
| |
|
| | async def run_gen_task(task_id: str, prompt: str, image_link: Optional[str]): |
| | gen_handler = GenApeGen(context, BASE_URL, semaphore) |
| | |
| | try: |
| | res = await gen_handler.generate_image(prompt, image_link) |
| |
|
| | if res.get("ok"): |
| | tasks_status[task_id] = { |
| | "status": "success", |
| | "result": res["img_url"] |
| | } |
| | else: |
| | tasks_status[task_id] = { |
| | "status": "failed", |
| | "error": res.get("error", "Unknown error") |
| | } |
| | except Exception as e: |
| | tasks_status[task_id] = { |
| | "status": "failed", |
| | "error": str(e) |
| | } |
| |
|
| | @app.get("/") |
| | async def root(): |
| | return {"status": "running"} |