|  | from fastapi import FastAPI, UploadFile, File, HTTPException, Depends | 
					
						
						|  | from fastapi.responses import FileResponse | 
					
						
						|  | from fastapi.security import OAuth2PasswordBearer | 
					
						
						|  | from pathlib import Path | 
					
						
						|  | import psutil | 
					
						
						|  | import shutil | 
					
						
						|  | import os | 
					
						
						|  |  | 
					
						
						|  | app = FastAPI() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | UPLOAD_DIRECTORY = Path("uploads") | 
					
						
						|  | UPLOAD_DIRECTORY.mkdir(parents=True, exist_ok=True) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | VALID_TOKEN = os.environ.get("SECRET_TOKEN") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | MAX_CPU_CORES = 2 | 
					
						
						|  | MAX_MEMORY_GB = 16 | 
					
						
						|  | MAX_DISK_GB = 50 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def get_directory_size(directory: Path) -> int: | 
					
						
						|  | total_size = 0 | 
					
						
						|  | for dirpath, dirnames, filenames in os.walk(directory): | 
					
						
						|  | for f in filenames: | 
					
						
						|  | fp = os.path.join(dirpath, f) | 
					
						
						|  | total_size += os.path.getsize(fp) | 
					
						
						|  | return total_size | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def get_current_token(token: str = Depends(oauth2_scheme)): | 
					
						
						|  | if token != VALID_TOKEN: | 
					
						
						|  | raise HTTPException(status_code=401, detail="Invalid token") | 
					
						
						|  | return token | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @app.get("/health") | 
					
						
						|  | def health_check(token: str = Depends(get_current_token)): | 
					
						
						|  | return { | 
					
						
						|  | "status": "healthy" | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @app.get("/metrics") | 
					
						
						|  | def get_metrics(token: str = Depends(get_current_token)): | 
					
						
						|  |  | 
					
						
						|  | cpu_percent = round((psutil.cpu_percent(interval=1) / 100) * MAX_CPU_CORES, 2) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | memory = psutil.virtual_memory() | 
					
						
						|  | memory_used_gb = (memory.total - memory.available) / (1024 ** 3) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | memory_used_gb_relative = min(memory_used_gb, MAX_MEMORY_GB) | 
					
						
						|  | memory_usage_percent = (memory_used_gb_relative / MAX_MEMORY_GB) * 100 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | uploads_size_bytes = get_directory_size(UPLOAD_DIRECTORY) | 
					
						
						|  | uploads_size_gb = uploads_size_bytes / (1024 ** 3) | 
					
						
						|  | uploads_usage_percent = (uploads_size_gb / MAX_DISK_GB) * 100 | 
					
						
						|  |  | 
					
						
						|  | return { | 
					
						
						|  | "cpu_usage_cores": cpu_percent, | 
					
						
						|  | "memory": { | 
					
						
						|  | "used_gb": round(memory_used_gb_relative, 2), | 
					
						
						|  | "used_percent_of_16gb": round(memory_usage_percent, 2) | 
					
						
						|  | }, | 
					
						
						|  | "disk": { | 
					
						
						|  | "uploads_folder_size_gb": round(uploads_size_gb, 2), | 
					
						
						|  | "uploads_usage_percent_of_50gb": round(uploads_usage_percent, 2) | 
					
						
						|  | } | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @app.post("/uploadfile/") | 
					
						
						|  | async def upload_file(file: UploadFile = File(...), token: str = Depends(get_current_token)): | 
					
						
						|  | file_location = UPLOAD_DIRECTORY / file.filename | 
					
						
						|  |  | 
					
						
						|  | with file_location.open("wb") as buffer: | 
					
						
						|  | shutil.copyfileobj(file.file, buffer) | 
					
						
						|  |  | 
					
						
						|  | return {"info": f"file '{file.filename}' saved at '{file_location}'"} | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @app.get("/downloadfile/{filename}") | 
					
						
						|  | def download_file(filename: str, token: str = Depends(get_current_token)): | 
					
						
						|  | file_location = UPLOAD_DIRECTORY / filename | 
					
						
						|  |  | 
					
						
						|  | if not file_location.exists(): | 
					
						
						|  | raise HTTPException(status_code=404, detail="File not found") | 
					
						
						|  |  | 
					
						
						|  | return FileResponse(path=file_location, filename=filename) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @app.get("/files/") | 
					
						
						|  | def list_files(token: str = Depends(get_current_token)): | 
					
						
						|  | files = [f for f in os.listdir(UPLOAD_DIRECTORY) if os.path.isfile(UPLOAD_DIRECTORY / f)] | 
					
						
						|  | return {"files": files} | 
					
						
						|  |  |