|
from fastapi import FastAPI, UploadFile, File, HTTPException, Depends |
|
from fastapi.responses import FileResponse |
|
from fastapi.security import OAuth2PasswordBearer |
|
from pathlib import Path |
|
import psutil |
|
import shutil |
|
import os |
|
|
|
app = FastAPI() |
|
|
|
|
|
UPLOAD_DIRECTORY = Path("uploads") |
|
UPLOAD_DIRECTORY.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") |
|
|
|
|
|
VALID_TOKEN = os.environ.get("SECRET_TOKEN") |
|
|
|
|
|
MAX_CPU_CORES = 2 |
|
MAX_MEMORY_GB = 16 |
|
MAX_DISK_GB = 50 |
|
|
|
|
|
def get_directory_size(directory: Path) -> int: |
|
total_size = 0 |
|
for dirpath, dirnames, filenames in os.walk(directory): |
|
for f in filenames: |
|
fp = os.path.join(dirpath, f) |
|
total_size += os.path.getsize(fp) |
|
return total_size |
|
|
|
|
|
def get_current_token(token: str = Depends(oauth2_scheme)): |
|
if token != VALID_TOKEN: |
|
raise HTTPException(status_code=401, detail="Invalid token") |
|
return token |
|
|
|
|
|
@app.get("/health") |
|
def health_check(token: str = Depends(get_current_token)): |
|
return { |
|
"status": "healthy" |
|
} |
|
|
|
|
|
@app.get("/metrics") |
|
def get_metrics(token: str = Depends(get_current_token)): |
|
|
|
cpu_percent = round((psutil.cpu_percent(interval=1) / 100) * MAX_CPU_CORES, 2) |
|
|
|
|
|
memory = psutil.virtual_memory() |
|
memory_used_gb = (memory.total - memory.available) / (1024 ** 3) |
|
|
|
|
|
memory_used_gb_relative = min(memory_used_gb, MAX_MEMORY_GB) |
|
memory_usage_percent = (memory_used_gb_relative / MAX_MEMORY_GB) * 100 |
|
|
|
|
|
uploads_size_bytes = get_directory_size(UPLOAD_DIRECTORY) |
|
uploads_size_gb = uploads_size_bytes / (1024 ** 3) |
|
uploads_usage_percent = (uploads_size_gb / MAX_DISK_GB) * 100 |
|
|
|
return { |
|
"cpu_usage_cores": cpu_percent, |
|
"memory": { |
|
"used_gb": round(memory_used_gb_relative, 2), |
|
"used_percent_of_16gb": round(memory_usage_percent, 2) |
|
}, |
|
"disk": { |
|
"uploads_folder_size_gb": round(uploads_size_gb, 2), |
|
"uploads_usage_percent_of_50gb": round(uploads_usage_percent, 2) |
|
} |
|
} |
|
|
|
|
|
@app.post("/uploadfile/") |
|
async def upload_file(file: UploadFile = File(...), token: str = Depends(get_current_token)): |
|
file_location = UPLOAD_DIRECTORY / file.filename |
|
|
|
with file_location.open("wb") as buffer: |
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
return {"info": f"file '{file.filename}' saved at '{file_location}'"} |
|
|
|
|
|
@app.get("/downloadfile/{filename}") |
|
def download_file(filename: str, token: str = Depends(get_current_token)): |
|
file_location = UPLOAD_DIRECTORY / filename |
|
|
|
if not file_location.exists(): |
|
raise HTTPException(status_code=404, detail="File not found") |
|
|
|
return FileResponse(path=file_location, filename=filename) |
|
|
|
|
|
@app.get("/files/") |
|
def list_files(token: str = Depends(get_current_token)): |
|
files = [f for f in os.listdir(UPLOAD_DIRECTORY) if os.path.isfile(UPLOAD_DIRECTORY / f)] |
|
return {"files": files} |
|
|