| """ |
| Image Compressor Pro β Python Backend (FastAPI + Pillow) |
| ======================================================== |
| A high-performance, concurrent image compression API designed to replace |
| the Node.js + Sharp backend. Uses Pillow (with pillow-heif for HEIF/AVIF) |
| and FastAPI with StreamingResponse for efficient, non-blocking I/O. |
| |
| Deployment target: HuggingFace Spaces (Docker) or local testing. |
| """ |
|
|
| import io |
| import logging |
| from contextlib import asynccontextmanager |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| from fastapi import FastAPI, Request, Query, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import StreamingResponse, PlainTextResponse |
| from starlette.concurrency import run_in_threadpool |
|
|
| from PIL import Image |
|
|
| |
| |
| |
| try: |
| import pillow_heif |
| pillow_heif.register_heif_opener() |
| HEIF_AVAILABLE = True |
| except ImportError: |
| HEIF_AVAILABLE = False |
|
|
| |
| |
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s | %(levelname)-7s | %(message)s", |
| datefmt="%H:%M:%S", |
| ) |
| log = logging.getLogger("compressor") |
|
|
| |
| |
| |
| MAX_WORKERS = 4 |
| _pool = ThreadPoolExecutor(max_workers=MAX_WORKERS) |
|
|
| |
| |
| |
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| log.info("Compressor backend starting (workers=%d)", MAX_WORKERS) |
| if HEIF_AVAILABLE: |
| log.info("pillow-heif is available β HEIF/AVIF support enabled") |
| else: |
| log.warning("pillow-heif not installed β HEIF/AVIF output disabled") |
| yield |
| _pool.shutdown(wait=False) |
| log.info("Compressor backend shut down") |
|
|
| |
| |
| |
| app = FastAPI( |
| title="Image Compressor Pro API", |
| version="1.0.0", |
| lifespan=lifespan, |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| |
| |
| MAX_FILE_SIZE = 20 * 1024 * 1024 |
|
|
| |
| MIME_MAP = { |
| "jpeg": "image/jpeg", |
| "png": "image/png", |
| "webp": "image/webp", |
| "avif": "image/avif", |
| "heif": "image/heic", |
| "tiff": "image/tiff", |
| "gif": "image/gif", |
| "bmp": "image/bmp", |
| "ico": "image/x-icon", |
| "jp2": "image/jp2", |
| } |
|
|
| |
| |
| |
| def _compress_image(raw_bytes: bytes, quality: int, fmt: str) -> bytes: |
| """ |
| Open *raw_bytes* as an image, compress it into *fmt* at the given |
| *quality*, and return the resulting bytes. |
| |
| This is intentionally a **synchronous** function because Pillow is |
| CPU-bound. It is called via ``run_in_threadpool`` so the event loop |
| is never blocked. |
| """ |
| img = Image.open(io.BytesIO(raw_bytes)) |
|
|
| |
| if fmt in ("jpeg", "tiff", "bmp", "jp2") and img.mode in ("RGBA", "LA", "PA"): |
| background = Image.new("RGB", img.size, (255, 255, 255)) |
| background.paste(img, mask=img.split()[-1]) |
| img = background |
| elif fmt in ("jpeg", "bmp", "jp2") and img.mode not in ("RGB", "L"): |
| img = img.convert("RGB") |
| elif fmt == "gif" and img.mode == "RGBA": |
| |
| img = img.convert("RGBA") |
| alpha = img.split()[-1] |
| img = img.convert("RGB").convert("P", palette=Image.ADAPTIVE, colors=255) |
| mask = Image.eval(alpha, lambda a: 255 if a <= 128 else 0) |
| img.paste(255, mask) |
| |
|
|
| out_buffer = io.BytesIO() |
|
|
| if fmt == "jpeg": |
| img.save( |
| out_buffer, |
| format="JPEG", |
| quality=quality, |
| optimize=True, |
| progressive=True, |
| subsampling="4:2:0" if quality < 90 else "4:4:4", |
| ) |
| elif fmt == "png": |
| |
| compress_level = max(1, min(9, 9 - (quality // 12))) |
| img.save( |
| out_buffer, |
| format="PNG", |
| compress_level=compress_level, |
| optimize=True, |
| ) |
| elif fmt == "webp": |
| img.save( |
| out_buffer, |
| format="WEBP", |
| quality=quality, |
| method=4, |
| ) |
| elif fmt == "avif": |
| |
| |
| |
| |
| |
| if img.mode in ("RGBA", "LA", "PA") or "transparency" in img.info: |
| img = img.convert("RGBA") |
| import numpy as np |
| data = np.array(img) |
| alpha = data[:, :, 3] |
|
|
| |
| data[:, :, :3][alpha == 0] = [0, 0, 0] |
|
|
| img = Image.fromarray(data, "RGBA") |
| elif img.mode not in ("RGB", "L"): |
| img = img.convert("RGB") |
|
|
| img.save( |
| out_buffer, |
| format="AVIF", |
| quality=quality, |
| subsampling="4:4:4", |
| ) |
| elif fmt == "heif": |
| if not HEIF_AVAILABLE: |
| raise ValueError("HEIF output requires pillow-heif (not installed)") |
| img.save( |
| out_buffer, |
| format="HEIF", |
| quality=quality, |
| ) |
| elif fmt == "tiff": |
| img.save( |
| out_buffer, |
| format="TIFF", |
| compression="tiff_lzw", |
| ) |
| elif fmt == "gif": |
| |
| n_frames = getattr(img, 'n_frames', 1) |
| if n_frames > 1: |
| frames = [] |
| durations = [] |
| for i in range(n_frames): |
| img.seek(i) |
| frame = img.copy() |
| if frame.mode != "P": |
| frame = frame.convert("RGBA").convert("P", palette=Image.ADAPTIVE, colors=256) |
| frames.append(frame) |
| durations.append(img.info.get('duration', 100)) |
| frames[0].save( |
| out_buffer, |
| format="GIF", |
| save_all=True, |
| append_images=frames[1:], |
| duration=durations, |
| loop=img.info.get('loop', 0), |
| optimize=True, |
| ) |
| else: |
| if img.mode not in ("P", "L"): |
| img = img.convert("P", palette=Image.ADAPTIVE, colors=256) |
| img.save( |
| out_buffer, |
| format="GIF", |
| optimize=True, |
| ) |
| elif fmt == "bmp": |
| if img.mode not in ("RGB", "L", "1"): |
| img = img.convert("RGB") |
| img.save( |
| out_buffer, |
| format="BMP", |
| ) |
| elif fmt == "ico": |
| |
| max_ico = 256 |
| if img.width > max_ico or img.height > max_ico: |
| img.thumbnail((max_ico, max_ico), Image.LANCZOS) |
| if img.mode != "RGBA": |
| img = img.convert("RGBA") |
| img.save( |
| out_buffer, |
| format="ICO", |
| ) |
| elif fmt == "jp2": |
| if img.mode not in ("RGB", "L", "RGBA"): |
| img = img.convert("RGB") |
| img.save( |
| out_buffer, |
| format="JPEG2000", |
| quality_mode="rates", |
| quality_layers=[quality], |
| ) |
| else: |
| raise ValueError(f"Unsupported format: {fmt}") |
|
|
| out_buffer.seek(0) |
| return out_buffer.getvalue() |
|
|
| |
| |
| |
|
|
| @app.get("/ping", response_class=PlainTextResponse) |
| async def health_check(): |
| """Simple health-check endpoint compatible with the Node backend.""" |
| return "pong" |
|
|
|
|
| @app.get("/", response_class=PlainTextResponse) |
| async def root(): |
| """Root endpoint β useful for HuggingFace Spaces health probes.""" |
| return "Image Compressor Pro API is running" |
|
|
|
|
| @app.post("/api/process-stream") |
| async def process_stream( |
| request: Request, |
| quality: int = Query(default=80, ge=1, le=100), |
| format: str = Query(default="jpeg", pattern="^(jpeg|png|webp|avif|heif|tiff|gif|bmp|ico|jp2)$"), |
| ): |
| """ |
| Receive a raw image in the request body (Content-Type: image/*), |
| compress it using Pillow, and stream the result back. |
| |
| Query parameters |
| ---------------- |
| quality : int (1β100) |
| Compression quality. Higher = better quality, larger file. |
| format : str |
| Target output format (jpeg, png, webp, avif, heif, tiff). |
| |
| This mirrors the Node.js backend's ``POST /api/process-stream`` contract |
| so the frontend works without changes. |
| """ |
| |
| body = await request.body() |
| if len(body) == 0: |
| raise HTTPException(status_code=400, detail="Empty request body") |
| if len(body) > MAX_FILE_SIZE: |
| raise HTTPException( |
| status_code=413, |
| detail=f"File too large ({len(body)} bytes). Max is {MAX_FILE_SIZE} bytes.", |
| ) |
|
|
| |
| if format in ("avif", "heif") and not HEIF_AVAILABLE: |
| raise HTTPException( |
| status_code=400, |
| detail=f"{format.upper()} format requires pillow-heif which is not installed.", |
| ) |
|
|
| |
| try: |
| compressed = await run_in_threadpool(_compress_image, body, quality, format) |
| except ValueError as e: |
| raise HTTPException(status_code=400, detail=str(e)) |
| except Exception as e: |
| log.exception("Compression failed") |
| raise HTTPException(status_code=500, detail=f"Compression error: {e}") |
|
|
| |
| mime = MIME_MAP.get(format, "application/octet-stream") |
|
|
| return StreamingResponse( |
| io.BytesIO(compressed), |
| media_type=mime, |
| headers={ |
| "Content-Disposition": f"attachment; filename=compressed_image.{format}", |
| "Content-Length": str(len(compressed)), |
| "X-Original-Size": str(len(body)), |
| "X-Compressed-Size": str(len(compressed)), |
| }, |
| ) |
|
|
|
|
| |
| |
| |
| if __name__ == "__main__": |
| import uvicorn |
|
|
| uvicorn.run( |
| "app:app", |
| host="0.0.0.0", |
| port=7860, |
| workers=1, |
| log_level="info", |
| reload=True, |
| ) |
|
|