| | """ |
| | Patent upload and management endpoints |
| | """ |
| |
|
| | from fastapi import APIRouter, UploadFile, File, HTTPException |
| | from fastapi.responses import FileResponse |
| | from pathlib import Path |
| | import uuid |
| | import shutil |
| | from datetime import datetime |
| | from typing import List, Dict |
| | from loguru import logger |
| |
|
| | router = APIRouter() |
| |
|
| | UPLOAD_DIR = Path("uploads/patents") |
| | UPLOAD_DIR.mkdir(parents=True, exist_ok=True) |
| |
|
| | @router.post("/upload", response_model=Dict) |
| | async def upload_patent(file: UploadFile = File(...)): |
| | """ |
| | Upload a patent PDF for analysis. |
| | |
| | Args: |
| | file: PDF file to upload |
| | |
| | Returns: |
| | Patent metadata including unique ID |
| | """ |
| | logger.info(f"Received upload request for: {file.filename}") |
| |
|
| | |
| | if not file.filename.endswith('.pdf'): |
| | raise HTTPException( |
| | status_code=400, |
| | detail="Only PDF files are supported. Please upload a .pdf file." |
| | ) |
| |
|
| | |
| | file.file.seek(0, 2) |
| | file_size = file.file.tell() |
| | file.file.seek(0) |
| |
|
| | if file_size > 50 * 1024 * 1024: |
| | raise HTTPException( |
| | status_code=400, |
| | detail="File too large. Maximum size is 50MB." |
| | ) |
| |
|
| | try: |
| | |
| | patent_id = str(uuid.uuid4()) |
| |
|
| | |
| | file_path = UPLOAD_DIR / f"{patent_id}.pdf" |
| | with file_path.open("wb") as buffer: |
| | shutil.copyfileobj(file.file, buffer) |
| |
|
| | |
| | from api.main import app_state |
| |
|
| | metadata = { |
| | "id": patent_id, |
| | "filename": file.filename, |
| | "path": str(file_path), |
| | "size": file_size, |
| | "uploaded_at": datetime.utcnow().isoformat(), |
| | "status": "uploaded", |
| | "workflow_id": None |
| | } |
| |
|
| | app_state["patents"][patent_id] = metadata |
| |
|
| | logger.success(f"✅ Patent uploaded: {patent_id} ({file.filename})") |
| |
|
| | return { |
| | "patent_id": patent_id, |
| | "filename": file.filename, |
| | "size": file_size, |
| | "uploaded_at": metadata["uploaded_at"], |
| | "message": "Patent uploaded successfully" |
| | } |
| |
|
| | except Exception as e: |
| | logger.error(f"❌ Upload failed: {e}") |
| | raise HTTPException( |
| | status_code=500, |
| | detail=f"Upload failed: {str(e)}" |
| | ) |
| |
|
| | @router.get("/{patent_id}", response_model=Dict) |
| | async def get_patent(patent_id: str): |
| | """ |
| | Get patent metadata by ID. |
| | |
| | Args: |
| | patent_id: Unique patent identifier |
| | |
| | Returns: |
| | Patent metadata |
| | """ |
| | from api.main import app_state |
| |
|
| | if patent_id not in app_state["patents"]: |
| | raise HTTPException( |
| | status_code=404, |
| | detail=f"Patent not found: {patent_id}" |
| | ) |
| |
|
| | return app_state["patents"][patent_id] |
| |
|
| | @router.get("/", response_model=List[Dict]) |
| | async def list_patents( |
| | status: str = None, |
| | limit: int = 100, |
| | offset: int = 0 |
| | ): |
| | """ |
| | List all uploaded patents. |
| | |
| | Args: |
| | status: Filter by status (uploaded, analyzing, analyzed, failed) |
| | limit: Maximum number of results |
| | offset: Pagination offset |
| | |
| | Returns: |
| | List of patent metadata |
| | """ |
| | from api.main import app_state |
| |
|
| | patents = list(app_state["patents"].values()) |
| |
|
| | |
| | if status: |
| | patents = [p for p in patents if p["status"] == status] |
| |
|
| | |
| | patents.sort(key=lambda x: x["uploaded_at"], reverse=True) |
| |
|
| | |
| | patents = patents[offset:offset + limit] |
| |
|
| | return patents |
| |
|
| | @router.delete("/{patent_id}") |
| | async def delete_patent(patent_id: str): |
| | """ |
| | Delete a patent and its associated files. |
| | |
| | Args: |
| | patent_id: Unique patent identifier |
| | |
| | Returns: |
| | Success message |
| | """ |
| | from api.main import app_state |
| |
|
| | if patent_id not in app_state["patents"]: |
| | raise HTTPException( |
| | status_code=404, |
| | detail=f"Patent not found: {patent_id}" |
| | ) |
| |
|
| | try: |
| | patent = app_state["patents"][patent_id] |
| |
|
| | |
| | file_path = Path(patent["path"]) |
| | if file_path.exists(): |
| | file_path.unlink() |
| |
|
| | |
| | del app_state["patents"][patent_id] |
| |
|
| | logger.info(f"Deleted patent: {patent_id}") |
| |
|
| | return {"message": "Patent deleted successfully"} |
| |
|
| | except Exception as e: |
| | logger.error(f"Delete failed: {e}") |
| | raise HTTPException( |
| | status_code=500, |
| | detail=f"Delete failed: {str(e)}" |
| | ) |
| |
|
| | @router.get("/{patent_id}/download") |
| | async def download_patent(patent_id: str): |
| | """ |
| | Download the original patent PDF. |
| | |
| | Args: |
| | patent_id: Unique patent identifier |
| | |
| | Returns: |
| | PDF file |
| | """ |
| | from api.main import app_state |
| |
|
| | if patent_id not in app_state["patents"]: |
| | raise HTTPException( |
| | status_code=404, |
| | detail=f"Patent not found: {patent_id}" |
| | ) |
| |
|
| | patent = app_state["patents"][patent_id] |
| | file_path = Path(patent["path"]) |
| |
|
| | if not file_path.exists(): |
| | raise HTTPException( |
| | status_code=404, |
| | detail="Patent file not found on disk" |
| | ) |
| |
|
| | return FileResponse( |
| | path=file_path, |
| | media_type="application/pdf", |
| | filename=patent["filename"] |
| | ) |
| |
|