VesperAI's picture
addede a Production Branch
7ee60a0
import os
import uuid
from pathlib import Path
from typing import List, Optional
import io
from contextlib import asynccontextmanager
from fastapi import FastAPI, File, UploadFile, Request, WebSocket, WebSocketDisconnect, HTTPException, BackgroundTasks
from fastapi.responses import HTMLResponse, FileResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from PIL import Image
from image_indexer import ImageIndexer
from image_search import ImageSearch
from image_database import ImageDatabase
# Initialize image indexer, searcher, and database
indexer = ImageIndexer()
searcher = ImageSearch()
image_db = ImageDatabase()
image_extensions = [".jpg", ".jpeg", ".png", ".gif"]
@asynccontextmanager
async def lifespan(_: FastAPI):
"""Initialize the image indexer"""
yield
app = FastAPI(title="Visual Product Search", lifespan=lifespan)
# Setup templates and static files
templates = Jinja2Templates(directory="templates")
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
"""Render the home page"""
folders = indexer.folder_manager.get_all_folders()
return templates.TemplateResponse(
"index.html",
{
"request": request,
"initial_status": {
"status": indexer.status.value,
"current_file": indexer.current_file,
"total_files": indexer.total_files,
"processed_files": indexer.processed_files,
"progress_percentage": round((indexer.processed_files / indexer.total_files * 100) if indexer.total_files > 0 else 0, 2)
},
"folders": folders
}
)
@app.post("/upload")
async def upload_images(
files: List[UploadFile] = File(...),
background_tasks: BackgroundTasks = None
):
"""Upload multiple images and index them"""
try:
# Create uploads directory if it doesn't exist
upload_dir = Path("uploads")
upload_dir.mkdir(exist_ok=True)
# Save uploaded files
saved_files = []
for file in files:
if file.content_type and file.content_type.startswith('image/'):
# Generate unique filename
file_extension = Path(file.filename).suffix
unique_filename = f"{uuid.uuid4()}{file_extension}"
file_path = upload_dir / unique_filename
# Save the file
contents = await file.read()
with open(file_path, "wb") as f:
f.write(contents)
saved_files.append(str(file_path))
else:
raise HTTPException(status_code=400, detail=f"File {file.filename} is not a valid image")
if saved_files:
# Add the upload folder to be indexed
folder_info = indexer.folder_manager.add_folder(str(upload_dir))
# Start indexing in the background
if background_tasks:
background_tasks.add_task(indexer.index_folder, str(upload_dir))
return {
"status": "success",
"message": f"Uploaded and indexing {len(saved_files)} images",
"folder_info": folder_info,
"uploaded_files": saved_files
}
else:
raise HTTPException(status_code=400, detail="No valid images were uploaded")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@app.post("/folders")
async def add_folder(folder_path: str, background_tasks: BackgroundTasks):
"""Add a new folder to index"""
try:
# Add folder to manager first (this creates the collection)
folder_info = indexer.folder_manager.add_folder(folder_path)
# Start indexing in the background
background_tasks.add_task(indexer.index_folder, folder_path)
return folder_info
except Exception as e:
raise HTTPException(status_code=400, detail=str(e)) from e
@app.delete("/folders/{folder_path:path}")
async def remove_folder(folder_path: str):
"""Remove a folder from indexing"""
try:
await indexer.remove_folder(folder_path)
return {"status": "success"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e)) from e
@app.get("/folders")
async def list_folders():
"""List all indexed folders"""
return indexer.folder_manager.get_all_folders()
@app.get("/search/text")
async def search_by_text(query: str, folder: Optional[str] = None) -> List[dict]:
"""Search images by text query, optionally filtered by folder"""
results = await searcher.search_by_text(query, folder)
return results
@app.post("/search/image")
async def search_by_image(
file: UploadFile = File(...),
folder: Optional[str] = None
) -> List[dict]:
"""Search images by uploading a similar image, optionally filtered by folder"""
contents = await file.read()
image = Image.open(io.BytesIO(contents))
results = await searcher.search_by_image(image, folder)
return results
@app.get("/search/url")
async def search_by_url(
url: str,
folder: Optional[str] = None
) -> List[dict]:
"""Search images by providing a URL to a similar image, optionally filtered by folder"""
results = await searcher.search_by_url(url, folder)
return results
@app.get("/images")
async def list_images(folder: Optional[str] = None) -> List[dict]:
"""List all indexed images, optionally filtered by folder"""
return await indexer.get_all_images(folder)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time indexing status updates"""
await indexer.add_websocket_connection(websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
await indexer.remove_websocket_connection(websocket)
@app.get("/image/{image_id}")
async def serve_image(image_id: str):
"""Serve an image from the database by ID"""
try:
image_data = image_db.get_image(image_id)
if not image_data:
raise HTTPException(status_code=404, detail="Image not found")
return StreamingResponse(
io.BytesIO(image_data["image_data"]),
media_type=f"image/{image_data['file_extension'].lstrip('.')}",
headers={
"Cache-Control": "max-age=86400", # Cache for 24 hours
"Content-Disposition": f"inline; filename=\"{image_data['filename']}\""
}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/thumbnail/{image_id}")
async def serve_thumbnail_by_id(image_id: str):
"""Serve a thumbnail from the database by ID"""
try:
thumbnail_data = image_db.get_thumbnail(image_id)
if not thumbnail_data:
raise HTTPException(status_code=404, detail="Thumbnail not found")
return StreamingResponse(
io.BytesIO(thumbnail_data),
media_type="image/jpeg",
headers={"Cache-Control": "max-age=86400"} # Cache for 24 hours
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/stats")
async def get_database_stats():
"""Get database statistics"""
try:
return image_db.get_database_stats()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/debug/collections")
async def debug_collections():
"""Debug endpoint to check collections and folders"""
try:
# Get Qdrant client and collections
qdrant_client = indexer.qdrant
collections = qdrant_client.get_collections().collections
# Get folder manager status
folders = indexer.folder_manager.get_all_folders()
return {
"qdrant_collections": [col.name for col in collections],
"folder_manager_folders": folders,
"collections_count": len(collections),
"folders_count": len(folders)
}
except Exception as e:
return {"error": str(e)}
# Keep the old endpoints for backward compatibility but mark as deprecated
@app.get("/thumbnail/{folder_path:path}/{file_path:path}")
async def serve_thumbnail(folder_path: str, file_path: str):
"""Serve resized image thumbnails (DEPRECATED - use /thumbnail/{image_id} instead)"""
try:
# Get folder info to verify it's an indexed folder
folder_info = indexer.folder_manager.get_folder_info(folder_path)
if not folder_info:
raise HTTPException(status_code=404, detail="Folder not found")
# Construct full file path
full_path = Path(folder_path) / file_path
if not full_path.exists():
raise HTTPException(status_code=404, detail="File not found")
# Only serve image files
if full_path.suffix.lower() not in image_extensions:
raise HTTPException(status_code=400, detail="Invalid file type")
# Open image, resize, and convert to JPEG
img = Image.open(full_path)
img.thumbnail((200, 200)) # Resize maintaining aspect ratio
# Save to a byte stream
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format="JPEG")
img_byte_arr.seek(0)
return StreamingResponse(img_byte_arr, media_type="image/jpeg", headers={"Cache-Control": "max-age=3600"}) # Cache for 1 hour
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/files/{folder_path:path}/{file_path:path}")
async def serve_file(folder_path: str, file_path: str):
"""Serve files from indexed folders (DEPRECATED - use /image/{image_id} instead)"""
try:
# Get folder info to verify it's an indexed folder
folder_info = indexer.folder_manager.get_folder_info(folder_path)
if not folder_info:
raise HTTPException(status_code=404, detail="Folder not found")
# Construct full file path
full_path = Path(folder_path) / file_path
if not full_path.exists():
raise HTTPException(status_code=404, detail="File not found")
# Only serve image files
if full_path.suffix.lower() not in image_extensions:
raise HTTPException(status_code=400, detail="Invalid file type")
return FileResponse(full_path)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
def get_windows_drives():
"""Get available drives on Windows"""
from ctypes import windll
drives = []
bitmask = windll.kernel32.GetLogicalDrives()
for letter in range(65, 91): # A-Z
if bitmask & (1 << (letter - 65)):
drives.append(chr(letter) + ":\\")
return drives
def get_directory_item(item):
"""Get directory item info"""
try:
is_dir = item.is_dir()
if is_dir or item.suffix.lower() in image_extensions:
return {
"name": item.name,
"path": str(item.absolute()),
"type": "directory" if is_dir else "file",
"size": item.stat().st_size if not is_dir else None
}
except Exception:
pass
return None
def get_directory_contents(path: str):
"""Get contents of a directory"""
try:
path_obj = Path(path)
if not path_obj.exists():
return {"error": "Path does not exist"}
parent = str(path_obj.parent) if path_obj.parent != path_obj else None
contents = [
item for item in (get_directory_item(i) for i in path_obj.iterdir())
if item is not None
]
return {
"current_path": str(path_obj.absolute()),
"parent_path": parent,
"contents": sorted(contents, key=lambda x: (x["type"] != "directory", x["name"].lower()))
}
except Exception as e:
return {"error": str(e)}
@app.get("/browse")
async def browse_folders():
"""Browse system folders"""
if os.name == "nt": # Windows
return {"drives": get_windows_drives()}
return get_directory_contents("/") # Unix-like
@app.get("/browse/{path:path}")
async def browse_path(path: str):
"""Browse a specific path"""
try:
path_obj = Path(path)
if not path_obj.exists():
raise HTTPException(status_code=404, detail="Path not found")
# Get parent directory for navigation
parent = str(path_obj.parent) if path_obj.parent != path_obj else None
# List directories and files
contents = []
for item in path_obj.iterdir():
try:
is_dir = item.is_dir()
if is_dir or item.suffix.lower() in image_extensions:
contents.append({
"name": item.name,
"path": str(item.absolute()),
"type": "directory" if is_dir else "file",
"size": item.stat().st_size if not is_dir else None
})
except Exception:
continue # Skip items we can't access
return {
"current_path": str(path_obj.absolute()),
"parent_path": parent,
"contents": sorted(contents, key=lambda x: (x["type"] != "directory", x["name"].lower()))
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=False)