|
|
import json |
|
|
import argparse |
|
|
import os |
|
|
from collections import Counter |
|
|
import math |
|
|
import numpy as np |
|
|
import matplotlib.pyplot as plt |
|
|
import seaborn as sns |
|
|
from difflib import SequenceMatcher |
|
|
from tqdm import tqdm |
|
|
|
|
|
|
|
|
sns.set_theme(style="whitegrid", context="paper") |
|
|
plt.rcParams.update({ |
|
|
"figure.dpi": 300, |
|
|
"savefig.dpi": 300, |
|
|
"font.size": 11, |
|
|
"axes.titlesize": 12, |
|
|
"axes.labelsize": 11, |
|
|
|
|
|
"font.family": "serif", |
|
|
|
|
|
"mathtext.fontset": "cm", |
|
|
"axes.unicode_minus": False, |
|
|
}) |
|
|
|
|
|
|
|
|
def get_longest_common_prefix(str_list: list[str]) -> str: |
|
|
"""Calculates the longest common prefix for a list of strings.""" |
|
|
if not str_list: |
|
|
return "" |
|
|
prefix = str_list[0] |
|
|
for s in str_list[1:]: |
|
|
while not s.startswith(prefix): |
|
|
prefix = prefix[:-1] |
|
|
if not prefix: |
|
|
return "" |
|
|
return prefix |
|
|
|
|
|
|
|
|
def get_character_entropy(s: str) -> float: |
|
|
"""Calculates the Shannon entropy for a string.""" |
|
|
if not s: |
|
|
return 0.0 |
|
|
counts = Counter(s) |
|
|
total_len = len(s) |
|
|
entropy = 0.0 |
|
|
for count in counts.values(): |
|
|
p = count / total_len |
|
|
entropy -= p * math.log2(p) |
|
|
return entropy |
|
|
|
|
|
|
|
|
def build_case_record(case: dict, tag: str | None = None) -> dict: |
|
|
"""Create a lightweight JSON-friendly summary of a collision case.""" |
|
|
record = { |
|
|
"num_raw_variants": case["num_raw_variants"], |
|
|
"raw_chunk_variants_preview": [v[:80] for v in case.get("raw_chunk_variants", [])], |
|
|
"analysis_plus": case.get("analysis_plus", {}), |
|
|
} |
|
|
if tag is not None: |
|
|
record["tag"] = tag |
|
|
return record |
|
|
|
|
|
|
|
|
|
|
|
def analyze_collision_report(report_path: str, output_dir: str, max_chars_for_diff: int = 256): |
|
|
if not os.path.exists(report_path): |
|
|
print(f"❌ Error: Report file not found at '{report_path}'") |
|
|
return |
|
|
|
|
|
print(f"🔍 Reading report file: {report_path}") |
|
|
with open(report_path, "r", encoding="utf-8") as f: |
|
|
all_collisions = json.load(f) |
|
|
|
|
|
if not all_collisions: |
|
|
print("🎉 No collisions found in the report. Nothing to analyze.") |
|
|
return |
|
|
|
|
|
print(f"Report contains {len(all_collisions)} colliding token sequences.") |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
print("\n--- 1. Enriching data with LCP and entropy statistics ---") |
|
|
enriched_collisions = [] |
|
|
for collision in tqdm(all_collisions, desc="Analyzing content features"): |
|
|
variants = collision["raw_chunk_variants"] |
|
|
lcp = get_longest_common_prefix(variants) |
|
|
avg_len = np.mean([len(v) for v in variants]) if variants else 0 |
|
|
lcp_ratio = len(lcp) / avg_len if avg_len > 0 else 0.0 |
|
|
lengths = [len(v) for v in variants] |
|
|
entropies = [get_character_entropy(v) for v in variants] |
|
|
|
|
|
collision["analysis_plus"] = { |
|
|
"lcp_ratio": float(lcp_ratio), |
|
|
"length_stats": { |
|
|
"min": int(min(lengths)), |
|
|
"max": int(max(lengths)), |
|
|
"mean": float(np.mean(lengths)), |
|
|
"std": float(np.std(lengths)), |
|
|
}, |
|
|
"entropy_stats": { |
|
|
"min": float(min(entropies)), |
|
|
"max": float(max(entropies)), |
|
|
"mean": float(np.mean(entropies)), |
|
|
}, |
|
|
} |
|
|
enriched_collisions.append(collision) |
|
|
|
|
|
|
|
|
num_variants_list = [c["num_raw_variants"] for c in enriched_collisions] |
|
|
lcp_ratios = [c["analysis_plus"]["lcp_ratio"] for c in enriched_collisions] |
|
|
entropy_means = [c["analysis_plus"]["entropy_stats"]["mean"] for c in enriched_collisions] |
|
|
|
|
|
|
|
|
print("\n--- 2. Selecting representative collision cases and generating text-based previews ---") |
|
|
|
|
|
|
|
|
max_collision_case = max(enriched_collisions, key=lambda c: c["num_raw_variants"]) |
|
|
min_collision_case = min(enriched_collisions, key=lambda c: c["num_raw_variants"]) |
|
|
|
|
|
|
|
|
high_lcp_case = max(enriched_collisions, key=lambda c: c["analysis_plus"]["lcp_ratio"]) |
|
|
low_lcp_case = min(enriched_collisions, key=lambda c: c["analysis_plus"]["lcp_ratio"]) |
|
|
|
|
|
analysis_summary = { |
|
|
"total_colliding_sequences": len(all_collisions), |
|
|
"representative_cases": { |
|
|
"max_collision": build_case_record(max_collision_case, "Maximum collision scale"), |
|
|
"min_collision": build_case_record(min_collision_case, "Minimum collision scale"), |
|
|
"high_lcp": build_case_record(high_lcp_case, "Highest LCP ratio"), |
|
|
"low_lcp": build_case_record(low_lcp_case, "Lowest LCP ratio"), |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
summary_report_path = os.path.join(output_dir, "final_analysis_summary.json") |
|
|
with open(summary_report_path, "w", encoding="utf-8") as f: |
|
|
json.dump(analysis_summary, f, indent=2, ensure_ascii=False) |
|
|
print(f"\n💾 Final structured analysis summary saved to: {summary_report_path}") |
|
|
print("\n--- 2. Aggregate visualization of collision patterns ---") |
|
|
|
|
|
|
|
|
print("Plotting collision scale histogram (Figure 10.1)...") |
|
|
fig1, ax1 = plt.subplots(figsize=(6.2, 4.0)) |
|
|
max_count = max(num_variants_list) |
|
|
bins = np.arange(1.5, max_count + 1.5, 1) |
|
|
sns.histplot( |
|
|
num_variants_list, |
|
|
bins=bins, |
|
|
discrete=True, |
|
|
shrink=0.8, |
|
|
ax=ax1, |
|
|
) |
|
|
ax1.set_yscale("log") |
|
|
|
|
|
ax1.set_xlabel("Raw chunks per compressed segment") |
|
|
ax1.set_ylabel("Compressed segments (log scale)") |
|
|
ax1.grid(True, which="both", linestyle="--", alpha=0.5) |
|
|
fig1.tight_layout() |
|
|
save_figure(fig1, output_dir, "1_collision_scale") |
|
|
|
|
|
|
|
|
print("Plotting LCP ratio histogram (Figure 10.2)...") |
|
|
fig2, ax2 = plt.subplots(figsize=(6.2, 4.0)) |
|
|
sns.histplot( |
|
|
lcp_ratios, |
|
|
bins=50, |
|
|
binrange=(0.0, 1.0), |
|
|
kde=False, |
|
|
ax=ax2, |
|
|
) |
|
|
|
|
|
ax2.set_xlabel("LCP ratio") |
|
|
ax2.set_ylabel("Compressed symbols") |
|
|
ax2.set_xlim(0.0, 1.0) |
|
|
ax2.grid(True, which="both", linestyle="--", alpha=0.5) |
|
|
fig2.tight_layout() |
|
|
save_figure(fig2, output_dir, "2_lcp_ratio") |
|
|
|
|
|
|
|
|
if len(lcp_ratios) > 1: |
|
|
print("Plotting 2D density of LCP ratio vs entropy (Figure 10.3)...") |
|
|
fig3, ax3 = plt.subplots(figsize=(6.2, 4.2)) |
|
|
|
|
|
sns.kdeplot( |
|
|
x=lcp_ratios, |
|
|
y=entropy_means, |
|
|
fill=True, |
|
|
thresh=0.01, |
|
|
levels=40, |
|
|
cmap="mako", |
|
|
ax=ax3, |
|
|
) |
|
|
ax3.set_title("Joint density of LCP ratio and character entropy") |
|
|
ax3.set_xlabel("LCP ratio") |
|
|
ax3.set_ylabel("Mean character entropy") |
|
|
ax3.set_xlim(0.0, 1.0) |
|
|
ax3.grid(True, which="both", linestyle="--", alpha=0.4) |
|
|
fig3.tight_layout() |
|
|
save_figure(fig3, output_dir, "3_lcp_vs_entropy") |
|
|
else: |
|
|
print("Not enough points to plot 2D KDE, skipping Figure 10.3.") |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
print("Plotting auxiliary edit-distance based figures (optional)...") |
|
|
avg_distances = [c["levenshtein_analysis"]["average_distance"] for c in enriched_collisions] |
|
|
|
|
|
|
|
|
fig4, ax4 = plt.subplots(figsize=(6.0, 4.0)) |
|
|
scatter = ax4.scatter( |
|
|
avg_distances, |
|
|
lcp_ratios, |
|
|
c=num_variants_list, |
|
|
cmap="viridis", |
|
|
alpha=0.6, |
|
|
s=np.log1p(num_variants_list) * 18, |
|
|
) |
|
|
cbar = fig4.colorbar(scatter, ax=ax4) |
|
|
cbar.set_label("Number of raw variants") |
|
|
ax4.set_title("Average Levenshtein distance vs LCP ratio") |
|
|
ax4.set_xlabel("Average Levenshtein distance") |
|
|
ax4.set_ylabel("LCP ratio") |
|
|
ax4.grid(True, linestyle="--", alpha=0.4) |
|
|
fig4.tight_layout() |
|
|
save_figure(fig4, output_dir, "4_distance_vs_lcp_scatter") |
|
|
|
|
|
|
|
|
len_stds = [c["analysis_plus"]["length_stats"]["std"] for c in enriched_collisions] |
|
|
fig5, ax5 = plt.subplots(figsize=(6.0, 4.0)) |
|
|
scatter2 = ax5.scatter( |
|
|
len_stds, |
|
|
entropy_means, |
|
|
c=lcp_ratios, |
|
|
cmap="plasma", |
|
|
alpha=0.7, |
|
|
s=np.log1p(num_variants_list) * 18, |
|
|
) |
|
|
cbar2 = fig5.colorbar(scatter2, ax=ax5) |
|
|
cbar2.set_label("LCP ratio") |
|
|
ax5.set_title("Length std. deviation vs mean character entropy") |
|
|
ax5.set_xlabel("Std. deviation of raw chunk length") |
|
|
ax5.set_ylabel("Mean character entropy") |
|
|
ax5.set_xscale("log") |
|
|
ax5.grid(True, which="both", linestyle="--", alpha=0.4) |
|
|
fig5.tight_layout() |
|
|
save_figure(fig5, output_dir, "5_length_std_vs_entropy_scatter") |
|
|
except KeyError: |
|
|
print("Some entries do not contain 'levenshtein_analysis'; skipping auxiliary edit-distance plots.") |
|
|
|
|
|
print("\n✅ All analyses complete! Please check the output directory for the summary JSON.") |
|
|
|
|
|
|
|
|
|
|
|
def save_figure(fig, output_dir: str, filename: str): |
|
|
""" |
|
|
Save a Matplotlib figure as both PNG and PDF with a common base filename. |
|
|
""" |
|
|
base = os.path.join(output_dir, filename) |
|
|
for ext in ("png", "pdf"): |
|
|
fig.savefig(f"{base}.{ext}", bbox_inches="tight") |
|
|
plt.close(fig) |
|
|
print(f"📁 Saved figure: {base}.png / .pdf") |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
parser = argparse.ArgumentParser( |
|
|
description="Perform an in-depth, multi-dimensional, and visual analysis of a token collision report.", |
|
|
formatter_class=argparse.RawTextHelpFormatter, |
|
|
) |
|
|
parser.add_argument( |
|
|
"report_json", |
|
|
type=str, |
|
|
help="Path to the token_sequence_collision_report.json file generated by the main analyzer.", |
|
|
) |
|
|
parser.add_argument( |
|
|
"-o", |
|
|
"--output_dir", |
|
|
type=str, |
|
|
default="final_deep_analysis", |
|
|
help="Output directory to store all analysis plots and summaries.", |
|
|
) |
|
|
|
|
|
args = parser.parse_args() |
|
|
analyze_collision_report(args.report_json, args.output_dir) |
|
|
|
|
|
|
|
|
""" |
|
|
python deep_visual_analysis.py analysis_output_token_collision/token_collision_report.json |
|
|
pip install numpy matplotlib seaborn tqdm |
|
|
""" |