Spaces:
Running
Running
from fastapi import FastAPI, File, UploadFile, Request, HTTPException, Form, Depends, status | |
from fastapi.responses import HTMLResponse, FileResponse, RedirectResponse | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.templating import Jinja2Templates | |
from fastapi.security import HTTPBasic, HTTPBasicCredentials | |
import shutil | |
import os | |
import uuid | |
import base64 | |
from pathlib import Path | |
import uvicorn | |
from typing import List, Optional | |
import secrets | |
from starlette.middleware.sessions import SessionMiddleware | |
from fastapi.security import OAuth2PasswordRequestForm | |
from fastapi.responses import JSONResponse | |
import json | |
import io | |
from huggingface_hub import HfApi, HfFolder, create_repo | |
from huggingface_hub.utils import RepositoryNotFoundError | |
from huggingface_hub.hf_api import RepoFile | |
import tempfile | |
import time | |
# Create FastAPI app | |
app = FastAPI(title="Image Uploader") | |
# Add session middleware | |
app.add_middleware( | |
SessionMiddleware, | |
secret_key="YOUR_SECRET_KEY_CHANGE_THIS_IN_PRODUCTION" | |
) | |
# Create uploads directory if it doesn't exist | |
UPLOAD_DIR = Path("static/uploads") | |
UPLOAD_DIR.mkdir(parents=True, exist_ok=True) | |
# Create metadata directory for storing hashtags | |
METADATA_DIR = Path("static/metadata") | |
METADATA_DIR.mkdir(parents=True, exist_ok=True) | |
METADATA_FILE = METADATA_DIR / "image_metadata.json" | |
# Alternative metadata location with guaranteed write permissions | |
HOME_DIR = Path(os.environ.get("HOME", "/tmp")) | |
ALT_METADATA_DIR = HOME_DIR / ".image_uploader" | |
ALT_METADATA_DIR.mkdir(parents=True, exist_ok=True) | |
ALT_METADATA_FILE = ALT_METADATA_DIR / "image_metadata.json" | |
# Initialize metadata file if it doesn't exist | |
if not METADATA_FILE.exists() and not ALT_METADATA_FILE.exists(): | |
try: | |
with open(METADATA_FILE, "w") as f: | |
json.dump({}, f) | |
print(f"Initialized metadata file at {METADATA_FILE}") | |
except PermissionError: | |
with open(ALT_METADATA_FILE, "w") as f: | |
json.dump({}, f) | |
print(f"Initialized metadata file at alternative location: {ALT_METADATA_FILE}") | |
# Mount static directory | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
# Set up Jinja2 templates | |
templates = Jinja2Templates(directory="templates") | |
# Set up security | |
security = HTTPBasic() | |
# Hardcoded credentials (in a real app, use proper hashed passwords in a database) | |
USERNAME = "detomo" | |
PASSWORD = "itweek2025" | |
# Hugging Face Dataset configuration | |
HF_USERNAME = os.environ.get("HF_USERNAME", "") # Set this in Hugging Face Space settings | |
HF_TOKEN = os.environ.get("HF_TOKEN", "") # Set this in Hugging Face Space settings | |
DATASET_REPO = os.environ.get("HF_DATASET_REPO", "image-uploader-data") | |
IMAGES_PATH = "images" | |
METADATA_PATH = "metadata" | |
SPACE_NAME = os.environ.get( | |
"HF_SPACE_NAME", "" | |
) # Add this environment variable for the Space name | |
# Set HF cache directory to a writable location | |
# This is necessary for Hugging Face Spaces which has permission issues with the default cache location | |
os.environ["HF_HOME"] = os.path.join(tempfile.gettempdir(), "huggingface") | |
os.environ["HUGGINGFACE_HUB_CACHE"] = os.path.join(tempfile.gettempdir(), "huggingface", "hub") | |
os.makedirs(os.environ["HF_HOME"], exist_ok=True) | |
os.makedirs(os.environ["HUGGINGFACE_HUB_CACHE"], exist_ok=True) | |
# Initialize HfApi | |
hf_api = HfApi(token=HF_TOKEN) | |
# Create or ensure repository exists | |
def ensure_repo_exists(): | |
try: | |
# Check if repo exists | |
hf_api.repo_info(repo_id=f"{HF_USERNAME}/{DATASET_REPO}", repo_type="dataset") | |
print(f"Repository {HF_USERNAME}/{DATASET_REPO} exists") | |
except RepositoryNotFoundError: | |
# Create repo if it doesn't exist | |
try: | |
print(f"Creating repository {HF_USERNAME}/{DATASET_REPO}") | |
create_repo(f"{HF_USERNAME}/{DATASET_REPO}", repo_type="dataset", token=HF_TOKEN) | |
# Initialize metadata | |
metadata = json.dumps({}) | |
hf_api.upload_file( | |
path_or_fileobj=io.BytesIO(metadata.encode()), | |
path_in_repo=f"{METADATA_PATH}/image_metadata.json", | |
repo_id=f"{HF_USERNAME}/{DATASET_REPO}", | |
repo_type="dataset", | |
token=HF_TOKEN | |
) | |
print(f"Repository created and initialized") | |
except Exception as e: | |
print(f"Error creating repository: {e}") | |
except Exception as e: | |
print(f"Error checking repository: {e}") | |
# Initialize repository if in production | |
if os.environ.get("ENV", "development") == "production": | |
print("Running in production mode, checking Hugging Face repository...") | |
if HF_USERNAME and HF_TOKEN: | |
print(f"Using Hugging Face credentials for user: {HF_USERNAME}") | |
try: | |
ensure_repo_exists() | |
except Exception as e: | |
print(f"Error ensuring repository exists: {e}") | |
else: | |
print("Warning: HF_USERNAME or HF_TOKEN not set. Running without Hugging Face integration.") | |
def get_file_extension(filename: str) -> str: | |
"""Get the file extension from a filename.""" | |
return os.path.splitext(filename)[1].lower() | |
def is_valid_image(extension: str) -> bool: | |
"""Check if the file extension is a valid image type.""" | |
return extension in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'] | |
def authenticate(request: Request): | |
"""Check if user is authenticated.""" | |
is_authenticated = request.session.get("authenticated", False) | |
return is_authenticated | |
def verify_auth(request: Request): | |
"""Verify authentication.""" | |
if not authenticate(request): | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Not authenticated", | |
headers={"WWW-Authenticate": "Basic"}, | |
) | |
return True | |
def get_metadata_file(): | |
"""Get the appropriate metadata file based on write permissions.""" | |
# Try to write to the primary location | |
try: | |
if not METADATA_FILE.exists(): | |
with open(METADATA_FILE, "w") as f: | |
json.dump({}, f) | |
# Test write permission | |
os.access(METADATA_FILE, os.W_OK) | |
return METADATA_FILE | |
except (PermissionError, OSError): | |
print( | |
f"Warning: Cannot write to {METADATA_FILE}, using alternative location: {ALT_METADATA_FILE}" | |
) | |
if not ALT_METADATA_FILE.exists(): | |
with open(ALT_METADATA_FILE, "w") as f: | |
json.dump({}, f) | |
return ALT_METADATA_FILE | |
def get_image_metadata(): | |
"""Get all image metadata including hashtags from local storage and sync with HF if needed.""" | |
metadata_file = get_metadata_file() | |
if metadata_file.exists(): | |
try: | |
with open(metadata_file, "r") as f: | |
metadata = json.load(f) | |
# In production, sync metadata to Hugging Face if it exists locally but not on HF | |
if ( | |
os.environ.get("ENV", "development") == "production" | |
and HF_USERNAME | |
and HF_TOKEN | |
): | |
try: | |
# Only upload if there are changes (we'd need to implement a proper change tracking mechanism) | |
# For now, we'll upload every time to ensure consistency | |
metadata_str = json.dumps(metadata) | |
hf_api.upload_file( | |
path_or_fileobj=io.BytesIO(metadata_str.encode()), | |
path_in_repo=f"{METADATA_PATH}/image_metadata.json", | |
repo_id=f"{HF_USERNAME}/{DATASET_REPO}", | |
repo_type="dataset", | |
token=HF_TOKEN, | |
) | |
except Exception as e: | |
print(f"Error syncing metadata to Hugging Face: {e}") | |
return metadata | |
except Exception as e: | |
print(f"Error reading metadata file: {e}") | |
return {} | |
# If metadata file doesn't exist locally, create it | |
with open(metadata_file, "w") as f: | |
json.dump({}, f) | |
return {} | |
def save_image_metadata(metadata): | |
"""Save image metadata to the local JSON file and sync with HF.""" | |
metadata_file = get_metadata_file() | |
# Always save locally first | |
try: | |
with open(metadata_file, "w") as f: | |
json.dump(metadata, f) | |
print(f"Metadata saved to {metadata_file}") | |
except Exception as e: | |
print(f"Error saving metadata locally: {e}") | |
# In case of a file error, we'll still try to save to HF | |
# In production, also save to Hugging Face | |
if os.environ.get("ENV", "development") == "production" and HF_USERNAME and HF_TOKEN: | |
try: | |
print(f"Saving metadata to Hugging Face repository {HF_USERNAME}/{DATASET_REPO}") | |
metadata_str = json.dumps(metadata) | |
hf_api.upload_file( | |
path_or_fileobj=io.BytesIO(metadata_str.encode()), | |
path_in_repo=f"{METADATA_PATH}/image_metadata.json", | |
repo_id=f"{HF_USERNAME}/{DATASET_REPO}", | |
repo_type="dataset", | |
token=HF_TOKEN | |
) | |
print(f"Metadata saved successfully to Hugging Face") | |
except Exception as e: | |
print(f"Error saving metadata to Hugging Face: {e}") | |
def add_hashtags_to_image(filename, hashtags, original_filename=None): | |
"""Add hashtags to an image.""" | |
metadata = get_image_metadata() | |
# If file exists in metadata, update its hashtags, otherwise create new entry | |
if filename in metadata: | |
metadata[filename]["hashtags"] = hashtags | |
if original_filename: | |
metadata[filename]["original_filename"] = original_filename | |
else: | |
metadata_entry = {"hashtags": hashtags, "is_new": True} | |
if original_filename: | |
metadata_entry["original_filename"] = original_filename | |
metadata[filename] = metadata_entry | |
save_image_metadata(metadata) | |
def mark_image_as_viewed(filename): | |
"""Mark an image as viewed (not new)""" | |
metadata = get_image_metadata() | |
if filename in metadata: | |
metadata[filename]["is_new"] = False | |
save_image_metadata(metadata) | |
def get_hf_image_url(filename): | |
"""Get the URL for an image in the Hugging Face repo.""" | |
# Return the direct Space URL if available, otherwise use dataset URL | |
if SPACE_NAME and HF_USERNAME: | |
return f"https://{HF_USERNAME.lower()}-{SPACE_NAME}.hf.space/static/uploads/{filename}" | |
elif HF_USERNAME: | |
return f"https://huggingface.co/datasets/{HF_USERNAME}/{DATASET_REPO}/resolve/main/{IMAGES_PATH}/{filename}" | |
return None | |
def list_hf_images(): | |
"""List all images in the Hugging Face repo.""" | |
if os.environ.get("ENV", "development") == "production" and HF_USERNAME and HF_TOKEN: | |
try: | |
print(f"Listing files from Hugging Face repository {HF_USERNAME}/{DATASET_REPO}") | |
files = hf_api.list_repo_files( | |
repo_id=f"{HF_USERNAME}/{DATASET_REPO}", | |
repo_type="dataset", | |
token=HF_TOKEN | |
) | |
# Filter only image files in the images directory | |
image_files = [f for f in files if f.startswith(f"{IMAGES_PATH}/")] | |
image_basenames = [os.path.basename(f) for f in image_files] | |
print(f"Found {len(image_basenames)} images") | |
return image_basenames | |
except Exception as e: | |
print(f"Error listing files from Hugging Face: {e}") | |
return [] | |
return [] | |
async def login_page(request: Request): | |
"""Render the login page.""" | |
# If already authenticated, redirect to home | |
if authenticate(request): | |
return RedirectResponse(url="/", status_code=status.HTTP_302_FOUND) | |
return templates.TemplateResponse( | |
"login.html", | |
{"request": request} | |
) | |
async def login(request: Request, form_data: OAuth2PasswordRequestForm = Depends()): | |
"""Handle login form submission.""" | |
if form_data.username == USERNAME and form_data.password == PASSWORD: | |
request.session["authenticated"] = True | |
return RedirectResponse(url="/", status_code=status.HTTP_302_FOUND) | |
else: | |
return templates.TemplateResponse( | |
"login.html", | |
{"request": request, "error": "Invalid username or password"} | |
) | |
async def logout(request: Request): | |
"""Handle logout.""" | |
request.session.pop("authenticated", None) | |
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
async def home(request: Request, search: Optional[str] = None, tag: Optional[str] = None): | |
"""Render the home page with authentication check and optional search/filter.""" | |
# Check if user is authenticated | |
if not authenticate(request): | |
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
# Get all uploaded images and their metadata | |
uploaded_images = [] | |
metadata = get_image_metadata() | |
if UPLOAD_DIR.exists(): | |
for file in UPLOAD_DIR.iterdir(): | |
if is_valid_image(get_file_extension(file.name)): | |
# Get hashtags from metadata if available | |
hashtags = [] | |
is_new = False | |
original_filename = file.name | |
if file.name in metadata: | |
hashtags = metadata[file.name].get("hashtags", []) | |
is_new = metadata[file.name].get("is_new", False) | |
original_filename = metadata[file.name].get("original_filename", file.name) | |
# If searching/filtering, check if this image should be included | |
if search and search.lower() not in original_filename.lower() and not any(search.lower() in tag.lower() for tag in hashtags): | |
continue | |
if tag and tag not in hashtags: | |
continue | |
# Local URL for display | |
image_url = f"/static/uploads/{file.name}" | |
# Full URL for embedding (use Space URL in production if available) | |
if ( | |
SPACE_NAME | |
and HF_USERNAME | |
and os.environ.get("ENV", "development") == "production" | |
): | |
embed_url = f"https://{HF_USERNAME.lower()}-{SPACE_NAME}.hf.space/static/uploads/{file.name}" | |
else: | |
embed_url = f"{request.base_url}static/uploads/{file.name}" | |
uploaded_images.append( | |
{ | |
"name": file.name, | |
"url": image_url, | |
"embed_url": embed_url, | |
"hashtags": hashtags, | |
"is_new": is_new, | |
"original_filename": original_filename, | |
} | |
) | |
# Get all unique hashtags for the filter dropdown | |
all_hashtags = set() | |
for img_data in metadata.values(): | |
if "hashtags" in img_data: | |
all_hashtags.update(img_data["hashtags"]) | |
return templates.TemplateResponse( | |
"index.html", | |
{ | |
"request": request, | |
"uploaded_images": uploaded_images, | |
"all_hashtags": sorted(list(all_hashtags)), | |
"current_search": search, | |
"current_tag": tag | |
} | |
) | |
def upload_to_hf_and_local(file_content, filename): | |
"""Upload a file to both Hugging Face Dataset Repository and local storage.""" | |
local_success = False | |
hf_success = False | |
# Save locally first | |
try: | |
file_path = UPLOAD_DIR / filename | |
with file_path.open("wb") as buffer: | |
buffer.write(file_content) | |
local_success = True | |
print(f"File {filename} saved locally") | |
except Exception as e: | |
print(f"Error saving file locally: {e}") | |
# Then upload to Hugging Face if in production | |
if ( | |
os.environ.get("ENV", "development") == "production" | |
and HF_USERNAME | |
and HF_TOKEN | |
): | |
try: | |
print( | |
f"Uploading file {filename} to Hugging Face repository {HF_USERNAME}/{DATASET_REPO}" | |
) | |
hf_api.upload_file( | |
path_or_fileobj=io.BytesIO(file_content), | |
path_in_repo=f"{IMAGES_PATH}/{filename}", | |
repo_id=f"{HF_USERNAME}/{DATASET_REPO}", | |
repo_type="dataset", | |
token=HF_TOKEN, | |
) | |
print(f"File {filename} uploaded successfully to Hugging Face") | |
hf_success = True | |
except Exception as e: | |
print(f"Error uploading to Hugging Face: {e}") | |
return local_success or hf_success | |
def delete_from_hf_and_local(filename): | |
"""Delete a file from both Hugging Face Dataset Repository and local storage.""" | |
local_success = False | |
hf_success = False | |
# Delete locally first | |
file_path = UPLOAD_DIR / filename | |
if file_path.exists(): | |
try: | |
os.remove(file_path) | |
local_success = True | |
print(f"File {filename} deleted locally") | |
except Exception as e: | |
print(f"Error deleting file locally: {e}") | |
# Then delete from Hugging Face if in production | |
if ( | |
os.environ.get("ENV", "development") == "production" | |
and HF_USERNAME | |
and HF_TOKEN | |
): | |
try: | |
print( | |
f"Deleting file {filename} from Hugging Face repository {HF_USERNAME}/{DATASET_REPO}" | |
) | |
hf_api.delete_file( | |
path_in_repo=f"{IMAGES_PATH}/{filename}", | |
repo_id=f"{HF_USERNAME}/{DATASET_REPO}", | |
repo_type="dataset", | |
token=HF_TOKEN, | |
) | |
print(f"File {filename} deleted successfully from Hugging Face") | |
hf_success = True | |
except Exception as e: | |
print(f"Error deleting from Hugging Face: {e}") | |
return local_success or hf_success | |
def generate_safe_filename(original_filename): | |
"""Generate a safe filename that preserves the original name but avoids collisions.""" | |
# Extract base name and extension | |
base_name, extension = os.path.splitext(original_filename) | |
# Remove invalid characters from base name | |
safe_base = "".join(c for c in base_name if c.isalnum() or c in "-_. ") | |
safe_base = safe_base.replace(" ", "_") | |
# Add timestamp to ensure uniqueness | |
timestamp = int(time.time() * 1000) | |
return f"{safe_base}_{timestamp}{extension}" | |
async def upload_image( | |
request: Request, | |
files: List[UploadFile] = File(...), | |
hashtags: str = Form("") | |
): | |
"""Handle multiple image uploads with hashtags.""" | |
# Check if user is authenticated | |
if not authenticate(request): | |
return JSONResponse( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
content={"detail": "Not authenticated"} | |
) | |
# Process hashtags into a list | |
hashtag_list = [] | |
if hashtags: | |
# Split by spaces or commas and remove empty strings/whitespace | |
hashtag_list = [tag.strip() for tag in hashtags.replace(',', ' ').split() if tag.strip()] | |
results = [] | |
duplicates = [] | |
# First, check for duplicate filenames | |
metadata = get_image_metadata() | |
all_files = {} | |
# Check for duplicates in local storage first (should include HF images that were downloaded during build) | |
if UPLOAD_DIR.exists(): | |
for file in UPLOAD_DIR.iterdir(): | |
if is_valid_image(get_file_extension(file.name)): | |
# Get original filename from metadata if available | |
original_name = file.name | |
if file.name in metadata and "original_filename" in metadata[file.name]: | |
original_name = metadata[file.name]["original_filename"] | |
all_files[original_name.lower()] = file.name | |
# Check for duplicates in current upload batch | |
for file in files: | |
file_lower = file.filename.lower() | |
if file_lower in all_files: | |
# Found a duplicate | |
duplicates.append({ | |
"new_file": file.filename, | |
"existing_file": all_files[file_lower], | |
"original_name": file.filename | |
}) | |
# If we found duplicates, return them to the frontend for confirmation | |
if duplicates: | |
return { | |
"success": False, | |
"duplicates": duplicates, | |
"message": "Duplicate filenames detected", | |
"action_required": "confirm_replace" | |
} | |
# No duplicates, proceed with upload | |
for file in files: | |
# Check if the file is an image | |
extension = get_file_extension(file.filename) | |
if not is_valid_image(extension): | |
continue # Skip non-image files | |
# Preserve original filename in metadata | |
original_filename = file.filename | |
# Generate a safe filename that preserves the original name | |
safe_filename = generate_safe_filename(original_filename) | |
# Read file content for upload | |
file.file.seek(0) | |
file_content = await file.read() | |
# Save file to both local storage and Hugging Face | |
upload_success = upload_to_hf_and_local(file_content, safe_filename) | |
if not upload_success: | |
continue # Skip to next file if upload failed | |
# Save hashtags and original filename | |
add_hashtags_to_image(safe_filename, hashtag_list, original_filename) | |
# For base64 encoding | |
base64_encoded = base64.b64encode(file_content).decode("utf-8") | |
# Determine MIME type | |
mime_type = { | |
'.jpg': 'image/jpeg', | |
'.jpeg': 'image/jpeg', | |
'.png': 'image/png', | |
'.gif': 'image/gif', | |
'.bmp': 'image/bmp', | |
'.webp': 'image/webp' | |
}.get(extension, 'application/octet-stream') | |
# Get direct image URL using Space URL if available | |
image_url = f"/static/uploads/{safe_filename}" # Local URL | |
# Full URL for embedding | |
if ( | |
SPACE_NAME | |
and HF_USERNAME | |
and os.environ.get("ENV", "development") == "production" | |
): | |
full_url = f"https://{HF_USERNAME.lower()}-{SPACE_NAME}.hf.space/static/uploads/{safe_filename}" | |
else: | |
full_url = f"{request.base_url}static/uploads/{safe_filename}" | |
results.append( | |
{ | |
"success": True, | |
"file_name": safe_filename, | |
"original_filename": original_filename, | |
"file_url": image_url, | |
"full_url": full_url, | |
"embed_html": f'<img src="{full_url}" alt="{original_filename}" />', | |
"base64_data": f"data:{mime_type};base64,{base64_encoded[:20]}...{base64_encoded[-20:]}", | |
"base64_embed": f'<img src="data:{mime_type};base64,{base64_encoded}" alt="{original_filename}" />', | |
"hashtags": hashtag_list, | |
} | |
) | |
if len(results) == 1: | |
return results[0] | |
else: | |
return {"success": True, "uploaded_count": len(results), "files": results} | |
async def upload_with_replace( | |
request: Request, | |
files: List[UploadFile] = File(...), | |
hashtags: str = Form(""), | |
replace_files: str = Form("") | |
): | |
"""Handle upload with replacement of duplicate files.""" | |
# Check if user is authenticated | |
if not authenticate(request): | |
return JSONResponse( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
content={"detail": "Not authenticated"} | |
) | |
# Process hashtags into a list | |
hashtag_list = [] | |
if hashtags: | |
# Split by spaces or commas and remove empty strings/whitespace | |
hashtag_list = [tag.strip() for tag in hashtags.replace(',', ' ').split() if tag.strip()] | |
# Parse the replacement files JSON | |
files_to_replace = [] | |
if replace_files: | |
try: | |
files_to_replace = json.loads(replace_files) | |
except json.JSONDecodeError: | |
files_to_replace = [] | |
# Create a map of original names to replacement decisions | |
replace_map = {item["original_name"].lower(): item["existing_file"] for item in files_to_replace} | |
results = [] | |
for file in files: | |
# Check if the file is an image | |
extension = get_file_extension(file.filename) | |
if not is_valid_image(extension): | |
continue # Skip non-image files | |
# Preserve original filename in metadata | |
original_filename = file.filename | |
file_lower = original_filename.lower() | |
# Read file content | |
file.file.seek(0) | |
file_content = await file.read() | |
# Check if this file should replace an existing one | |
if file_lower in replace_map: | |
# Delete the old file from both local storage and Hugging Face | |
old_filename = replace_map[file_lower] | |
delete_from_hf_and_local(old_filename) | |
# Remove from metadata | |
metadata = get_image_metadata() | |
if old_filename in metadata: | |
del metadata[old_filename] | |
save_image_metadata(metadata) | |
# Generate a safe filename that preserves the original name | |
safe_filename = generate_safe_filename(original_filename) | |
# Upload to both local storage and Hugging Face | |
upload_success = upload_to_hf_and_local(file_content, safe_filename) | |
if not upload_success: | |
continue # Skip to next file if upload failed | |
# Save hashtags and original filename | |
add_hashtags_to_image(safe_filename, hashtag_list, original_filename) | |
# For base64 encoding | |
base64_encoded = base64.b64encode(file_content).decode("utf-8") | |
# Determine MIME type | |
mime_type = { | |
'.jpg': 'image/jpeg', | |
'.jpeg': 'image/jpeg', | |
'.png': 'image/png', | |
'.gif': 'image/gif', | |
'.bmp': 'image/bmp', | |
'.webp': 'image/webp' | |
}.get(extension, 'application/octet-stream') | |
# Get direct image URL using Space URL if available | |
image_url = f"/static/uploads/{safe_filename}" # Local URL | |
# Full URL for embedding | |
if ( | |
SPACE_NAME | |
and HF_USERNAME | |
and os.environ.get("ENV", "development") == "production" | |
): | |
full_url = f"https://{HF_USERNAME.lower()}-{SPACE_NAME}.hf.space/static/uploads/{safe_filename}" | |
else: | |
full_url = f"{request.base_url}static/uploads/{safe_filename}" | |
results.append( | |
{ | |
"success": True, | |
"file_name": safe_filename, | |
"original_filename": original_filename, | |
"file_url": image_url, | |
"full_url": full_url, | |
"embed_html": f'<img src="{full_url}" alt="{original_filename}" />', | |
"base64_data": f"data:{mime_type};base64,{base64_encoded[:20]}...{base64_encoded[-20:]}", | |
"base64_embed": f'<img src="data:{mime_type};base64,{base64_encoded}" alt="{original_filename}" />', | |
"hashtags": hashtag_list, | |
} | |
) | |
if len(results) == 1: | |
return results[0] | |
else: | |
return {"success": True, "uploaded_count": len(results), "files": results} | |
async def view_image(request: Request, file_name: str): | |
"""View a specific image with authentication check.""" | |
# Check if user is authenticated | |
if not authenticate(request): | |
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
# Mark image as viewed (not new) | |
mark_image_as_viewed(file_name) | |
# Check if file exists locally (should always be the case now) | |
file_path = UPLOAD_DIR / file_name | |
if not file_path.exists(): | |
raise HTTPException(status_code=404, detail="Image not found") | |
# Local URL for rendering in template | |
image_url = f"/static/uploads/{file_name}" | |
# Full URL for embedding | |
if ( | |
SPACE_NAME | |
and HF_USERNAME | |
and os.environ.get("ENV", "development") == "production" | |
): | |
embed_url = f"https://{HF_USERNAME.lower()}-{SPACE_NAME}.hf.space/static/uploads/{file_name}" | |
else: | |
embed_url = f"{request.base_url}static/uploads/{file_name}" | |
# Get metadata | |
metadata = get_image_metadata() | |
hashtags = [] | |
original_filename = file_name | |
if file_name in metadata: | |
hashtags = metadata[file_name].get("hashtags", []) | |
original_filename = metadata[file_name].get("original_filename", file_name) | |
return templates.TemplateResponse( | |
"view.html", | |
{ | |
"request": request, | |
"image_url": image_url, | |
"file_name": file_name, | |
"original_filename": original_filename, | |
"embed_url": embed_url, | |
"hashtags": hashtags | |
} | |
) | |
async def update_hashtags(request: Request, file_name: str, hashtags: str = Form("")): | |
"""Update hashtags for an image.""" | |
# Check if user is authenticated | |
if not authenticate(request): | |
return JSONResponse( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
content={"detail": "Not authenticated"} | |
) | |
# Check if file exists | |
if os.environ.get("ENV", "development") == "production" and HF_USERNAME: | |
if file_name not in list_hf_images(): | |
raise HTTPException(status_code=404, detail="Image not found") | |
else: | |
file_path = UPLOAD_DIR / file_name | |
if not file_path.exists(): | |
raise HTTPException(status_code=404, detail="Image not found") | |
# Process hashtags | |
hashtag_list = [] | |
if hashtags: | |
hashtag_list = [tag.strip() for tag in hashtags.replace(',', ' ').split() if tag.strip()] | |
# Update hashtags in metadata | |
add_hashtags_to_image(file_name, hashtag_list) | |
# Redirect back to the image view | |
return RedirectResponse(url=f"/view/{file_name}", status_code=status.HTTP_303_SEE_OTHER) | |
async def delete_image(request: Request, file_name: str): | |
"""Delete an image with authentication check.""" | |
# Check if user is authenticated | |
if not authenticate(request): | |
return JSONResponse( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
content={"detail": "Not authenticated"} | |
) | |
# Delete from both local storage and Hugging Face | |
delete_success = delete_from_hf_and_local(file_name) | |
if not delete_success: | |
raise HTTPException( | |
status_code=404, detail="Image not found or could not be deleted" | |
) | |
# Remove from metadata | |
metadata = get_image_metadata() | |
if file_name in metadata: | |
del metadata[file_name] | |
save_image_metadata(metadata) | |
return {"success": True, "message": f"Image {file_name} has been deleted"} | |
# Health check endpoint for Hugging Face Spaces | |
async def health_check(): | |
return {"status": "ok"} | |
async def startup_event(): | |
"""Run on application startup to ensure configuration is correct.""" | |
global SPACE_NAME | |
# Try to extract Space name from HF_SPACE_ID if not explicitly set | |
if not SPACE_NAME and "HF_SPACE_ID" in os.environ: | |
space_id = os.environ.get("HF_SPACE_ID", "") | |
if space_id and "/" in space_id: | |
# HF_SPACE_ID is in format username/space-name | |
SPACE_NAME = space_id.split("/")[1] | |
print(f"Extracted Space name from HF_SPACE_ID: {SPACE_NAME}") | |
# Log configuration | |
if os.environ.get("ENV", "development") == "production": | |
if HF_USERNAME and HF_TOKEN and DATASET_REPO: | |
print(f"Running in production mode with Hugging Face integration:") | |
print(f" - Username: {HF_USERNAME}") | |
print(f" - Dataset: {DATASET_REPO}") | |
if SPACE_NAME: | |
print(f" - Space: {SPACE_NAME}") | |
print( | |
f" - Space URL: https://{HF_USERNAME.lower()}-{SPACE_NAME}.hf.space" | |
) | |
else: | |
print(" - Space name not set. Direct URLs will use dataset links.") | |
else: | |
print( | |
"Warning: Running in production mode but Hugging Face credentials not fully configured." | |
) | |
else: | |
print("Running in development mode. Local storage will be used.") | |
if __name__ == "__main__": | |
# For local development | |
uvicorn.run("app:app", host="127.0.0.1", port=8000, reload=True) | |
# For production/Hugging Face (uncomment when deploying) | |
# uvicorn.run("app:app", host="0.0.0.0", port=7860) | |