| """ |
| Video Intelligence Platform β REST API |
| FastAPI server exposing all platform capabilities as REST endpoints. |
| |
| Run: |
| uvicorn video_intelligence.api:app --host 0.0.0.0 --port 8000 |
| |
| All endpoints return JSON. Upload videos as multipart/form-data. |
| Frontend (React/Next.js) just makes fetch() calls to these endpoints. |
| """ |
| import os |
| import io |
| import shutil |
| import tempfile |
| from typing import Optional, List |
| from pathlib import Path |
|
|
| from fastapi import FastAPI, UploadFile, File, HTTPException, Header, Query |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel |
| from contextlib import asynccontextmanager |
|
|
| from .config import Config |
| from .pipeline import IndexingPipeline |
| from .query_engine import QueryEngine, QueryResult |
| from .akinator import AkinatorRefiner |
| from .gemini_client import GeminiClient |
| from .index_store import VideoIndex |
|
|
|
|
| |
| |
| state = { |
| "pipeline": None, |
| "query_engine": None, |
| "akinator": None, |
| "initialized": False, |
| } |
|
|
|
|
| |
| app = FastAPI( |
| title="Video Intelligence Platform", |
| description="Akinator-style video search with RAG, boolean queries, and tree refinement", |
| version="1.0.0", |
| docs_url="/docs", |
| redoc_url="/redoc", |
| ) |
|
|
| |
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| |
|
|
| class InitRequest(BaseModel): |
| gemini_api_key: str |
| device: str = "cpu" |
|
|
| class InitResponse(BaseModel): |
| status: str |
| message: str |
|
|
| class SearchRequest(BaseModel): |
| query: str |
| top_k: int = 20 |
|
|
| class SearchResult(BaseModel): |
| frame_id: int |
| timestamp_sec: float |
| time_str: str |
| score: float |
| caption: str |
| detections: List[str] |
| match_source: str |
|
|
| class SearchResponse(BaseModel): |
| query: str |
| results: List[SearchResult] |
| count: int |
| akinator_active: bool = False |
| akinator_question: Optional[str] = None |
| akinator_options: Optional[List[str]] = None |
|
|
| class RefineRequest(BaseModel): |
| choice: str |
| query: str |
|
|
| class RefineResponse(BaseModel): |
| status: str |
| count: int |
| results: Optional[List[dict]] = None |
| question: Optional[str] = None |
| options: Optional[List[str]] = None |
| history: Optional[List[dict]] = None |
|
|
| class RAGRequest(BaseModel): |
| query: str |
|
|
| class RAGResponse(BaseModel): |
| query: str |
| answer: str |
|
|
| class IndexResponse(BaseModel): |
| status: str |
| frames: int |
| detections: int |
| visual_vectors: int |
| caption_vectors: int |
| elapsed_sec: float |
|
|
| class HealthResponse(BaseModel): |
| status: str |
| initialized: bool |
| version: str |
|
|
|
|
| |
|
|
| @app.get("/health", response_model=HealthResponse) |
| def health(): |
| """Health check β use for container readiness/liveness probes.""" |
| return HealthResponse( |
| status="ok", |
| initialized=state["initialized"], |
| version="1.0.0", |
| ) |
|
|
|
|
| @app.post("/init", response_model=InitResponse) |
| def initialize(req: InitRequest): |
| """ |
| Initialize models with your Gemini API key. |
| Call once before indexing/searching. Takes ~30-60s to load models. |
| """ |
| try: |
| config = Config( |
| gemini_api_key=req.gemini_api_key, |
| device=req.device, |
| ) |
|
|
| pipeline = IndexingPipeline(config) |
| query_engine = QueryEngine( |
| index=pipeline.index, |
| gemini=pipeline.gemini, |
| siglip=pipeline.siglip, |
| top_k=20, |
| ) |
| akinator = AkinatorRefiner( |
| index=pipeline.index, |
| gemini=pipeline.gemini, |
| threshold=10, |
| ) |
|
|
| state["pipeline"] = pipeline |
| state["query_engine"] = query_engine |
| state["akinator"] = akinator |
| state["initialized"] = True |
|
|
| return InitResponse(status="ok", message="Models loaded successfully") |
|
|
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/index", response_model=IndexResponse) |
| async def index_video( |
| video: UploadFile = File(...), |
| caption_every_n: int = Query(default=3, ge=1, le=20), |
| ): |
| """ |
| Upload and index a video. Extracts frames, runs detection, |
| generates embeddings and captions. |
| |
| Send as multipart/form-data with field name "video". |
| """ |
| if not state["initialized"]: |
| raise HTTPException(status_code=400, detail="Not initialized. Call POST /init first.") |
|
|
| |
| suffix = Path(video.filename).suffix or ".mp4" |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: |
| shutil.copyfileobj(video.file, tmp) |
| tmp_path = tmp.name |
|
|
| try: |
| stats = state["pipeline"].index_video( |
| tmp_path, |
| caption_every_n=caption_every_n, |
| detect_every_n=1, |
| ) |
| return IndexResponse( |
| status="ok", |
| frames=stats["frames"], |
| detections=stats["detections"], |
| visual_vectors=stats["visual_vectors"], |
| caption_vectors=stats["caption_vectors"], |
| elapsed_sec=stats["elapsed_sec"], |
| ) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
| finally: |
| os.unlink(tmp_path) |
|
|
|
|
| @app.post("/search", response_model=SearchResponse) |
| def search(req: SearchRequest): |
| """ |
| Search the indexed video with natural language. |
| Supports boolean: "red car AND person", "dog OR cat" |
| """ |
| if not state["initialized"]: |
| raise HTTPException(status_code=400, detail="Not initialized. Call POST /init first.") |
|
|
| try: |
| results = state["query_engine"].search(req.query, top_k=req.top_k) |
|
|
| search_results = [ |
| SearchResult( |
| frame_id=r.frame_id, |
| timestamp_sec=r.timestamp_sec, |
| time_str=r.time_str, |
| score=round(r.score, 4), |
| caption=r.caption or "", |
| detections=r.detections, |
| match_source=r.match_source, |
| ) |
| for r in results |
| ] |
|
|
| |
| state["_last_results"] = results |
|
|
| |
| akinator_active = False |
| akinator_question = None |
| akinator_options = None |
|
|
| if len(results) > 10 and state["akinator"]: |
| ak_result = state["akinator"].start(results, req.query) |
| if ak_result["status"] == "refining": |
| akinator_active = True |
| akinator_question = ak_result["question"] |
| akinator_options = ak_result["options"] |
|
|
| return SearchResponse( |
| query=req.query, |
| results=search_results, |
| count=len(search_results), |
| akinator_active=akinator_active, |
| akinator_question=akinator_question, |
| akinator_options=akinator_options, |
| ) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/refine", response_model=RefineResponse) |
| def refine(req: RefineRequest): |
| """ |
| Answer an Akinator refinement question to narrow results. |
| Send the chosen option from the previous search/refine response. |
| """ |
| if not state["akinator"]: |
| raise HTTPException(status_code=400, detail="No active refinement session") |
|
|
| try: |
| result = state["akinator"].answer(req.choice, req.query) |
| return RefineResponse( |
| status=result["status"], |
| count=result["count"], |
| results=result.get("results"), |
| question=result.get("question"), |
| options=result.get("options"), |
| history=result.get("history"), |
| ) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/rag", response_model=RAGResponse) |
| def rag_answer(req: RAGRequest): |
| """ |
| Generate a RAG answer from the last search results. |
| Cites specific timestamps in the response. |
| """ |
| if not state["initialized"]: |
| raise HTTPException(status_code=400, detail="Not initialized. Call POST /init first.") |
|
|
| last_results = state.get("_last_results", []) |
| if not last_results: |
| raise HTTPException(status_code=400, detail="No search results. Call POST /search first.") |
|
|
| try: |
| contexts = [r.to_dict() for r in last_results[:15]] |
| answer = state["pipeline"].gemini.generate_rag_answer(req.query, contexts) |
| return RAGResponse(query=req.query, answer=answer) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.get("/stats") |
| def stats(): |
| """Get current index statistics.""" |
| if not state["initialized"]: |
| raise HTTPException(status_code=400, detail="Not initialized.") |
| return state["pipeline"].index.stats() |
|
|