Brianpuz's picture
fix the table
982b426 verified
import os
import subprocess
import signal
import tempfile
from pathlib import Path
import logging
import gradio as gr
from huggingface_hub import HfApi, ModelCard, whoami
from gradio_huggingfacehub_search import HuggingfaceHubSearch
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import numpy as np
import shutil
from copy import deepcopy
HF_TOKEN = os.environ.get("HF_TOKEN")
os.environ["GRADIO_ANALYTICS_ENABLED"] = "False"
CONVERSION_SCRIPT = "./llama.cpp/convert_hf_to_gguf.py"
log_dir = "/data/logs"
downloads_dir = "/data/downloads"
outputs_dir = "/data/outputs"
os.makedirs(log_dir, exist_ok=True)
logging.basicConfig(
filename=os.path.join(log_dir, "app.log"),
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
def get_llama_cpp_version():
try:
result = subprocess.run(
["git", "-C", "./llama.cpp", "describe", "--tags", "--always"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
text=True,
)
version = result.stdout.strip().split("-")[0]
return version
except subprocess.CalledProcessError as e:
logger.error("Error getting llama.cpp version: %s", e.stderr.strip())
return None
def get_repo_namespace(repo_owner: str, username: str, user_orgs: list) -> str:
if repo_owner == "self":
return username
for org in user_orgs:
if org["name"] == repo_owner:
return org["name"]
raise ValueError(f"Invalid repo_owner: {repo_owner}")
def escape(s: str) -> str:
return (
s.replace("&", "&")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
.replace("\n", "<br/>")
)
def toggle_repo_owner(export_to_org: bool, oauth_token: gr.OAuthToken | None) -> tuple:
if oauth_token is None or oauth_token.token is None:
raise gr.Error("You must be logged in to use quantize-my-repo")
if not export_to_org:
return gr.update(visible=False, choices=["self"], value="self"), gr.update(
visible=False, value=""
)
info = whoami(oauth_token.token)
orgs = [org["name"] for org in info.get("orgs", [])]
return gr.update(visible=True, choices=["self"] + orgs, value="self"), gr.update(
visible=True
)
def generate_importance_matrix(
model_path: str, train_data_path: str, output_path: str
) -> None:
imatrix_command = [
"./llama.cpp/llama-imatrix",
"-m",
model_path,
"-f",
train_data_path,
"-ngl",
"99",
"--output-frequency",
"10",
"-o",
output_path,
]
if not os.path.isfile(model_path):
raise FileNotFoundError(f"Model file not found: {model_path}")
logger.info("Running imatrix command...")
process = subprocess.Popen(imatrix_command, shell=False)
try:
process.wait(timeout=60)
except subprocess.TimeoutExpired:
logger.warning(
"Imatrix computation timed out. Sending SIGINT to allow graceful termination..."
)
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
logger.error(
"Imatrix proc still didn't term. Forecfully terming process..."
)
process.kill()
logger.info("Importance matrix generation completed.")
def split_upload_model(
model_path: str,
outdir: str,
repo_id: str,
oauth_token: gr.OAuthToken | None,
split_max_tensors: int = 256,
split_max_size: str | None = None,
org_token: str | None = None,
export_to_org: bool = False,
) -> None:
logger.info("Model path: %s", model_path)
logger.info("Output dir: %s", outdir)
if oauth_token is None or oauth_token.token is None:
raise ValueError("You have to be logged in.")
split_cmd = ["./llama.cpp/llama-gguf-split", "--split"]
if split_max_size:
split_cmd.extend(["--split-max-size", split_max_size])
else:
split_cmd.extend(["--split-max-tensors", str(split_max_tensors)])
model_path_prefix = ".".join(model_path.split(".")[:-1])
split_cmd.extend([model_path, model_path_prefix])
logger.info("Split command: %s", split_cmd)
result = subprocess.run(split_cmd, shell=False, capture_output=True, text=True)
logger.info("Split command stdout: %s", result.stdout)
logger.info("Split command stderr: %s", result.stderr)
if result.returncode != 0:
raise RuntimeError(f"Error splitting the model: {result.stderr}")
logger.info("Model split successfully!")
if os.path.exists(model_path):
os.remove(model_path)
model_file_prefix = model_path_prefix.split("/")[-1]
logger.info("Model file name prefix: %s", model_file_prefix)
sharded_model_files = [
f
for f in os.listdir(outdir)
if f.startswith(model_file_prefix) and f.endswith(".gguf")
]
if not sharded_model_files:
raise RuntimeError("No sharded files found.")
logger.info("Sharded model files: %s", sharded_model_files)
api = HfApi(token=org_token if (export_to_org and org_token) else oauth_token.token)
for file in sharded_model_files:
file_path = os.path.join(outdir, file)
logger.info("Uploading file: %s", file_path)
try:
api.upload_file(
path_or_fileobj=file_path,
path_in_repo=file,
repo_id=repo_id,
)
except Exception as e:
raise RuntimeError(f"Error uploading file {file_path}: {e}") from e
logger.info("Sharded model has been uploaded successfully!")
def get_new_model_card(
original_card: ModelCard,
original_model_id: str,
gguf_files: list,
new_repo_url: str,
split_model: bool,
) -> ModelCard:
version = get_llama_cpp_version()
model_card = deepcopy(original_card)
model_card.data.tags = (model_card.data.tags or []) + [
"antigma",
"quantize-my-repo",
]
model_card.data.base_model = original_model_id
# Format the table rows
table_rows = []
for file_info in gguf_files:
name, _, size, method = file_info
if split_model:
display_name = name[:-5]
else:
display_name = f"[{name}]({new_repo_url}/blob/main/{name})"
table_rows.append(f"{display_name}|{method}|{size:.2f} GB|{split_model}|\n")
model_card.text = f"""
*Produced by [Antigma Labs](https://antigma.ai), [Antigma Quantize Space](https://huggingface.co/spaces/Antigma/quantize-my-repo)*
*Follow Antigma Labs in X [https://x.com/antigma_labs](https://x.com/antigma_labs)*
*Antigma's GitHub Homepage [https://github.com/AntigmaLabs](https://github.com/AntigmaLabs)*
## Quantization Format (GGUF)
We use <a href="https://github.com/ggml-org/llama.cpp">llama.cpp</a> release <a href="https://github.com/ggml-org/llama.cpp/releases/tag/{version}">{version}</a> for quantization.
Original model: https://huggingface.co/{original_model_id}
## Download a file (not the whole branch) from below:
| Filename | Quant type | File Size | Split |
| -------- | ---------- | --------- | ----- |
| {'|'.join(table_rows)}
## Original Model Card
{original_card.text}
## Downloading using huggingface-cli
<details>
<summary>Click to view download instructions</summary>
First, make sure you have hugginface-cli installed:
```
pip install -U "huggingface_hub[cli]"
```
Then, you can target the specific file you want:
```
huggingface-cli download {new_repo_url} --include "{gguf_files[0][0]}" --local-dir ./
```
If the model is bigger than 50GB, it will have been split into multiple files. In order to download them all to a local folder, run:
```
huggingface-cli download {new_repo_url} --include "{gguf_files[0][0]}/*" --local-dir ./
```
You can either specify a new local-dir (e.g. deepseek-ai_DeepSeek-V3-0324-Q8_0) or it will be in default hugging face cache
</details>
"""
return model_card
def process_model(
model_id: str,
q_method: str | list,
use_imatrix: bool,
imatrix_q_method: str,
private_repo: bool,
train_data_file: gr.File | None,
split_model: bool,
split_max_tensors: int,
split_max_size: str | None,
export_to_org: bool,
repo_owner: str,
org_token: str | None,
oauth_token: gr.OAuthToken | None,
) -> tuple[str, str]:
if oauth_token is None or oauth_token.token is None:
raise gr.Error("You must be logged in to use quantize-my-repo")
try:
whoami(oauth_token.token)
except Exception as e:
raise gr.Error("You must be logged in to use quantize-my-repo") from e
user_info = whoami(oauth_token.token)
username = user_info["name"]
user_orgs = user_info.get("orgs", [])
if not export_to_org:
repo_owner = "self"
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.info(
"Time %s, Username %s, Model_ID %s, q_method %s",
current_time,
username,
model_id,
",".join(q_method) if isinstance(q_method, list) else q_method,
)
repo_namespace = get_repo_namespace(repo_owner, username, user_orgs)
model_name = model_id.split("/")[-1]
try:
api_token = org_token if (export_to_org and org_token) else oauth_token.token
api = HfApi(token=api_token)
dl_pattern = ["*.md", "*.json", "*.model"]
pattern = (
"*.safetensors"
if any(
f.path.endswith(".safetensors")
for f in api.list_repo_tree(repo_id=model_id, recursive=True)
)
else "*.bin"
)
dl_pattern.append(pattern)
os.makedirs(downloads_dir, exist_ok=True)
os.makedirs(outputs_dir, exist_ok=True)
with tempfile.TemporaryDirectory(dir=outputs_dir) as outdir:
fp16 = str(Path(outdir) / f"{model_name}.fp16.gguf")
with tempfile.TemporaryDirectory(dir=downloads_dir) as tmpdir:
logger.info("Start download")
local_dir = Path(tmpdir) / model_name
api.snapshot_download(
repo_id=model_id,
local_dir=local_dir,
local_dir_use_symlinks=False,
allow_patterns=dl_pattern,
)
config_dir = local_dir / "config.json"
adapter_config_dir = local_dir / "adapter_config.json"
if os.path.exists(adapter_config_dir) and not os.path.exists(
config_dir
):
raise RuntimeError(
"adapter_config.json is present. If converting LoRA, use GGUF-my-lora."
)
logger.info("Download successfully")
result = subprocess.run(
[
"python",
CONVERSION_SCRIPT,
local_dir,
"--outtype",
"f16",
"--outfile",
fp16,
],
shell=False,
capture_output=True,
)
logger.info("Converted to f16")
if result.returncode != 0:
raise RuntimeError(
f"Error converting to fp16: {result.stderr.decode()}"
)
shutil.rmtree(downloads_dir)
imatrix_path = Path(outdir) / "imatrix.dat"
if use_imatrix:
train_data_path = (
train_data_file.name
if train_data_file
else "llama.cpp/groups_merged.txt"
)
if not os.path.isfile(train_data_path):
raise FileNotFoundError(
f"Training data not found: {train_data_path}"
)
generate_importance_matrix(fp16, train_data_path, imatrix_path)
quant_methods = (
[imatrix_q_method]
if use_imatrix
else (q_method if isinstance(q_method, list) else [q_method])
)
suffix = "imat" if use_imatrix else None
gguf_files = []
for method in quant_methods:
logger.info("Begin quantize")
name = (
f"{model_name.lower()}-{method.lower()}-{suffix}.gguf"
if suffix
else f"{model_name.lower()}-{method.lower()}.gguf"
)
path = str(Path(outdir) / name)
quant_cmd = (
[
"./llama.cpp/llama-quantize",
"--imatrix",
imatrix_path,
fp16,
path,
method,
]
if use_imatrix
else ["./llama.cpp/llama-quantize", fp16, path, method]
)
result = subprocess.run(quant_cmd, shell=False, capture_output=True)
if result.returncode != 0:
raise RuntimeError(
f"Quantization failed ({method}): {result.stderr.decode()}"
)
size = os.path.getsize(path) / 1024 / 1024 / 1024
gguf_files.append((name, path, size, method))
logger.info("Quantize successfully!")
suffix_for_repo = (
f"{imatrix_q_method}-imat" if use_imatrix else "-".join(quant_methods)
)
repo_id = f"{repo_namespace}/{model_name}-GGUF"
new_repo_url = api.create_repo(
repo_id=repo_id, exist_ok=True, private=private_repo
)
try:
original_card = ModelCard.load(model_id, token=oauth_token.token)
except Exception:
original_card = ModelCard("")
card = get_new_model_card(
original_card, model_id, gguf_files, new_repo_url, split_model
)
readme_path = Path(outdir) / "README.md"
card.save(readme_path)
for name, path, _, _ in gguf_files:
if split_model:
split_upload_model(
path,
outdir,
repo_id,
oauth_token,
split_max_tensors,
split_max_size,
org_token,
export_to_org,
)
else:
api.upload_file(
path_or_fileobj=path, path_in_repo=name, repo_id=repo_id
)
if use_imatrix and os.path.isfile(imatrix_path):
api.upload_file(
path_or_fileobj=imatrix_path,
path_in_repo="imatrix.dat",
repo_id=repo_id,
)
api.upload_file(
path_or_fileobj=readme_path, path_in_repo="README.md", repo_id=repo_id
)
return (
f'<h1>✅ DONE</h1><br/>Repo: <a href="{new_repo_url}" target="_blank" style="text-decoration:underline">{repo_id}</a>',
f"llama{np.random.randint(9)}.png",
)
except Exception as e:
return (
f'<h1>❌ ERROR</h1><br/><pre style="white-space:pre-wrap;">{escape(str(e))}</pre>',
"error.png",
)
css = """/* Custom CSS to allow scrolling */
.gradio-container {overflow-y: auto;}
"""
model_id = HuggingfaceHubSearch(
label="Hub Model ID",
placeholder="Search for model id on Huggingface",
search_type="model",
)
export_to_org = gr.Checkbox(
label="Export to Organization Repository",
value=False,
info="If checked, you can select an organization to export to.",
)
repo_owner = gr.Dropdown(
choices=["self"], value="self", label="Repository Owner", visible=False
)
org_token = gr.Textbox(label="Org Access Token", type="password", visible=False)
q_method = gr.Dropdown(
[
"Q2_K",
"Q3_K_S",
"Q3_K_M",
"Q3_K_L",
"Q4_0",
"Q4_K_S",
"Q4_K_M",
"Q5_0",
"Q5_K_S",
"Q5_K_M",
"Q6_K",
"Q8_0",
],
label="Quantization Method",
info="GGML quantization type",
value="Q4_K_M",
filterable=False,
visible=True,
multiselect=True,
)
imatrix_q_method = gr.Dropdown(
["IQ3_M", "IQ3_XXS", "Q4_K_M", "Q4_K_S", "IQ4_NL", "IQ4_XS", "Q5_K_M", "Q5_K_S"],
label="Imatrix Quantization Method",
info="GGML imatrix quants type",
value="IQ4_NL",
filterable=False,
visible=False,
)
use_imatrix = gr.Checkbox(
value=False,
label="Use Imatrix Quantization",
info="Use importance matrix for quantization.",
)
private_repo = gr.Checkbox(
value=False, label="Private Repo", info="Create a private repo under your username."
)
train_data_file = gr.File(label="Training Data File", file_types=["txt"], visible=False)
split_model = gr.Checkbox(
value=False, label="Split Model", info="Shard the model using gguf-split."
)
split_max_tensors = gr.Number(
value=256,
label="Max Tensors per File",
info="Maximum number of tensors per file when splitting model.",
visible=False,
)
split_max_size = gr.Textbox(
label="Max File Size",
info="Maximum file size when splitting model (--split-max-size). May leave empty to use the default. Accepted suffixes: M, G. Example: 256M, 5G",
visible=False,
)
iface = gr.Interface(
fn=process_model,
inputs=[
model_id,
q_method,
use_imatrix,
imatrix_q_method,
private_repo,
train_data_file,
split_model,
split_max_tensors,
split_max_size,
export_to_org,
repo_owner,
org_token,
],
outputs=[gr.Markdown(label="Output"), gr.Image(show_label=False)],
title="Make your own GGUF Quants — faster than ever before, believe me.",
description="We take your Hugging Face repo — a terrific repo — we quantize it, we package it beautifully, and we give you your very own repo. It's smart. It's efficient. It's huge. You're gonna love it.",
api_name=False,
)
with gr.Blocks(css=".gradio-container {overflow-y: auto;}") as demo:
gr.Markdown("Logged in, you must be. Classy, secure, and victorious, it keeps us.")
gr.LoginButton(min_width=250)
export_to_org.change(
fn=toggle_repo_owner, inputs=[export_to_org], outputs=[repo_owner, org_token]
)
split_model.change(
fn=lambda sm: (gr.update(visible=sm), gr.update(visible=sm)),
inputs=split_model,
outputs=[split_max_tensors, split_max_size],
)
use_imatrix.change(
fn=lambda use: (
gr.update(visible=not use),
gr.update(visible=use),
gr.update(visible=use),
),
inputs=use_imatrix,
outputs=[q_method, imatrix_q_method, train_data_file],
)
iface.render()
def restart_space():
HfApi().restart_space(
repo_id="Antigma/quantize-my-repo", token=HF_TOKEN, factory_reboot=True
)
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, "interval", seconds=86400)
scheduler.start()
demo.queue(default_concurrency_limit=1, max_size=5).launch(debug=True, show_api=False)