Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, Form, BackgroundTasks, HTTPException, Depends | |
| from fastapi.responses import HTMLResponse, JSONResponse | |
| from pathlib import Path | |
| from typing import List | |
| from uploader import HuggingFaceUploader | |
| import shutil | |
| import uuid | |
| import os | |
| app = FastAPI() | |
| # Initialize HuggingFaceUploader with your token and repo_id | |
| hf_token = os.getenv("HF_TOKEN") | |
| repo_id = os.getenv("REPO_ID") | |
| uploader = HuggingFaceUploader(hf_token, repo_id) | |
| # Directories for temporary uploads and status tracking | |
| UPLOAD_FOLDER = Path("temp_uploads") | |
| STATUS_FOLDER = Path("status") | |
| UPLOAD_FOLDER.mkdir(exist_ok=True) | |
| STATUS_FOLDER.mkdir(exist_ok=True) | |
| # Access key for upload authorization | |
| ACCESS_KEY = os.getenv("PASSWORD") | |
| def save_progress(status_file, filename, status): | |
| """Helper function to update progress to a status file.""" | |
| with status_file.open("a", encoding="utf-8") as f: | |
| f.write(f"{filename}: {status}\n") | |
| def upload_file_task(temp_folder_path: Path, destination_folder: str, status_file: Path): | |
| """Background task to upload files and log progress.""" | |
| try: | |
| total_files = len(list(temp_folder_path.rglob('*'))) | |
| uploaded_files = 0 | |
| for local_path in temp_folder_path.rglob('*'): | |
| if local_path.is_file(): | |
| relative_path = local_path.relative_to(temp_folder_path) | |
| path_in_repo = str(Path(destination_folder) / relative_path).replace("\\", "/") | |
| uploader.api.upload_file( | |
| path_or_fileobj=str(local_path), | |
| path_in_repo=path_in_repo, | |
| repo_id=uploader.repo_id, | |
| repo_type="model" | |
| ) | |
| uploaded_files += 1 | |
| save_progress(status_file, str(local_path.name), "Uploaded") | |
| # Log current progress | |
| save_progress(status_file, "Progress", f"{uploaded_files}/{total_files} files uploaded.") | |
| except Exception as e: | |
| save_progress(status_file, "error", str(e)) | |
| finally: | |
| # Clean up files | |
| for file in temp_folder_path.glob('*'): | |
| file.unlink() | |
| temp_folder_path.rmdir() | |
| async def main(): | |
| with open("index.html") as f: | |
| return HTMLResponse(content=f.read()) | |
| async def upload_files( | |
| background_tasks: BackgroundTasks, | |
| access_key: str = Form(...), | |
| destination_folder: str = Form(...), | |
| files: List[UploadFile] = File(...) | |
| ): | |
| # Verify access key | |
| if access_key != ACCESS_KEY: | |
| raise HTTPException(status_code=403, detail="Invalid access key") | |
| temp_folder_path = UPLOAD_FOLDER / uuid.uuid4().hex | |
| temp_folder_path.mkdir(parents=True, exist_ok=True) | |
| # Save uploaded files temporarily | |
| for file in files: | |
| file_path = temp_folder_path / file.filename | |
| with file_path.open("wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| # Create a status file for tracking progress | |
| status_file = STATUS_FOLDER / f"{temp_folder_path.name}.txt" | |
| background_tasks.add_task(upload_file_task, temp_folder_path, destination_folder, status_file) | |
| return JSONResponse(content={"status_id": temp_folder_path.name}) | |
| async def get_progress(status_id: str): | |
| """Get the progress of the upload from the status file.""" | |
| status_file = STATUS_FOLDER / f"{status_id}.txt" | |
| if not status_file.exists(): | |
| raise HTTPException(status_code=404, detail="Status not found") | |
| with status_file.open("r", encoding="utf-8") as f: | |
| progress = f.readlines() | |
| return JSONResponse(content={"progress": progress}) | |