Spaces:
Running
Running
| import os | |
| import subprocess | |
| import signal | |
| import sys | |
| import shutil | |
| import gradio as gr | |
| import tempfile | |
| from huggingface_hub import HfApi, ModelCard, whoami | |
| from gradio_huggingfacehub_search import HuggingfaceHubSearch | |
| from pathlib import Path | |
| from textwrap import dedent | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| # --- CONFIGURATION & CONSTANTS --- | |
| os.environ["GRADIO_ANALYTICS_ENABLED"] = "False" | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| CONVERSION_SCRIPT = "./llama.cpp/convert_hf_to_gguf.py" | |
| # --- HELPER FUNCTIONS --- | |
| def escape_html(s: str) -> str: | |
| # Escapes a string for safe HTML rendering. | |
| s = str(s) | |
| s = s.replace("&", "&") # Must be done first! | |
| s = s.replace("<", "<") | |
| s = s.replace(">", ">") | |
| s = s.replace('"', """) | |
| s = s.replace("\n", "<br/>") | |
| return s | |
| def get_platform_executable(base_name: str) -> str: | |
| # Returns the platform-specific executable name and path. | |
| executable = f"{base_name}.exe" if sys.platform == "win32" else base_name | |
| return os.path.join(".", "llama.cpp", executable) | |
| def generate_importance_matrix(model_path: str, train_data_path: str, output_path: str): | |
| # Generates the importance matrix using llama-imatrix. | |
| imatrix_executable = get_platform_executable("llama-imatrix") | |
| imatrix_command = [imatrix_executable, "-m", model_path, "-f", train_data_path, "-o", output_path, "-ngl", "0"] | |
| # --- START OF DLL FIX --- | |
| # Temporarily rename the problematic RPC DLL to prevent it from being loaded. | |
| dll_path = os.path.join(".", "llama.cpp", "ggml-rpc.dll") | |
| hidden_dll_path = os.path.join(".", "llama.cpp", "ggml-rpc.dll.hidden") | |
| rpc_dll_exists = os.path.exists(dll_path) | |
| try: | |
| if rpc_dll_exists: | |
| print(f"Temporarily hiding {dll_path} to force CPU backend...") | |
| os.rename(dll_path, hidden_dll_path) | |
| print("Running imatrix command...") | |
| process = subprocess.run(imatrix_command, capture_output=True, text=True) | |
| if process.returncode != 0: | |
| # Re-raise the exception with stdout and stderr for better debugging | |
| raise Exception(f"Imatrix generation failed:\nSTDOUT:\n{process.stdout}\n\nSTDERR:\n{process.stderr}") | |
| print("Importance matrix generation completed.") | |
| finally: | |
| # CRITICAL: Always rename the DLL back, even if the process fails. | |
| if rpc_dll_exists: | |
| print(f"Restoring {dll_path}...") | |
| os.rename(hidden_dll_path, dll_path) | |
| # --- END OF DLL FIX --- | |
| def split_and_upload_shards(model_path: str, outdir: str, repo_id: str, oauth_token: str, split_max_tensors=256, split_max_size=None): | |
| # Splits a GGUF model and uploads the shards. | |
| split_executable = get_platform_executable("llama-gguf-split") | |
| model_path_prefix = '.'.join(model_path.split('.')[:-1]) | |
| split_cmd = [split_executable, "--split"] | |
| if split_max_size: | |
| split_cmd.extend(["--split-max-size", split_max_size]) | |
| else: | |
| split_cmd.extend(["--split-max-tensors", str(split_max_tensors)]) | |
| split_cmd.extend([model_path, model_path_prefix]) | |
| print(f"Running split command: {split_cmd}") | |
| result = subprocess.run(split_cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise Exception(f"Error splitting the model: {result.stderr}") | |
| print("Model split successfully!") | |
| if os.path.exists(model_path): | |
| os.remove(model_path) | |
| model_file_prefix = os.path.basename(model_path_prefix) | |
| sharded_files = [f for f in os.listdir(outdir) if f.startswith(model_file_prefix) and f.endswith(".gguf")] | |
| if not sharded_files: | |
| raise Exception("No sharded files found after splitting.") | |
| api = HfApi(token=oauth_token) | |
| for file in sharded_files: | |
| file_path = os.path.join(outdir, file) | |
| print(f"Uploading shard: {file_path}") | |
| api.upload_file(path_or_fileobj=file_path, path_in_repo=file, repo_id=repo_id) | |
| print("All sharded model files have been uploaded successfully!") | |
| def upload_and_cleanup(temp_dir: str, oauth_token: gr.OAuthToken | None): | |
| # Handles the final upload process and cleans up the temporary directory. | |
| if not temp_dir or not os.path.exists(temp_dir): | |
| return "Error: No files found to upload.", "error.png", None, None, gr.update(visible=False), gr.update(visible=False) | |
| try: | |
| if oauth_token is None or oauth_token.token is None: | |
| raise gr.Error("Authentication token is missing. Please log in.") | |
| api = HfApi(token=oauth_token.token) | |
| username = whoami(token=oauth_token.token)["name"] | |
| quantized_gguf_path = next((os.path.join(temp_dir, f) for f in os.listdir(temp_dir) if f.endswith('.gguf')), None) | |
| imatrix_path = os.path.join(temp_dir, "imatrix.dat") | |
| readme_path = os.path.join(temp_dir, "README.md") | |
| private_repo_flag_path = os.path.join(temp_dir, "private_repo.flag") | |
| split_model_flag_path = os.path.join(temp_dir, "split_model.flag") | |
| split_tensors_path = os.path.join(temp_dir, "split_tensors.dat") | |
| split_size_path = os.path.join(temp_dir, "split_size.dat") | |
| if not quantized_gguf_path: | |
| raise FileNotFoundError("Could not find the quantized GGUF file.") | |
| quantized_gguf_name = os.path.basename(quantized_gguf_path) | |
| model_name = quantized_gguf_name.split('-')[0] | |
| quant_method_str = quantized_gguf_name.split('-')[1] | |
| is_private = os.path.exists(private_repo_flag_path) | |
| new_repo_id = f"{username}/{model_name}-{quant_method_str}-GGUF" | |
| new_repo_url = api.create_repo(repo_id=new_repo_id, exist_ok=True, private=is_private) | |
| print(f"Repo created/retrieved: {new_repo_url}") | |
| if os.path.exists(split_model_flag_path): | |
| max_tensors = int(open(split_tensors_path).read()) if os.path.exists(split_tensors_path) else 256 | |
| max_size = open(split_size_path).read() if os.path.exists(split_size_path) else None | |
| split_and_upload_shards(quantized_gguf_path, temp_dir, new_repo_id, oauth_token.token, max_tensors, max_size) | |
| else: | |
| print(f"Uploading single file: {quantized_gguf_path}") | |
| api.upload_file(path_or_fileobj=quantized_gguf_path, path_in_repo=quantized_gguf_name, repo_id=new_repo_id) | |
| if os.path.exists(imatrix_path): | |
| api.upload_file(path_or_fileobj=imatrix_path, path_in_repo="imatrix.dat", repo_id=new_repo_id) | |
| if os.path.exists(readme_path): | |
| api.upload_file(path_or_fileobj=readme_path, path_in_repo="README.md", repo_id=new_repo_id) | |
| final_message = f'<h1>✅ UPLOAD COMPLETE</h1><br/>Find your repo here: <a href="{new_repo_url}" target="_blank" style="text-decoration:underline">{new_repo_id}</a>' | |
| final_image = "llama.png" | |
| except Exception as e: | |
| final_message = f'<h1>❌ UPLOAD ERROR</h1><br/><pre style="white-space:pre-wrap;">{escape_html(str(e))}</pre>' | |
| final_image = "error.png" | |
| finally: | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| print(f"Cleaned up temporary directory: {temp_dir}") | |
| return final_message, final_image, None, None, gr.update(visible=False), gr.update(visible=False) | |
| def delete_files(temp_dir: str): | |
| # Deletes the temporary directory and resets the UI. | |
| if temp_dir and os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| message = "Local files have been deleted." | |
| print(f"User deleted temporary directory: {temp_dir}") | |
| else: | |
| message = "No local files to delete." | |
| return message, "llama.png", None, None, gr.update(visible=False), gr.update(visible=False) | |
| def process_model(model_id, q_method, use_imatrix, imatrix_q_method, private_repo, train_data_file, split_model, split_max_tensors, split_max_size, oauth_token: gr.OAuthToken | None): | |
| # Main function to download, convert, and quantize the model. | |
| # Unconditionally use the gr.OAuthToken object from the Login Button. | |
| if oauth_token is None or oauth_token.token is None: | |
| raise gr.Error("Authentication failed. Please log in to Hugging Face.") | |
| try: | |
| # Use the .token attribute directly | |
| whoami(token=oauth_token.token) | |
| except Exception as e: | |
| raise gr.Error(f"Authentication failed. Is your token valid? Error: {e}") | |
| model_name = model_id.split('/')[-1] | |
| # Ensure the outputs directory exists before trying to use it | |
| os.makedirs("outputs", exist_ok=True) | |
| outdir = tempfile.mkdtemp(dir="outputs") | |
| try: | |
| api = HfApi(token=oauth_token.token) | |
| dl_pattern = ["*.md", "*.json", "*.model"] | |
| try: | |
| repo_tree = api.list_repo_tree(repo_id=model_id, recursive=True) | |
| pattern = "*.safetensors" if any(f.path.endswith(".safetensors") for f in repo_tree) else "*.bin" | |
| except Exception: | |
| print("Could not determine primary file type, downloading both .safetensors and .bin") | |
| pattern = ["*.safetensors", "*.bin"] | |
| dl_pattern.extend(pattern if isinstance(pattern, list) else [pattern]) | |
| if not os.path.exists("downloads"): os.makedirs("downloads") | |
| if not os.path.exists("outputs"): os.makedirs("outputs") | |
| fp16 = str(Path(outdir) / f"{model_name}.fp16.gguf") | |
| # --- START OF CACHING LOGIC --- | |
| # Define a permanent cache directory path | |
| model_cache_root = Path("./model_cache") | |
| # Sanitize the model_id to create a valid directory name (e.g., "google/gemma-2b" -> "google__gemma-2b") | |
| sanitized_model_id = model_id.replace("/", "__") | |
| local_dir = model_cache_root / sanitized_model_id | |
| # Check if the model is already cached by looking for a sentinel file | |
| sentinel_file = local_dir / ".download_complete" | |
| if local_dir.exists() and sentinel_file.exists(): | |
| print(f"Model '{model_id}' found in cache. Skipping download.") | |
| else: | |
| print(f"Model '{model_id}' not found in cache. Starting download...") | |
| local_dir.mkdir(parents=True, exist_ok=True) | |
| api.snapshot_download(repo_id=model_id, local_dir=str(local_dir), local_dir_use_symlinks=False, allow_patterns=dl_pattern) | |
| # Create a sentinel file to mark the download as complete | |
| sentinel_file.touch() | |
| print("Download complete and cached.") | |
| # --- END OF CACHING LOGIC --- | |
| result = subprocess.run(["python", CONVERSION_SCRIPT, str(local_dir), "--outtype", "f16", "--outfile", fp16], capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise Exception(f"Error converting to fp16: {result.stderr}") | |
| print(f"Model converted to fp16 successfully: {fp16}") | |
| imatrix_path = Path(outdir) / "imatrix.dat" | |
| if use_imatrix: | |
| train_data_path = train_data_file.name if train_data_file else "llama.cpp/groups_merged.txt" | |
| if not os.path.isfile(train_data_path): | |
| raise Exception(f"Training data file not found: {train_data_path}") | |
| generate_importance_matrix(fp16, train_data_path, str(imatrix_path)) | |
| quant_method_str = (imatrix_q_method if use_imatrix else q_method).upper() | |
| quantized_gguf_name = f"{model_name.lower()}-{quant_method_str}.gguf" | |
| quantized_gguf_path = str(Path(outdir) / quantized_gguf_name) | |
| quantize_executable = get_platform_executable("llama-quantize") | |
| quantise_ggml = [quantize_executable] | |
| if use_imatrix: | |
| quantise_ggml.extend(["--imatrix", str(imatrix_path)]) | |
| quantise_ggml.extend([fp16, quantized_gguf_path, quant_method_str]) | |
| result = subprocess.run(quantise_ggml, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise Exception(f"Error quantizing: {result.stderr}") | |
| print(f"Quantized successfully: {quantized_gguf_path}") | |
| if private_repo: open(os.path.join(outdir, "private_repo.flag"), 'a').close() | |
| if split_model: | |
| open(os.path.join(outdir, "split_model.flag"), 'a').close() | |
| with open(os.path.join(outdir, "split_tensors.dat"), 'w') as f: f.write(str(split_max_tensors)) | |
| if split_max_size: | |
| with open(os.path.join(outdir, "split_size.dat"), 'w') as f: f.write(split_max_size) | |
| username = whoami(token=oauth_token.token)["name"] | |
| new_repo_id = f"{username}/{model_name}-{quant_method_str}-GGUF" | |
| space_id = os.environ.get("HF_SPACE_ID", "naphula/gguf-repo-suite") | |
| space_link = f"[{space_id.split('/')[-1]}](https://huggingface.co/spaces/{space_id})" | |
| card = ModelCard("") | |
| card.data.base_model = model_id | |
| card.text = f"# GGUF Model Card for {new_repo_id}\nConverted from [{model_id}](https://huggingface.co/{model_id}) via {space_link}." | |
| card.save(os.path.join(outdir, "README.md")) | |
| return ( | |
| "Files generated successfully. You can now download them locally or choose an action below.", | |
| "llama.png", | |
| quantized_gguf_path, | |
| str(imatrix_path) if use_imatrix and os.path.exists(imatrix_path) else None, | |
| gr.update(visible=True), | |
| gr.update(visible=True), | |
| outdir, | |
| ) | |
| except Exception as e: | |
| if os.path.exists(outdir): # Keep this commented out to prevent outputs folder from being automatically deleted | |
| shutil.rmtree(outdir) # Keep this commented out to prevent outputs folder from being automatically deleted | |
| return ( | |
| f'<h1>❌ ERROR</h1><br/><pre style="white-space:pre-wrap;">{escape_html(str(e))}</pre>', # 1. output_markdown | |
| "error.png", # 2. output_image | |
| None, # 3. gguf_download_link | |
| None, # 4. imatrix_download_link | |
| gr.update(visible=False), # 5. download_row | |
| gr.update(visible=False), # 6. action_row | |
| None # 7. temp_dir_state | |
| ) | |
| # --- GRADIO UI DEFINITION --- | |
| with gr.Blocks(css=".gradio-container {overflow-y: auto;}") as demo: | |
| gr.Markdown("# Create your own GGUF Quants, blazingly fast ⚡!") | |
| gr.Markdown( | |
| "The space takes an HF repo as an input, quantizes it and creates a Public repo containing the selected quant under your HF user namespace.\n\n" | |
| "This space (originally by ggml-org) was modified by Fentible/Naphula to support lower IQ quants and local execution.\n\n" | |
| "See the readme here for more information: https://huggingface.co/spaces/Naphula/gguf-repo-suite/blob/main/README.md\n\n" | |
| "The 16GB CPU Basic version does not work on hugging face spaces. It hasn't been tested on a higher capacity rented space either.\n\n" | |
| "This modified suite is only confirmed to work on Windows. As such, you should clone this repo and host it locally via python venv." | |
| ) | |
| # Create the Login Button, which will be visible in all environments. | |
| # Locally, it will use your cached hf_token. On a Space, it provides the full login flow. | |
| gr.Markdown("You must be logged in to upload to the Hub.") | |
| oauth_token_state = gr.LoginButton(min_width=250) | |
| gr.Markdown("## 1. Select Model and Quantization Options") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| # Attempt to use the search component everywhere | |
| model_id = HuggingfaceHubSearch( | |
| label="Hub Model ID", | |
| placeholder="Search for model id on Huggingface", | |
| search_type="model", | |
| ) | |
| with gr.Row(): | |
| use_imatrix = gr.Checkbox(label="Use Imatrix Quantization", info="Use importance matrix for quantization.") | |
| private_repo = gr.Checkbox(label="Private Repo", info="Create a private repo under your username.") | |
| split_model = gr.Checkbox(label="Split Model", info="Shard the model using gguf-split.") | |
| with gr.Column(scale=1): | |
| q_method = gr.Dropdown(["TQ1_0", "TQ2_0", "Q2_K", "Q3_K_S", "Q3_K_M", "Q3_K_L", "Q4_0", "Q4_K_S", "Q4_K_M", "Q5_0", "Q5_K_S", "Q5_K_M", "Q6_K", "Q8_0"], label="Quantization Method", value="Q4_K_M", filterable=False) | |
| imatrix_q_method = gr.Dropdown(["IQ1_S", "IQ1_M", "IQ2_XXS", "IQ2_XS", "IQ2_S", "IQ2_M", "IQ3_XXS", "IQ3_XS", "IQ3_S", "IQ3_M", "Q4_K_M", "Q4_K_S", "IQ4_NL", "IQ4_XS", "Q5_K_M", "Q5_K_S"], label="Imatrix Quantization Method", value="IQ4_NL", filterable=False, visible=False) | |
| train_data_file = gr.File(label="Training Data File", visible=False) | |
| split_max_tensors = gr.Number(label="Max Tensors per File", value=256, visible=False) | |
| split_max_size = gr.Textbox(label="Max File Size", info="Accepted suffixes: M, G. Example: 256M, 5G", visible=False) | |
| quantize_btn = gr.Button("Quantize Model", variant="primary") | |
| gr.Markdown("## 2. Results") | |
| with gr.Row(): | |
| output_markdown = gr.Markdown(label="Output") | |
| output_image = gr.Image(show_label=False, value="llama.png") | |
| with gr.Row(visible=False) as download_row: | |
| gguf_download_link = gr.File(label="Download Quantized GGUF", interactive=False) | |
| imatrix_download_link = gr.File(label="Download imatrix.dat", interactive=False, visible=False) | |
| with gr.Row(visible=False) as action_row: | |
| proceed_to_upload_btn = gr.Button("Proceed to Upload", variant="primary") | |
| delete_local_files_btn = gr.Button("Delete Local Files", variant="stop") | |
| temp_dir_state = gr.State() | |
| # --- Event Handlers --- | |
| quantize_btn.click( | |
| fn=process_model, | |
| inputs=[model_id, q_method, use_imatrix, imatrix_q_method, private_repo, train_data_file, split_model, split_max_tensors, split_max_size], # oauth_token_state NOW PASSED IMPLICITLY | |
| outputs=[output_markdown, output_image, gguf_download_link, imatrix_download_link, download_row, action_row, temp_dir_state] | |
| ) | |
| proceed_to_upload_btn.click( | |
| fn=upload_and_cleanup, | |
| inputs=[temp_dir_state], # oauth_token_state NOW PASSED IMPLICITLY | |
| outputs=[output_markdown, output_image, gguf_download_link, imatrix_download_link, download_row, action_row] | |
| ) | |
| delete_local_files_btn.click( | |
| fn=delete_files, | |
| inputs=[temp_dir_state], | |
| outputs=[output_markdown, output_image, gguf_download_link, imatrix_download_link, download_row, action_row] | |
| ) | |
| split_model.change(lambda x: (gr.update(visible=x), gr.update(visible=x)), split_model, [split_max_tensors, split_max_size]) | |
| use_imatrix.change(lambda x: (gr.update(visible=not x), gr.update(visible=x), gr.update(visible=x), gr.update(visible=x)), use_imatrix, [q_method, imatrix_q_method, train_data_file, imatrix_download_link]) | |
| # --- SCHEDULER & LAUNCH --- | |
| space_id = os.environ.get("HF_SPACE_ID") | |
| if space_id and HF_TOKEN: | |
| print(f"Running on HF Space: {space_id}. Scheduling a restart every 3 hours.") | |
| def restart_space(): | |
| try: | |
| HfApi().restart_space(repo_id=space_id, token=HF_TOKEN, factory_reboot=True) | |
| except Exception as e: | |
| print(f"Error scheduling space restart: {e}") | |
| scheduler = BackgroundScheduler() | |
| scheduler.add_job(restart_space, "interval", seconds=10800) | |
| scheduler.start() | |
| else: | |
| print("Not running on a Hugging Face Space or HF_TOKEN not set. Skipping space restart schedule.") | |
| demo.queue(default_concurrency_limit=1, max_size=5).launch(debug=True, show_api=False) |