AMOP / app.py
broadfield-dev's picture
Update app.py
8a1a33a verified
import gradio as gr
import torch
import os
import logging
import time
import tempfile
import shutil
import subprocess
from datetime import datetime
from pathlib import Path
from huggingface_hub import HfApi
from transformers import AutoConfig, AutoModel, AutoTokenizer
from optimum.onnxruntime import ORTQuantizer
from optimum.onnxruntime.configuration import AutoQuantizationConfig
import torch.nn.utils.prune as prune
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
HF_TOKEN = os.getenv("HF_TOKEN")
if not HF_TOKEN:
logging.warning("HF_TOKEN environment variable not set. Packaging and uploading will fail.")
api = HfApi()
OUTPUT_DIR = "/tmp/optimized_models"
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Use an absolute path to the pre-built location in /opt
LLAMA_CPP_DIR = Path("/opt/llama.cpp")
# Binaries are in the 'build/bin' subdirectory from our out-of-source build
LLAMA_CPP_QUANTIZE_SCRIPT = LLAMA_CPP_DIR / "build" / "bin" / "quantize"
LLAMA_CPP_CONVERT_SCRIPT = LLAMA_CPP_DIR / "convert.py"
if not LLAMA_CPP_QUANTIZE_SCRIPT.exists():
error_msg = "FATAL ERROR: llama.cpp binaries not found. The Docker build may have failed."
logging.error(error_msg)
raise RuntimeError(error_msg)
def stage_1_analyze_model(model_id: str):
log_stream = "[STAGE 1] Analyzing model...\n"
try:
config = AutoConfig.from_pretrained(model_id, trust_remote_code=True, token=HF_TOKEN)
model_type = config.model_type
analysis_report = f"""### Model Analysis Report\n- **Model ID:** `{model_id}`\n- **Architecture:** `{model_type}`"""
recommendation = ""
if 'llama' in model_type or 'gpt' in model_type or 'mistral' in model_type or 'gemma' in model_type:
recommendation = "**Recommendation:** This is a Large Language Model (LLM). For the best CPU performance, the **GGUF Pipeline** (using llama.cpp) is highly recommended."
else:
recommendation = "**Recommendation:** This is likely an encoder model. The **ONNX Pipeline** is recommended."
log_stream += f"Analysis complete. Architecture: {model_type}.\n"
return log_stream, analysis_report + "\n" + recommendation, gr.Accordion(open=True)
except Exception as e:
error_msg = f"Failed to analyze model '{model_id}'. Error: {e}"
logging.error(error_msg)
return log_stream + error_msg, "Could not analyze model.", gr.Accordion(open=False)
def stage_2_prune_model(model, prune_percentage: float):
if prune_percentage == 0:
return model, "Skipped pruning as percentage was 0."
log_stream = "[STAGE 2] Pruning model...\n"
for name, module in model.named_modules():
if isinstance(module, torch.nn.Linear):
prune.l1_unstructured(module, name='weight', amount=prune_percentage / 100.0)
prune.remove(module, 'weight')
log_stream += f"Pruning complete with {prune_percentage}% target.\n"
return model, log_stream
def stage_3_4_onnx_quantize(model_path_or_id: str, onnx_quant_type: str, calibration_data_path: str):
log_stream = "[STAGE 3 & 4] Converting to ONNX and Quantizing...\n"
run_id = datetime.now().strftime("%Y%m%d-%H%M%S")
model_name = model_path_or_id.split('/')[-1]
onnx_path = os.path.join(OUTPUT_DIR, f"{model_name}-{run_id}-onnx")
try:
export_command = ["optimum-cli", "export", "onnx", "--model", model_path_or_id, "--trust-remote-code", onnx_path]
process = subprocess.run(export_command, check=True, capture_output=True, text=True)
log_stream += f"Executing `optimum-cli export onnx` for '{model_path_or_id}'...\n{process.stdout}\n"
if process.stderr: log_stream += f"[STDERR]\n{process.stderr}\n"
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed during `optimum-cli export onnx`. Error:\n{e.stderr}")
try:
quantizer = ORTQuantizer.from_pretrained(onnx_path)
log_stream += "Performing DYNAMIC quantization...\n"
dqconfig = AutoQuantizationConfig.avx512_vnni(is_static=False, per_channel=False)
quantized_path = os.path.join(onnx_path, "quantized-dynamic")
quantizer.quantize(save_dir=quantized_path, quantization_config=dqconfig)
log_stream += f"Successfully quantized model to: {quantized_path}\n"
if not os.path.exists(os.path.join(quantized_path, 'tokenizer_config.json')):
AutoTokenizer.from_pretrained(model_path_or_id, trust_remote_code=True).save_pretrained(quantized_path)
log_stream += "Saved new tokenizer files.\n"
return quantized_path, log_stream
except Exception as e:
raise RuntimeError(f"Failed during ONNX quantization step. Error: {e}")
def stage_3_4_gguf_quantize(model_path_or_id: str, original_model_id: str, quantization_strategy: str):
log_stream = "[STAGE 3 & 4] Converting to GGUF using llama.cpp...\n"
run_id = datetime.now().strftime("%Y%m%d-%H%M%S")
model_name = original_model_id.replace('/', '_')
gguf_path = os.path.abspath(os.path.join(OUTPUT_DIR, f"{model_name}-{run_id}-gguf"))
os.makedirs(gguf_path, exist_ok=True)
f16_gguf_path = os.path.join(gguf_path, "model-f16.gguf")
quantized_gguf_path = os.path.join(gguf_path, "model.gguf")
absolute_model_path = os.path.abspath(model_path_or_id) if os.path.exists(model_path_or_id) else model_path_or_id
try:
# The python script can be called directly using its absolute path.
convert_command = ["python3", str(LLAMA_CPP_CONVERT_SCRIPT), absolute_model_path, "--outfile", f16_gguf_path, "--outtype", "f16"]
process = subprocess.run(convert_command, check=True, capture_output=True, text=True)
log_stream += f"Executing llama.cpp conversion script...\n{process.stdout}\n"
if process.stderr: log_stream += f"[STDERR]\n{process.stderr}\n"
quantize_map = {"q4_k_m": "Q4_K_M", "q5_k_m": "Q5_K_M", "q8_0": "Q8_0", "f16": "F16"}
target_quant_name = quantize_map.get(quantization_strategy.lower(), "Q4_K_M")
if target_quant_name == "F16":
log_stream += "Target is F16, renaming file...\n"
os.rename(f16_gguf_path, quantized_gguf_path)
else:
log_stream += f"Quantizing FP16 GGUF to {target_quant_name}...\n"
quantize_command = [str(LLAMA_CPP_QUANTIZE_SCRIPT), f16_gguf_path, quantized_gguf_path, target_quant_name]
process = subprocess.run(quantize_command, check=True, capture_output=True, text=True)
log_stream += f"{process.stdout}\n"
if process.stderr: log_stream += f"[STDERR]\n{process.stderr}\n"
os.remove(f16_gguf_path)
return gguf_path, log_stream
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed during llama.cpp execution. Error:\n{e.stderr}")
except Exception as e:
raise RuntimeError(f"An unexpected error occurred during GGUF conversion. Error: {e}")
def stage_5_package_and_upload(model_id: str, optimized_model_path: str, pipeline_log: str, options: dict):
log_stream = "[STAGE 5] Packaging and Uploading...\n"
if not HF_TOKEN:
return "Skipping upload: HF_TOKEN not found.", log_stream + "Skipping upload: HF_TOKEN not found."
try:
repo_name = f"{model_id.split('/')[-1]}-amop-cpu-{options['pipeline_type'].lower()}"
repo_url = api.create_repo(repo_id=repo_name, exist_ok=True, token=HF_TOKEN)
template_file = "model_card_template_gguf.md" if options['pipeline_type'] == "GGUF" else "model_card_template.md"
with open(template_file, "r", encoding="utf-8") as f: template_content = f.read()
model_card_content = template_content.format(repo_name=repo_name, model_id=model_id, optimization_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), pruning_status="Enabled" if options.get('prune', False) else "Disabled", pruning_percent=options.get('prune_percent', 0), quant_type=options.get('quant_type', 'N/A'), repo_id=repo_url.repo_id, pipeline_log=pipeline_log)
with open(os.path.join(optimized_model_path, "README.md"), "w", encoding="utf-8") as f: f.write(model_card_content)
api.upload_folder(folder_path=optimized_model_path, repo_id=repo_url.repo_id, repo_type="model", token=HF_TOKEN)
log_stream += "Upload complete.\n"
return f"Success! Your optimized model is available at: huggingface.co/{repo_url.repo_id}", log_stream
except Exception as e:
raise RuntimeError(f"Failed to upload to the Hub. Error: {e}")
def run_amop_pipeline(model_id: str, pipeline_type: str, do_prune: bool, prune_percent: float, onnx_quant_type: str, calibration_file, gguf_quant_type: str):
if not model_id:
yield {log_output: "Please enter a Model ID.", final_output: "Idle"}
return
initial_log = f"[START] AMOP {pipeline_type} Pipeline Initiated for '{model_id}'.\n"
yield {run_button: gr.Button(interactive=False, value="πŸš€ Running..."), analyze_button: gr.Button(interactive=False), final_output: f"RUNNING ({pipeline_type})", log_output: initial_log}
full_log = initial_log
temp_model_dir = None
model_path_or_id = model_id
try:
whoami = api.whoami(token=HF_TOKEN)
if not whoami: raise RuntimeError("Could not authenticate with Hugging Face Hub. Check your HF_TOKEN.")
repo_id_for_link = f"{whoami['name']}/{model_id.split('/')[-1]}-amop-cpu-{pipeline_type.lower()}"
if do_prune and prune_percent > 0:
full_log += f"\n[WARNING] Pruning is memory-intensive and may fail for large models.\n"
yield {final_output: "Loading model (1/5)", log_output: full_log}
model = AutoModel.from_pretrained(model_id, trust_remote_code=True)
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
full_log += f"Successfully loaded '{model_id}'.\n"
yield {final_output: "Pruning model (2/5)", log_output: full_log}
model, log = stage_2_prune_model(model, prune_percent)
full_log += log
temp_model_dir = tempfile.mkdtemp()
model.save_pretrained(temp_model_dir)
tokenizer.save_pretrained(temp_model_dir)
model_path_or_id = temp_model_dir
full_log += f"Saved intermediate pruned model to {temp_model_dir}\n"
else:
full_log += "Pruning skipped.\n"
if pipeline_type == "ONNX":
yield {final_output: "Converting to ONNX (3/5)", log_output: full_log}
optimized_path, log = stage_3_4_onnx_quantize(model_path_or_id, onnx_quant_type, calibration_file.name if onnx_quant_type == "Static" and calibration_file else None)
options = {'pipeline_type': 'ONNX', 'prune': do_prune, 'prune_percent': prune_percent, 'quant_type': onnx_quant_type}
elif pipeline_type == "GGUF":
yield {final_output: "Converting to GGUF (3/5)", log_output: full_log}
optimized_path, log = stage_3_4_gguf_quantize(model_path_or_id, model_id, gguf_quant_type)
options = {'pipeline_type': 'GGUF', 'prune': do_prune, 'prune_percent': prune_percent, 'quant_type': gguf_quant_type}
else:
raise ValueError("Invalid pipeline type selected.")
full_log += log
yield {final_output: "Packaging & Uploading (4/5)", log_output: full_log}
final_message, log = stage_5_package_and_upload(model_id, optimized_model_path, full_log, options)
full_log += log
yield {final_output: gr.update(value="SUCCESS", label="Status"), log_output: full_log, success_box: gr.Markdown(f"βœ… **Success!** Model available: [{repo_id_for_link}](https://huggingface.co/{repo_id_for_link})", visible=True), run_button: gr.Button(interactive=True, value="Run Optimization Pipeline", variant="primary"), analyze_button: gr.Button(interactive=True, value="Analyze Model")}
except Exception as e:
logging.error(f"AMOP Pipeline failed. Error: {e}", exc_info=True)
full_log += f"\n[ERROR] Pipeline failed: {e}"
yield {final_output: gr.update(value="ERROR", label="Status"), log_output: full_log, success_box: gr.Markdown(f"❌ **An error occurred.** Check logs for details.", visible=True), run_button: gr.Button(interactive=True, value="Run Optimization Pipeline", variant="primary"), analyze_button: gr.Button(interactive=True, value="Analyze Model")}
finally:
if temp_model_dir and os.path.exists(temp_model_dir):
shutil.rmtree(temp_model_dir)
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# πŸš€ AMOP: Adaptive Model Optimization Pipeline")
if not HF_TOKEN: gr.Warning("HF_TOKEN not set! The final 'upload' step will be skipped.")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 1. Select a Model")
model_id_input = gr.Textbox(label="Hugging Face Model ID", placeholder="e.g., gpt2, google/gemma-2b")
analyze_button = gr.Button("πŸ” Analyze Model", variant="secondary")
with gr.Accordion("βš™οΈ 2. Configure Optimization", open=False) as optimization_accordion:
analysis_report_output = gr.Markdown()
pipeline_type_radio = gr.Radio(["ONNX", "GGUF"], label="Select Optimization Pipeline")
gr.Warning("Pruning requires high RAM and may fail for models >2B parameters on free Spaces.")
prune_checkbox = gr.Checkbox(label="Enable Pruning (Optional)", value=False, info="Removes redundant weights before quantization.")
prune_slider = gr.Slider(minimum=0, maximum=90, value=20, step=5, label="Pruning Percentage (%)", visible=True)
with gr.Group(visible=False) as onnx_options:
gr.Markdown("#### ONNX Options")
onnx_quant_radio = gr.Radio(["Dynamic"], label="Quantization Type", value="Dynamic", info="Static quantization via UI is not supported.")
calibration_file_upload = gr.File(visible=False)
with gr.Group(visible=False) as gguf_options:
gr.Markdown("#### GGUF Options")
gguf_quant_dropdown = gr.Dropdown(["q4_k_m", "q5_k_m", "q8_0", "f16"], label="Quantization Strategy", value="q4_k_m")
run_button = gr.Button("πŸš€ Run Optimization Pipeline", variant="primary")
with gr.Column(scale=2):
gr.Markdown("### Pipeline Status & Logs")
final_output = gr.Label(value="Idle", label="Status")
success_box = gr.Markdown(visible=False)
log_output = gr.Textbox(label="Live Logs", lines=20, interactive=False)
def update_ui_for_pipeline(pipeline_type):
return {onnx_options: gr.Group(visible=pipeline_type=="ONNX"), gguf_options: gr.Group(visible=pipeline_type=="GGUF")}
pipeline_type_radio.change(fn=update_ui_for_pipeline, inputs=pipeline_type_radio, outputs=[onnx_options, gguf_options])
analyze_button.click(fn=stage_1_analyze_model, inputs=[model_id_input], outputs=[log_output, analysis_report_output, optimization_accordion])
run_button.click(fn=run_amop_pipeline,
inputs=[model_id_input, pipeline_type_radio, prune_checkbox, prune_slider, onnx_quant_radio, calibration_file_upload, gguf_quant_dropdown],
outputs=[run_button, analyze_button, final_output, log_output, success_box])
if __name__ == "__main__":
demo.queue().launch(debug=True)