| """ |
| LocalTools Server - PDF Editor & Image Scraper |
| FastAPI backend with async operations, logging, and cleanup |
| """ |
|
|
| import asyncio |
| import io |
| import json |
| import logging |
| import re |
| import uuid |
| import zipfile |
| from contextlib import asynccontextmanager |
| from datetime import datetime, timedelta |
| from pathlib import Path |
| from typing import Optional, List |
| from urllib.parse import urljoin, urlparse |
|
|
| import aiohttp |
| import fitz |
| from PIL import Image |
| from pydantic import BaseModel, Field, HttpUrl |
| from pydantic_settings import BaseSettings |
|
|
| from fastapi import FastAPI, HTTPException, UploadFile, File, Form, BackgroundTasks |
| from fastapi.responses import FileResponse, HTMLResponse, JSONResponse |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.middleware.cors import CORSMiddleware |
|
|
|
|
| |
|
|
| class Settings(BaseSettings): |
| """Application settings with environment variable support""" |
| app_name: str = "LocalTools" |
| debug: bool = False |
| |
| |
| work_dir: str = "work" |
| output_dir: str = "outputs" |
| static_dir: str = "static" |
| |
| |
| max_pdf_size_mb: int = 100 |
| max_image_size_mb: int = 50 |
| min_image_dimension: int = 50 |
| request_timeout: int = 60 |
| |
| |
| cleanup_interval_hours: int = 24 |
| file_retention_hours: int = 48 |
| |
| class Config: |
| env_prefix = "LOCALTOOLS_" |
|
|
|
|
| settings = Settings() |
|
|
|
|
| |
|
|
| logging.basicConfig( |
| level=logging.DEBUG if settings.debug else logging.INFO, |
| format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s", |
| datefmt="%Y-%m-%d %H:%M:%S" |
| ) |
| logger = logging.getLogger("localtools") |
|
|
|
|
| |
|
|
| APP_DIR = Path(__file__).parent |
| STATIC_DIR = APP_DIR / settings.static_dir |
| WORK_DIR = APP_DIR / settings.work_dir |
| OUT_DIR = APP_DIR / settings.output_dir |
| SCRAPE_DIR = WORK_DIR / "scrape_jobs" |
|
|
| for directory in [WORK_DIR, OUT_DIR, SCRAPE_DIR]: |
| directory.mkdir(exist_ok=True) |
|
|
|
|
| |
|
|
| UA_HEADERS = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" |
| } |
|
|
| ALLOWED_IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp", "image/bmp"} |
|
|
|
|
| |
|
|
| class ScrapeResult(BaseModel): |
| job_id: str |
| images: List[dict] |
| total_found: int |
| filtered_count: int |
|
|
|
|
| class HealthResponse(BaseModel): |
| status: str |
| version: str = "1.0.0" |
| uptime_seconds: float |
|
|
|
|
| |
|
|
| startup_time = datetime.now() |
|
|
| async def cleanup_old_files(): |
| """Remove files older than retention period""" |
| retention = timedelta(hours=settings.file_retention_hours) |
| cutoff = datetime.now() - retention |
| removed = 0 |
| |
| for directory in [WORK_DIR, OUT_DIR]: |
| for file_path in directory.glob("*"): |
| if file_path.is_file(): |
| mtime = datetime.fromtimestamp(file_path.stat().st_mtime) |
| if mtime < cutoff: |
| try: |
| file_path.unlink() |
| removed += 1 |
| except Exception as e: |
| logger.warning(f"Failed to remove {file_path}: {e}") |
| |
| |
| for job_dir in SCRAPE_DIR.glob("*"): |
| if job_dir.is_dir(): |
| mtime = datetime.fromtimestamp(job_dir.stat().st_mtime) |
| if mtime < cutoff: |
| try: |
| for f in job_dir.glob("*"): |
| f.unlink() |
| job_dir.rmdir() |
| removed += 1 |
| except Exception as e: |
| logger.warning(f"Failed to remove job {job_dir}: {e}") |
| |
| if removed > 0: |
| logger.info(f"Cleanup: removed {removed} old files/directories") |
|
|
|
|
| async def periodic_cleanup(): |
| """Run cleanup periodically""" |
| while True: |
| await asyncio.sleep(settings.cleanup_interval_hours * 3600) |
| await cleanup_old_files() |
|
|
|
|
| |
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| """Startup and shutdown events""" |
| logger.info(f"Starting {settings.app_name}") |
| |
| |
| cleanup_task = asyncio.create_task(periodic_cleanup()) |
| |
| |
| await cleanup_old_files() |
| |
| yield |
| |
| |
| cleanup_task.cancel() |
| logger.info("Shutting down") |
|
|
|
|
| |
|
|
| app = FastAPI( |
| title=settings.app_name, |
| description="PDF Editor & Image Scraper API", |
| version="1.0.0", |
| lifespan=lifespan |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static") |
|
|
|
|
| |
|
|
| def to_points(value: float, unit: str) -> float: |
| """Convert measurement to PDF points""" |
| conversions = { |
| "pt": 1.0, |
| "points": 1.0, |
| "in": 72.0, |
| "inch": 72.0, |
| "inches": 72.0, |
| "cm": 72.0 / 2.54, |
| "centimeter": 72.0 / 2.54, |
| "centimeters": 72.0 / 2.54, |
| "mm": 72.0 / 25.4, |
| "millimeter": 72.0 / 25.4, |
| "millimeters": 72.0 / 25.4, |
| } |
| |
| unit_lower = unit.lower().strip() |
| if unit_lower not in conversions: |
| raise ValueError(f"Invalid unit: {unit}. Use: pt, in, cm, or mm") |
| |
| return float(value) * conversions[unit_lower] |
|
|
|
|
| def parse_page_spec(spec: str, total_pages: int) -> set[int]: |
| """ |
| Parse page specification string to set of 0-based page indices. |
| Input: "1,3,5" or "2-6" or "1,3-5,9" (1-based) |
| Output: Set of 0-based indices |
| """ |
| spec = (spec or "").strip().replace(" ", "") |
| if not spec: |
| return set() |
|
|
| result = set() |
| for part in spec.split(","): |
| if not part: |
| continue |
| |
| try: |
| if "-" in part: |
| start_str, end_str = part.split("-", 1) |
| start, end = int(start_str), int(end_str) |
| if start > end: |
| start, end = end, start |
| for p in range(start, end + 1): |
| if 1 <= p <= total_pages: |
| result.add(p - 1) |
| else: |
| p = int(part) |
| if 1 <= p <= total_pages: |
| result.add(p - 1) |
| except ValueError: |
| logger.warning(f"Invalid page spec part: {part}") |
| continue |
| |
| return result |
|
|
|
|
| def extract_drive_file_id(url: str) -> Optional[str]: |
| """Extract Google Drive file ID from URL""" |
| patterns = [ |
| r"/file/d/([^/]+)", |
| r"[?&]id=([^&]+)", |
| ] |
| |
| for pattern in patterns: |
| match = re.search(pattern, url) |
| if match: |
| return match.group(1) |
| |
| return None |
|
|
|
|
| async def download_from_url(url: str, out_path: Path, is_drive: bool = False) -> Path: |
| """Download file from URL using aiohttp""" |
| timeout = aiohttp.ClientTimeout(total=settings.request_timeout) |
| |
| async with aiohttp.ClientSession(timeout=timeout, headers=UA_HEADERS) as session: |
| if is_drive: |
| file_id = extract_drive_file_id(url) |
| if not file_id: |
| raise ValueError("Invalid Google Drive URL. Use format: /file/d/<ID>/view") |
| |
| download_url = f"https://drive.google.com/uc?export=download&id={file_id}" |
| |
| async with session.get(download_url) as resp: |
| resp.raise_for_status() |
| |
| |
| cookies = resp.cookies |
| confirm_token = None |
| for key, cookie in cookies.items(): |
| if key.startswith("download_warning"): |
| confirm_token = cookie.value |
| break |
| |
| if confirm_token: |
| download_url = f"{download_url}&confirm={confirm_token}" |
| async with session.get(download_url) as resp2: |
| resp2.raise_for_status() |
| content = await resp2.read() |
| else: |
| content = await resp.read() |
| else: |
| async with session.get(url) as resp: |
| resp.raise_for_status() |
| content = await resp.read() |
| |
| |
| if not content[:5] == b"%PDF-": |
| |
| if b"<html" in content[:1000].lower() or b"sign in" in content[:1000].lower(): |
| raise ValueError("Access denied. For Drive files, set sharing to 'Anyone with link'") |
| raise ValueError("Downloaded file is not a valid PDF") |
| |
| out_path.parent.mkdir(parents=True, exist_ok=True) |
| out_path.write_bytes(content) |
| |
| logger.info(f"Downloaded PDF: {out_path.name} ({len(content)} bytes)") |
| return out_path |
|
|
|
|
| def validate_pdf_upload(data: bytes) -> None: |
| """Validate uploaded PDF data""" |
| if not data: |
| raise ValueError("Empty file uploaded") |
| |
| if len(data) > settings.max_pdf_size_mb * 1024 * 1024: |
| raise ValueError(f"File too large. Maximum size: {settings.max_pdf_size_mb}MB") |
| |
| if not data[:5] == b"%PDF-": |
| raise ValueError("Invalid PDF file") |
|
|
|
|
| def add_text_watermark(page: fitz.Page, text: str, font_size: float, rotate: int) -> None: |
| """Add text watermark to PDF page""" |
| if not text.strip(): |
| return |
|
|
| rect = page.rect |
| box = fitz.Rect( |
| rect.x0 + rect.width * 0.05, |
| rect.y0 + rect.height * 0.05, |
| rect.x1 - rect.width * 0.05, |
| rect.y1 - rect.height * 0.05, |
| ) |
|
|
| try: |
| page.insert_textbox( |
| box, text, |
| fontsize=float(font_size), |
| rotate=int(rotate), |
| align=fitz.TEXT_ALIGN_CENTER, |
| color=(0.55, 0.55, 0.55), |
| overlay=True, |
| fill_opacity=0.18, |
| stroke_opacity=0.18, |
| ) |
| except TypeError: |
| |
| page.insert_textbox( |
| box, text, |
| fontsize=float(font_size), |
| rotate=int(rotate), |
| align=fitz.TEXT_ALIGN_CENTER, |
| color=(0.75, 0.75, 0.75), |
| overlay=True, |
| ) |
|
|
|
|
| def process_pdf( |
| input_path: Path, |
| output_path: Path, |
| remove_pages: str, |
| crop_top: float, |
| crop_bottom: float, |
| crop_left: float, |
| crop_right: float, |
| unit: str, |
| watermark_text: str, |
| watermark_size: float, |
| watermark_rotate: int, |
| ) -> dict: |
| """Process PDF with cropping, page removal, and watermark""" |
| |
| src = fitz.open(str(input_path)) |
| total = src.page_count |
| remove_set = parse_page_spec(remove_pages, total) |
|
|
| |
| T = to_points(crop_top, unit) |
| B = to_points(crop_bottom, unit) |
| L = to_points(crop_left, unit) |
| R = to_points(crop_right, unit) |
|
|
| out = fitz.open() |
| kept = 0 |
|
|
| for i in range(total): |
| if i in remove_set: |
| continue |
|
|
| out.insert_pdf(src, from_page=i, to_page=i) |
| page = out.load_page(out.page_count - 1) |
|
|
| rect = page.rect |
| crop_rect = fitz.Rect(rect.x0 + L, rect.y0 + T, rect.x1 - R, rect.y1 - B) |
| |
| if crop_rect.is_empty or crop_rect.width <= 2 or crop_rect.height <= 2: |
| raise ValueError(f"Invalid crop on page {i+1}. Reduce crop values.") |
|
|
| page.set_cropbox(crop_rect) |
|
|
| if watermark_text.strip(): |
| add_text_watermark(page, watermark_text, watermark_size, watermark_rotate) |
|
|
| kept += 1 |
|
|
| if kept == 0: |
| raise ValueError("All pages removed. Output would be empty.") |
|
|
| output_path.parent.mkdir(parents=True, exist_ok=True) |
| out.save(str(output_path)) |
| out.close() |
| src.close() |
|
|
| logger.info(f"Processed PDF: {kept}/{total} pages kept") |
| |
| return { |
| "original_pages": total, |
| "output_pages": kept, |
| "removed_pages": len(remove_set) |
| } |
|
|
|
|
| |
|
|
| def normalize_url(url: str) -> str: |
| """Normalize URL by removing fragment""" |
| parsed = urlparse(url) |
| return parsed._replace(fragment="").geturl() |
|
|
|
|
| def best_from_srcset(srcset: str, base_url: str) -> Optional[str]: |
| """Extract best quality image URL from srcset attribute""" |
| if not srcset: |
| return None |
| |
| candidates = [] |
| for part in srcset.split(","): |
| part = part.strip() |
| if not part: |
| continue |
| |
| bits = part.split() |
| img_url = urljoin(base_url, bits[0]) |
| score = 0.0 |
| |
| if len(bits) > 1: |
| descriptor = bits[1].lower().strip() |
| try: |
| if descriptor.endswith("w"): |
| score = float(descriptor[:-1]) |
| elif descriptor.endswith("x"): |
| score = float(descriptor[:-1]) * 10000.0 |
| except ValueError: |
| pass |
| |
| candidates.append((score, img_url)) |
| |
| if not candidates: |
| return None |
| |
| candidates.sort(key=lambda x: x[0], reverse=True) |
| return candidates[0][1] |
|
|
|
|
| def safe_filename(url: str, fallback: str) -> str: |
| """Generate safe filename from URL""" |
| name = Path(urlparse(url).path).name or fallback |
| name = re.sub(r"[^a-zA-Z0-9._-]+", "_", name) |
| |
| if "." not in name: |
| name += ".jpg" |
| |
| return name[:120] |
|
|
|
|
| async def fetch_image(session: aiohttp.ClientSession, img_url: str) -> tuple[bytes, Optional[int], Optional[int], Optional[str]]: |
| """Fetch image and return bytes with dimensions""" |
| async with session.get(img_url) as resp: |
| resp.raise_for_status() |
| content_type = resp.headers.get("Content-Type", "") |
| data = await resp.read() |
| |
| width = height = None |
| try: |
| with Image.open(io.BytesIO(data)) as img: |
| width, height = img.size |
| except Exception: |
| pass |
| |
| return data, width, height, content_type |
|
|
|
|
| async def scrape_images(page_url: str) -> dict: |
| """Scrape images from webpage""" |
| page_url = page_url.strip() |
| if not page_url: |
| raise ValueError("Web page URL is required") |
|
|
| timeout = aiohttp.ClientTimeout(total=settings.request_timeout) |
| |
| async with aiohttp.ClientSession(timeout=timeout, headers=UA_HEADERS) as session: |
| |
| async with session.get(page_url) as resp: |
| resp.raise_for_status() |
| html = await resp.text() |
|
|
| |
| found_urls: List[str] = [] |
| |
| |
| img_pattern = r'<img[^>]+>' |
| lazy_attrs = ["data-src", "data-original", "data-lazy-src", "data-url", "data-image", "data-srcset", "srcset", "src"] |
| |
| for img_match in re.finditer(img_pattern, html, re.IGNORECASE): |
| img_tag = img_match.group() |
| |
| for attr in lazy_attrs: |
| attr_pattern = rf'{attr}=["\']([^"\']+)["\']' |
| attr_match = re.search(attr_pattern, img_tag, re.IGNORECASE) |
| |
| if attr_match: |
| value = attr_match.group(1) |
| |
| if "srcset" in attr.lower(): |
| best = best_from_srcset(value, page_url) |
| if best: |
| found_urls.append(best) |
| else: |
| found_urls.append(urljoin(page_url, value)) |
|
|
| |
| seen = set() |
| deduped = [] |
| for url in found_urls: |
| normalized = normalize_url(url) |
| if normalized not in seen and normalized.startswith("http"): |
| seen.add(normalized) |
| deduped.append(normalized) |
|
|
| logger.info(f"Found {len(deduped)} unique image URLs on {page_url}") |
|
|
| |
| job_id = uuid.uuid4().hex[:10] |
| job_dir = SCRAPE_DIR / job_id |
| job_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| images = [] |
| filtered = 0 |
| |
| async def process_image(idx: int, url: str): |
| nonlocal filtered |
| try: |
| data, width, height, content_type = await fetch_image(session, url) |
| |
| |
| if width and height: |
| if width < settings.min_image_dimension or height < settings.min_image_dimension: |
| filtered += 1 |
| return None |
| |
| img_id = uuid.uuid4().hex[:10] |
| filename = safe_filename(url, f"image_{idx}.jpg") |
| |
| |
| (job_dir / f"{img_id}.bin").write_bytes(data) |
| |
| return { |
| "id": img_id, |
| "url": url, |
| "filename": filename, |
| "width": width, |
| "height": height, |
| "bytes": len(data), |
| } |
| except Exception as e: |
| logger.debug(f"Failed to fetch {url}: {e}") |
| return None |
|
|
| |
| semaphore = asyncio.Semaphore(10) |
| |
| async def limited_process(idx: int, url: str): |
| async with semaphore: |
| return await process_image(idx, url) |
| |
| tasks = [limited_process(idx, url) for idx, url in enumerate(deduped, 1)] |
| results = await asyncio.gather(*tasks) |
| |
| images = [r for r in results if r is not None] |
|
|
| |
| meta = { |
| "page_url": page_url, |
| "scraped_at": datetime.now().isoformat(), |
| "images": images |
| } |
| (job_dir / "meta.json").write_text( |
| json.dumps(meta, ensure_ascii=False, indent=2), |
| encoding="utf-8" |
| ) |
|
|
| logger.info(f"Scraped {len(images)} images, filtered {filtered} small icons") |
|
|
| return { |
| "job_id": job_id, |
| "images": images, |
| "total_found": len(deduped), |
| "filtered_count": filtered |
| } |
|
|
|
|
| def load_scrape_job(job_id: str) -> dict: |
| """Load scrape job metadata""" |
| job_dir = SCRAPE_DIR / job_id |
| meta_path = job_dir / "meta.json" |
| |
| if not job_dir.exists() or not meta_path.exists(): |
| raise HTTPException(status_code=404, detail="Scrape job not found. Please scrape again.") |
| |
| try: |
| return json.loads(meta_path.read_text(encoding="utf-8")) |
| except Exception as e: |
| logger.error(f"Failed to load job {job_id}: {e}") |
| raise HTTPException(status_code=500, detail="Corrupted scrape job metadata") |
|
|
|
|
| |
|
|
| def sanitize_filename(name: str, default: str = "file") -> str: |
| """Sanitize filename for safe file system use""" |
| name = (name or default).strip() |
| name = re.sub(r"[^a-zA-Z0-9._-]+", "_", name) |
| return name[:200] or default |
|
|
|
|
| def generate_job_id() -> str: |
| """Generate unique job ID""" |
| return uuid.uuid4().hex[:10] |
|
|
|
|
| |
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def home(): |
| """Serve main HTML page""" |
| index_path = STATIC_DIR / "index.html" |
| if not index_path.exists(): |
| raise HTTPException(status_code=500, detail="Frontend not found") |
| return HTMLResponse(index_path.read_text(encoding="utf-8")) |
|
|
|
|
| @app.get("/api/health", response_model=HealthResponse) |
| async def health_check(): |
| """Health check endpoint""" |
| uptime = (datetime.now() - startup_time).total_seconds() |
| return HealthResponse(status="healthy", uptime_seconds=uptime) |
|
|
|
|
| @app.post("/api/fetch") |
| async def api_fetch_pdf( |
| url: str = Form(""), |
| file: UploadFile = File(None), |
| output_name: str = Form("original.pdf"), |
| ): |
| """Fetch/upload PDF for preview (no processing)""" |
| output_name = sanitize_filename(output_name, "original.pdf") |
| if not output_name.lower().endswith(".pdf"): |
| output_name += ".pdf" |
|
|
| job_id = generate_job_id() |
| input_path = WORK_DIR / f"original_{job_id}.pdf" |
|
|
| try: |
| if file is not None and file.filename: |
| data = await file.read() |
| validate_pdf_upload(data) |
| input_path.write_bytes(data) |
| logger.info(f"Uploaded PDF: {file.filename}") |
| elif url.strip(): |
| is_drive = "drive.google.com" in url |
| await download_from_url(url, input_path, is_drive=is_drive) |
| else: |
| raise ValueError("Provide a PDF URL or upload a file") |
| |
| except ValueError as e: |
| raise HTTPException(status_code=400, detail=str(e)) |
| except aiohttp.ClientError as e: |
| logger.error(f"Download failed: {e}") |
| raise HTTPException(status_code=400, detail=f"Download failed: {str(e)}") |
| except Exception as e: |
| logger.exception("Unexpected error in fetch") |
| raise HTTPException(status_code=500, detail="Internal server error") |
|
|
| return FileResponse( |
| path=str(input_path), |
| media_type="application/pdf", |
| filename=output_name |
| ) |
|
|
|
|
| @app.post("/api/process") |
| async def api_process_pdf( |
| url: str = Form(""), |
| file: UploadFile = File(None), |
| output_name: str = Form("cropped.pdf"), |
| remove_pages: str = Form(""), |
| unit: str = Form("mm"), |
| top: float = Form(0), |
| bottom: float = Form(0), |
| left: float = Form(0), |
| right: float = Form(0), |
| watermark_text: str = Form(""), |
| watermark_size: float = Form(36), |
| watermark_rotate: int = Form(45), |
| ): |
| """Process PDF with cropping, page removal, and watermark""" |
| output_name = sanitize_filename(output_name, "cropped.pdf") |
| if not output_name.lower().endswith(".pdf"): |
| output_name += ".pdf" |
|
|
| job_id = generate_job_id() |
| input_path = WORK_DIR / f"input_{job_id}.pdf" |
| output_path = OUT_DIR / f"{job_id}_{output_name}" |
|
|
| |
| try: |
| if file is not None and file.filename: |
| data = await file.read() |
| validate_pdf_upload(data) |
| input_path.write_bytes(data) |
| elif url.strip(): |
| is_drive = "drive.google.com" in url |
| await download_from_url(url, input_path, is_drive=is_drive) |
| else: |
| raise ValueError("Provide a PDF URL or upload a file") |
| |
| except ValueError as e: |
| raise HTTPException(status_code=400, detail=str(e)) |
| except aiohttp.ClientError as e: |
| raise HTTPException(status_code=400, detail=f"Download failed: {str(e)}") |
|
|
| |
| try: |
| process_pdf( |
| input_path=input_path, |
| output_path=output_path, |
| remove_pages=remove_pages, |
| crop_top=top, |
| crop_bottom=bottom, |
| crop_left=left, |
| crop_right=right, |
| unit=unit, |
| watermark_text=watermark_text, |
| watermark_size=watermark_size, |
| watermark_rotate=watermark_rotate, |
| ) |
| except ValueError as e: |
| raise HTTPException(status_code=400, detail=str(e)) |
| except Exception as e: |
| logger.exception("PDF processing failed") |
| raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}") |
|
|
| return FileResponse( |
| path=str(output_path), |
| media_type="application/pdf", |
| filename=output_name |
| ) |
|
|
|
|
| @app.post("/api/scrape-images") |
| async def api_scrape_images(page_url: str = Form(...)): |
| """Scrape images from a webpage""" |
| try: |
| result = await scrape_images(page_url) |
| return JSONResponse(result) |
| except ValueError as e: |
| raise HTTPException(status_code=400, detail=str(e)) |
| except aiohttp.ClientError as e: |
| raise HTTPException(status_code=400, detail=f"Failed to fetch page: {str(e)}") |
| except Exception as e: |
| logger.exception("Scrape failed") |
| raise HTTPException(status_code=500, detail="Scraping failed") |
|
|
|
|
| @app.post("/api/download-zip") |
| async def api_download_zip( |
| job_id: str = Form(...), |
| image_ids: str = Form(...), |
| zip_name: str = Form("images.zip") |
| ): |
| """Download selected images as ZIP""" |
| meta = load_scrape_job(job_id.strip()) |
| job_dir = SCRAPE_DIR / job_id.strip() |
|
|
| selected = [x.strip() for x in image_ids.split(",") if x.strip()] |
| if not selected: |
| raise HTTPException(status_code=400, detail="No images selected") |
|
|
| zip_name = sanitize_filename(zip_name, "images.zip") |
| if not zip_name.lower().endswith(".zip"): |
| zip_name += ".zip" |
|
|
| output_path = OUT_DIR / f"{generate_job_id()}_{zip_name}" |
| id_to_meta = {img["id"]: img for img in meta.get("images", [])} |
|
|
| with zipfile.ZipFile(output_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: |
| for img_id in selected: |
| bin_path = job_dir / f"{img_id}.bin" |
| if bin_path.exists(): |
| filename = id_to_meta.get(img_id, {}).get("filename", f"{img_id}.jpg") |
| zf.writestr(filename, bin_path.read_bytes()) |
|
|
| logger.info(f"Created ZIP with {len(selected)} images") |
|
|
| return FileResponse( |
| path=str(output_path), |
| media_type="application/zip", |
| filename=zip_name |
| ) |
|
|
|
|
| @app.post("/api/download-pdf") |
| async def api_download_pdf( |
| job_id: str = Form(...), |
| image_ids: str = Form(...), |
| pdf_name: str = Form("images.pdf") |
| ): |
| """Download selected images as PDF""" |
| meta = load_scrape_job(job_id.strip()) |
| job_dir = SCRAPE_DIR / job_id.strip() |
|
|
| selected = [x.strip() for x in image_ids.split(",") if x.strip()] |
| if not selected: |
| raise HTTPException(status_code=400, detail="No images selected") |
|
|
| pdf_name = sanitize_filename(pdf_name, "images.pdf") |
| if not pdf_name.lower().endswith(".pdf"): |
| pdf_name += ".pdf" |
|
|
| |
| pil_pages: List[Image.Image] = [] |
| for img_id in selected: |
| bin_path = job_dir / f"{img_id}.bin" |
| if not bin_path.exists(): |
| continue |
| try: |
| with Image.open(bin_path) as img: |
| rgb = img.convert("RGB") |
| pil_pages.append(rgb.copy()) |
| except Exception as e: |
| logger.warning(f"Failed to process image {img_id}: {e}") |
| continue |
|
|
| if not pil_pages: |
| raise HTTPException(status_code=400, detail="No valid images to convert") |
|
|
| output_path = OUT_DIR / f"{generate_job_id()}_{pdf_name}" |
| |
| first = pil_pages[0] |
| rest = pil_pages[1:] if len(pil_pages) > 1 else [] |
| first.save(output_path, "PDF", save_all=True, append_images=rest) |
|
|
| logger.info(f"Created PDF with {len(pil_pages)} images") |
|
|
| return FileResponse( |
| path=str(output_path), |
| media_type="application/pdf", |
| filename=pdf_name |
| ) |
|
|
|
|
| @app.post("/api/cleanup") |
| async def api_trigger_cleanup(background_tasks: BackgroundTasks): |
| """Manually trigger cleanup (for admin use)""" |
| background_tasks.add_task(cleanup_old_files) |
| return {"message": "Cleanup scheduled"} |
|
|
|
|
| @app.post("/api/remove-watermark") |
| async def api_remove_watermark( |
| url: str = Form(""), |
| file: UploadFile = File(None), |
| output_name: str = Form("cleaned.pdf"), |
| watermark_text: str = Form("Educated Nepal"), |
| method: str = Form("inpaint"), |
| intensity: int = Form(50), |
| dpi: int = Form(120), |
| quality: int = Form(70), |
| ): |
| """Remove watermark from PDF using image processing""" |
| try: |
| from watermark_remover import remove_watermark_from_pdf, CV2_AVAILABLE |
| |
| if not CV2_AVAILABLE: |
| raise HTTPException( |
| status_code=500, |
| detail="OpenCV not installed. Run: pip install opencv-python-headless" |
| ) |
| except ImportError as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
| |
| output_name = sanitize_filename(output_name, "cleaned.pdf") |
| if not output_name.lower().endswith(".pdf"): |
| output_name += ".pdf" |
|
|
| job_id = generate_job_id() |
| input_path = WORK_DIR / f"input_{job_id}.pdf" |
| output_path = OUT_DIR / f"{job_id}_{output_name}" |
|
|
| |
| try: |
| if file is not None and file.filename: |
| data = await file.read() |
| validate_pdf_upload(data) |
| input_path.write_bytes(data) |
| elif url.strip(): |
| is_drive = "drive.google.com" in url |
| await download_from_url(url, input_path, is_drive=is_drive) |
| else: |
| raise ValueError("Provide a PDF URL or upload a file") |
| |
| except ValueError as e: |
| raise HTTPException(status_code=400, detail=str(e)) |
| except aiohttp.ClientError as e: |
| raise HTTPException(status_code=400, detail=f"Download failed: {str(e)}") |
|
|
| |
| dpi = max(72, min(200, dpi)) |
| quality = max(30, min(95, quality)) |
|
|
| |
| try: |
| pdf_bytes = input_path.read_bytes() |
| original_size = len(pdf_bytes) |
| |
| result_bytes = remove_watermark_from_pdf( |
| pdf_bytes=pdf_bytes, |
| watermark_text=watermark_text, |
| method=method, |
| intensity=intensity, |
| dpi=dpi, |
| jpeg_quality=quality |
| ) |
| output_path.write_bytes(result_bytes) |
| |
| output_size = len(result_bytes) |
| logger.info(f"Watermark removed: {output_name}, {original_size/1024:.0f}KB -> {output_size/1024:.0f}KB") |
| |
| except Exception as e: |
| logger.exception("Watermark removal failed") |
| raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}") |
|
|
| return FileResponse( |
| path=str(output_path), |
| media_type="application/pdf", |
| filename=output_name, |
| headers={ |
| "X-Original-Size": str(original_size), |
| "X-Output-Size": str(output_size) |
| } |
| ) |
|
|
|
|
| @app.post("/api/watermark-preview") |
| async def api_watermark_preview( |
| url: str = Form(""), |
| file: UploadFile = File(None), |
| page: int = Form(0), |
| method: str = Form("inpaint"), |
| intensity: int = Form(50), |
| ): |
| """Preview watermark removal on a single page - returns original and processed images""" |
| try: |
| from watermark_remover import preview_single_page, CV2_AVAILABLE |
| |
| if not CV2_AVAILABLE: |
| raise HTTPException(status_code=500, detail="OpenCV not installed") |
| except ImportError as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| |
| try: |
| if file is not None and file.filename: |
| pdf_bytes = await file.read() |
| validate_pdf_upload(pdf_bytes) |
| elif url.strip(): |
| job_id = generate_job_id() |
| input_path = WORK_DIR / f"preview_{job_id}.pdf" |
| is_drive = "drive.google.com" in url |
| await download_from_url(url, input_path, is_drive=is_drive) |
| pdf_bytes = input_path.read_bytes() |
| else: |
| raise ValueError("Provide a PDF URL or upload a file") |
| |
| except ValueError as e: |
| raise HTTPException(status_code=400, detail=str(e)) |
|
|
| |
| try: |
| import base64 |
| original_png, processed_png = preview_single_page( |
| pdf_bytes=pdf_bytes, |
| page_num=page, |
| method=method, |
| intensity=intensity, |
| dpi=100 |
| ) |
| |
| return JSONResponse({ |
| "original": base64.b64encode(original_png).decode(), |
| "processed": base64.b64encode(processed_png).decode(), |
| "page": page |
| }) |
| |
| except Exception as e: |
| logger.exception("Preview generation failed") |
| raise HTTPException(status_code=500, detail=f"Preview failed: {str(e)}") |
|
|
|
|
| |
|
|
| @app.post("/api/images-to-pdf") |
| async def api_images_to_pdf( |
| files: List[UploadFile] = File(...), |
| order: str = Form(""), |
| output_name: str = Form("images.pdf"), |
| page_size: str = Form("a4"), |
| margin: int = Form(20), |
| ): |
| """Convert multiple images to PDF with custom order""" |
| if not files: |
| raise HTTPException(status_code=400, detail="No images provided") |
| |
| output_name = sanitize_filename(output_name, "images.pdf") |
| if not output_name.lower().endswith(".pdf"): |
| output_name += ".pdf" |
| |
| |
| if order.strip(): |
| try: |
| indices = [int(i.strip()) for i in order.split(",")] |
| ordered_files = [files[i] for i in indices if 0 <= i < len(files)] |
| except (ValueError, IndexError): |
| ordered_files = files |
| else: |
| ordered_files = files |
| |
| |
| page_sizes = { |
| "a4": (595, 842), |
| "letter": (612, 792), |
| "a3": (842, 1191), |
| "fit": None |
| } |
| |
| try: |
| pil_images = [] |
| for f in ordered_files: |
| data = await f.read() |
| try: |
| img = Image.open(io.BytesIO(data)) |
| if img.mode in ('RGBA', 'P'): |
| img = img.convert('RGB') |
| pil_images.append(img) |
| except Exception as e: |
| logger.warning(f"Skipping invalid image {f.filename}: {e}") |
| continue |
| |
| if not pil_images: |
| raise HTTPException(status_code=400, detail="No valid images found") |
| |
| |
| output_path = OUT_DIR / f"{generate_job_id()}_{output_name}" |
| |
| if page_size == "fit": |
| |
| first = pil_images[0] |
| rest = pil_images[1:] if len(pil_images) > 1 else [] |
| first.save(output_path, "PDF", save_all=True, append_images=rest) |
| else: |
| |
| page_w, page_h = page_sizes.get(page_size, page_sizes["a4"]) |
| |
| doc = fitz.open() |
| for img in pil_images: |
| |
| img_buffer = io.BytesIO() |
| img.save(img_buffer, format='JPEG', quality=90) |
| img_bytes = img_buffer.getvalue() |
| |
| |
| page = doc.new_page(width=page_w, height=page_h) |
| |
| |
| img_w, img_h = img.size |
| available_w = page_w - 2 * margin |
| available_h = page_h - 2 * margin |
| |
| |
| scale = min(available_w / img_w, available_h / img_h) |
| new_w = img_w * scale |
| new_h = img_h * scale |
| |
| |
| x = (page_w - new_w) / 2 |
| y = (page_h - new_h) / 2 |
| |
| rect = fitz.Rect(x, y, x + new_w, y + new_h) |
| page.insert_image(rect, stream=img_bytes) |
| |
| doc.save(str(output_path)) |
| doc.close() |
| |
| logger.info(f"Created PDF from {len(pil_images)} images") |
| |
| return FileResponse( |
| path=str(output_path), |
| media_type="application/pdf", |
| filename=output_name |
| ) |
| |
| except Exception as e: |
| logger.exception("Images to PDF failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| |
|
|
| @app.post("/api/merge-pdf") |
| async def api_merge_pdf( |
| files: List[UploadFile] = File(...), |
| order: str = Form(""), |
| output_name: str = Form("merged.pdf"), |
| ): |
| """Merge multiple PDFs into one""" |
| if not files or len(files) < 2: |
| raise HTTPException(status_code=400, detail="At least 2 PDF files required") |
| |
| output_name = sanitize_filename(output_name, "merged.pdf") |
| if not output_name.lower().endswith(".pdf"): |
| output_name += ".pdf" |
| |
| |
| if order.strip(): |
| try: |
| indices = [int(i.strip()) for i in order.split(",")] |
| ordered_files = [files[i] for i in indices if 0 <= i < len(files)] |
| except (ValueError, IndexError): |
| ordered_files = files |
| else: |
| ordered_files = files |
| |
| try: |
| output_doc = fitz.open() |
| total_pages = 0 |
| |
| for f in ordered_files: |
| data = await f.read() |
| if not data[:5] == b"%PDF-": |
| logger.warning(f"Skipping non-PDF file: {f.filename}") |
| continue |
| |
| src_doc = fitz.open(stream=data, filetype="pdf") |
| output_doc.insert_pdf(src_doc) |
| total_pages += len(src_doc) |
| src_doc.close() |
| |
| if total_pages == 0: |
| raise HTTPException(status_code=400, detail="No valid PDF files found") |
| |
| output_path = OUT_DIR / f"{generate_job_id()}_{output_name}" |
| output_doc.save(str(output_path), deflate=True) |
| output_doc.close() |
| |
| logger.info(f"Merged {len(ordered_files)} PDFs, {total_pages} total pages") |
| |
| return FileResponse( |
| path=str(output_path), |
| media_type="application/pdf", |
| filename=output_name |
| ) |
| |
| except Exception as e: |
| logger.exception("PDF merge failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| |
|
|
| @app.post("/api/split-pdf") |
| async def api_split_pdf( |
| file: UploadFile = File(...), |
| mode: str = Form("all"), |
| pages: str = Form(""), |
| output_name: str = Form("split"), |
| ): |
| """ |
| Split PDF into multiple files. |
| Modes: 'all' (each page), 'range' (specific pages), 'chunks' (every N pages) |
| """ |
| data = await file.read() |
| validate_pdf_upload(data) |
| |
| output_name = sanitize_filename(output_name, "split") |
| |
| try: |
| src_doc = fitz.open(stream=data, filetype="pdf") |
| total_pages = len(src_doc) |
| |
| if mode == "all": |
| |
| job_id = generate_job_id() |
| zip_path = OUT_DIR / f"{job_id}_{output_name}.zip" |
| |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: |
| for i in range(total_pages): |
| page_doc = fitz.open() |
| page_doc.insert_pdf(src_doc, from_page=i, to_page=i) |
| |
| pdf_bytes = page_doc.tobytes() |
| zf.writestr(f"{output_name}_page_{i+1}.pdf", pdf_bytes) |
| page_doc.close() |
| |
| src_doc.close() |
| logger.info(f"Split PDF into {total_pages} individual pages") |
| |
| return FileResponse( |
| path=str(zip_path), |
| media_type="application/zip", |
| filename=f"{output_name}_pages.zip" |
| ) |
| |
| elif mode == "range": |
| |
| page_set = parse_page_spec(pages, total_pages) |
| if not page_set: |
| raise HTTPException(status_code=400, detail="No valid pages specified") |
| |
| output_doc = fitz.open() |
| for i in sorted(page_set): |
| output_doc.insert_pdf(src_doc, from_page=i, to_page=i) |
| |
| output_path = OUT_DIR / f"{generate_job_id()}_{output_name}.pdf" |
| output_doc.save(str(output_path)) |
| output_doc.close() |
| src_doc.close() |
| |
| logger.info(f"Extracted {len(page_set)} pages from PDF") |
| |
| return FileResponse( |
| path=str(output_path), |
| media_type="application/pdf", |
| filename=f"{output_name}.pdf" |
| ) |
| |
| elif mode == "chunks": |
| |
| try: |
| chunk_size = int(pages) if pages else 1 |
| chunk_size = max(1, chunk_size) |
| except ValueError: |
| chunk_size = 1 |
| |
| job_id = generate_job_id() |
| zip_path = OUT_DIR / f"{job_id}_{output_name}.zip" |
| |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: |
| chunk_num = 1 |
| for start in range(0, total_pages, chunk_size): |
| end = min(start + chunk_size - 1, total_pages - 1) |
| |
| chunk_doc = fitz.open() |
| chunk_doc.insert_pdf(src_doc, from_page=start, to_page=end) |
| |
| pdf_bytes = chunk_doc.tobytes() |
| zf.writestr(f"{output_name}_part_{chunk_num}.pdf", pdf_bytes) |
| chunk_doc.close() |
| chunk_num += 1 |
| |
| src_doc.close() |
| logger.info(f"Split PDF into {chunk_num-1} chunks of {chunk_size} pages") |
| |
| return FileResponse( |
| path=str(zip_path), |
| media_type="application/zip", |
| filename=f"{output_name}_parts.zip" |
| ) |
| |
| else: |
| raise HTTPException(status_code=400, detail="Invalid split mode") |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| logger.exception("PDF split failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| |
|
|
| @app.post("/api/pdf-to-images") |
| async def api_pdf_to_images( |
| file: UploadFile = File(...), |
| format: str = Form("png"), |
| dpi: int = Form(200), |
| pages: str = Form(""), |
| output_name: str = Form("pages"), |
| ): |
| """Convert PDF pages to high-quality images (PNG or JPG) |
| |
| DPI Guide: |
| - 150: Fast, small files (web preview) |
| - 200: Good quality (default) |
| - 300: Print quality |
| - 400: Ultra sharp (presentations) |
| - 600: Maximum quality (OCR/archive) |
| """ |
| data = await file.read() |
| validate_pdf_upload(data) |
| |
| output_name = sanitize_filename(output_name, "pages") |
| format = format.lower() if format.lower() in ["png", "jpg", "jpeg"] else "png" |
| if format == "jpeg": |
| format = "jpg" |
| |
| |
| dpi = max(72, min(600, dpi)) |
| |
| try: |
| src_doc = fitz.open(stream=data, filetype="pdf") |
| total_pages = len(src_doc) |
| |
| |
| if pages.strip(): |
| page_set = parse_page_spec(pages, total_pages) |
| else: |
| page_set = set(range(total_pages)) |
| |
| if not page_set: |
| raise HTTPException(status_code=400, detail="No valid pages specified") |
| |
| job_id = generate_job_id() |
| zip_path = OUT_DIR / f"{job_id}_{output_name}.zip" |
| |
| |
| zoom = dpi / 72.0 |
| matrix = fitz.Matrix(zoom, zoom) |
| |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: |
| for i in sorted(page_set): |
| page = src_doc.load_page(i) |
| |
| pix = page.get_pixmap(matrix=matrix, alpha=False) |
| |
| if format == "png": |
| img_bytes = pix.tobytes("png") |
| else: |
| |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
| buffer = io.BytesIO() |
| img.save(buffer, format="JPEG", quality=95, optimize=True) |
| img_bytes = buffer.getvalue() |
| |
| zf.writestr(f"{output_name}_page_{i+1}.{format}", img_bytes) |
| |
| src_doc.close() |
| logger.info(f"Converted {len(page_set)} pages to {format.upper()} at {dpi} DPI") |
| |
| return FileResponse( |
| path=str(zip_path), |
| media_type="application/zip", |
| filename=f"{output_name}_images.zip" |
| ) |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| logger.exception("PDF to images failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| |
|
|
| @app.post("/api/compress-pdf") |
| async def api_compress_pdf( |
| file: UploadFile = File(...), |
| quality: int = Form(60), |
| output_name: str = Form("compressed.pdf"), |
| ): |
| """Compress PDF by reducing image quality and cleaning metadata""" |
| data = await file.read() |
| validate_pdf_upload(data) |
| |
| output_name = sanitize_filename(output_name, "compressed.pdf") |
| if not output_name.lower().endswith(".pdf"): |
| output_name += ".pdf" |
| |
| |
| jpeg_quality = max(10, min(100, quality)) |
| |
| |
| |
| max_dim = int(1000 + (quality / 100) * 2000) |
| |
| logger.info(f"Compressing PDF with quality={jpeg_quality}, max_dim={max_dim}") |
| |
| try: |
| src_doc = fitz.open(stream=data, filetype="pdf") |
| images_processed = 0 |
| |
| |
| for page_num in range(len(src_doc)): |
| page = src_doc.load_page(page_num) |
| image_list = page.get_images(full=True) |
| |
| for img_index, img_info in enumerate(image_list): |
| xref = img_info[0] |
| |
| try: |
| |
| base_image = src_doc.extract_image(xref) |
| if not base_image: |
| continue |
| |
| image_bytes = base_image["image"] |
| original_img_size = len(image_bytes) |
| |
| |
| if original_img_size < 5000: |
| continue |
| |
| |
| img = Image.open(io.BytesIO(image_bytes)) |
| |
| |
| if img.mode in ('RGBA', 'P'): |
| background = Image.new('RGB', img.size, (255, 255, 255)) |
| if img.mode == 'P': |
| img = img.convert('RGBA') |
| if img.mode == 'RGBA': |
| background.paste(img, mask=img.split()[3]) |
| img = background |
| else: |
| img = img.convert('RGB') |
| elif img.mode != 'RGB': |
| img = img.convert('RGB') |
| |
| |
| if max(img.size) > max_dim: |
| ratio = max_dim / max(img.size) |
| new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio)) |
| img = img.resize(new_size, Image.Resampling.LANCZOS) |
| |
| |
| buffer = io.BytesIO() |
| img.save(buffer, format="JPEG", quality=jpeg_quality, optimize=True) |
| compressed_bytes = buffer.getvalue() |
| |
| |
| |
| page.replace_image(xref, stream=compressed_bytes) |
| images_processed += 1 |
| |
| except Exception as e: |
| logger.debug(f"Could not compress image {xref}: {e}") |
| continue |
| |
| logger.info(f"Processed {images_processed} images") |
| |
| output_path = OUT_DIR / f"{generate_job_id()}_{output_name}" |
| |
| |
| src_doc.save( |
| str(output_path), |
| garbage=4, |
| deflate=True, |
| clean=True, |
| ) |
| |
| original_size = len(data) |
| compressed_size = output_path.stat().st_size |
| |
| |
| if compressed_size >= original_size * 0.95: |
| src_doc.close() |
| |
| src_doc = fitz.open(stream=data, filetype="pdf") |
| src_doc.save( |
| str(output_path), |
| garbage=4, |
| deflate=True, |
| clean=True, |
| ) |
| compressed_size = output_path.stat().st_size |
| |
| reduction = ((original_size - compressed_size) / original_size) * 100 |
| if reduction < 0: |
| reduction = 0 |
| |
| output_path.write_bytes(data) |
| compressed_size = original_size |
| |
| src_doc.close() |
| |
| logger.info(f"Compressed PDF: {original_size} -> {compressed_size} bytes ({reduction:.1f}% reduction)") |
| |
| return FileResponse( |
| path=str(output_path), |
| media_type="application/pdf", |
| filename=output_name, |
| headers={ |
| "X-Original-Size": str(original_size), |
| "X-Compressed-Size": str(compressed_size), |
| "X-Reduction-Percent": f"{reduction:.1f}" |
| } |
| ) |
| |
| except Exception as e: |
| logger.exception("PDF compression failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| |
|
|
| @app.post("/api/rotate-pdf") |
| async def api_rotate_pdf( |
| file: UploadFile = File(...), |
| rotation: int = Form(90), |
| pages: str = Form(""), |
| output_name: str = Form("rotated.pdf"), |
| ): |
| """Rotate PDF pages (90, 180, or 270 degrees)""" |
| data = await file.read() |
| validate_pdf_upload(data) |
| |
| output_name = sanitize_filename(output_name, "rotated.pdf") |
| if not output_name.lower().endswith(".pdf"): |
| output_name += ".pdf" |
| |
| |
| rotation = int(rotation) % 360 |
| if rotation not in [90, 180, 270]: |
| rotation = 90 |
| |
| try: |
| src_doc = fitz.open(stream=data, filetype="pdf") |
| total_pages = len(src_doc) |
| |
| |
| if pages.strip(): |
| page_set = parse_page_spec(pages, total_pages) |
| else: |
| page_set = set(range(total_pages)) |
| |
| |
| for i in page_set: |
| page = src_doc.load_page(i) |
| page.set_rotation(page.rotation + rotation) |
| |
| output_path = OUT_DIR / f"{generate_job_id()}_{output_name}" |
| src_doc.save(str(output_path)) |
| src_doc.close() |
| |
| logger.info(f"Rotated {len(page_set)} pages by {rotation}°") |
| |
| return FileResponse( |
| path=str(output_path), |
| media_type="application/pdf", |
| filename=output_name |
| ) |
| |
| except Exception as e: |
| logger.exception("PDF rotation failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| |
|
|
| @app.post("/api/add-page-numbers") |
| async def api_add_page_numbers( |
| file: UploadFile = File(...), |
| position: str = Form("bottom-center"), |
| format: str = Form("Page {n} of {total}"), |
| start_number: int = Form(1), |
| font_size: int = Form(11), |
| margin: int = Form(30), |
| output_name: str = Form("numbered.pdf"), |
| ): |
| """Add page numbers to PDF""" |
| data = await file.read() |
| validate_pdf_upload(data) |
| |
| output_name = sanitize_filename(output_name, "numbered.pdf") |
| if not output_name.lower().endswith(".pdf"): |
| output_name += ".pdf" |
| |
| |
| positions = { |
| "top-left": ("left", "top"), |
| "top-center": ("center", "top"), |
| "top-right": ("right", "top"), |
| "bottom-left": ("left", "bottom"), |
| "bottom-center": ("center", "bottom"), |
| "bottom-right": ("right", "bottom"), |
| } |
| h_align, v_align = positions.get(position, ("center", "bottom")) |
| |
| try: |
| src_doc = fitz.open(stream=data, filetype="pdf") |
| total_pages = len(src_doc) |
| |
| for i in range(total_pages): |
| page = src_doc.load_page(i) |
| rect = page.rect |
| |
| |
| page_num = start_number + i |
| text = format.replace("{n}", str(page_num)).replace("{total}", str(total_pages)) |
| |
| |
| text_width = fitz.get_text_length(text, fontsize=font_size) |
| |
| if h_align == "left": |
| x = margin |
| elif h_align == "right": |
| x = rect.width - margin - text_width |
| else: |
| x = (rect.width - text_width) / 2 |
| |
| if v_align == "top": |
| y = margin + font_size |
| else: |
| y = rect.height - margin |
| |
| |
| page.insert_text( |
| (x, y), |
| text, |
| fontsize=font_size, |
| color=(0.3, 0.3, 0.3), |
| ) |
| |
| output_path = OUT_DIR / f"{generate_job_id()}_{output_name}" |
| src_doc.save(str(output_path)) |
| src_doc.close() |
| |
| logger.info(f"Added page numbers to {total_pages} pages") |
| |
| return FileResponse( |
| path=str(output_path), |
| media_type="application/pdf", |
| filename=output_name |
| ) |
| |
| except Exception as e: |
| logger.exception("Add page numbers failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| |
|
|
| |
| TESSERACT_AVAILABLE = False |
| try: |
| import pytesseract |
| |
| pytesseract.get_tesseract_version() |
| TESSERACT_AVAILABLE = True |
| except Exception: |
| logger.warning("Tesseract not available. OCR will use basic text extraction only.") |
|
|
|
|
| @app.post("/api/pdf-ocr") |
| async def api_pdf_ocr( |
| file: UploadFile = File(...), |
| language: str = Form("eng"), |
| pages: str = Form(""), |
| output_format: str = Form("txt"), |
| dpi: int = Form(200), |
| ): |
| """Extract text from PDF using OCR (Tesseract) or native text extraction""" |
| data = await file.read() |
| validate_pdf_upload(data) |
| |
| dpi = max(100, min(400, dpi)) |
| |
| try: |
| src_doc = fitz.open(stream=data, filetype="pdf") |
| total_pages = len(src_doc) |
| |
| |
| if pages.strip(): |
| page_set = parse_page_spec(pages, total_pages) |
| else: |
| page_set = set(range(total_pages)) |
| |
| if not page_set: |
| raise HTTPException(status_code=400, detail="No valid pages specified") |
| |
| all_text = [] |
| |
| for page_num in sorted(page_set): |
| page = src_doc.load_page(page_num) |
| |
| |
| native_text = page.get_text("text").strip() |
| |
| |
| if len(native_text) > 50: |
| all_text.append(f"--- Page {page_num + 1} ---\n{native_text}") |
| elif TESSERACT_AVAILABLE: |
| |
| import pytesseract |
| |
| zoom = dpi / 72.0 |
| mat = fitz.Matrix(zoom, zoom) |
| pix = page.get_pixmap(matrix=mat) |
| |
| |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
| |
| |
| ocr_text = pytesseract.image_to_string(img, lang=language) |
| all_text.append(f"--- Page {page_num + 1} (OCR) ---\n{ocr_text.strip()}") |
| else: |
| |
| all_text.append(f"--- Page {page_num + 1} ---\n{native_text if native_text else '[No text detected - Tesseract not installed]'}") |
| |
| src_doc.close() |
| |
| combined_text = "\n\n".join(all_text) |
| |
| job_id = generate_job_id() |
| |
| if output_format == "json": |
| |
| return JSONResponse({ |
| "pages": len(page_set), |
| "text": combined_text, |
| "ocr_used": TESSERACT_AVAILABLE, |
| }) |
| else: |
| |
| output_path = OUT_DIR / f"{job_id}_extracted.txt" |
| output_path.write_text(combined_text, encoding="utf-8") |
| |
| logger.info(f"Extracted text from {len(page_set)} pages") |
| |
| return FileResponse( |
| path=str(output_path), |
| media_type="text/plain", |
| filename="extracted_text.txt" |
| ) |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| logger.exception("PDF OCR failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.get("/api/ocr-status") |
| async def api_ocr_status(): |
| """Check if Tesseract OCR is available""" |
| return {"available": TESSERACT_AVAILABLE} |
|
|
|
|
| |
|
|
| def enhance_image(img: Image.Image, level: str = "medium") -> Image.Image: |
| """Enhance image quality with sharpening and contrast adjustment""" |
| from PIL import ImageEnhance, ImageFilter |
| |
| |
| presets = { |
| "light": (1.2, 1.05, 1.05, 1.02), |
| "medium": (1.4, 1.1, 1.1, 1.03), |
| "strong": (1.6, 1.15, 1.15, 1.05), |
| } |
| sharpness, contrast, color, brightness = presets.get(level, presets["medium"]) |
| |
| |
| if img.mode in ('RGBA', 'P'): |
| |
| if img.mode == 'RGBA': |
| alpha = img.split()[3] |
| img = img.convert('RGB') |
| else: |
| img = img.convert('RGB') |
| alpha = None |
| else: |
| img = img.convert('RGB') |
| alpha = None |
| |
| |
| img = img.filter(ImageFilter.UnsharpMask(radius=1.5, percent=50, threshold=3)) |
| |
| |
| enhancer = ImageEnhance.Sharpness(img) |
| img = enhancer.enhance(sharpness) |
| |
| |
| enhancer = ImageEnhance.Contrast(img) |
| img = enhancer.enhance(contrast) |
| |
| |
| enhancer = ImageEnhance.Color(img) |
| img = enhancer.enhance(color) |
| |
| |
| enhancer = ImageEnhance.Brightness(img) |
| img = enhancer.enhance(brightness) |
| |
| |
| if alpha: |
| img = img.convert('RGBA') |
| img.putalpha(alpha) |
| |
| return img |
|
|
|
|
| @app.post("/api/enhance-image") |
| async def api_enhance_image( |
| file: UploadFile = File(...), |
| level: str = Form("medium"), |
| upscale: float = Form(1.0), |
| output_name: str = Form("enhanced"), |
| ): |
| """Enhance image quality with sharpening and optional upscaling""" |
| data = await file.read() |
| original_size = len(data) |
| |
| if original_size > settings.max_image_size_mb * 1024 * 1024: |
| raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB") |
| |
| output_name = sanitize_filename(output_name, "enhanced") |
| level = level if level in ["light", "medium", "strong"] else "medium" |
| upscale = max(1.0, min(2.0, upscale)) |
| |
| try: |
| img = Image.open(io.BytesIO(data)) |
| original_format = file.filename.split('.')[-1].lower() if file.filename else 'jpg' |
| |
| |
| if upscale > 1.0: |
| new_size = (int(img.size[0] * upscale), int(img.size[1] * upscale)) |
| img = img.resize(new_size, Image.Resampling.LANCZOS) |
| |
| |
| img = enhance_image(img, level) |
| |
| |
| if original_format in ['jpg', 'jpeg']: |
| output_format = 'JPEG' |
| ext = 'jpg' |
| mime = 'image/jpeg' |
| if img.mode == 'RGBA': |
| img = img.convert('RGB') |
| |
| quality = 88 |
| elif original_format == 'webp': |
| output_format = 'WEBP' |
| ext = 'webp' |
| mime = 'image/webp' |
| quality = 88 |
| else: |
| output_format = 'PNG' |
| ext = 'png' |
| mime = 'image/png' |
| quality = None |
| |
| output_path = OUT_DIR / f"{generate_job_id()}_{output_name}.{ext}" |
| |
| save_kwargs = {'optimize': True} |
| if quality: |
| save_kwargs['quality'] = quality |
| |
| img.save(str(output_path), output_format, **save_kwargs) |
| |
| enhanced_size = output_path.stat().st_size |
| |
| |
| if enhanced_size > original_size * 3 and output_format in ['JPEG', 'WEBP']: |
| quality = 75 |
| img.save(str(output_path), output_format, quality=quality, optimize=True) |
| enhanced_size = output_path.stat().st_size |
| |
| logger.info(f"Enhanced image: {original_size} -> {enhanced_size} bytes") |
| |
| return FileResponse( |
| path=str(output_path), |
| media_type=mime, |
| filename=f"{output_name}.{ext}", |
| headers={ |
| "X-Original-Size": str(original_size), |
| "X-Enhanced-Size": str(enhanced_size), |
| } |
| ) |
| |
| except Exception as e: |
| logger.exception("Image enhancement failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/api/enhance-pdf") |
| async def api_enhance_pdf( |
| file: UploadFile = File(...), |
| level: str = Form("medium"), |
| dpi: int = Form(150), |
| output_name: str = Form("enhanced.pdf"), |
| ): |
| """Enhance PDF quality by improving embedded images""" |
| data = await file.read() |
| original_size = len(data) |
| validate_pdf_upload(data) |
| |
| output_name = sanitize_filename(output_name, "enhanced.pdf") |
| if not output_name.lower().endswith(".pdf"): |
| output_name += ".pdf" |
| |
| level = level if level in ["light", "medium", "strong"] else "medium" |
| dpi = max(100, min(200, dpi)) |
| |
| |
| jpeg_quality = {"light": 82, "medium": 85, "strong": 88}.get(level, 85) |
| |
| try: |
| src_doc = fitz.open(stream=data, filetype="pdf") |
| images_enhanced = 0 |
| |
| for page_num in range(len(src_doc)): |
| page = src_doc.load_page(page_num) |
| image_list = page.get_images(full=True) |
| |
| for img_info in image_list: |
| xref = img_info[0] |
| |
| try: |
| base_image = src_doc.extract_image(xref) |
| if not base_image: |
| continue |
| |
| image_bytes = base_image["image"] |
| |
| |
| if len(image_bytes) < 5000: |
| continue |
| |
| |
| img = Image.open(io.BytesIO(image_bytes)) |
| |
| |
| if max(img.size) < 100: |
| continue |
| |
| |
| if img.mode in ('RGBA', 'P'): |
| img = img.convert('RGB') |
| elif img.mode != 'RGB': |
| img = img.convert('RGB') |
| |
| |
| img = enhance_image(img, level) |
| |
| |
| buffer = io.BytesIO() |
| img.save(buffer, format="JPEG", quality=jpeg_quality, optimize=True) |
| enhanced_bytes = buffer.getvalue() |
| |
| |
| if len(enhanced_bytes) <= len(image_bytes) * 1.5: |
| page.replace_image(xref, stream=enhanced_bytes) |
| images_enhanced += 1 |
| |
| except Exception as e: |
| logger.debug(f"Could not enhance image {xref}: {e}") |
| continue |
| |
| output_path = OUT_DIR / f"{generate_job_id()}_{output_name}" |
| |
| |
| src_doc.save( |
| str(output_path), |
| garbage=3, |
| deflate=True, |
| clean=True, |
| ) |
| src_doc.close() |
| |
| enhanced_size = output_path.stat().st_size |
| |
| |
| if enhanced_size > original_size * 4: |
| output_path.write_bytes(data) |
| enhanced_size = original_size |
| logger.warning("Enhanced PDF was too large, returning original") |
| |
| size_change = ((enhanced_size - original_size) / original_size) * 100 |
| |
| logger.info(f"Enhanced PDF: {images_enhanced} images, {original_size} -> {enhanced_size} bytes ({size_change:+.1f}%)") |
| |
| return FileResponse( |
| path=str(output_path), |
| media_type="application/pdf", |
| filename=output_name, |
| headers={ |
| "X-Original-Size": str(original_size), |
| "X-Enhanced-Size": str(enhanced_size), |
| "X-Images-Enhanced": str(images_enhanced), |
| } |
| ) |
| |
| except Exception as e: |
| logger.exception("PDF enhancement failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| |
|
|
| |
| REMBG_AVAILABLE = False |
| rembg_remove = None |
|
|
| def _check_rembg(): |
| """Safely check if rembg is available""" |
| global REMBG_AVAILABLE, rembg_remove |
| try: |
| import sys |
| import io |
| |
| old_stderr = sys.stderr |
| sys.stderr = io.StringIO() |
| try: |
| from rembg import remove |
| rembg_remove = remove |
| REMBG_AVAILABLE = True |
| finally: |
| sys.stderr = old_stderr |
| except (ImportError, Exception) as e: |
| REMBG_AVAILABLE = False |
| logger.info(f"rembg not available: {e}") |
|
|
| |
| |
|
|
|
|
| @app.post("/api/remove-background") |
| async def api_remove_background( |
| file: UploadFile = File(...), |
| output_name: str = Form("no-bg.png"), |
| ): |
| """Remove background from image using AI (rembg)""" |
| global REMBG_AVAILABLE, rembg_remove |
| |
| |
| if rembg_remove is None and not REMBG_AVAILABLE: |
| try: |
| from rembg import remove |
| rembg_remove = remove |
| REMBG_AVAILABLE = True |
| except Exception as e: |
| logger.warning(f"rembg not available: {e}") |
| REMBG_AVAILABLE = False |
| |
| if not REMBG_AVAILABLE or rembg_remove is None: |
| raise HTTPException( |
| status_code=500, |
| detail="rembg not installed. Run: pip install rembg[gpu] or pip install rembg" |
| ) |
| |
| data = await file.read() |
| if len(data) > settings.max_image_size_mb * 1024 * 1024: |
| raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB") |
| |
| output_name = sanitize_filename(output_name, "no-bg.png") |
| if not output_name.lower().endswith(".png"): |
| output_name += ".png" |
| |
| try: |
| |
| result = rembg_remove(data) |
| |
| output_path = OUT_DIR / f"{generate_job_id()}_{output_name}" |
| output_path.write_bytes(result) |
| |
| logger.info(f"Background removed: {output_name}") |
| |
| return FileResponse( |
| path=str(output_path), |
| media_type="image/png", |
| filename=output_name |
| ) |
| |
| except Exception as e: |
| logger.exception("Background removal failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/api/add-image-watermark") |
| async def api_add_image_watermark( |
| file: UploadFile = File(...), |
| text: str = Form(""), |
| position: str = Form("center"), |
| opacity: int = Form(50), |
| font_size: int = Form(36), |
| color: str = Form("#000000"), |
| rotation: int = Form(0), |
| output_name: str = Form("watermarked"), |
| ): |
| """Add text watermark to image""" |
| data = await file.read() |
| if len(data) > settings.max_image_size_mb * 1024 * 1024: |
| raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB") |
| |
| if not text.strip(): |
| raise HTTPException(status_code=400, detail="Watermark text is required") |
| |
| output_name = sanitize_filename(output_name, "watermarked") |
| |
| try: |
| from PIL import ImageDraw, ImageFont |
| |
| img = Image.open(io.BytesIO(data)) |
| if img.mode != 'RGBA': |
| img = img.convert('RGBA') |
| |
| |
| watermark = Image.new('RGBA', img.size, (0, 0, 0, 0)) |
| draw = ImageDraw.Draw(watermark) |
| |
| |
| try: |
| font = ImageFont.truetype("arial.ttf", font_size) |
| except: |
| font = ImageFont.load_default() |
| |
| |
| color_hex = color.lstrip('#') |
| r, g, b = tuple(int(color_hex[i:i+2], 16) for i in (0, 2, 4)) |
| alpha = int(255 * opacity / 100) |
| |
| |
| bbox = draw.textbbox((0, 0), text, font=font) |
| text_width = bbox[2] - bbox[0] |
| text_height = bbox[3] - bbox[1] |
| |
| |
| positions = { |
| "top-left": (20, 20), |
| "top-center": ((img.width - text_width) // 2, 20), |
| "top-right": (img.width - text_width - 20, 20), |
| "center": ((img.width - text_width) // 2, (img.height - text_height) // 2), |
| "bottom-left": (20, img.height - text_height - 20), |
| "bottom-center": ((img.width - text_width) // 2, img.height - text_height - 20), |
| "bottom-right": (img.width - text_width - 20, img.height - text_height - 20), |
| "tile": None, |
| } |
| |
| if position == "tile": |
| |
| for y in range(0, img.height, text_height + 100): |
| for x in range(0, img.width, text_width + 100): |
| draw.text((x, y), text, font=font, fill=(r, g, b, alpha)) |
| else: |
| pos = positions.get(position, positions["center"]) |
| draw.text(pos, text, font=font, fill=(r, g, b, alpha)) |
| |
| |
| if rotation != 0: |
| watermark = watermark.rotate(rotation, expand=False, center=(img.width//2, img.height//2)) |
| |
| |
| result = Image.alpha_composite(img, watermark) |
| |
| |
| original_format = file.filename.split('.')[-1].lower() if file.filename else 'png' |
| if original_format in ['jpg', 'jpeg']: |
| result = result.convert('RGB') |
| output_format = 'JPEG' |
| output_name += '.jpg' |
| mime = 'image/jpeg' |
| else: |
| output_format = 'PNG' |
| output_name += '.png' |
| mime = 'image/png' |
| |
| output_path = OUT_DIR / f"{generate_job_id()}_{output_name}" |
| result.save(str(output_path), output_format, quality=95) |
| |
| logger.info(f"Watermark added: {output_name}") |
| |
| return FileResponse(path=str(output_path), media_type=mime, filename=output_name) |
| |
| except Exception as e: |
| logger.exception("Add watermark failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/api/resize-image") |
| async def api_resize_image( |
| file: UploadFile = File(...), |
| preset: str = Form("custom"), |
| width: int = Form(0), |
| height: int = Form(0), |
| maintain_aspect: bool = Form(True), |
| output_name: str = Form("resized"), |
| ): |
| """Resize image with presets or custom dimensions""" |
| data = await file.read() |
| if len(data) > settings.max_image_size_mb * 1024 * 1024: |
| raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB") |
| |
| output_name = sanitize_filename(output_name, "resized") |
| |
| |
| presets = { |
| "thumbnail": (150, 150), |
| "small": (320, 240), |
| "medium": (640, 480), |
| "hd": (1280, 720), |
| "fullhd": (1920, 1080), |
| "4k": (3840, 2160), |
| "square-sm": (500, 500), |
| "square-lg": (1000, 1000), |
| } |
| |
| try: |
| img = Image.open(io.BytesIO(data)) |
| original_width, original_height = img.size |
| |
| |
| if preset != "custom" and preset in presets: |
| target_width, target_height = presets[preset] |
| else: |
| target_width = width if width > 0 else original_width |
| target_height = height if height > 0 else original_height |
| |
| |
| if maintain_aspect: |
| ratio = min(target_width / original_width, target_height / original_height) |
| target_width = int(original_width * ratio) |
| target_height = int(original_height * ratio) |
| |
| |
| resized = img.resize((target_width, target_height), Image.Resampling.LANCZOS) |
| |
| |
| original_format = file.filename.split('.')[-1].lower() if file.filename else 'png' |
| format_map = {'jpg': 'JPEG', 'jpeg': 'JPEG', 'png': 'PNG', 'webp': 'WEBP', 'gif': 'GIF'} |
| output_format = format_map.get(original_format, 'PNG') |
| ext = original_format if original_format in format_map else 'png' |
| |
| if output_format == 'JPEG' and resized.mode == 'RGBA': |
| resized = resized.convert('RGB') |
| |
| output_path = OUT_DIR / f"{generate_job_id()}_{output_name}.{ext}" |
| resized.save(str(output_path), output_format, quality=95) |
| |
| mime_map = {'JPEG': 'image/jpeg', 'PNG': 'image/png', 'WEBP': 'image/webp', 'GIF': 'image/gif'} |
| |
| logger.info(f"Resized image: {original_width}x{original_height} -> {target_width}x{target_height}") |
| |
| return FileResponse( |
| path=str(output_path), |
| media_type=mime_map.get(output_format, 'image/png'), |
| filename=f"{output_name}.{ext}", |
| headers={ |
| "X-Original-Size": f"{original_width}x{original_height}", |
| "X-New-Size": f"{target_width}x{target_height}" |
| } |
| ) |
| |
| except Exception as e: |
| logger.exception("Resize image failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/api/convert-image") |
| async def api_convert_image( |
| file: UploadFile = File(...), |
| target_format: str = Form("png"), |
| quality: int = Form(90), |
| output_name: str = Form("converted"), |
| ): |
| """Convert image between formats (JPG, PNG, WebP, GIF)""" |
| data = await file.read() |
| if len(data) > settings.max_image_size_mb * 1024 * 1024: |
| raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB") |
| |
| output_name = sanitize_filename(output_name, "converted") |
| target_format = target_format.lower() |
| |
| format_map = { |
| 'jpg': ('JPEG', 'image/jpeg', 'jpg'), |
| 'jpeg': ('JPEG', 'image/jpeg', 'jpg'), |
| 'png': ('PNG', 'image/png', 'png'), |
| 'webp': ('WEBP', 'image/webp', 'webp'), |
| 'gif': ('GIF', 'image/gif', 'gif'), |
| } |
| |
| if target_format not in format_map: |
| raise HTTPException(status_code=400, detail="Unsupported format. Use: jpg, png, webp, gif") |
| |
| pil_format, mime, ext = format_map[target_format] |
| |
| try: |
| img = Image.open(io.BytesIO(data)) |
| |
| |
| if pil_format == 'JPEG' and img.mode in ('RGBA', 'P'): |
| background = Image.new('RGB', img.size, (255, 255, 255)) |
| if img.mode == 'P': |
| img = img.convert('RGBA') |
| background.paste(img, mask=img.split()[3] if len(img.split()) == 4 else None) |
| img = background |
| elif pil_format == 'JPEG' and img.mode != 'RGB': |
| img = img.convert('RGB') |
| |
| output_path = OUT_DIR / f"{generate_job_id()}_{output_name}.{ext}" |
| |
| save_kwargs = {} |
| if pil_format in ['JPEG', 'WEBP']: |
| save_kwargs['quality'] = quality |
| if pil_format == 'PNG': |
| save_kwargs['optimize'] = True |
| |
| img.save(str(output_path), pil_format, **save_kwargs) |
| |
| logger.info(f"Converted image to {target_format.upper()}") |
| |
| return FileResponse( |
| path=str(output_path), |
| media_type=mime, |
| filename=f"{output_name}.{ext}" |
| ) |
| |
| except Exception as e: |
| logger.exception("Convert image failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/api/compress-image") |
| async def api_compress_image( |
| file: UploadFile = File(...), |
| quality: int = Form(70), |
| output_name: str = Form("compressed"), |
| ): |
| """Compress image to reduce file size""" |
| data = await file.read() |
| original_size = len(data) |
| |
| if original_size > settings.max_image_size_mb * 1024 * 1024: |
| raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB") |
| |
| output_name = sanitize_filename(output_name, "compressed") |
| quality = max(10, min(100, quality)) |
| |
| try: |
| img = Image.open(io.BytesIO(data)) |
| |
| |
| |
| if img.mode in ('RGBA', 'P'): |
| |
| background = Image.new('RGB', img.size, (255, 255, 255)) |
| if img.mode == 'P': |
| img = img.convert('RGBA') |
| if img.mode == 'RGBA': |
| background.paste(img, mask=img.split()[3]) |
| img = background |
| else: |
| img = img.convert('RGB') |
| elif img.mode != 'RGB': |
| img = img.convert('RGB') |
| |
| output_format = 'JPEG' |
| ext = 'jpg' |
| mime = 'image/jpeg' |
| save_kwargs = {'quality': quality, 'optimize': True} |
| |
| output_path = OUT_DIR / f"{generate_job_id()}_{output_name}.{ext}" |
| img.save(str(output_path), output_format, **save_kwargs) |
| |
| compressed_size = output_path.stat().st_size |
| reduction = ((original_size - compressed_size) / original_size) * 100 |
| |
| logger.info(f"Compressed image: {original_size} -> {compressed_size} bytes ({reduction:.1f}% reduction)") |
| |
| return FileResponse( |
| path=str(output_path), |
| media_type=mime, |
| filename=f"{output_name}.{ext}", |
| headers={ |
| "X-Original-Size": str(original_size), |
| "X-Compressed-Size": str(compressed_size), |
| "X-Reduction-Percent": f"{reduction:.1f}" |
| } |
| ) |
| |
| except Exception as e: |
| logger.exception("Compress image failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| |
|
|
| @app.post("/api/estimate/compress-image") |
| async def api_estimate_compress_image( |
| file: UploadFile = File(...), |
| quality: int = Form(70), |
| ): |
| """Estimate compressed image file size - matches actual compression logic""" |
| data = await file.read() |
| original_size = len(data) |
| |
| if original_size > settings.max_image_size_mb * 1024 * 1024: |
| raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB") |
| |
| quality = max(10, min(100, quality)) |
| |
| try: |
| img = Image.open(io.BytesIO(data)) |
| |
| |
| if img.mode in ('RGBA', 'P'): |
| background = Image.new('RGB', img.size, (255, 255, 255)) |
| if img.mode == 'P': |
| img = img.convert('RGBA') |
| if img.mode == 'RGBA': |
| background.paste(img, mask=img.split()[3]) |
| img = background |
| else: |
| img = img.convert('RGB') |
| elif img.mode != 'RGB': |
| img = img.convert('RGB') |
| |
| buffer = io.BytesIO() |
| img.save(buffer, format="JPEG", quality=quality, optimize=True) |
| estimated_size = len(buffer.getvalue()) |
| |
| reduction = ((original_size - estimated_size) / original_size) * 100 if original_size > 0 else 0 |
| |
| return JSONResponse({ |
| "original_size": original_size, |
| "estimated_size": estimated_size, |
| "reduction_percent": round(max(0, reduction), 1) |
| }) |
| |
| except Exception as e: |
| logger.exception("Estimate compress image failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/api/estimate/convert-image") |
| async def api_estimate_convert_image( |
| file: UploadFile = File(...), |
| target_format: str = Form("png"), |
| quality: int = Form(90), |
| ): |
| """Estimate converted image file size""" |
| data = await file.read() |
| original_size = len(data) |
| |
| if original_size > settings.max_image_size_mb * 1024 * 1024: |
| raise HTTPException(status_code=400, detail=f"Image too large. Max: {settings.max_image_size_mb}MB") |
| |
| target_format = target_format.lower() |
| format_map = { |
| 'jpg': ('JPEG', 'jpg'), |
| 'jpeg': ('JPEG', 'jpg'), |
| 'png': ('PNG', 'png'), |
| 'webp': ('WEBP', 'webp'), |
| 'gif': ('GIF', 'gif'), |
| } |
| |
| if target_format not in format_map: |
| raise HTTPException(status_code=400, detail="Unsupported format") |
| |
| pil_format, ext = format_map[target_format] |
| |
| try: |
| img = Image.open(io.BytesIO(data)) |
| |
| |
| if pil_format == 'JPEG' and img.mode in ('RGBA', 'P'): |
| background = Image.new('RGB', img.size, (255, 255, 255)) |
| if img.mode == 'P': |
| img = img.convert('RGBA') |
| background.paste(img, mask=img.split()[3] if len(img.split()) == 4 else None) |
| img = background |
| elif pil_format == 'JPEG' and img.mode != 'RGB': |
| img = img.convert('RGB') |
| |
| buffer = io.BytesIO() |
| save_kwargs = {} |
| if pil_format in ['JPEG', 'WEBP']: |
| save_kwargs['quality'] = quality |
| if pil_format == 'PNG': |
| save_kwargs['optimize'] = True |
| |
| img.save(buffer, pil_format, **save_kwargs) |
| estimated_size = len(buffer.getvalue()) |
| |
| return JSONResponse({ |
| "original_size": original_size, |
| "estimated_size": estimated_size, |
| "target_format": target_format.upper() |
| }) |
| |
| except Exception as e: |
| logger.exception("Estimate convert image failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/api/estimate/compress-pdf") |
| async def api_estimate_compress_pdf( |
| file: UploadFile = File(...), |
| quality: int = Form(60), |
| ): |
| """Estimate compressed PDF file size by actually compressing it in memory""" |
| data = await file.read() |
| original_size = len(data) |
| validate_pdf_upload(data) |
| |
| |
| jpeg_quality = max(10, min(100, quality)) |
| max_dim = int(1000 + (quality / 100) * 2000) |
| |
| try: |
| src_doc = fitz.open(stream=data, filetype="pdf") |
| total_pages = len(src_doc) |
| |
| |
| for page_num in range(total_pages): |
| page = src_doc.load_page(page_num) |
| image_list = page.get_images(full=True) |
| |
| for img_info in image_list: |
| xref = img_info[0] |
| try: |
| base_image = src_doc.extract_image(xref) |
| if not base_image or len(base_image["image"]) < 5000: |
| continue |
| |
| image_bytes = base_image["image"] |
| |
| img = Image.open(io.BytesIO(image_bytes)) |
| if img.mode in ('RGBA', 'P'): |
| background = Image.new('RGB', img.size, (255, 255, 255)) |
| if img.mode == 'P': |
| img = img.convert('RGBA') |
| if img.mode == 'RGBA': |
| background.paste(img, mask=img.split()[3]) |
| img = background |
| else: |
| img = img.convert('RGB') |
| elif img.mode != 'RGB': |
| img = img.convert('RGB') |
| |
| |
| if max(img.size) > max_dim: |
| ratio = max_dim / max(img.size) |
| new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio)) |
| img = img.resize(new_size, Image.Resampling.LANCZOS) |
| |
| buffer = io.BytesIO() |
| img.save(buffer, format="JPEG", quality=jpeg_quality, optimize=True) |
| compressed_bytes = buffer.getvalue() |
| |
| |
| page.replace_image(xref, stream=compressed_bytes) |
| |
| except Exception: |
| continue |
| |
| |
| compressed_bytes = src_doc.tobytes(garbage=4, deflate=True, clean=True) |
| estimated_size = len(compressed_bytes) |
| src_doc.close() |
| |
| reduction = ((original_size - estimated_size) / original_size) * 100 if original_size > 0 else 0 |
| |
| return JSONResponse({ |
| "original_size": original_size, |
| "estimated_size": estimated_size, |
| "reduction_percent": round(max(0, reduction), 1), |
| "quality": quality |
| }) |
| |
| except Exception as e: |
| logger.exception("Estimate compress PDF failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.get("/api/rembg-status") |
| async def api_rembg_status(): |
| """Check if rembg is available""" |
| global REMBG_AVAILABLE, rembg_remove |
| |
| |
| if rembg_remove is None and not REMBG_AVAILABLE: |
| try: |
| from rembg import remove |
| rembg_remove = remove |
| REMBG_AVAILABLE = True |
| except Exception: |
| REMBG_AVAILABLE = False |
| |
| return {"available": REMBG_AVAILABLE} |
|
|
|
|
| |
|
|
| def image_to_base64(img: Image.Image, format: str = "PNG", quality: int = 95) -> str: |
| """Convert PIL Image to base64 string""" |
| import base64 |
| buffer = io.BytesIO() |
| if format == "JPEG" and img.mode in ('RGBA', 'P'): |
| img = img.convert('RGB') |
| img.save(buffer, format=format, quality=quality if format == "JPEG" else None) |
| return base64.b64encode(buffer.getvalue()).decode() |
|
|
|
|
| def prepare_preview_image(img: Image.Image, max_size: int = 1920) -> Image.Image: |
| """Prepare image for preview - only resize if larger than max_size (HD)""" |
| if max(img.size) > max_size: |
| img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) |
| return img |
|
|
|
|
| def pdf_page_to_base64(pdf_bytes: bytes, page_num: int = 0, dpi: int = 200) -> str: |
| """Convert PDF page to base64 PNG at high quality""" |
| import base64 |
| doc = fitz.open(stream=pdf_bytes, filetype="pdf") |
| page = doc.load_page(page_num) |
| zoom = dpi / 72.0 |
| mat = fitz.Matrix(zoom, zoom) |
| pix = page.get_pixmap(matrix=mat) |
| png_bytes = pix.tobytes("png") |
| doc.close() |
| return base64.b64encode(png_bytes).decode() |
|
|
|
|
| @app.post("/api/preview/compress-pdf") |
| async def api_preview_compress_pdf( |
| file: UploadFile = File(...), |
| quality: int = Form(60), |
| ): |
| """Preview PDF compression - returns original and compressed first page""" |
| data = await file.read() |
| validate_pdf_upload(data) |
| |
| jpeg_quality = max(10, min(100, quality)) |
| max_dim = int(1000 + (quality / 100) * 2000) |
| |
| try: |
| |
| original_b64 = pdf_page_to_base64(data, 0, 150) |
| |
| |
| src_doc = fitz.open(stream=data, filetype="pdf") |
| |
| for page_num in range(min(1, len(src_doc))): |
| page = src_doc.load_page(page_num) |
| image_list = page.get_images(full=True) |
| |
| for img_info in image_list: |
| xref = img_info[0] |
| try: |
| base_image = src_doc.extract_image(xref) |
| if not base_image or len(base_image["image"]) < 5000: |
| continue |
| |
| img = Image.open(io.BytesIO(base_image["image"])) |
| if img.mode in ('RGBA', 'P'): |
| background = Image.new('RGB', img.size, (255, 255, 255)) |
| if img.mode == 'P': |
| img = img.convert('RGBA') |
| if img.mode == 'RGBA': |
| background.paste(img, mask=img.split()[3]) |
| img = background |
| else: |
| img = img.convert('RGB') |
| elif img.mode != 'RGB': |
| img = img.convert('RGB') |
| |
| |
| if max(img.size) > max_dim: |
| ratio = max_dim / max(img.size) |
| new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio)) |
| img = img.resize(new_size, Image.Resampling.LANCZOS) |
| |
| buffer = io.BytesIO() |
| img.save(buffer, format="JPEG", quality=jpeg_quality, optimize=True) |
| |
| page.replace_image(xref, stream=buffer.getvalue()) |
| except Exception: |
| continue |
| |
| compressed_bytes = src_doc.tobytes(garbage=4, deflate=True, clean=True) |
| src_doc.close() |
| |
| processed_b64 = pdf_page_to_base64(compressed_bytes, 0, 150) |
| |
| return JSONResponse({ |
| "original": original_b64, |
| "processed": processed_b64, |
| "original_size": len(data), |
| "processed_size": len(compressed_bytes) |
| }) |
| |
| except Exception as e: |
| logger.exception("Compress preview failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/api/preview/rotate-pdf") |
| async def api_preview_rotate_pdf( |
| file: UploadFile = File(...), |
| rotation: int = Form(90), |
| ): |
| """Preview PDF rotation - returns original and rotated first page""" |
| data = await file.read() |
| validate_pdf_upload(data) |
| |
| rotation = int(rotation) % 360 |
| if rotation not in [90, 180, 270]: |
| rotation = 90 |
| |
| try: |
| original_b64 = pdf_page_to_base64(data, 0, 150) |
| |
| src_doc = fitz.open(stream=data, filetype="pdf") |
| page = src_doc.load_page(0) |
| page.set_rotation(page.rotation + rotation) |
| rotated_bytes = src_doc.tobytes() |
| src_doc.close() |
| |
| processed_b64 = pdf_page_to_base64(rotated_bytes, 0, 150) |
| |
| return JSONResponse({ |
| "original": original_b64, |
| "processed": processed_b64 |
| }) |
| |
| except Exception as e: |
| logger.exception("Rotate preview failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/api/preview/page-numbers") |
| async def api_preview_page_numbers( |
| file: UploadFile = File(...), |
| position: str = Form("bottom-center"), |
| format: str = Form("Page {n} of {total}"), |
| start_number: int = Form(1), |
| font_size: int = Form(11), |
| ): |
| """Preview page numbers - returns original and numbered first page""" |
| data = await file.read() |
| validate_pdf_upload(data) |
| |
| positions = { |
| "top-left": ("left", "top"), |
| "top-center": ("center", "top"), |
| "top-right": ("right", "top"), |
| "bottom-left": ("left", "bottom"), |
| "bottom-center": ("center", "bottom"), |
| "bottom-right": ("right", "bottom"), |
| } |
| h_align, v_align = positions.get(position, ("center", "bottom")) |
| margin = 30 |
| |
| try: |
| original_b64 = pdf_page_to_base64(data, 0, 150) |
| |
| src_doc = fitz.open(stream=data, filetype="pdf") |
| total_pages = len(src_doc) |
| page = src_doc.load_page(0) |
| rect = page.rect |
| |
| text = format.replace("{n}", str(start_number)).replace("{total}", str(total_pages)) |
| text_width = fitz.get_text_length(text, fontsize=font_size) |
| |
| if h_align == "left": |
| x = margin |
| elif h_align == "right": |
| x = rect.width - margin - text_width |
| else: |
| x = (rect.width - text_width) / 2 |
| |
| if v_align == "top": |
| y = margin + font_size |
| else: |
| y = rect.height - margin |
| |
| page.insert_text((x, y), text, fontsize=font_size, color=(0.3, 0.3, 0.3)) |
| |
| numbered_bytes = src_doc.tobytes() |
| src_doc.close() |
| |
| processed_b64 = pdf_page_to_base64(numbered_bytes, 0, 150) |
| |
| return JSONResponse({ |
| "original": original_b64, |
| "processed": processed_b64 |
| }) |
| |
| except Exception as e: |
| logger.exception("Page numbers preview failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/api/preview/remove-background") |
| async def api_preview_remove_background( |
| file: UploadFile = File(...), |
| ): |
| """Preview background removal - returns original and processed image in HD quality""" |
| global REMBG_AVAILABLE, rembg_remove |
| |
| if rembg_remove is None and not REMBG_AVAILABLE: |
| try: |
| from rembg import remove |
| rembg_remove = remove |
| REMBG_AVAILABLE = True |
| except Exception: |
| REMBG_AVAILABLE = False |
| |
| if not REMBG_AVAILABLE or rembg_remove is None: |
| raise HTTPException(status_code=500, detail="rembg not installed") |
| |
| data = await file.read() |
| |
| try: |
| img = Image.open(io.BytesIO(data)) |
| img = prepare_preview_image(img) |
| original_b64 = image_to_base64(img, "PNG") |
| |
| |
| img_buffer = io.BytesIO() |
| img_rgb = img.convert('RGB') if img.mode != 'RGB' else img |
| img_rgb.save(img_buffer, format='PNG') |
| result = rembg_remove(img_buffer.getvalue()) |
| |
| result_img = Image.open(io.BytesIO(result)) |
| processed_b64 = image_to_base64(result_img, "PNG") |
| |
| return JSONResponse({ |
| "original": original_b64, |
| "processed": processed_b64 |
| }) |
| |
| except Exception as e: |
| logger.exception("Remove background preview failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/api/preview/enhance-image") |
| async def api_preview_enhance_image( |
| file: UploadFile = File(...), |
| level: str = Form("medium"), |
| upscale: float = Form(1.0), |
| ): |
| """Preview image enhancement - returns original and enhanced image in HD quality""" |
| data = await file.read() |
| |
| try: |
| img = Image.open(io.BytesIO(data)) |
| img = prepare_preview_image(img) |
| original_b64 = image_to_base64(img.convert('RGB'), "JPEG", 95) |
| |
| work_img = img.copy() |
| if upscale > 1.0: |
| new_size = (int(work_img.size[0] * min(upscale, 2.0)), int(work_img.size[1] * min(upscale, 2.0))) |
| work_img = work_img.resize(new_size, Image.Resampling.LANCZOS) |
| |
| enhanced = enhance_image(work_img, level) |
| enhanced = prepare_preview_image(enhanced) |
| processed_b64 = image_to_base64(enhanced.convert('RGB'), "JPEG", 95) |
| |
| return JSONResponse({ |
| "original": original_b64, |
| "processed": processed_b64 |
| }) |
| |
| except Exception as e: |
| logger.exception("Enhance preview failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/api/preview/image-watermark") |
| async def api_preview_image_watermark( |
| file: UploadFile = File(...), |
| text: str = Form(""), |
| position: str = Form("center"), |
| opacity: int = Form(50), |
| font_size: int = Form(36), |
| color: str = Form("#000000"), |
| ): |
| """Preview image watermark - returns original and watermarked image in HD quality""" |
| from PIL import ImageDraw, ImageFont |
| |
| data = await file.read() |
| |
| if not text.strip(): |
| raise HTTPException(status_code=400, detail="Watermark text required") |
| |
| try: |
| img = Image.open(io.BytesIO(data)) |
| img = prepare_preview_image(img) |
| original_b64 = image_to_base64(img.convert('RGB'), "JPEG", 95) |
| |
| if img.mode != 'RGBA': |
| img = img.convert('RGBA') |
| |
| watermark = Image.new('RGBA', img.size, (0, 0, 0, 0)) |
| draw = ImageDraw.Draw(watermark) |
| |
| try: |
| font = ImageFont.truetype("arial.ttf", font_size) |
| except: |
| font = ImageFont.load_default() |
| |
| color_hex = color.lstrip('#') |
| r, g, b = tuple(int(color_hex[i:i+2], 16) for i in (0, 2, 4)) |
| alpha = int(255 * opacity / 100) |
| |
| bbox = draw.textbbox((0, 0), text, font=font) |
| text_width = bbox[2] - bbox[0] |
| text_height = bbox[3] - bbox[1] |
| |
| positions = { |
| "top-left": (20, 20), |
| "top-center": ((img.width - text_width) // 2, 20), |
| "top-right": (img.width - text_width - 20, 20), |
| "center": ((img.width - text_width) // 2, (img.height - text_height) // 2), |
| "bottom-left": (20, img.height - text_height - 20), |
| "bottom-center": ((img.width - text_width) // 2, img.height - text_height - 20), |
| "bottom-right": (img.width - text_width - 20, img.height - text_height - 20), |
| } |
| |
| pos = positions.get(position, positions["center"]) |
| draw.text(pos, text, font=font, fill=(r, g, b, alpha)) |
| |
| result = Image.alpha_composite(img, watermark) |
| processed_b64 = image_to_base64(result.convert('RGB'), "JPEG", 95) |
| |
| return JSONResponse({ |
| "original": original_b64, |
| "processed": processed_b64 |
| }) |
| |
| except Exception as e: |
| logger.exception("Watermark preview failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/api/preview/resize-image") |
| async def api_preview_resize_image( |
| file: UploadFile = File(...), |
| preset: str = Form("custom"), |
| width: int = Form(0), |
| height: int = Form(0), |
| maintain_aspect: bool = Form(True), |
| ): |
| """Preview image resize - returns original and resized image in HD quality""" |
| data = await file.read() |
| |
| presets = { |
| "thumbnail": (150, 150), |
| "small": (320, 240), |
| "medium": (640, 480), |
| "hd": (1280, 720), |
| "fullhd": (1920, 1080), |
| "4k": (3840, 2160), |
| "square-sm": (500, 500), |
| "square-lg": (1000, 1000), |
| } |
| |
| try: |
| img = Image.open(io.BytesIO(data)) |
| original_width, original_height = img.size |
| |
| |
| original_preview = prepare_preview_image(img.copy()) |
| original_b64 = image_to_base64(original_preview.convert('RGB'), "JPEG", 95) |
| |
| |
| if preset != "custom" and preset in presets: |
| target_width, target_height = presets[preset] |
| else: |
| target_width = width if width > 0 else original_width |
| target_height = height if height > 0 else original_height |
| |
| if maintain_aspect: |
| ratio = min(target_width / original_width, target_height / original_height) |
| target_width = int(original_width * ratio) |
| target_height = int(original_height * ratio) |
| |
| |
| resized = img.resize((target_width, target_height), Image.Resampling.LANCZOS) |
| resized_preview = prepare_preview_image(resized) |
| processed_b64 = image_to_base64(resized_preview.convert('RGB'), "JPEG", 95) |
| |
| return JSONResponse({ |
| "original": original_b64, |
| "processed": processed_b64, |
| "original_size": f"{original_width}x{original_height}", |
| "new_size": f"{target_width}x{target_height}" |
| }) |
| |
| except Exception as e: |
| logger.exception("Resize preview failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/api/preview/compress-image") |
| async def api_preview_compress_image( |
| file: UploadFile = File(...), |
| quality: int = Form(70), |
| ): |
| """Preview image compression - returns original and compressed image in HD quality""" |
| data = await file.read() |
| original_size = len(data) |
| |
| try: |
| img = Image.open(io.BytesIO(data)) |
| img = prepare_preview_image(img) |
| original_b64 = image_to_base64(img.convert('RGB'), "JPEG", 98) |
| |
| |
| if img.mode != 'RGB': |
| img = img.convert('RGB') |
| |
| buffer = io.BytesIO() |
| img.save(buffer, format="JPEG", quality=quality, optimize=True) |
| compressed_size = len(buffer.getvalue()) |
| |
| compressed_img = Image.open(io.BytesIO(buffer.getvalue())) |
| processed_b64 = image_to_base64(compressed_img, "JPEG", 98) |
| |
| return JSONResponse({ |
| "original": original_b64, |
| "processed": processed_b64, |
| "original_size": original_size, |
| "compressed_size": compressed_size |
| }) |
| |
| except Exception as e: |
| logger.exception("Compress preview failed") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run( |
| "server:app", |
| host="127.0.0.1", |
| port=8000, |
| reload=settings.debug |
| ) |
|
|