Spaces:
Runtime error
Runtime error
| from typing import List, Dict | |
| import httpx | |
| import gradio as gr | |
| import pandas as pd | |
| from huggingface_hub import HfApi, ModelCard, snapshot_download, login | |
| import base64 | |
| import io | |
| import zipfile | |
| import asyncio | |
| import aiohttp | |
| from pathlib import Path | |
| import emoji | |
| import tempfile | |
| import shutil | |
| import os | |
| # Initialize HuggingFace with access token | |
| def init_huggingface(token: str): | |
| """Initialize HuggingFace with access token.""" | |
| try: | |
| login(token=token) | |
| return True | |
| except Exception as e: | |
| print(f"Error logging in: {str(e)}") | |
| return False | |
| def format_link(item: Dict, number: int, search_type: str) -> str: | |
| """Format a link for display in the UI.""" | |
| link = item['link'] | |
| readme_link = f"{link}/blob/main/README.md" | |
| title = f"{number}. {item['id']}" | |
| metadata = f"Author: {item['author']}" | |
| if 'downloads' in item: | |
| metadata += f", Downloads: {item['downloads']}" | |
| html = f""" | |
| <div style="margin-bottom: 10px;"> | |
| <strong>{title}</strong><br> | |
| <a href="{link}" target="_blank" style="color: #4a90e2; text-decoration: none;">View {search_type[:-1]}</a> | | |
| <a href="{readme_link}" target="_blank" style="color: #4a90e2; text-decoration: none;">View README</a><br> | |
| <small>{metadata}</small> | |
| </div> | |
| """ | |
| return html | |
| def display_results(df: pd.DataFrame): | |
| """Display search results in HTML format.""" | |
| if df is not None and not df.empty: | |
| html = "<div style='max-height: 400px; overflow-y: auto;'>" | |
| for _, row in df.iterrows(): | |
| html += row['formatted_link'] | |
| html += "</div>" | |
| return html | |
| else: | |
| return "<p>No results found.</p>" | |
| def SwarmyTime(data: List[Dict]) -> Dict: | |
| """Aggregates all content from the given data.""" | |
| aggregated = { | |
| "total_items": len(data), | |
| "unique_authors": set(), | |
| "total_downloads": 0, | |
| "item_types": {"Models": 0, "Datasets": 0, "Spaces": 0} | |
| } | |
| for item in data: | |
| aggregated["unique_authors"].add(item.get("author", "Unknown")) | |
| aggregated["total_downloads"] += item.get("downloads", 0) | |
| if "modelId" in item: | |
| aggregated["item_types"]["Models"] += 1 | |
| elif "dataset" in item.get("id", ""): | |
| aggregated["item_types"]["Datasets"] += 1 | |
| else: | |
| aggregated["item_types"]["Spaces"] += 1 | |
| aggregated["unique_authors"] = len(aggregated["unique_authors"]) | |
| return aggregated | |
| def search_hub(query: str, search_type: str, token: str = None) -> pd.DataFrame: | |
| """Search the Hugging Face Hub for models, datasets, or spaces.""" | |
| api = HfApi(token=token) | |
| if search_type == "Models": | |
| results = api.list_models(search=query) | |
| data = [{"id": model.modelId, "author": model.author, "downloads": model.downloads, "link": f"https://huggingface.co/{model.modelId}"} for model in results] | |
| elif search_type == "Datasets": | |
| results = api.list_datasets(search=query) | |
| data = [{"id": dataset.id, "author": dataset.author, "downloads": dataset.downloads, "link": f"https://huggingface.co/datasets/{dataset.id}"} for dataset in results] | |
| elif search_type == "Spaces": | |
| results = api.list_spaces(search=query) | |
| data = [{"id": space.id, "author": space.author, "link": f"https://huggingface.co/spaces/{space.id}"} for space in results] | |
| else: | |
| data = [] | |
| for i, item in enumerate(data, 1): | |
| item['number'] = i | |
| item['formatted_link'] = format_link(item, i, search_type) | |
| return pd.DataFrame(data) | |
| async def download_readme(session: aiohttp.ClientSession, item: Dict, token: str) -> tuple[str, str]: | |
| """Download README.md file for a given item.""" | |
| item_id = item['id'] | |
| # Different base URLs for different repository types | |
| if 'datasets' in item['link']: | |
| raw_url = f"https://huggingface.co/datasets/{item_id}/raw/main/README.md" | |
| alt_url = f"https://huggingface.co/datasets/{item_id}/raw/master/README.md" | |
| elif 'spaces' in item['link']: | |
| raw_url = f"https://huggingface.co/spaces/{item_id}/raw/main/README.md" | |
| alt_url = f"https://huggingface.co/spaces/{item_id}/raw/master/README.md" | |
| else: # Models | |
| raw_url = f"https://huggingface.co/{item_id}/raw/main/README.md" | |
| alt_url = f"https://huggingface.co/{item_id}/raw/master/README.md" | |
| headers = {"Authorization": f"Bearer {token}"} if token else {} | |
| try: | |
| # Try main branch first | |
| async with session.get(raw_url, headers=headers) as response: | |
| if response.status == 200: | |
| content = await response.text() | |
| return item_id.replace('/', '_'), content | |
| # If main branch fails, try master branch | |
| if response.status in [401, 404]: | |
| async with session.get(alt_url, headers=headers) as alt_response: | |
| if alt_response.status == 200: | |
| content = await alt_response.text() | |
| return item_id.replace('/', '_'), content | |
| # If both attempts fail, return error message | |
| error_msg = f"# Error downloading README for {item_id}\n" | |
| if response.status == 401: | |
| error_msg += "Authentication required. Please provide a valid HuggingFace token." | |
| else: | |
| error_msg += f"Status code: {response.status}" | |
| return item_id.replace('/', '_'), error_msg | |
| except Exception as e: | |
| return item_id.replace('/', '_'), f"# Error downloading README for {item_id}\nError: {str(e)}" | |
| async def download_all_readmes(data: List[Dict], token: str) -> tuple[str, str]: | |
| """Download all README files and create a zip archive.""" | |
| if not data: | |
| return "", "No results to download" | |
| zip_buffer = io.BytesIO() | |
| status_message = "Downloading READMEs..." | |
| failed_downloads = [] | |
| async with aiohttp.ClientSession() as session: | |
| tasks = [download_readme(session, item, token) for item in data] | |
| results = await asyncio.gather(*tasks) | |
| with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
| for filename, content in results: | |
| if "Error downloading README" in content: | |
| failed_downloads.append(filename) | |
| zip_file.writestr(f"{filename}.md", content) | |
| zip_buffer.seek(0) | |
| base64_zip = base64.b64encode(zip_buffer.getvalue()).decode() | |
| status = "READMEs ready for download!" | |
| if failed_downloads: | |
| status += f" (Failed to download {len(failed_downloads)} READMEs)" | |
| download_link = f""" | |
| <div style="margin-top: 10px;"> | |
| <a href="data:application/zip;base64,{base64_zip}" | |
| download="readmes.zip" | |
| style="display: inline-block; padding: 10px 20px; | |
| background-color: #4CAF50; color: white; | |
| text-decoration: none; border-radius: 5px;"> | |
| 📥 Download READMEs Archive | |
| </a> | |
| {f'<p style="color: #ff6b6b; margin-top: 10px;">Note: Some READMEs could not be downloaded. Please check the zip file for details.</p>' if failed_downloads else ''} | |
| </div> | |
| """ | |
| return download_link, status | |
| def download_repository(repo_id: str, repo_type: str, temp_dir: str, token: str) -> str: | |
| """Download a single repository.""" | |
| try: | |
| repo_path = snapshot_download( | |
| repo_id=repo_id, | |
| repo_type=repo_type.lower()[:-1], # Remove 's' from 'Models'/'Datasets'/'Spaces' | |
| local_dir=os.path.join(temp_dir, repo_id.replace('/', '_')), | |
| ignore_patterns=["*.bin", "*.pt", "*.pth", "*.ckpt", "*.safetensors"], # Ignore large binary files | |
| token=token | |
| ) | |
| return repo_path | |
| except Exception as e: | |
| print(f"Error downloading {repo_id}: {str(e)}") | |
| return None | |
| def create_repo_zip(data: List[Dict], search_type: str, token: str) -> tuple[str, str]: | |
| """Download repositories and create a zip archive.""" | |
| if not data: | |
| return "", "No repositories to download" | |
| # Create temporary directory | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| successful_downloads = [] | |
| # Download each repository | |
| for item in data: | |
| repo_path = download_repository(item['id'], search_type, temp_dir, token) | |
| if repo_path: | |
| successful_downloads.append(repo_path) | |
| if not successful_downloads: | |
| return "", "No repositories were successfully downloaded" | |
| # Create zip file in memory | |
| zip_buffer = io.BytesIO() | |
| with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
| for repo_path in successful_downloads: | |
| repo_name = os.path.basename(repo_path) | |
| for root, _, files in os.walk(repo_path): | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| arcname = os.path.join(repo_name, os.path.relpath(file_path, repo_path)) | |
| zip_file.write(file_path, arcname) | |
| # Convert to base64 | |
| zip_buffer.seek(0) | |
| base64_zip = base64.b64encode(zip_buffer.getvalue()).decode() | |
| download_link = f""" | |
| <div style="margin-top: 10px;"> | |
| <a href="data:application/zip;base64,{base64_zip}" | |
| download="repositories.zip" | |
| style="display: inline-block; padding: 10px 20px; | |
| background-color: #4CAF50; color: white; | |
| text-decoration: none; border-radius: 5px;"> | |
| 📥 Download Repositories Archive | |
| </a> | |
| </div> | |
| """ | |
| return download_link, f"Successfully downloaded {len(successful_downloads)} repositories" | |
| # Gradio Interface | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # Search the Hugging Face Hub | |
| Search and download models, datasets, and spaces from Hugging Face. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| hf_token = gr.Textbox( | |
| label="HuggingFace Access Token (optional)", | |
| type="password", | |
| placeholder="Enter your HuggingFace access token...", | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| search_query = gr.Textbox( | |
| label="Search Query", | |
| value="awacke1", | |
| placeholder="Enter search term..." | |
| ) | |
| with gr.Column(scale=2): | |
| search_type = gr.Radio( | |
| ["Models", "Datasets", "Spaces"], | |
| label="Search Type", | |
| value="Models", | |
| container=True | |
| ) | |
| with gr.Column(scale=1): | |
| search_button = gr.Button("🔍 Search", variant="primary", scale=1) | |
| with gr.Row(variant="panel"): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Download Options") | |
| with gr.Row(): | |
| download_readme_button = gr.Button( | |
| "📚 Download READMEs", | |
| variant="secondary", | |
| ) | |
| download_repo_button = gr.Button( | |
| "📦 Download Repositories", | |
| variant="secondary", | |
| ) | |
| download_status = gr.Markdown("Status: Ready to download", label="Status") | |
| download_area = gr.HTML("", label="Download Link") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| results_html = gr.HTML(label="Search Results") | |
| with gr.Column(scale=1): | |
| aggregated_output = gr.JSON(label="Search Statistics") | |
| search_type_state = gr.State("") | |
| current_results = gr.State([]) | |
| def search_and_aggregate(query, search_type, token): | |
| df = search_hub(query, search_type, token) | |
| data = df.to_dict('records') | |
| aggregated = SwarmyTime(data) | |
| html_results = display_results(df) | |
| return [ | |
| html_results, | |
| "Status: Ready to download", | |
| "", | |
| aggregated, | |
| search_type, | |
| data | |
| ] | |
| async def handle_readme_download(data, token): | |
| if not data: | |
| return ["Status: No results to download", ""] | |
| download_link, status = await download_all_readmes(data, token) | |
| return [f"Status: {status}", download_link] | |
| def handle_repo_download(data, search_type, token): | |
| if not data: | |
| return ["Status: No results to download", ""] | |
| download_link, status = create_repo_zip(data, search_type, token) | |
| return [f"Status: {status}", download_link] | |
| search_button.click( | |
| search_and_aggregate, | |
| inputs=[search_query, search_type, hf_token], | |
| outputs=[ | |
| results_html, | |
| download_status, | |
| download_area, | |
| aggregated_output, | |
| search_type_state, | |
| current_results | |
| ] | |
| ) | |
| download_readme_button.click( | |
| handle_readme_download, | |
| inputs=[current_results, hf_token], | |
| outputs=[download_status, download_area] | |
| ) | |
| download_repo_button.click( | |
| handle_repo_download, | |
| inputs=[current_results, search_type_state, hf_token], | |
| outputs=[download_status, download_area] | |
| ) | |
| demo.launch(debug=True) |