Spaces:
Paused
Paused
| import os | |
| import shutil | |
| import zipfile | |
| import json | |
| import time | |
| from contextlib import asynccontextmanager | |
| from typing import List, Dict, Any | |
| from fastapi import FastAPI, UploadFile, File, HTTPException | |
| from fastapi.responses import FileResponse | |
| from huggingface_hub import HfApi, hf_hub_download, HfFileSystem | |
| # --- Configuration --- | |
| UPLOAD_DIR = "uploaded_files" | |
| STATE_FILE_NAME = "processing_audio_state.json" # Match the client's filename | |
| STATE_FILE_PATH = os.path.join(os.getcwd(), STATE_FILE_NAME) | |
| HF_DATASET_REPO = "samfred2/AT2" | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| # Hardcoded variables for automation | |
| # Both thresholds are now 100 as requested | |
| ZIP_UPLOAD_THRESHOLD = 100 | |
| STATE_UPLOAD_THRESHOLD = 100 | |
| # Global state object (will be loaded from/saved to STATE_FILE_PATH) | |
| app_state: Dict[str, Any] = { | |
| "total_files_uploaded": 0, | |
| "current_zip_version": 0, | |
| "files_since_last_zip": 0, | |
| "files_since_last_state_upload": 0, | |
| } | |
| # Global file states (client-managed processing state) | |
| file_states_cache: Dict[str, Any] = { | |
| "next_download_index": 0, | |
| "file_states": {} | |
| } | |
| # --- Hugging Face Utility Functions --- | |
| def get_hf_api() -> HfApi: | |
| """Returns an HfApi instance, raising an error if token is missing.""" | |
| if not HF_TOKEN: | |
| raise ValueError("HF_TOKEN not found in environment variables.") | |
| return HfApi(token=HF_TOKEN) | |
| def download_state_file_from_hf(): | |
| """Downloads the state file from the Hugging Face dataset to the local path.""" | |
| try: | |
| api = get_hf_api() | |
| # Use hf_hub_download for simple file download | |
| downloaded_path = hf_hub_download( | |
| repo_id=HF_DATASET_REPO, | |
| filename=STATE_FILE_NAME, | |
| repo_type="dataset", | |
| local_dir=os.path.dirname(STATE_FILE_PATH), | |
| local_dir_use_symlinks=False | |
| ) | |
| print(f"Successfully downloaded state file to: {downloaded_path}") | |
| return True | |
| except ValueError: | |
| print("HF_TOKEN missing. Cannot download state file.") | |
| return False | |
| except Exception as e: | |
| # File not found is a common case for the first run | |
| if "404" in str(e): | |
| print("State file not found on Hugging Face. Will start with default state.") | |
| else: | |
| print(f"Error downloading state file: {e}") | |
| return False | |
| def upload_file_to_huggingface(local_path: str, path_in_repo: str): | |
| """Uploads a single file to the specified Hugging Face dataset.""" | |
| try: | |
| api = get_hf_api() | |
| api.upload_file( | |
| path_or_fileobj=local_path, | |
| path_in_repo=path_in_repo, | |
| repo_id=HF_DATASET_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message=f"Sync: {path_in_repo}" | |
| ) | |
| print(f"Successfully uploaded {os.path.basename(local_path)} to {HF_DATASET_REPO} as {path_in_repo}") | |
| return True | |
| except ValueError: | |
| print("HF_TOKEN missing. Skipping upload.") | |
| return False | |
| except Exception as e: | |
| print(f"Hugging Face upload failed for {local_path}: {e}") | |
| return False | |
| # --- State Management Functions --- | |
| def load_state(): | |
| """Loads application state from the local state file.""" | |
| global app_state, file_states_cache | |
| if os.path.exists(STATE_FILE_PATH): | |
| try: | |
| with open(STATE_FILE_PATH, "r") as f: | |
| # Load the complete state file content | |
| state_content = json.load(f) | |
| # Load internal counters | |
| if "total_files_uploaded" in state_content: | |
| app_state["total_files_uploaded"] = state_content["total_files_uploaded"] | |
| if "current_zip_version" in state_content: | |
| app_state["current_zip_version"] = state_content["current_zip_version"] | |
| if "files_since_last_zip" in state_content: | |
| app_state["files_since_last_zip"] = state_content["files_since_last_zip"] | |
| if "files_since_last_state_upload" in state_content: | |
| app_state["files_since_last_state_upload"] = state_content["files_since_last_state_upload"] | |
| # Load file_states (client-managed processing state) | |
| if "file_states" in state_content: | |
| file_states_cache["file_states"] = state_content["file_states"] | |
| if "next_download_index" in state_content: | |
| file_states_cache["next_download_index"] = state_content["next_download_index"] | |
| print(f"State loaded successfully: app_state={app_state}, file_states_cache={file_states_cache}") | |
| except Exception as e: | |
| print(f"Error loading state file: {e}. Using default state.") | |
| else: | |
| print("Local state file not found. Using default state.") | |
| def save_state(): | |
| """Saves both application state counters and file processing states to the local state file.""" | |
| global app_state, file_states_cache | |
| try: | |
| # Combine both app_state counters and file_states into one JSON | |
| complete_state = { | |
| **app_state, # Include all counters | |
| **file_states_cache # Include file_states and next_download_index | |
| } | |
| with open(STATE_FILE_PATH, "w") as f: | |
| json.dump(complete_state, f, indent=4) | |
| print(f"Complete state saved to {STATE_FILE_PATH}.") | |
| except Exception as e: | |
| print(f"Error saving state file: {e}") | |
| def upload_state_file_to_hf(): | |
| """Saves the current app_state counters and uploads the state file to Hugging Face.""" | |
| save_state() | |
| upload_file_to_huggingface(STATE_FILE_PATH, STATE_FILE_NAME) | |
| global app_state | |
| app_state["files_since_last_state_upload"] = 0 | |
| # --- File Processing Functions --- | |
| def zip_uploaded_files_versioned() -> str: | |
| """Zips all files in the upload directory (excluding state file) into a versioned zip file.""" | |
| if not os.path.exists(UPLOAD_DIR) or not os.listdir(UPLOAD_DIR): | |
| print("No files to zip.") | |
| return None | |
| # Increment version and create filename | |
| global app_state | |
| app_state["current_zip_version"] += 1 | |
| zip_filename = f"uploaded_files_{app_state['current_zip_version']}.zip" | |
| zip_path = os.path.join(os.getcwd(), zip_filename) | |
| exclude_file = STATE_FILE_NAME | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| for root, _, files in os.walk(UPLOAD_DIR): | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| # Only zip files that are NOT the state file | |
| if file != exclude_file: | |
| # Add file to zip, preserving directory structure relative to UPLOAD_DIR | |
| zipf.write(file_path, os.path.relpath(file_path, UPLOAD_DIR)) | |
| print(f"Successfully created versioned zip file at: {zip_path}") | |
| return zip_path | |
| def upload_zip_to_hf(zip_path: str): | |
| """Uploads the versioned zip file to the Hugging Face dataset.""" | |
| if not zip_path or not os.path.exists(zip_path): | |
| print("Zip file not found. Skipping upload.") | |
| return | |
| upload_file_to_huggingface(zip_path, os.path.basename(zip_path)) | |
| # Reset counter after successful upload | |
| global app_state | |
| app_state["files_since_last_zip"] = 0 | |
| def cleanup_upload_dir(): | |
| """Removes the upload directory and its contents, but keeps the state file.""" | |
| if os.path.exists(UPLOAD_DIR): | |
| for item in os.listdir(UPLOAD_DIR): | |
| item_path = os.path.join(UPLOAD_DIR, item) | |
| if os.path.isdir(item_path): | |
| shutil.rmtree(item_path) | |
| # Only remove files that are NOT the state file | |
| elif item != STATE_FILE_NAME: | |
| os.remove(item_path) | |
| print(f"Cleaned up files in {UPLOAD_DIR} directory (state file preserved).") | |
| # --- Application Lifespan --- | |
| async def lifespan(app: FastAPI): | |
| # Startup: 1. Ensure upload directory exists | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| print(f"Application starting. Upload directory: {UPLOAD_DIR}") | |
| # Startup: 2. Download and load state (only internal counters are loaded here) | |
| # We rely on the /state/ endpoint to download the file_states from HF when needed. | |
| download_state_file_from_hf() | |
| load_state() | |
| print(f"Initial state counters: {app_state}") | |
| yield | |
| # Shutdown: 1. Upload state file (only internal counters are uploaded here) | |
| print("Application shutting down. Uploading final state counters...") | |
| upload_state_file_to_hf() | |
| # Shutdown: 2. Perform final zip and upload of any remaining files | |
| print("Performing final zip and upload...") | |
| zip_path = zip_uploaded_files_versioned() | |
| if zip_path: | |
| upload_zip_to_hf(zip_path) | |
| # Clean up the created zip file after upload | |
| os.remove(zip_path) | |
| # Shutdown: 3. Cleanup | |
| cleanup_upload_dir() | |
| print("Shutdown complete.") | |
| # --- FastAPI App Initialization --- | |
| app = FastAPI( | |
| title="Versioned File Uploader and Dataset Sync Service", | |
| description="A service for file management with versioned zipping and state persistence to Hugging Face.", | |
| version="2.0.0", | |
| lifespan=lifespan | |
| ) | |
| # --- Endpoints --- | |
| async def upload_file(file: UploadFile = File(...)): | |
| """Upload a file to the server and trigger periodic tasks.""" | |
| global app_state, file_states_cache | |
| # 1. Save the file | |
| try: | |
| file_path = os.path.join(UPLOAD_DIR, file.filename) | |
| # Check if file already exists to prevent overwriting without warning | |
| if os.path.exists(file_path): | |
| # Special handling for the state file: allow overwrite as it's a sync mechanism | |
| if file.filename != STATE_FILE_NAME: | |
| raise HTTPException(status_code=409, detail=f"File '{file.filename}' already exists.") | |
| try: | |
| # Write the uploaded file to the local UPLOAD_DIR | |
| with open(file_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Could not write file to disk: {e}") | |
| # --- LOGIC FOR STATE FILE HANDLING --- | |
| if file.filename == STATE_FILE_NAME: | |
| print(f"Intercepted state file upload: {STATE_FILE_NAME}. Merging with cache...") | |
| # Read the uploaded state file and merge it with our cache | |
| try: | |
| with open(file_path, "r") as f: | |
| uploaded_state = json.load(f) | |
| # Merge the uploaded state into our cache | |
| if "file_states" in uploaded_state: | |
| file_states_cache["file_states"] = uploaded_state["file_states"] | |
| if "next_download_index" in uploaded_state: | |
| file_states_cache["next_download_index"] = uploaded_state["next_download_index"] | |
| # Save the merged state back to disk | |
| save_state() | |
| print(f"✅ State file merged: next_download_index={file_states_cache['next_download_index']}, file_states count={len(file_states_cache['file_states'])}") | |
| except Exception as e: | |
| print(f"Error processing uploaded state file: {e}") | |
| return {"filename": file.filename, "message": "State file successfully merged", "state": file_states_cache} | |
| # --- END LOGIC FOR STATE FILE HANDLING --- | |
| # 2. Update counters only after successful write of a non-state file (transcription JSON) | |
| app_state["total_files_uploaded"] += 1 | |
| app_state["files_since_last_zip"] += 1 | |
| app_state["files_since_last_state_upload"] += 1 | |
| save_state() # Save both app_state and file_states | |
| print(f"✅ File uploaded: {file.filename} | files_since_last_zip={app_state['files_since_last_zip']}") | |
| # 3. Check for state file upload threshold (100 non-state file uploads) | |
| if app_state["files_since_last_state_upload"] >= STATE_UPLOAD_THRESHOLD: | |
| print(f"State upload threshold ({STATE_UPLOAD_THRESHOLD}) reached. Uploading state file...") | |
| upload_state_file_to_hf() | |
| # 4. Check for zip upload threshold (100 non-state file uploads) | |
| if app_state["files_since_last_zip"] >= ZIP_UPLOAD_THRESHOLD: | |
| print(f"Zip upload threshold ({ZIP_UPLOAD_THRESHOLD}) reached. Zipping and uploading files...") | |
| zip_path = zip_uploaded_files_versioned() | |
| if zip_path: | |
| upload_zip_to_hf(zip_path) | |
| os.remove(zip_path) | |
| cleanup_upload_dir() | |
| return {"filename": file.filename, "message": "File successfully uploaded", "state": app_state} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"An error occurred during upload: {e}") | |
| async def download_file(filename: str): | |
| """Download a file from the server.""" | |
| file_path = os.path.join(UPLOAD_DIR, filename) | |
| if not os.path.exists(file_path): | |
| raise HTTPException(status_code=404, detail="File not found") | |
| return FileResponse( | |
| path=file_path, | |
| filename=filename, | |
| media_type='application/octet-stream' | |
| ) | |
| async def sync_dataset(): | |
| """Manually trigger zipping of all uploaded files and uploading to the Hugging Face dataset.""" | |
| print("Manual dataset sync triggered.") | |
| # 1. Upload state file (internal counters) | |
| upload_state_file_to_hf() | |
| # 2. Zip and upload files | |
| zip_path = zip_uploaded_files_versioned() | |
| if not zip_path: | |
| return {"message": "No files to sync. Upload directory is empty.", "state": app_state} | |
| upload_zip_to_hf(zip_path) | |
| # Clean up the created zip file after upload | |
| os.remove(zip_path) | |
| return {"message": "Files zipped and upload to Hugging Face dataset initiated.", "state": app_state} | |
| async def list_files(): | |
| """List all files currently available for download.""" | |
| if not os.path.exists(UPLOAD_DIR): | |
| return {"files": []} | |
| # Exclude the state file from the list of downloadable files | |
| files = [f for f in os.listdir(UPLOAD_DIR) if f != STATE_FILE_NAME] | |
| return {"files": files, "state": app_state} | |
| async def get_state(): | |
| """ | |
| Get the current application state (including file processing status). | |
| Returns both internal counters and client-managed file_states. | |
| """ | |
| global file_states_cache | |
| # Return the merged state: app_state counters + file_states cache | |
| return_state = { | |
| **app_state, | |
| **file_states_cache | |
| } | |
| print(f"✅ Returning state: next_download_index={file_states_cache.get('next_download_index')}, file_states count={len(file_states_cache.get('file_states', {}))}") | |
| return {"state": return_state} | |
| # --- Main execution block for testing/running --- | |
| if __name__ == "__main__": | |
| import uvicorn | |
| # Set the token for local testing | |
| os.environ["HF_TOKEN"] = HF_TOKEN or "dummy_token_for_local_test" | |
| # Ensure UPLOAD_DIR exists before starting | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| # Use a short timeout for local testing to simulate a quick run | |
| config = uvicorn.Config(app, host="0.0.0.0", port=8000, log_level="info") | |
| server = uvicorn.Server(config) | |
| # This block is for local testing and won't be used in the final sandbox execution | |
| # but is good practice for a runnable script. | |
| try: | |
| print("Starting server for local test...") | |
| # server.run() # Normally we would run this, but in the sandbox we use exec | |
| pass | |
| except KeyboardInterrupt: | |
| print("Server stopped by user.") | |
| finally: | |
| # Simulate cleanup that happens in the lifespan context manager | |
| # when running with uvicorn in a real environment. | |
| pass | |