Abhaykoul's picture
Update app.py
e9c1ec4 verified
import gradio as gr
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import tempfile
import os
import hashlib
import time
from gradio import Progress
# Function to get OID from a raw Hugging Face LFS file URL
def get_lfs_oid(raw_url: str) -> str | None:
"""
Fetches the content of a raw Hugging Face LFS file URL and extracts the SHA256 OID.
"""
try:
response = requests.get(raw_url, timeout=10)
response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)
content = response.text
for line in content.splitlines():
if line.startswith("oid sha256:"):
return line.split("sha256:")[1].strip()
return None
except requests.exceptions.RequestException as e:
print(f"Error fetching OID from {raw_url}: {e}")
return None
# Function to get .safetensors file info (file list and OIDs) using only HTTP requests
def get_model_safetensors_info(model_id: str) -> tuple[dict, str]:
"""
Fetches safetensors file information for a Hugging Face model using HTTP requests.
Returns {filename: oid} and error_message.
"""
safetensors_oids = {}
error_message = ""
try:
# Use Hugging Face Hub REST API to get file list
api_url = f"https://huggingface.co/api/models/{model_id}"
resp = requests.get(api_url, timeout=10)
if resp.status_code != 200:
error_message += f"Could not fetch file list for {model_id}: HTTP {resp.status_code}\n"
return safetensors_oids, error_message
data = resp.json()
files = [f['rfilename'] for f in data.get('siblings', []) if f['rfilename'].endswith('.safetensors')]
if not files:
error_message += f"No .safetensors files found for {model_id}.\n"
return safetensors_oids, error_message
# Parallel OID fetch
def fetch_oid(f):
raw_url = f"https://huggingface.co/{model_id}/raw/main/{f}"
oid = get_lfs_oid(raw_url)
return f, oid
with ThreadPoolExecutor(max_workers=min(8, len(files))) as executor:
future_to_file = {executor.submit(fetch_oid, f): f for f in files}
for future in as_completed(future_to_file):
f, oid = future.result()
if oid:
safetensors_oids[f] = oid
else:
error_message += f"Could not get OID for {f} in {model_id}.\n"
except Exception as e:
error_message += f"Error fetching info for {model_id}: {e}\n"
return safetensors_oids, error_message
# Main comparison function (no config, only file structure and OIDs)
def compare_hf_models(model_id1: str, model_id2: str) -> str:
"""
Compares two Hugging Face models based on their safetensors OIDs.
"""
if not model_id1 or not model_id2:
return "Please provide both model IDs."
output = []
output.append(f"--- Fetching info for Model 1: {model_id1} ---")
oids1, err1 = get_model_safetensors_info(model_id1)
if err1: output.append(err1)
output.append(f"Found {len(oids1)} .safetensors files for {model_id1}.")
output.append(f"\n--- Fetching info for Model 2: {model_id2} ---")
oids2, err2 = get_model_safetensors_info(model_id2)
if err2: output.append(err2)
output.append(f"Found {len(oids2)} .safetensors files for {model_id2}.")
# 1. Compare Safetensors OIDs
output.append("\n--- Safetensors Weight File Comparison (via OID) ---")
if not oids1 and not oids2:
output.append("No .safetensors files found for either model. Cannot compare weights.")
weights_identical = False
elif not oids1:
output.append(f"No .safetensors files found for {model_id1}. Cannot compare weights.")
weights_identical = False
elif not oids2:
output.append(f"No .safetensors files found for {model_id2}. Cannot compare weights.")
weights_identical = False
else:
# Check if file lists are identical
files1_set = set(oids1.keys())
files2_set = set(oids2.keys())
if files1_set != files2_set:
output.append("The set of .safetensors files differs between models.")
output.append(f"Files in {model_id1} but not {model_id2}: {files1_set - files2_set}")
output.append(f"Files in {model_id2} but not {model_id1}: {files2_set - files1_set}")
weights_identical = False
else:
output.append("The models have the same set of .safetensors files.")
all_oids_match = True
diff_files = []
for filename in files1_set:
if oids1[filename] != oids2[filename]:
all_oids_match = False
diff_files.append(filename)
if all_oids_match:
output.append("All corresponding .safetensors OIDs are IDENTICAL.")
output.append(f"This strongly suggests '{model_id1}' and '{model_id2}' are 'copy-paste' models at the weight level.")
weights_identical = True
else:
output.append(f"Some .safetensors OIDs DIFFER. Differing files: {', '.join(diff_files)}")
output.append(f"This indicates different weights. If file structure is identical, '{model_id2}' could be a 'fine-tuned' version of '{model_id1}' (or vice-versa, or both fine-tuned from a common base).")
weights_identical = False
output.append("\n--- Summary ---")
if weights_identical:
output.append(f"Conclusion: Models '{model_id1}' and '{model_id2}' are IDENTICAL (copy-paste).")
else:
output.append(f"Conclusion: Models '{model_id1}' and '{model_id2}' have different weights or file structures. They are distinct or fine-tuned models.")
return "\n".join(output)
def multi_compare_hf_models(model_ids: list) -> tuple:
if not model_ids or len(model_ids) < 2:
return "Please provide at least two model IDs.", None, None
details = []
safetensors_data = {}
errors = {}
# Fetch all model info in parallel
with ThreadPoolExecutor(max_workers=min(8, len(model_ids))) as executor:
future_to_model = {executor.submit(get_model_safetensors_info, mid): mid for mid in model_ids}
for future in as_completed(future_to_model):
mid = future_to_model[future]
oids, err = future.result()
safetensors_data[mid] = oids
errors[mid] = err
# Build summary
summary = []
all_files = set()
for mid, oids in safetensors_data.items():
all_files.update(oids.keys())
all_files = sorted(all_files)
# Table header
table = [["File"] + model_ids + ["Match"]]
for f in all_files:
row = [f]
oids_for_file = []
for mid in model_ids:
oid = safetensors_data.get(mid, {}).get(f, "-")
oids_for_file.append(oid if oid else "-")
row.append(oid if oid else "-")
# Determine if all OIDs for this file match (ignoring missing)
present_oids = [oid for oid in oids_for_file if oid != "-"]
if len(present_oids) > 1 and all(oid == present_oids[0] for oid in present_oids):
row.append("Match")
else:
row.append("Unmatch")
table.append(row)
# Per-model details
for mid in model_ids:
oids = safetensors_data.get(mid, {})
summary.append(f"{mid}: {len(oids)} .safetensors files.")
if errors[mid]:
summary.append(f"Errors for {mid}: {errors[mid]}")
# File presence summary
for f in all_files:
present = [mid for mid in model_ids if f in safetensors_data.get(mid, {})]
if len(present) != len(model_ids):
summary.append(f"File '{f}' missing in: {set(model_ids) - set(present)}")
return "\n".join(summary), table, safetensors_data
def download_file(url, dest):
try:
r = requests.get(url, stream=True, timeout=30)
r.raise_for_status()
with open(dest, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
return True, ""
except Exception as e:
return False, str(e)
def download_file_with_progress(url, dest, progress: Progress = None, progress_offset=0, progress_scale=1):
try:
r = requests.get(url, stream=True, timeout=30)
r.raise_for_status()
total = int(r.headers.get('content-length', 0))
downloaded = 0
start_time = time.time()
last_update_time = start_time
update_interval = 1.0 # Update every 1 second for HF Spaces compatibility
if progress and total:
mb_total = total // 1024 // 1024
progress(progress_offset, desc=f"🎯 Starting: {os.path.basename(dest)} ({mb_total}MB)")
with open(dest, 'wb') as f:
for chunk in r.iter_content(chunk_size=65536): # 64KB chunks for better performance on HF Spaces
if chunk:
f.write(chunk)
downloaded += len(chunk)
current_time = time.time()
# Update progress less frequently for HF Spaces
if progress and total and (current_time - last_update_time) >= update_interval:
file_progress = downloaded / total
overall_progress = progress_offset + (file_progress * progress_scale)
# Calculate download speed
elapsed_time = current_time - start_time
if elapsed_time > 0:
speed_bps = downloaded / elapsed_time
speed_mbps = speed_bps / (1024 * 1024)
if speed_mbps >= 1:
speed_str = f"{speed_mbps:.1f}MB/s"
else:
speed_kbps = speed_bps / 1024
speed_str = f"{speed_kbps:.0f}KB/s"
else:
speed_str = "calculating..."
# Calculate ETA
if speed_bps > 0:
remaining_bytes = total - downloaded
eta_seconds = remaining_bytes / speed_bps
if eta_seconds < 60:
eta_str = f"{eta_seconds:.0f}s"
else:
eta_minutes = eta_seconds / 60
eta_str = f"{eta_minutes:.1f}min"
else:
eta_str = "calculating..."
mb_downloaded = downloaded // 1024 // 1024
mb_total = total // 1024 // 1024
# Simplified progress message for HF Spaces
progress(overall_progress,
desc=f"⬇️ {mb_downloaded}/{mb_total}MB ({file_progress*100:.0f}%) • {speed_str} • ETA: {eta_str}")
last_update_time = current_time
if progress:
final_time = time.time()
total_time = final_time - start_time
avg_speed = (downloaded / total_time) / (1024 * 1024) if total_time > 0 else 0
mb_total = total // 1024 // 1024
progress(progress_offset + progress_scale,
desc=f"✅ Complete: {mb_total}MB downloaded (avg {avg_speed:.1f}MB/s)")
return True, ""
except Exception as e:
if progress:
progress(progress_offset + progress_scale, desc=f"❌ Download failed: {str(e)[:50]}...")
return False, str(e)
def file_similarity(file1, file2, chunk_size=1024*1024):
"""
Compares two files byte-by-byte and returns percent similarity (by identical bytes).
"""
size1 = os.path.getsize(file1)
size2 = os.path.getsize(file2)
if size1 != size2:
return 0.0, f"File sizes differ: {size1} vs {size2} bytes."
total = size1
same = 0
with open(file1, 'rb') as f1, open(file2, 'rb') as f2:
while True:
b1 = f1.read(chunk_size)
b2 = f2.read(chunk_size)
if not b1:
break
for x, y in zip(b1, b2):
if x == y:
same += 1
percent = (same / total) * 100 if total else 0.0
return percent, None
# Gradio Interface
with gr.Blocks(theme="soft") as demo:
gr.Markdown(
"""
# 🤖 Hugging Face Model Cross-Checker
Easily check if two Hugging Face models are **identical (copy-paste)**, **fine-tuned**, or **completely different**—without downloading any weights!
- Enter two model IDs below (e.g. `deepseek-ai/DeepSeek-R1-0528` and `Parveshiiii/DeepSeek-R1-0528-MathX`).
- Click **Compare** to see a clear verdict and detailed breakdown.
"""
)
with gr.Row():
model1 = gr.Textbox(label="Model ID 1", placeholder="e.g. deepseek-ai/DeepSeek-R1-0528")
model2 = gr.Textbox(label="Model ID 2", placeholder="e.g. Parveshiiii/DeepSeek-R1-0528-MathX")
compare_btn = gr.Button("Compare")
verdict = gr.HighlightedText(label="Result Verdict", color_map={"Copy-Paste":"green","Fine-Tuned":"orange","Different":"red","Error":"gray"})
details = gr.Dataframe(headers=["File","Model 1 OID","Model 2 OID","Match"], label="File-by-File Comparison", interactive=False)
summary = gr.Textbox(label="Summary Details", lines=8, interactive=False)
def crosscheck_ui(m1, m2):
if not m1 or not m2:
return [("Error: Please provide both model IDs.", "Error")], [], ""
oids1, err1 = get_model_safetensors_info(m1)
oids2, err2 = get_model_safetensors_info(m2)
if err1 or err2:
return [(f"Error: {err1 or ''} {err2 or ''}", "Error")], [], ""
files = sorted(set(oids1.keys()) | set(oids2.keys()))
table = []
all_match = True
all_present = True
diff_count = 0
for f in files:
oid1 = oids1.get(f, "-")
oid2 = oids2.get(f, "-")
if oid1 == oid2 and oid1 != "-":
match = "Match"
else:
match = "Unmatch"
all_match = False
if oid1 != "-" and oid2 != "-":
diff_count += 1
if oid1 == "-" or oid2 == "-":
all_present = False
table.append([f, oid1, oid2, match])
# Verdict logic
if all_match and all_present and files:
verdict_text = [("Copy-Paste: Models are identical at the safetensors level!", "Copy-Paste")]
elif all_present and diff_count > 0:
verdict_text = [("Fine-Tuned: Same file structure, but weights differ.", "Fine-Tuned")]
else:
verdict_text = [("Different: File structure or weights are different.", "Different")]
# Summary
summary_lines = [
f"Model 1: {m1} ({len(oids1)} .safetensors files)",
f"Model 2: {m2} ({len(oids2)} .safetensors files)",
f"Files compared: {len(files)}",
f"Matching files: {sum(1 for row in table if row[3]=='Match')}",
f"Unmatched files: {sum(1 for row in table if row[3]=='Unmatch')}",
]
missing1 = [f for f in files if oids1.get(f) is None]
missing2 = [f for f in files if oids2.get(f) is None]
if missing1:
summary_lines.append(f"Files missing in Model 1: {', '.join(missing1)}")
if missing2:
summary_lines.append(f"Files missing in Model 2: {', '.join(missing2)}")
return verdict_text, table, "\n".join(summary_lines)
compare_btn.click(
fn=crosscheck_ui,
inputs=[model1, model2],
outputs=[verdict, details, summary]
)
with gr.Accordion("Advanced: Compare File Shards Bitwise", open=False):
gr.Markdown("""
## Compare a specific file (shard) from both models, byte-by-byte
- Enter the file name (e.g. `model-00001-of-00010.safetensors`).
- The tool will download this file from both models and compare their contents.
- Shows the percent of identical bytes (100% = exact copy).
""")
adv_model1 = gr.Textbox(label="Model ID 1", placeholder="e.g. deepseek-ai/DeepSeek-R1-0528")
adv_model2 = gr.Textbox(label="Model ID 2", placeholder="e.g. Parveshiiii/DeepSeek-R1-0528-MathX")
adv_filename = gr.Textbox(label="File Name", placeholder="e.g. model-00001-of-00010.safetensors")
adv_btn = gr.Button("Download & Compare File")
adv_result = gr.Textbox(label="Bitwise Comparison Result", lines=3, interactive=False)
def adv_compare(m1, m2, fname, progress=gr.Progress()):
if not m1 or not m2 or not fname:
return "Please provide both model IDs and the file name."
progress(0.0, desc="🚀 Initializing comparison...")
url1 = f"https://huggingface.co/{m1}/resolve/main/{fname}?download=true"
url2 = f"https://huggingface.co/{m2}/resolve/main/{fname}?download=true"
with tempfile.TemporaryDirectory() as tmp:
f1 = os.path.join(tmp, f"model1_{fname}")
f2 = os.path.join(tmp, f"model2_{fname}")
# Download first file (5% to 47.5%)
progress(0.05, desc=f"📡 Connecting to {m1.split('/')[-1]}...")
ok1, err1 = download_file_with_progress(url1, f1, progress, progress_offset=0.05, progress_scale=0.425)
if not ok1:
return f"❌ Download failed from {m1}: {err1}"
# Download second file (50% to 92.5%)
progress(0.5, desc=f"📡 Connecting to {m2.split('/')[-1]}...")
ok2, err2 = download_file_with_progress(url2, f2, progress, progress_offset=0.5, progress_scale=0.425)
if not ok2:
return f"❌ Download failed from {m2}: {err2}"
# Compare files (95% to 100%)
progress(0.95, desc="🔍 Analyzing files byte-by-byte...")
percent, err = file_similarity(f1, f2)
if err:
return f"❌ Comparison error: {err}"
progress(1.0, desc="✅ Analysis complete!")
# Get file info
size1 = os.path.getsize(f1)
size2 = os.path.getsize(f2)
size_mb = size1 // 1024 // 1024
# Enhanced result formatting
if percent == 100:
result_icon = "🟢"
result_text = "IDENTICAL"
elif percent >= 99:
result_icon = "🟡"
result_text = "NEARLY IDENTICAL"
elif percent >= 90:
result_icon = "🟠"
result_text = "SIMILAR"
else:
result_icon = "🔴"
result_text = "DIFFERENT"
return f"{result_icon} **{result_text}** ({percent:.3f}% similarity)\n📁 File size: {size_mb}MB\n🔗 Models: {m1.split('/')[-1]} vs {m2.split('/')[-1]}"
adv_btn.click(
fn=adv_compare,
inputs=[adv_model1, adv_model2, adv_filename],
outputs=[adv_result]
)
demo.launch()