Spaces:
Paused
Paused
| import gradio as gr | |
| import torch | |
| import os | |
| import logging | |
| import time | |
| import tempfile | |
| import shutil | |
| import subprocess | |
| from datetime import datetime | |
| from pathlib import Path | |
| from huggingface_hub import HfApi | |
| from transformers import AutoConfig, AutoModel, AutoTokenizer | |
| from optimum.onnxruntime import ORTQuantizer | |
| from optimum.onnxruntime.configuration import AutoQuantizationConfig | |
| import torch.nn.utils.prune as prune | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| if not HF_TOKEN: | |
| logging.warning("HF_TOKEN environment variable not set. Packaging and uploading will fail.") | |
| api = HfApi() | |
| OUTPUT_DIR = "/tmp/optimized_models" | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| # Use an absolute path to the pre-built location in /opt | |
| LLAMA_CPP_DIR = Path("/opt/llama.cpp") | |
| # Binaries are in the 'build/bin' subdirectory from our out-of-source build | |
| LLAMA_CPP_QUANTIZE_SCRIPT = LLAMA_CPP_DIR / "build" / "bin" / "quantize" | |
| LLAMA_CPP_CONVERT_SCRIPT = LLAMA_CPP_DIR / "convert.py" | |
| if not LLAMA_CPP_QUANTIZE_SCRIPT.exists(): | |
| error_msg = "FATAL ERROR: llama.cpp binaries not found. The Docker build may have failed." | |
| logging.error(error_msg) | |
| raise RuntimeError(error_msg) | |
| def stage_1_analyze_model(model_id: str): | |
| log_stream = "[STAGE 1] Analyzing model...\n" | |
| try: | |
| config = AutoConfig.from_pretrained(model_id, trust_remote_code=True, token=HF_TOKEN) | |
| model_type = config.model_type | |
| analysis_report = f"""### Model Analysis Report\n- **Model ID:** `{model_id}`\n- **Architecture:** `{model_type}`""" | |
| recommendation = "" | |
| if 'llama' in model_type or 'gpt' in model_type or 'mistral' in model_type or 'gemma' in model_type: | |
| recommendation = "**Recommendation:** This is a Large Language Model (LLM). For the best CPU performance, the **GGUF Pipeline** (using llama.cpp) is highly recommended." | |
| else: | |
| recommendation = "**Recommendation:** This is likely an encoder model. The **ONNX Pipeline** is recommended." | |
| log_stream += f"Analysis complete. Architecture: {model_type}.\n" | |
| return log_stream, analysis_report + "\n" + recommendation, gr.Accordion(open=True) | |
| except Exception as e: | |
| error_msg = f"Failed to analyze model '{model_id}'. Error: {e}" | |
| logging.error(error_msg) | |
| return log_stream + error_msg, "Could not analyze model.", gr.Accordion(open=False) | |
| def stage_2_prune_model(model, prune_percentage: float): | |
| if prune_percentage == 0: | |
| return model, "Skipped pruning as percentage was 0." | |
| log_stream = "[STAGE 2] Pruning model...\n" | |
| for name, module in model.named_modules(): | |
| if isinstance(module, torch.nn.Linear): | |
| prune.l1_unstructured(module, name='weight', amount=prune_percentage / 100.0) | |
| prune.remove(module, 'weight') | |
| log_stream += f"Pruning complete with {prune_percentage}% target.\n" | |
| return model, log_stream | |
| def stage_3_4_onnx_quantize(model_path_or_id: str, onnx_quant_type: str, calibration_data_path: str): | |
| log_stream = "[STAGE 3 & 4] Converting to ONNX and Quantizing...\n" | |
| run_id = datetime.now().strftime("%Y%m%d-%H%M%S") | |
| model_name = model_path_or_id.split('/')[-1] | |
| onnx_path = os.path.join(OUTPUT_DIR, f"{model_name}-{run_id}-onnx") | |
| try: | |
| export_command = ["optimum-cli", "export", "onnx", "--model", model_path_or_id, "--trust-remote-code", onnx_path] | |
| process = subprocess.run(export_command, check=True, capture_output=True, text=True) | |
| log_stream += f"Executing `optimum-cli export onnx` for '{model_path_or_id}'...\n{process.stdout}\n" | |
| if process.stderr: log_stream += f"[STDERR]\n{process.stderr}\n" | |
| except subprocess.CalledProcessError as e: | |
| raise RuntimeError(f"Failed during `optimum-cli export onnx`. Error:\n{e.stderr}") | |
| try: | |
| quantizer = ORTQuantizer.from_pretrained(onnx_path) | |
| log_stream += "Performing DYNAMIC quantization...\n" | |
| dqconfig = AutoQuantizationConfig.avx512_vnni(is_static=False, per_channel=False) | |
| quantized_path = os.path.join(onnx_path, "quantized-dynamic") | |
| quantizer.quantize(save_dir=quantized_path, quantization_config=dqconfig) | |
| log_stream += f"Successfully quantized model to: {quantized_path}\n" | |
| if not os.path.exists(os.path.join(quantized_path, 'tokenizer_config.json')): | |
| AutoTokenizer.from_pretrained(model_path_or_id, trust_remote_code=True).save_pretrained(quantized_path) | |
| log_stream += "Saved new tokenizer files.\n" | |
| return quantized_path, log_stream | |
| except Exception as e: | |
| raise RuntimeError(f"Failed during ONNX quantization step. Error: {e}") | |
| def stage_3_4_gguf_quantize(model_path_or_id: str, original_model_id: str, quantization_strategy: str): | |
| log_stream = "[STAGE 3 & 4] Converting to GGUF using llama.cpp...\n" | |
| run_id = datetime.now().strftime("%Y%m%d-%H%M%S") | |
| model_name = original_model_id.replace('/', '_') | |
| gguf_path = os.path.abspath(os.path.join(OUTPUT_DIR, f"{model_name}-{run_id}-gguf")) | |
| os.makedirs(gguf_path, exist_ok=True) | |
| f16_gguf_path = os.path.join(gguf_path, "model-f16.gguf") | |
| quantized_gguf_path = os.path.join(gguf_path, "model.gguf") | |
| absolute_model_path = os.path.abspath(model_path_or_id) if os.path.exists(model_path_or_id) else model_path_or_id | |
| try: | |
| # The python script can be called directly using its absolute path. | |
| convert_command = ["python3", str(LLAMA_CPP_CONVERT_SCRIPT), absolute_model_path, "--outfile", f16_gguf_path, "--outtype", "f16"] | |
| process = subprocess.run(convert_command, check=True, capture_output=True, text=True) | |
| log_stream += f"Executing llama.cpp conversion script...\n{process.stdout}\n" | |
| if process.stderr: log_stream += f"[STDERR]\n{process.stderr}\n" | |
| quantize_map = {"q4_k_m": "Q4_K_M", "q5_k_m": "Q5_K_M", "q8_0": "Q8_0", "f16": "F16"} | |
| target_quant_name = quantize_map.get(quantization_strategy.lower(), "Q4_K_M") | |
| if target_quant_name == "F16": | |
| log_stream += "Target is F16, renaming file...\n" | |
| os.rename(f16_gguf_path, quantized_gguf_path) | |
| else: | |
| log_stream += f"Quantizing FP16 GGUF to {target_quant_name}...\n" | |
| quantize_command = [str(LLAMA_CPP_QUANTIZE_SCRIPT), f16_gguf_path, quantized_gguf_path, target_quant_name] | |
| process = subprocess.run(quantize_command, check=True, capture_output=True, text=True) | |
| log_stream += f"{process.stdout}\n" | |
| if process.stderr: log_stream += f"[STDERR]\n{process.stderr}\n" | |
| os.remove(f16_gguf_path) | |
| return gguf_path, log_stream | |
| except subprocess.CalledProcessError as e: | |
| raise RuntimeError(f"Failed during llama.cpp execution. Error:\n{e.stderr}") | |
| except Exception as e: | |
| raise RuntimeError(f"An unexpected error occurred during GGUF conversion. Error: {e}") | |
| def stage_5_package_and_upload(model_id: str, optimized_model_path: str, pipeline_log: str, options: dict): | |
| log_stream = "[STAGE 5] Packaging and Uploading...\n" | |
| if not HF_TOKEN: | |
| return "Skipping upload: HF_TOKEN not found.", log_stream + "Skipping upload: HF_TOKEN not found." | |
| try: | |
| repo_name = f"{model_id.split('/')[-1]}-amop-cpu-{options['pipeline_type'].lower()}" | |
| repo_url = api.create_repo(repo_id=repo_name, exist_ok=True, token=HF_TOKEN) | |
| template_file = "model_card_template_gguf.md" if options['pipeline_type'] == "GGUF" else "model_card_template.md" | |
| with open(template_file, "r", encoding="utf-8") as f: template_content = f.read() | |
| model_card_content = template_content.format(repo_name=repo_name, model_id=model_id, optimization_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), pruning_status="Enabled" if options.get('prune', False) else "Disabled", pruning_percent=options.get('prune_percent', 0), quant_type=options.get('quant_type', 'N/A'), repo_id=repo_url.repo_id, pipeline_log=pipeline_log) | |
| with open(os.path.join(optimized_model_path, "README.md"), "w", encoding="utf-8") as f: f.write(model_card_content) | |
| api.upload_folder(folder_path=optimized_model_path, repo_id=repo_url.repo_id, repo_type="model", token=HF_TOKEN) | |
| log_stream += "Upload complete.\n" | |
| return f"Success! Your optimized model is available at: huggingface.co/{repo_url.repo_id}", log_stream | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to upload to the Hub. Error: {e}") | |
| def run_amop_pipeline(model_id: str, pipeline_type: str, do_prune: bool, prune_percent: float, onnx_quant_type: str, calibration_file, gguf_quant_type: str): | |
| if not model_id: | |
| yield {log_output: "Please enter a Model ID.", final_output: "Idle"} | |
| return | |
| initial_log = f"[START] AMOP {pipeline_type} Pipeline Initiated for '{model_id}'.\n" | |
| yield {run_button: gr.Button(interactive=False, value="π Running..."), analyze_button: gr.Button(interactive=False), final_output: f"RUNNING ({pipeline_type})", log_output: initial_log} | |
| full_log = initial_log | |
| temp_model_dir = None | |
| model_path_or_id = model_id | |
| try: | |
| whoami = api.whoami(token=HF_TOKEN) | |
| if not whoami: raise RuntimeError("Could not authenticate with Hugging Face Hub. Check your HF_TOKEN.") | |
| repo_id_for_link = f"{whoami['name']}/{model_id.split('/')[-1]}-amop-cpu-{pipeline_type.lower()}" | |
| if do_prune and prune_percent > 0: | |
| full_log += f"\n[WARNING] Pruning is memory-intensive and may fail for large models.\n" | |
| yield {final_output: "Loading model (1/5)", log_output: full_log} | |
| model = AutoModel.from_pretrained(model_id, trust_remote_code=True) | |
| tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True) | |
| full_log += f"Successfully loaded '{model_id}'.\n" | |
| yield {final_output: "Pruning model (2/5)", log_output: full_log} | |
| model, log = stage_2_prune_model(model, prune_percent) | |
| full_log += log | |
| temp_model_dir = tempfile.mkdtemp() | |
| model.save_pretrained(temp_model_dir) | |
| tokenizer.save_pretrained(temp_model_dir) | |
| model_path_or_id = temp_model_dir | |
| full_log += f"Saved intermediate pruned model to {temp_model_dir}\n" | |
| else: | |
| full_log += "Pruning skipped.\n" | |
| if pipeline_type == "ONNX": | |
| yield {final_output: "Converting to ONNX (3/5)", log_output: full_log} | |
| optimized_path, log = stage_3_4_onnx_quantize(model_path_or_id, onnx_quant_type, calibration_file.name if onnx_quant_type == "Static" and calibration_file else None) | |
| options = {'pipeline_type': 'ONNX', 'prune': do_prune, 'prune_percent': prune_percent, 'quant_type': onnx_quant_type} | |
| elif pipeline_type == "GGUF": | |
| yield {final_output: "Converting to GGUF (3/5)", log_output: full_log} | |
| optimized_path, log = stage_3_4_gguf_quantize(model_path_or_id, model_id, gguf_quant_type) | |
| options = {'pipeline_type': 'GGUF', 'prune': do_prune, 'prune_percent': prune_percent, 'quant_type': gguf_quant_type} | |
| else: | |
| raise ValueError("Invalid pipeline type selected.") | |
| full_log += log | |
| yield {final_output: "Packaging & Uploading (4/5)", log_output: full_log} | |
| final_message, log = stage_5_package_and_upload(model_id, optimized_model_path, full_log, options) | |
| full_log += log | |
| yield {final_output: gr.update(value="SUCCESS", label="Status"), log_output: full_log, success_box: gr.Markdown(f"β **Success!** Model available: [{repo_id_for_link}](https://huggingface.co/{repo_id_for_link})", visible=True), run_button: gr.Button(interactive=True, value="Run Optimization Pipeline", variant="primary"), analyze_button: gr.Button(interactive=True, value="Analyze Model")} | |
| except Exception as e: | |
| logging.error(f"AMOP Pipeline failed. Error: {e}", exc_info=True) | |
| full_log += f"\n[ERROR] Pipeline failed: {e}" | |
| yield {final_output: gr.update(value="ERROR", label="Status"), log_output: full_log, success_box: gr.Markdown(f"β **An error occurred.** Check logs for details.", visible=True), run_button: gr.Button(interactive=True, value="Run Optimization Pipeline", variant="primary"), analyze_button: gr.Button(interactive=True, value="Analyze Model")} | |
| finally: | |
| if temp_model_dir and os.path.exists(temp_model_dir): | |
| shutil.rmtree(temp_model_dir) | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π AMOP: Adaptive Model Optimization Pipeline") | |
| if not HF_TOKEN: gr.Warning("HF_TOKEN not set! The final 'upload' step will be skipped.") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 1. Select a Model") | |
| model_id_input = gr.Textbox(label="Hugging Face Model ID", placeholder="e.g., gpt2, google/gemma-2b") | |
| analyze_button = gr.Button("π Analyze Model", variant="secondary") | |
| with gr.Accordion("βοΈ 2. Configure Optimization", open=False) as optimization_accordion: | |
| analysis_report_output = gr.Markdown() | |
| pipeline_type_radio = gr.Radio(["ONNX", "GGUF"], label="Select Optimization Pipeline") | |
| gr.Warning("Pruning requires high RAM and may fail for models >2B parameters on free Spaces.") | |
| prune_checkbox = gr.Checkbox(label="Enable Pruning (Optional)", value=False, info="Removes redundant weights before quantization.") | |
| prune_slider = gr.Slider(minimum=0, maximum=90, value=20, step=5, label="Pruning Percentage (%)", visible=True) | |
| with gr.Group(visible=False) as onnx_options: | |
| gr.Markdown("#### ONNX Options") | |
| onnx_quant_radio = gr.Radio(["Dynamic"], label="Quantization Type", value="Dynamic", info="Static quantization via UI is not supported.") | |
| calibration_file_upload = gr.File(visible=False) | |
| with gr.Group(visible=False) as gguf_options: | |
| gr.Markdown("#### GGUF Options") | |
| gguf_quant_dropdown = gr.Dropdown(["q4_k_m", "q5_k_m", "q8_0", "f16"], label="Quantization Strategy", value="q4_k_m") | |
| run_button = gr.Button("π Run Optimization Pipeline", variant="primary") | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Pipeline Status & Logs") | |
| final_output = gr.Label(value="Idle", label="Status") | |
| success_box = gr.Markdown(visible=False) | |
| log_output = gr.Textbox(label="Live Logs", lines=20, interactive=False) | |
| def update_ui_for_pipeline(pipeline_type): | |
| return {onnx_options: gr.Group(visible=pipeline_type=="ONNX"), gguf_options: gr.Group(visible=pipeline_type=="GGUF")} | |
| pipeline_type_radio.change(fn=update_ui_for_pipeline, inputs=pipeline_type_radio, outputs=[onnx_options, gguf_options]) | |
| analyze_button.click(fn=stage_1_analyze_model, inputs=[model_id_input], outputs=[log_output, analysis_report_output, optimization_accordion]) | |
| run_button.click(fn=run_amop_pipeline, | |
| inputs=[model_id_input, pipeline_type_radio, prune_checkbox, prune_slider, onnx_quant_radio, calibration_file_upload, gguf_quant_dropdown], | |
| outputs=[run_button, analyze_button, final_output, log_output, success_box]) | |
| if __name__ == "__main__": | |
| demo.queue().launch(debug=True) |