Spaces:
Running
Running
import gradio as gr | |
import requests | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import tempfile | |
import os | |
import hashlib | |
import time | |
from gradio import Progress | |
# Function to get OID from a raw Hugging Face LFS file URL | |
def get_lfs_oid(raw_url: str) -> str | None: | |
""" | |
Fetches the content of a raw Hugging Face LFS file URL and extracts the SHA256 OID. | |
""" | |
try: | |
response = requests.get(raw_url, timeout=10) | |
response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx) | |
content = response.text | |
for line in content.splitlines(): | |
if line.startswith("oid sha256:"): | |
return line.split("sha256:")[1].strip() | |
return None | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching OID from {raw_url}: {e}") | |
return None | |
# Function to get .safetensors file info (file list and OIDs) using only HTTP requests | |
def get_model_safetensors_info(model_id: str) -> tuple[dict, str]: | |
""" | |
Fetches safetensors file information for a Hugging Face model using HTTP requests. | |
Returns {filename: oid} and error_message. | |
""" | |
safetensors_oids = {} | |
error_message = "" | |
try: | |
# Use Hugging Face Hub REST API to get file list | |
api_url = f"https://huggingface.co/api/models/{model_id}" | |
resp = requests.get(api_url, timeout=10) | |
if resp.status_code != 200: | |
error_message += f"Could not fetch file list for {model_id}: HTTP {resp.status_code}\n" | |
return safetensors_oids, error_message | |
data = resp.json() | |
files = [f['rfilename'] for f in data.get('siblings', []) if f['rfilename'].endswith('.safetensors')] | |
if not files: | |
error_message += f"No .safetensors files found for {model_id}.\n" | |
return safetensors_oids, error_message | |
# Parallel OID fetch | |
def fetch_oid(f): | |
raw_url = f"https://huggingface.co/{model_id}/raw/main/{f}" | |
oid = get_lfs_oid(raw_url) | |
return f, oid | |
with ThreadPoolExecutor(max_workers=min(8, len(files))) as executor: | |
future_to_file = {executor.submit(fetch_oid, f): f for f in files} | |
for future in as_completed(future_to_file): | |
f, oid = future.result() | |
if oid: | |
safetensors_oids[f] = oid | |
else: | |
error_message += f"Could not get OID for {f} in {model_id}.\n" | |
except Exception as e: | |
error_message += f"Error fetching info for {model_id}: {e}\n" | |
return safetensors_oids, error_message | |
# Main comparison function (no config, only file structure and OIDs) | |
def compare_hf_models(model_id1: str, model_id2: str) -> str: | |
""" | |
Compares two Hugging Face models based on their safetensors OIDs. | |
""" | |
if not model_id1 or not model_id2: | |
return "Please provide both model IDs." | |
output = [] | |
output.append(f"--- Fetching info for Model 1: {model_id1} ---") | |
oids1, err1 = get_model_safetensors_info(model_id1) | |
if err1: output.append(err1) | |
output.append(f"Found {len(oids1)} .safetensors files for {model_id1}.") | |
output.append(f"\n--- Fetching info for Model 2: {model_id2} ---") | |
oids2, err2 = get_model_safetensors_info(model_id2) | |
if err2: output.append(err2) | |
output.append(f"Found {len(oids2)} .safetensors files for {model_id2}.") | |
# 1. Compare Safetensors OIDs | |
output.append("\n--- Safetensors Weight File Comparison (via OID) ---") | |
if not oids1 and not oids2: | |
output.append("No .safetensors files found for either model. Cannot compare weights.") | |
weights_identical = False | |
elif not oids1: | |
output.append(f"No .safetensors files found for {model_id1}. Cannot compare weights.") | |
weights_identical = False | |
elif not oids2: | |
output.append(f"No .safetensors files found for {model_id2}. Cannot compare weights.") | |
weights_identical = False | |
else: | |
# Check if file lists are identical | |
files1_set = set(oids1.keys()) | |
files2_set = set(oids2.keys()) | |
if files1_set != files2_set: | |
output.append("The set of .safetensors files differs between models.") | |
output.append(f"Files in {model_id1} but not {model_id2}: {files1_set - files2_set}") | |
output.append(f"Files in {model_id2} but not {model_id1}: {files2_set - files1_set}") | |
weights_identical = False | |
else: | |
output.append("The models have the same set of .safetensors files.") | |
all_oids_match = True | |
diff_files = [] | |
for filename in files1_set: | |
if oids1[filename] != oids2[filename]: | |
all_oids_match = False | |
diff_files.append(filename) | |
if all_oids_match: | |
output.append("All corresponding .safetensors OIDs are IDENTICAL.") | |
output.append(f"This strongly suggests '{model_id1}' and '{model_id2}' are 'copy-paste' models at the weight level.") | |
weights_identical = True | |
else: | |
output.append(f"Some .safetensors OIDs DIFFER. Differing files: {', '.join(diff_files)}") | |
output.append(f"This indicates different weights. If file structure is identical, '{model_id2}' could be a 'fine-tuned' version of '{model_id1}' (or vice-versa, or both fine-tuned from a common base).") | |
weights_identical = False | |
output.append("\n--- Summary ---") | |
if weights_identical: | |
output.append(f"Conclusion: Models '{model_id1}' and '{model_id2}' are IDENTICAL (copy-paste).") | |
else: | |
output.append(f"Conclusion: Models '{model_id1}' and '{model_id2}' have different weights or file structures. They are distinct or fine-tuned models.") | |
return "\n".join(output) | |
def multi_compare_hf_models(model_ids: list) -> tuple: | |
if not model_ids or len(model_ids) < 2: | |
return "Please provide at least two model IDs.", None, None | |
details = [] | |
safetensors_data = {} | |
errors = {} | |
# Fetch all model info in parallel | |
with ThreadPoolExecutor(max_workers=min(8, len(model_ids))) as executor: | |
future_to_model = {executor.submit(get_model_safetensors_info, mid): mid for mid in model_ids} | |
for future in as_completed(future_to_model): | |
mid = future_to_model[future] | |
oids, err = future.result() | |
safetensors_data[mid] = oids | |
errors[mid] = err | |
# Build summary | |
summary = [] | |
all_files = set() | |
for mid, oids in safetensors_data.items(): | |
all_files.update(oids.keys()) | |
all_files = sorted(all_files) | |
# Table header | |
table = [["File"] + model_ids + ["Match"]] | |
for f in all_files: | |
row = [f] | |
oids_for_file = [] | |
for mid in model_ids: | |
oid = safetensors_data.get(mid, {}).get(f, "-") | |
oids_for_file.append(oid if oid else "-") | |
row.append(oid if oid else "-") | |
# Determine if all OIDs for this file match (ignoring missing) | |
present_oids = [oid for oid in oids_for_file if oid != "-"] | |
if len(present_oids) > 1 and all(oid == present_oids[0] for oid in present_oids): | |
row.append("Match") | |
else: | |
row.append("Unmatch") | |
table.append(row) | |
# Per-model details | |
for mid in model_ids: | |
oids = safetensors_data.get(mid, {}) | |
summary.append(f"{mid}: {len(oids)} .safetensors files.") | |
if errors[mid]: | |
summary.append(f"Errors for {mid}: {errors[mid]}") | |
# File presence summary | |
for f in all_files: | |
present = [mid for mid in model_ids if f in safetensors_data.get(mid, {})] | |
if len(present) != len(model_ids): | |
summary.append(f"File '{f}' missing in: {set(model_ids) - set(present)}") | |
return "\n".join(summary), table, safetensors_data | |
def download_file(url, dest): | |
try: | |
r = requests.get(url, stream=True, timeout=30) | |
r.raise_for_status() | |
with open(dest, 'wb') as f: | |
for chunk in r.iter_content(chunk_size=8192): | |
f.write(chunk) | |
return True, "" | |
except Exception as e: | |
return False, str(e) | |
def download_file_with_progress(url, dest, progress: Progress = None, progress_offset=0, progress_scale=1): | |
try: | |
r = requests.get(url, stream=True, timeout=30) | |
r.raise_for_status() | |
total = int(r.headers.get('content-length', 0)) | |
downloaded = 0 | |
start_time = time.time() | |
last_update_time = start_time | |
update_interval = 1.0 # Update every 1 second for HF Spaces compatibility | |
if progress and total: | |
mb_total = total // 1024 // 1024 | |
progress(progress_offset, desc=f"🎯 Starting: {os.path.basename(dest)} ({mb_total}MB)") | |
with open(dest, 'wb') as f: | |
for chunk in r.iter_content(chunk_size=65536): # 64KB chunks for better performance on HF Spaces | |
if chunk: | |
f.write(chunk) | |
downloaded += len(chunk) | |
current_time = time.time() | |
# Update progress less frequently for HF Spaces | |
if progress and total and (current_time - last_update_time) >= update_interval: | |
file_progress = downloaded / total | |
overall_progress = progress_offset + (file_progress * progress_scale) | |
# Calculate download speed | |
elapsed_time = current_time - start_time | |
if elapsed_time > 0: | |
speed_bps = downloaded / elapsed_time | |
speed_mbps = speed_bps / (1024 * 1024) | |
if speed_mbps >= 1: | |
speed_str = f"{speed_mbps:.1f}MB/s" | |
else: | |
speed_kbps = speed_bps / 1024 | |
speed_str = f"{speed_kbps:.0f}KB/s" | |
else: | |
speed_str = "calculating..." | |
# Calculate ETA | |
if speed_bps > 0: | |
remaining_bytes = total - downloaded | |
eta_seconds = remaining_bytes / speed_bps | |
if eta_seconds < 60: | |
eta_str = f"{eta_seconds:.0f}s" | |
else: | |
eta_minutes = eta_seconds / 60 | |
eta_str = f"{eta_minutes:.1f}min" | |
else: | |
eta_str = "calculating..." | |
mb_downloaded = downloaded // 1024 // 1024 | |
mb_total = total // 1024 // 1024 | |
# Simplified progress message for HF Spaces | |
progress(overall_progress, | |
desc=f"⬇️ {mb_downloaded}/{mb_total}MB ({file_progress*100:.0f}%) • {speed_str} • ETA: {eta_str}") | |
last_update_time = current_time | |
if progress: | |
final_time = time.time() | |
total_time = final_time - start_time | |
avg_speed = (downloaded / total_time) / (1024 * 1024) if total_time > 0 else 0 | |
mb_total = total // 1024 // 1024 | |
progress(progress_offset + progress_scale, | |
desc=f"✅ Complete: {mb_total}MB downloaded (avg {avg_speed:.1f}MB/s)") | |
return True, "" | |
except Exception as e: | |
if progress: | |
progress(progress_offset + progress_scale, desc=f"❌ Download failed: {str(e)[:50]}...") | |
return False, str(e) | |
def file_similarity(file1, file2, chunk_size=1024*1024): | |
""" | |
Compares two files byte-by-byte and returns percent similarity (by identical bytes). | |
""" | |
size1 = os.path.getsize(file1) | |
size2 = os.path.getsize(file2) | |
if size1 != size2: | |
return 0.0, f"File sizes differ: {size1} vs {size2} bytes." | |
total = size1 | |
same = 0 | |
with open(file1, 'rb') as f1, open(file2, 'rb') as f2: | |
while True: | |
b1 = f1.read(chunk_size) | |
b2 = f2.read(chunk_size) | |
if not b1: | |
break | |
for x, y in zip(b1, b2): | |
if x == y: | |
same += 1 | |
percent = (same / total) * 100 if total else 0.0 | |
return percent, None | |
# Gradio Interface | |
with gr.Blocks(theme="soft") as demo: | |
gr.Markdown( | |
""" | |
# 🤖 Hugging Face Model Cross-Checker | |
Easily check if two Hugging Face models are **identical (copy-paste)**, **fine-tuned**, or **completely different**—without downloading any weights! | |
- Enter two model IDs below (e.g. `deepseek-ai/DeepSeek-R1-0528` and `Parveshiiii/DeepSeek-R1-0528-MathX`). | |
- Click **Compare** to see a clear verdict and detailed breakdown. | |
""" | |
) | |
with gr.Row(): | |
model1 = gr.Textbox(label="Model ID 1", placeholder="e.g. deepseek-ai/DeepSeek-R1-0528") | |
model2 = gr.Textbox(label="Model ID 2", placeholder="e.g. Parveshiiii/DeepSeek-R1-0528-MathX") | |
compare_btn = gr.Button("Compare") | |
verdict = gr.HighlightedText(label="Result Verdict", color_map={"Copy-Paste":"green","Fine-Tuned":"orange","Different":"red","Error":"gray"}) | |
details = gr.Dataframe(headers=["File","Model 1 OID","Model 2 OID","Match"], label="File-by-File Comparison", interactive=False) | |
summary = gr.Textbox(label="Summary Details", lines=8, interactive=False) | |
def crosscheck_ui(m1, m2): | |
if not m1 or not m2: | |
return [("Error: Please provide both model IDs.", "Error")], [], "" | |
oids1, err1 = get_model_safetensors_info(m1) | |
oids2, err2 = get_model_safetensors_info(m2) | |
if err1 or err2: | |
return [(f"Error: {err1 or ''} {err2 or ''}", "Error")], [], "" | |
files = sorted(set(oids1.keys()) | set(oids2.keys())) | |
table = [] | |
all_match = True | |
all_present = True | |
diff_count = 0 | |
for f in files: | |
oid1 = oids1.get(f, "-") | |
oid2 = oids2.get(f, "-") | |
if oid1 == oid2 and oid1 != "-": | |
match = "Match" | |
else: | |
match = "Unmatch" | |
all_match = False | |
if oid1 != "-" and oid2 != "-": | |
diff_count += 1 | |
if oid1 == "-" or oid2 == "-": | |
all_present = False | |
table.append([f, oid1, oid2, match]) | |
# Verdict logic | |
if all_match and all_present and files: | |
verdict_text = [("Copy-Paste: Models are identical at the safetensors level!", "Copy-Paste")] | |
elif all_present and diff_count > 0: | |
verdict_text = [("Fine-Tuned: Same file structure, but weights differ.", "Fine-Tuned")] | |
else: | |
verdict_text = [("Different: File structure or weights are different.", "Different")] | |
# Summary | |
summary_lines = [ | |
f"Model 1: {m1} ({len(oids1)} .safetensors files)", | |
f"Model 2: {m2} ({len(oids2)} .safetensors files)", | |
f"Files compared: {len(files)}", | |
f"Matching files: {sum(1 for row in table if row[3]=='Match')}", | |
f"Unmatched files: {sum(1 for row in table if row[3]=='Unmatch')}", | |
] | |
missing1 = [f for f in files if oids1.get(f) is None] | |
missing2 = [f for f in files if oids2.get(f) is None] | |
if missing1: | |
summary_lines.append(f"Files missing in Model 1: {', '.join(missing1)}") | |
if missing2: | |
summary_lines.append(f"Files missing in Model 2: {', '.join(missing2)}") | |
return verdict_text, table, "\n".join(summary_lines) | |
compare_btn.click( | |
fn=crosscheck_ui, | |
inputs=[model1, model2], | |
outputs=[verdict, details, summary] | |
) | |
with gr.Accordion("Advanced: Compare File Shards Bitwise", open=False): | |
gr.Markdown(""" | |
## Compare a specific file (shard) from both models, byte-by-byte | |
- Enter the file name (e.g. `model-00001-of-00010.safetensors`). | |
- The tool will download this file from both models and compare their contents. | |
- Shows the percent of identical bytes (100% = exact copy). | |
""") | |
adv_model1 = gr.Textbox(label="Model ID 1", placeholder="e.g. deepseek-ai/DeepSeek-R1-0528") | |
adv_model2 = gr.Textbox(label="Model ID 2", placeholder="e.g. Parveshiiii/DeepSeek-R1-0528-MathX") | |
adv_filename = gr.Textbox(label="File Name", placeholder="e.g. model-00001-of-00010.safetensors") | |
adv_btn = gr.Button("Download & Compare File") | |
adv_result = gr.Textbox(label="Bitwise Comparison Result", lines=3, interactive=False) | |
def adv_compare(m1, m2, fname, progress=gr.Progress()): | |
if not m1 or not m2 or not fname: | |
return "Please provide both model IDs and the file name." | |
progress(0.0, desc="🚀 Initializing comparison...") | |
url1 = f"https://huggingface.co/{m1}/resolve/main/{fname}?download=true" | |
url2 = f"https://huggingface.co/{m2}/resolve/main/{fname}?download=true" | |
with tempfile.TemporaryDirectory() as tmp: | |
f1 = os.path.join(tmp, f"model1_{fname}") | |
f2 = os.path.join(tmp, f"model2_{fname}") | |
# Download first file (5% to 47.5%) | |
progress(0.05, desc=f"📡 Connecting to {m1.split('/')[-1]}...") | |
ok1, err1 = download_file_with_progress(url1, f1, progress, progress_offset=0.05, progress_scale=0.425) | |
if not ok1: | |
return f"❌ Download failed from {m1}: {err1}" | |
# Download second file (50% to 92.5%) | |
progress(0.5, desc=f"📡 Connecting to {m2.split('/')[-1]}...") | |
ok2, err2 = download_file_with_progress(url2, f2, progress, progress_offset=0.5, progress_scale=0.425) | |
if not ok2: | |
return f"❌ Download failed from {m2}: {err2}" | |
# Compare files (95% to 100%) | |
progress(0.95, desc="🔍 Analyzing files byte-by-byte...") | |
percent, err = file_similarity(f1, f2) | |
if err: | |
return f"❌ Comparison error: {err}" | |
progress(1.0, desc="✅ Analysis complete!") | |
# Get file info | |
size1 = os.path.getsize(f1) | |
size2 = os.path.getsize(f2) | |
size_mb = size1 // 1024 // 1024 | |
# Enhanced result formatting | |
if percent == 100: | |
result_icon = "🟢" | |
result_text = "IDENTICAL" | |
elif percent >= 99: | |
result_icon = "🟡" | |
result_text = "NEARLY IDENTICAL" | |
elif percent >= 90: | |
result_icon = "🟠" | |
result_text = "SIMILAR" | |
else: | |
result_icon = "🔴" | |
result_text = "DIFFERENT" | |
return f"{result_icon} **{result_text}** ({percent:.3f}% similarity)\n📁 File size: {size_mb}MB\n🔗 Models: {m1.split('/')[-1]} vs {m2.split('/')[-1]}" | |
adv_btn.click( | |
fn=adv_compare, | |
inputs=[adv_model1, adv_model2, adv_filename], | |
outputs=[adv_result] | |
) | |
demo.launch() |