Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, UploadFile, File | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from huggingface_hub import upload_file | |
| from huggingface_hub import list_repo_files | |
| from huggingface_hub import list_repo_files, hf_hub_download, upload_file | |
| import os | |
| import os | |
| import zipfile | |
| import tempfile # β Add this! | |
| app = FastAPI() | |
| # CORS setup to allow requests from your frontend | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Replace "*" with your frontend domain in production | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def health_check(): | |
| return {"status": "β FastAPI running on Hugging Face Spaces!"} | |
| REPO_ID = "rahul7star/ohamlab" | |
| FOLDER = "demo" | |
| BASE_URL = f"https://huggingface.co/{REPO_ID}/resolve/main/" | |
| def list_images(): | |
| try: | |
| all_files = list_repo_files(REPO_ID) | |
| folder_prefix = FOLDER.rstrip("/") + "/" | |
| files_in_folder = [ | |
| f for f in all_files | |
| if f.startswith(folder_prefix) | |
| and "/" not in f[len(folder_prefix):] # no subfolder files | |
| and f.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) | |
| ] | |
| urls = [BASE_URL + f for f in files_in_folder] | |
| return {"images": urls} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| from datetime import datetime | |
| import tempfile | |
| import uuid | |
| async def upload_zip(file: UploadFile = File(...)): | |
| if not file.filename.endswith(".zip"): | |
| return {"error": "Please upload a .zip file"} | |
| # Save the ZIP to /tmp | |
| temp_zip_path = f"/tmp/{file.filename}" | |
| with open(temp_zip_path, "wb") as f: | |
| f.write(await file.read()) | |
| # Create a unique subfolder name inside 'demo/' | |
| timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| unique_id = uuid.uuid4().hex[:6] | |
| folder_name = f"upload_{timestamp}_{unique_id}" | |
| hf_folder_prefix = f"demo/{folder_name}" | |
| try: | |
| with tempfile.TemporaryDirectory() as extract_dir: | |
| # Extract zip | |
| with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref: | |
| zip_ref.extractall(extract_dir) | |
| uploaded_files = [] | |
| # Upload all extracted files | |
| for root_dir, _, files in os.walk(extract_dir): | |
| for name in files: | |
| file_path = os.path.join(root_dir, name) | |
| relative_path = os.path.relpath(file_path, extract_dir) | |
| repo_path = f"{hf_folder_prefix}/{relative_path}".replace("\\", "/") | |
| upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo=repo_path, | |
| repo_id="rahul7star/ohamlab", | |
| repo_type="model", | |
| commit_message=f"Upload {relative_path} to {folder_name}", | |
| token=True, | |
| ) | |
| uploaded_files.append(repo_path) | |
| return { | |
| "message": f"β Uploaded {len(uploaded_files)} files", | |
| "folder": folder_name, | |
| "files": uploaded_files, | |
| } | |
| except Exception as e: | |
| return {"error": f"β Failed to process zip: {str(e)}"} | |
| async def upload_image(file: UploadFile = File(...)): | |
| filename = file.filename | |
| contents = await file.read() | |
| # Save temporarily | |
| temp_path = f"/tmp/{filename}" | |
| with open(temp_path, "wb") as f: | |
| f.write(contents) | |
| try: | |
| upload_file( | |
| path_or_fileobj=temp_path, | |
| path_in_repo=f"demo/{filename}", | |
| repo_id="rahul7star/ohamlab", | |
| repo_type="model", | |
| commit_message=f"Upload {filename} to demo folder", | |
| token=True, # uses HF_TOKEN from Space secrets | |
| ) | |
| return {"message": f"β {filename} uploaded successfully!"} | |
| except Exception as e: | |
| return {"error": f"β Upload failed: {str(e)}"} | |
| #Tranining Data set start fitering data for traninig | |
| T_REPO_ID = "rahul7star/ohamlab" | |
| T_FOLDER = "demo" | |
| T_BASE_URL = f"https://huggingface.co/{T_REPO_ID}/resolve/main/" | |
| DESCRIPTION_TEXT = ( | |
| "Ra3hul is wearing a black jacket over a striped white t-shirt with blue jeans. " | |
| "He is standing near a lake with his arms spread wide open, with mountains and cloudy skies in the background." | |
| ) | |
| def filter_and_rename_images(): | |
| try: | |
| all_files = list_repo_files(T_REPO_ID) | |
| folder_prefix = T_FOLDER.rstrip("/") + "/" | |
| filter_folder = f"filter-{T_FOLDER}" | |
| filter_prefix = filter_folder + "/" | |
| image_files = [ | |
| f for f in all_files | |
| if f.startswith(folder_prefix) | |
| and "/" not in f[len(folder_prefix):] | |
| and f.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) | |
| ] | |
| if not image_files: | |
| return {"error": f"No images found in folder '{T_FOLDER}'"} | |
| uploaded_files = [] | |
| for idx, orig_path in enumerate(image_files, start=1): | |
| # Download file content into bytes (no local disk) | |
| local_path = hf_hub_download(repo_id=T_REPO_ID, filename=orig_path) | |
| with open(local_path, "rb") as f: | |
| file_bytes = f.read() | |
| new_image_name = f"image{idx}.jpeg" # rename all to .jpeg | |
| # Upload image bytes directly from memory | |
| upload_file( | |
| path_or_fileobj=io.BytesIO(file_bytes), | |
| path_in_repo=filter_prefix + new_image_name, | |
| repo_id=T_REPO_ID, | |
| repo_type="model", | |
| commit_message=f"Upload renamed image {new_image_name} to {filter_folder}", | |
| token=True, | |
| ) | |
| uploaded_files.append(filter_prefix + new_image_name) | |
| # Create and upload text file from string bytes | |
| txt_filename = f"image{idx}.txt" | |
| upload_file( | |
| path_or_fileobj=io.BytesIO(DESCRIPTION_TEXT.encode("utf-8")), | |
| path_in_repo=filter_prefix + txt_filename, | |
| repo_id=T_REPO_ID, | |
| repo_type="model", | |
| commit_message=f"Upload text file {txt_filename} to {filter_folder}", | |
| token=True, | |
| ) | |
| uploaded_files.append(filter_prefix + txt_filename) | |
| return {"message": f"Processed and uploaded {len(image_files)} images and text files.", "files": uploaded_files} | |
| except Exception as e: | |
| return {"error": str(e)} | |