|
|
|
|
|
|
|
|
|
|
|
|
|
|
from fastapi import FastAPI, Request |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.responses import StreamingResponse, FileResponse |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from playwright.async_api import async_playwright, Browser, Page |
|
|
import json |
|
|
import asyncio |
|
|
import base64 |
|
|
from typing import Dict, Any, AsyncGenerator, Optional |
|
|
from pathlib import Path |
|
|
import os |
|
|
|
|
|
app = FastAPI( |
|
|
title="Thumbnail Crafter MCP Server", |
|
|
description="AI-callable thumbnail generation using real React app", |
|
|
version="2.0.0" |
|
|
) |
|
|
|
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
browser: Optional[Browser] = None |
|
|
APP_URL = os.getenv("APP_URL", "http://localhost:7860") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def get_browser() -> Browser: |
|
|
"""Get or create browser instance""" |
|
|
global browser |
|
|
if browser is None: |
|
|
playwright = await async_playwright().start() |
|
|
browser = await playwright.chromium.launch( |
|
|
headless=True, |
|
|
args=['--no-sandbox', '--disable-dev-shm-usage'] |
|
|
) |
|
|
return browser |
|
|
|
|
|
|
|
|
async def close_browser(): |
|
|
"""Close browser instance""" |
|
|
global browser |
|
|
if browser: |
|
|
await browser.close() |
|
|
browser = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
AVAILABLE_LAYOUTS = [ |
|
|
{ |
|
|
"id": "seriousCollab", |
|
|
"name": "Serious Collab", |
|
|
"description": "Professional collaboration layout with HF logo and partner logo placeholder" |
|
|
}, |
|
|
{ |
|
|
"id": "funCollab", |
|
|
"name": "Fun Collab", |
|
|
"description": "Playful collaboration layout with Huggy mascots and title text" |
|
|
}, |
|
|
{ |
|
|
"id": "sandwich", |
|
|
"name": "Sandwich", |
|
|
"description": "Title and subtitle with central Huggy character" |
|
|
}, |
|
|
{ |
|
|
"id": "academiaHub", |
|
|
"name": "Academia Hub", |
|
|
"description": "Academic-themed layout with 'Welcome Academia Hub' text" |
|
|
}, |
|
|
{ |
|
|
"id": "impactTitle", |
|
|
"name": "Impact Title", |
|
|
"description": "Bold impact-style title with subtitle and HF logo" |
|
|
} |
|
|
] |
|
|
|
|
|
|
|
|
AVAILABLE_HUGGYS = [ |
|
|
{"id": "acedemic-huggy", "name": "Academic Huggy"}, |
|
|
{"id": "dragon-huggy", "name": "Dragon Huggy"}, |
|
|
{"id": "game-jam-huggy", "name": "Game Jam Huggy"}, |
|
|
{"id": "huggy-chef", "name": "Huggy Chef"}, |
|
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def generate_thumbnail_with_browser(inputs: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Generate thumbnail using real React app via Playwright. |
|
|
|
|
|
Args: |
|
|
inputs: Dictionary containing: |
|
|
- layout_id (str): Layout to use (e.g., "funCollab") |
|
|
- title (str): Main title text |
|
|
- subtitle (str): Subtitle text (optional) |
|
|
- bg_color (str): Background color hex |
|
|
- huggy_ids (list): List of Huggy IDs to add (optional) |
|
|
- width (int): Canvas width (1200 or 1160) |
|
|
- height (int): Canvas height (675, 627, or 580) |
|
|
""" |
|
|
try: |
|
|
browser = await get_browser() |
|
|
page = await browser.new_page(viewport={"width": 1920, "height": 1080}) |
|
|
|
|
|
|
|
|
await page.goto(APP_URL, wait_until="networkidle") |
|
|
await asyncio.sleep(2) |
|
|
|
|
|
|
|
|
layout_id = inputs.get("layout_id", "impactTitle") |
|
|
title = inputs.get("title", "Untitled") |
|
|
subtitle = inputs.get("subtitle") |
|
|
bg_color = inputs.get("bg_color", "#1a1a1a") |
|
|
width = inputs.get("width", 1200) |
|
|
height = inputs.get("height", 675) |
|
|
|
|
|
|
|
|
if width != 1200 or height != 675: |
|
|
await page.evaluate(f""" |
|
|
() => {{ |
|
|
const sizeButton = document.querySelector('[data-testid="canvas-size-button"]'); |
|
|
if (sizeButton) sizeButton.click(); |
|
|
}} |
|
|
""") |
|
|
await asyncio.sleep(0.5) |
|
|
|
|
|
|
|
|
size_mapping = { |
|
|
(1200, 675): "1200×675", |
|
|
(1200, 627): "1200×627", |
|
|
(1160, 580): "1160×580" |
|
|
} |
|
|
size_text = size_mapping.get((width, height), "1200×675") |
|
|
await page.click(f"text={size_text}") |
|
|
await asyncio.sleep(0.5) |
|
|
|
|
|
|
|
|
await page.evaluate(f""" |
|
|
async () => {{ |
|
|
// Find and click the layout button |
|
|
const layoutButton = Array.from(document.querySelectorAll('button')) |
|
|
.find(btn => btn.textContent.includes('Layout')); |
|
|
if (layoutButton) layoutButton.click(); |
|
|
await new Promise(resolve => setTimeout(resolve, 500)); |
|
|
|
|
|
// Find and click the specific layout |
|
|
const layoutOption = Array.from(document.querySelectorAll('[data-layout-id]')) |
|
|
.find(el => el.getAttribute('data-layout-id') === '{layout_id}'); |
|
|
if (layoutOption) layoutOption.click(); |
|
|
}} |
|
|
""") |
|
|
await asyncio.sleep(1) |
|
|
|
|
|
|
|
|
await page.evaluate(f""" |
|
|
() => {{ |
|
|
const bgColorInput = document.querySelector('input[type="color"]'); |
|
|
if (bgColorInput) {{ |
|
|
bgColorInput.value = '{bg_color}'; |
|
|
bgColorInput.dispatchEvent(new Event('input', {{ bubbles: true }})); |
|
|
bgColorInput.dispatchEvent(new Event('change', {{ bubbles: true }})); |
|
|
}} |
|
|
}} |
|
|
""") |
|
|
await asyncio.sleep(0.5) |
|
|
|
|
|
|
|
|
|
|
|
text_updated = await page.evaluate(f""" |
|
|
() => {{ |
|
|
// Try to find text elements on canvas and update them |
|
|
// This is tricky - might need to double-click to edit |
|
|
const canvas = document.querySelector('canvas'); |
|
|
if (!canvas) return false; |
|
|
|
|
|
// Simulate double-click on text to edit |
|
|
// (This part depends on how your app handles text editing) |
|
|
return true; |
|
|
}} |
|
|
""") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
await page.evaluate(""" |
|
|
() => { |
|
|
const exportButton = Array.from(document.querySelectorAll('button')) |
|
|
.find(btn => btn.textContent.toLowerCase().includes('export')); |
|
|
if (exportButton) exportButton.click(); |
|
|
} |
|
|
""") |
|
|
await asyncio.sleep(1) |
|
|
|
|
|
|
|
|
canvas_element = await page.query_selector('canvas') |
|
|
if not canvas_element: |
|
|
raise Exception("Canvas element not found") |
|
|
|
|
|
|
|
|
screenshot_bytes = await canvas_element.screenshot(type="png") |
|
|
img_base64 = base64.b64encode(screenshot_bytes).decode('utf-8') |
|
|
|
|
|
await page.close() |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"image": f"data:image/png;base64,{img_base64}", |
|
|
"width": width, |
|
|
"height": height, |
|
|
"format": "PNG", |
|
|
"size_bytes": len(screenshot_bytes), |
|
|
"layout_used": layout_id, |
|
|
"title": title |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"message": "Failed to generate thumbnail with browser automation" |
|
|
} |
|
|
|
|
|
|
|
|
def get_available_layouts_tool(inputs: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Return list of available layouts from the real React app""" |
|
|
return { |
|
|
"success": True, |
|
|
"layouts": AVAILABLE_LAYOUTS, |
|
|
"count": len(AVAILABLE_LAYOUTS) |
|
|
} |
|
|
|
|
|
|
|
|
def get_available_huggys_tool(inputs: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Return list of available Huggy mascots""" |
|
|
return { |
|
|
"success": True, |
|
|
"huggys": AVAILABLE_HUGGYS, |
|
|
"count": len(AVAILABLE_HUGGYS), |
|
|
"total_available": 44, |
|
|
"note": "Full list available at https://huggingface.co/datasets/Chunte/Huggy" |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def mcp_response(request: Request) -> AsyncGenerator[str, None]: |
|
|
"""MCP server entry point following HuggingChat protocol""" |
|
|
try: |
|
|
payload = await request.json() |
|
|
tool_name = payload.get("name") |
|
|
arguments = payload.get("arguments", {}) |
|
|
|
|
|
|
|
|
if tool_name == "generate_thumbnail": |
|
|
result = await generate_thumbnail_with_browser(arguments) |
|
|
elif tool_name == "get_available_layouts": |
|
|
result = get_available_layouts_tool(arguments) |
|
|
elif tool_name == "get_available_huggys": |
|
|
result = get_available_huggys_tool(arguments) |
|
|
else: |
|
|
result = {"error": f"Unknown tool: {tool_name}"} |
|
|
|
|
|
|
|
|
yield json.dumps({"output": True, "data": result}) + "\n" |
|
|
yield json.dumps({"output": False}) + "\n" |
|
|
|
|
|
except Exception as e: |
|
|
error_response = { |
|
|
"output": True, |
|
|
"data": { |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"message": "MCP request processing failed" |
|
|
} |
|
|
} |
|
|
yield json.dumps(error_response) + "\n" |
|
|
yield json.dumps({"output": False}) + "\n" |
|
|
|
|
|
|
|
|
@app.post("/tools") |
|
|
async def tools_endpoint(request: Request): |
|
|
"""MCP tools endpoint for HuggingChat integration""" |
|
|
return StreamingResponse( |
|
|
mcp_response(request), |
|
|
media_type="application/json" |
|
|
) |
|
|
|
|
|
|
|
|
@app.get("/api/info") |
|
|
async def api_info(): |
|
|
"""API information endpoint""" |
|
|
return { |
|
|
"name": "Thumbnail Crafter MCP Server (Browser Mode)", |
|
|
"version": "2.0.0", |
|
|
"status": "active", |
|
|
"mode": "browser_automation", |
|
|
"endpoints": { |
|
|
"tools": "/tools", |
|
|
"schema": "/tools.json", |
|
|
"info": "/api/info" |
|
|
}, |
|
|
"available_tools": [ |
|
|
"generate_thumbnail", |
|
|
"get_available_layouts", |
|
|
"get_available_huggys" |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
@app.get("/health") |
|
|
async def health_check(): |
|
|
"""Health check endpoint""" |
|
|
return {"status": "healthy", "service": "thumbnail-crafter-mcp", "mode": "browser"} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.on_event("startup") |
|
|
async def startup_event(): |
|
|
"""Initialize browser on startup""" |
|
|
print("Initializing browser...") |
|
|
await get_browser() |
|
|
print("Browser ready!") |
|
|
|
|
|
|
|
|
@app.on_event("shutdown") |
|
|
async def shutdown_event(): |
|
|
"""Cleanup browser on shutdown""" |
|
|
print("Closing browser...") |
|
|
await close_browser() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static_dir = Path("dist") |
|
|
if static_dir.exists(): |
|
|
app.mount("/assets", StaticFiles(directory=static_dir / "assets"), name="assets") |
|
|
|
|
|
@app.get("/") |
|
|
async def serve_frontend(): |
|
|
"""Serve the React frontend""" |
|
|
index_file = static_dir / "index.html" |
|
|
if index_file.exists(): |
|
|
return FileResponse(index_file) |
|
|
return {"message": "Frontend not built"} |
|
|
|
|
|
@app.get("/{full_path:path}") |
|
|
async def serve_spa(full_path: str): |
|
|
"""Serve React app for all routes""" |
|
|
if full_path.startswith(("api/", "tools", "health")): |
|
|
return {"error": "Not found"} |
|
|
|
|
|
file_path = static_dir / full_path |
|
|
if file_path.exists() and file_path.is_file(): |
|
|
return FileResponse(file_path) |
|
|
|
|
|
index_file = static_dir / "index.html" |
|
|
if index_file.exists(): |
|
|
return FileResponse(index_file) |
|
|
return {"error": "File not found"} |
|
|
else: |
|
|
@app.get("/") |
|
|
async def root(): |
|
|
return { |
|
|
"message": "Thumbnail Crafter MCP Server (Browser Mode)", |
|
|
"note": "React frontend not found. Build with 'npm run build'", |
|
|
"api_info": "/api/info" |
|
|
} |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|
|