Spaces:
Running
Running
File size: 7,786 Bytes
f7596a7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
from fastapi import FastAPI, File, UploadFile, Request, HTTPException, Form, Depends, status
from fastapi.responses import HTMLResponse, FileResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import shutil
import os
import uuid
import base64
from pathlib import Path
import uvicorn
from typing import List, Optional
import secrets
from starlette.middleware.sessions import SessionMiddleware
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.responses import JSONResponse
# Create FastAPI app
app = FastAPI(title="Image Uploader")
# Add session middleware
app.add_middleware(
SessionMiddleware,
secret_key="YOUR_SECRET_KEY_CHANGE_THIS_IN_PRODUCTION"
)
# Create uploads directory if it doesn't exist
UPLOAD_DIR = Path("static/uploads")
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
# Mount static directory
app.mount("/static", StaticFiles(directory="static"), name="static")
# Set up Jinja2 templates
templates = Jinja2Templates(directory="templates")
# Set up security
security = HTTPBasic()
# Hardcoded credentials (in a real app, use proper hashed passwords in a database)
USERNAME = "detomo"
PASSWORD = "itweek2025"
def get_file_extension(filename: str) -> str:
"""Get the file extension from a filename."""
return os.path.splitext(filename)[1].lower()
def is_valid_image(extension: str) -> bool:
"""Check if the file extension is a valid image type."""
return extension in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']
def authenticate(request: Request):
"""Check if user is authenticated."""
is_authenticated = request.session.get("authenticated", False)
return is_authenticated
def verify_auth(request: Request):
"""Verify authentication."""
if not authenticate(request):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Basic"},
)
return True
@app.get("/login", response_class=HTMLResponse)
async def login_page(request: Request):
"""Render the login page."""
# If already authenticated, redirect to home
if authenticate(request):
return RedirectResponse(url="/", status_code=status.HTTP_302_FOUND)
return templates.TemplateResponse(
"login.html",
{"request": request}
)
@app.post("/login")
async def login(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
"""Handle login form submission."""
if form_data.username == USERNAME and form_data.password == PASSWORD:
request.session["authenticated"] = True
return RedirectResponse(url="/", status_code=status.HTTP_302_FOUND)
else:
return templates.TemplateResponse(
"login.html",
{"request": request, "error": "Invalid username or password"}
)
@app.get("/logout")
async def logout(request: Request):
"""Handle logout."""
request.session.pop("authenticated", None)
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND)
@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
"""Render the home page with authentication check."""
# Check if user is authenticated
if not authenticate(request):
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND)
# Get all uploaded images
uploaded_images = []
if UPLOAD_DIR.exists():
for file in UPLOAD_DIR.iterdir():
if is_valid_image(get_file_extension(file.name)):
image_url = f"/static/uploads/{file.name}"
uploaded_images.append({
"name": file.name,
"url": image_url,
"embed_url": f"{request.base_url}static/uploads/{file.name}"
})
return templates.TemplateResponse(
"index.html",
{"request": request, "uploaded_images": uploaded_images}
)
@app.post("/upload/")
async def upload_image(request: Request, file: UploadFile = File(...)):
"""Handle image upload with authentication check."""
# Check if user is authenticated
if not authenticate(request):
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": "Not authenticated"}
)
# Check if the file is an image
extension = get_file_extension(file.filename)
if not is_valid_image(extension):
raise HTTPException(status_code=400, detail="Only image files are allowed")
# Generate a unique filename to prevent overwrites
unique_filename = f"{uuid.uuid4()}{extension}"
file_path = UPLOAD_DIR / unique_filename
# Save the file
with file_path.open("wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Return the file URL and embed code
file_url = f"/static/uploads/{unique_filename}"
# For base64 encoding
file.file.seek(0) # Reset file pointer to beginning
contents = await file.read()
base64_encoded = base64.b64encode(contents).decode("utf-8")
# Determine MIME type
mime_type = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.bmp': 'image/bmp',
'.webp': 'image/webp'
}.get(extension, 'application/octet-stream')
return {
"success": True,
"file_name": unique_filename,
"file_url": file_url,
"full_url": f"{request.base_url}static/uploads/{unique_filename}",
"embed_html": f'<img src="{request.base_url}static/uploads/{unique_filename}" alt="Uploaded Image" />',
"base64_data": f"data:{mime_type};base64,{base64_encoded[:20]}...{base64_encoded[-20:]}",
"base64_embed": f'<img src="data:{mime_type};base64,{base64_encoded}" alt="Embedded Image" />'
}
@app.get("/view/{file_name}")
async def view_image(request: Request, file_name: str):
"""View a specific image with authentication check."""
# Check if user is authenticated
if not authenticate(request):
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND)
file_path = UPLOAD_DIR / file_name
if not file_path.exists():
raise HTTPException(status_code=404, detail="Image not found")
image_url = f"/static/uploads/{file_name}"
embed_url = f"{request.base_url}static/uploads/{file_name}"
return templates.TemplateResponse(
"view.html",
{
"request": request,
"image_url": image_url,
"file_name": file_name,
"embed_url": embed_url
}
)
@app.delete("/delete/{file_name}")
async def delete_image(request: Request, file_name: str):
"""Delete an image with authentication check."""
# Check if user is authenticated
if not authenticate(request):
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": "Not authenticated"}
)
file_path = UPLOAD_DIR / file_name
if not file_path.exists():
raise HTTPException(status_code=404, detail="Image not found")
os.remove(file_path)
return {"success": True, "message": f"Image {file_name} has been deleted"}
# Health check endpoint for Hugging Face Spaces
@app.get("/healthz")
async def health_check():
return {"status": "ok"}
if __name__ == "__main__":
# For local development
uvicorn.run("app:app", host="127.0.0.1", port=8000, reload=True)
# For production/Hugging Face (uncomment when deploying)
# uvicorn.run("app:app", host="0.0.0.0", port=7860) |