| | """ |
| | NER Anonymization Evaluator — نسخه بهبودیافته |
| | بر اساس app (70).py با این بهبودها: |
| | ✅ fix-1 : رفع باگ ایندکس — AMOUNT-1 == AMOUNT-01 |
| | ✅ fix-2 : تحلیل FP/FN به تفکیک COMPANY/PERSON/AMOUNT/PERCENT |
| | ✅ fix-3 : جدول سطرهای ناقص (F1 < 1) جداگانه |
| | ✅ fix-4 : نمودار ستونی توزیع F1 |
| | ✅ fix-5 : مقایسه چند فایل CSV کنار هم |
| | ✅ fix-6 : نمایش سیر پیشرفت (تاریخچه اجراها) |
| | ✅ fix-7 : نمایش بهتر missed/extra در جدول خطا |
| | """ |
| |
|
| | import pandas as pd |
| | import re |
| | from typing import Dict, List, Set, Tuple |
| | import gradio as gr |
| | from datetime import datetime |
| | import tempfile |
| | import os |
| | from collections import Counter |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class NERBenchmarkEvaluator: |
| |
|
| | ENTITY_TYPES = ["COMPANY", "PERSON", "AMOUNT", "PERCENT"] |
| |
|
| | def __init__(self): |
| | self.results_df = None |
| | self.run_history: List[Dict] = [] |
| |
|
| | |
| | self.pattern = re.compile( |
| | r'\b(COMPANY|company|PERSON|person|AMOUNT|amount|PERCENT|percent|GROUP|group|STOCK|stock)-(\d+)\b' |
| | ) |
| |
|
| | |
| |
|
| | def extract_entities(self, text: str) -> Set[Tuple[str, str]]: |
| | """ |
| | استخراج entities. |
| | ✅ fix-1: ایندکس نرمال میشود: '1' → '01', '5' → '05' |
| | """ |
| | if pd.isna(text) or not isinstance(text, str): |
| | return set() |
| | entities = set() |
| | for m in self.pattern.finditer(text): |
| | etype = m.group(1).upper() |
| | eidx = f"{int(m.group(2)):02d}" |
| | entities.add((etype, eidx)) |
| | return entities |
| |
|
| | |
| |
|
| | def calc_metrics(self, ref: Set, pred: Set) -> Dict: |
| | if len(ref) == 0 and len(pred) == 0: |
| | return dict(tp=0, fp=0, fn=0, precision=1.0, recall=1.0, f1=1.0, |
| | matched=[], missed=[], extra=[]) |
| | tp = len(ref & pred) |
| | fp = len(pred - ref) |
| | fn = len(ref - pred) |
| | p = tp / (tp + fp) if (tp + fp) > 0 else 0.0 |
| | r = tp / (tp + fn) if (tp + fn) > 0 else 0.0 |
| | f1 = 2*p*r / (p+r) if (p+r) > 0 else 0.0 |
| | return dict( |
| | tp=tp, fp=fp, fn=fn, |
| | precision=round(p,4), recall=round(r,4), f1=round(f1,4), |
| | matched=sorted(ref & pred), |
| | missed =sorted(ref - pred), |
| | extra =sorted(pred - ref), |
| | ) |
| |
|
| | |
| |
|
| | def evaluate_dataset(self, file_path: str) -> Tuple[bool, str, pd.DataFrame, pd.DataFrame, pd.DataFrame]: |
| | """ |
| | Returns: (success, summary_md, full_df, bad_df, chart_df) |
| | """ |
| | try: |
| | df = pd.read_csv(file_path, encoding='utf-8-sig') |
| | except Exception as e: |
| | return False, f"❌ خطا در خواندن فایل:\n{e}", *[pd.DataFrame()]*3 |
| |
|
| | |
| | ref_col = next((c for c in df.columns if 'ref' in c.lower()), None) |
| | pred_col = next((c for c in df.columns if 'anon' in c.lower()), None) |
| | if not ref_col or not pred_col: |
| | return False, f"❌ ستونهای مورد نیاز یافت نشد\nستونها: {list(df.columns)}", \ |
| | *[pd.DataFrame()]*3 |
| |
|
| | |
| | rows_out = [] |
| | fp_by_type = Counter() |
| | fn_by_type = Counter() |
| |
|
| | for i, row in df.iterrows(): |
| | ref_ents = self.extract_entities(str(row[ref_col])) |
| | pred_ents = self.extract_entities(str(row[pred_col])) |
| | m = self.calc_metrics(ref_ents, pred_ents) |
| |
|
| | |
| | for t, _ in m['extra']: fp_by_type[t] += 1 |
| | for t, _ in m['missed']: fn_by_type[t] += 1 |
| |
|
| | rows_out.append({ |
| | 'ردیف' : i + 1, |
| | 'F1' : m['f1'], |
| | 'Precision' : m['precision'], |
| | 'Recall' : m['recall'], |
| | 'TP' : m['tp'], |
| | 'FP' : m['fp'], |
| | 'FN' : m['fn'], |
| | 'missed' : ", ".join(f"{t}-{idx}" for t,idx in m['missed']) or "—", |
| | 'extra' : ", ".join(f"{t}-{idx}" for t,idx in m['extra']) or "—", |
| | 'ref_entities' : str(sorted(list(ref_ents))), |
| | 'pred_entities' : str(sorted(list(pred_ents))), |
| | }) |
| |
|
| | results_df = pd.DataFrame(rows_out) |
| | self.results_df = results_df |
| |
|
| | |
| | f1s = results_df['F1'] |
| | avg_f1 = f1s.mean() |
| | avg_p = results_df['Precision'].mean() |
| | avg_r = results_df['Recall'].mean() |
| | total_tp = results_df['TP'].sum() |
| | total_fp = results_df['FP'].sum() |
| | total_fn = results_df['FN'].sum() |
| | perfect = (f1s == 1.0).sum() |
| |
|
| | micro_p = total_tp / (total_tp + total_fp) if (total_tp + total_fp) > 0 else 0 |
| | micro_r = total_tp / (total_tp + total_fn) if (total_tp + total_fn) > 0 else 0 |
| | micro_f1 = 2*micro_p*micro_r/(micro_p+micro_r) if (micro_p+micro_r) > 0 else 0 |
| |
|
| | dist = { |
| | "عالی (=1.0)" : int((f1s == 1.0).sum()), |
| | "خوب (≥0.9)" : int(((f1s >= 0.9)&(f1s<1.0)).sum()), |
| | "متوسط(≥0.7)" : int(((f1s >= 0.7)&(f1s<0.9)).sum()), |
| | "ضعیف (<0.7)" : int((f1s < 0.7).sum()), |
| | } |
| |
|
| | |
| | entity_rows = "" |
| | for et in self.ENTITY_TYPES: |
| | fp = fp_by_type.get(et, 0) |
| | fn = fn_by_type.get(et, 0) |
| | bar_fp = "🔴" * min(fp, 10) |
| | bar_fn = "🔵" * min(fn, 10) |
| | entity_rows += f"| {et:8s} | {fp:3d} {bar_fp} | {fn:3d} {bar_fn} |\n" |
| |
|
| | |
| | fname = os.path.basename(file_path) |
| | self.run_history.append({ |
| | 'فایل' : fname, |
| | 'زمان' : datetime.now().strftime("%H:%M:%S"), |
| | 'Macro F1' : round(avg_f1, 4), |
| | 'Micro F1' : round(micro_f1, 4), |
| | 'Precision' : round(avg_p, 4), |
| | 'Recall' : round(avg_r, 4), |
| | 'کامل' : f"{perfect}/{len(df)}", |
| | }) |
| |
|
| | summary = f"""## 📊 نتایج ارزیابی — {fname} |
| | |
| | | متریک | مقدار | |
| | |-------|-------| |
| | | **Macro F1** | `{avg_f1:.4f}` ({avg_f1*100:.2f}%) | |
| | | **Micro F1** | `{micro_f1:.4f}` ({micro_f1*100:.2f}%) | |
| | | Precision | `{avg_p:.4f}` | |
| | | Recall | `{avg_r:.4f}` | |
| | | کامل (F1=1) | **{perfect}/{len(df)}** | |
| | | FP کل | {total_fp} | |
| | | FN کل | {total_fn} | |
| | |
| | --- |
| | ### 📊 توزیع F1 |
| | | سطح | تعداد | درصد | |
| | |-----|-------|------| |
| | | عالی (F1=1.0) | {dist['عالی (=1.0)']} | {dist['عالی (=1.0)']/len(df)*100:.0f}% | |
| | | خوب (≥0.9) | {dist['خوب (≥0.9)']} | {dist['خوب (≥0.9)']/len(df)*100:.0f}% | |
| | | متوسط (≥0.7) | {dist['متوسط(≥0.7)']} | {dist['متوسط(≥0.7)']/len(df)*100:.0f}% | |
| | | ضعیف (<0.7) | {dist['ضعیف (<0.7)']} | {dist['ضعیف (<0.7)']/len(df)*100:.0f}% | |
| | |
| | --- |
| | ### 🔍 خطا به تفکیک نوع Entity |
| | | نوع | FP (اضافی) | FN (جاافتاده) | |
| | |-----|-----------|--------------| |
| | {entity_rows} |
| | """ |
| | |
| | bad_df = results_df[results_df['F1'] < 1.0][ |
| | ['ردیف', 'F1', 'TP', 'FP', 'FN', 'missed', 'extra'] |
| | ].reset_index(drop=True) |
| |
|
| | |
| | chart_df = pd.DataFrame({ |
| | 'سطح F1': list(dist.keys()), |
| | 'تعداد' : list(dist.values()), |
| | }) |
| |
|
| | return True, summary, results_df, bad_df, chart_df |
| |
|
| | def save_csv(self) -> str: |
| | if self.results_df is None or self.results_df.empty: |
| | return None |
| | ts = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | path = os.path.join(tempfile.gettempdir(), f"benchmark_{ts}.csv") |
| | self.results_df.to_csv(path, index=False, encoding='utf-8-sig') |
| | return path |
| |
|
| | def get_history_df(self) -> pd.DataFrame: |
| | if not self.run_history: |
| | return pd.DataFrame() |
| | return pd.DataFrame(self.run_history) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def compare_files(files) -> Tuple[str, pd.DataFrame]: |
| | if not files: |
| | return "❌ هیچ فایلی انتخاب نشده", pd.DataFrame() |
| |
|
| | ev = NERBenchmarkEvaluator() |
| | rows = [] |
| | for f in files: |
| | path = f if isinstance(f, str) else f.name |
| | ok, _, df, _, _ = ev.evaluate_dataset(path) |
| | if not ok or df.empty: |
| | continue |
| | f1s = df['F1'] |
| | tp = df['TP'].sum(); fp = df['FP'].sum(); fn = df['FN'].sum() |
| | mp = tp/(tp+fp) if (tp+fp)>0 else 0 |
| | mr = tp/(tp+fn) if (tp+fn)>0 else 0 |
| | mf1= 2*mp*mr/(mp+mr) if (mp+mr)>0 else 0 |
| | rows.append({ |
| | 'فایل' : os.path.basename(path), |
| | 'Macro F1' : round(f1s.mean(), 4), |
| | 'Micro F1' : round(mf1, 4), |
| | 'Precision' : round(df['Precision'].mean(), 4), |
| | 'Recall' : round(df['Recall'].mean(), 4), |
| | 'کامل (F1=1)' : int((f1s==1.0).sum()), |
| | 'ضعیف (F1<0.7)' : int((f1s<0.7).sum()), |
| | 'تعداد ردیف' : len(df), |
| | }) |
| |
|
| | if not rows: |
| | return "❌ هیچ فایل معتبری پردازش نشد", pd.DataFrame() |
| |
|
| | cdf = pd.DataFrame(rows) |
| | best = cdf['Macro F1'].max() |
| | summary = f"### مقایسه {len(rows)} فایل — بهترین Macro F1: `{best:.4f}` ({best*100:.2f}%)" |
| | return summary, cdf |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def create_interface(): |
| | evaluator = NERBenchmarkEvaluator() |
| |
|
| | with gr.Blocks(title="NER Benchmark Evaluator", theme=gr.themes.Soft()) as demo: |
| |
|
| | gr.Markdown(""" |
| | # 🎯 ارزیاب بنچمارک ناشناسسازی NER |
| | نسخه بهبودیافته با رفع باگ ایندکس، تحلیل تفکیکی entity و مقایسه چند فایل |
| | """) |
| |
|
| | |
| | with gr.Tab("📊 ارزیابی فایل"): |
| |
|
| | with gr.Row(): |
| | file_input = gr.File(label="فایل CSV", file_types=[".csv"]) |
| | evaluate_btn = gr.Button("🚀 شروع ارزیابی", variant="primary", size="lg") |
| |
|
| | summary_md = gr.Markdown("فایل CSV را بارگذاری کنید...") |
| |
|
| | |
| | chart_plot = gr.BarPlot( |
| | x="سطح F1", y="تعداد", |
| | title="توزیع F1", |
| | visible=False, |
| | color="سطح F1", |
| | ) |
| |
|
| | |
| | gr.Markdown("### ❌ سطرهای ناقص (F1 < 1.0)") |
| | bad_table = gr.Dataframe( |
| | label="سطرهای با F1 کمتر از 1", |
| | visible=False, |
| | wrap=True, |
| | ) |
| |
|
| | gr.Markdown("### 📋 جدول کامل نتایج (10 سطر اول)") |
| | full_table = gr.Dataframe( |
| | label="همه نتایج", |
| | visible=False, |
| | wrap=True, |
| | ) |
| |
|
| | with gr.Row(): |
| | download_btn = gr.Button("💾 دانلود CSV", visible=False, variant="secondary") |
| | download_file = gr.File(visible=False) |
| |
|
| | |
| | with gr.Tab("🔄 مقایسه چند فایل"): |
| | gr.Markdown("چند فایل نتایج را با هم مقایسه کنید (مثلاً n4، n5، n6)") |
| | multi_files = gr.File( |
| | label="فایلهای CSV", |
| | file_count="multiple", |
| | file_types=[".csv"] |
| | ) |
| | compare_btn = gr.Button("🔄 مقایسه", variant="primary") |
| | compare_md = gr.Markdown() |
| | compare_tbl = gr.Dataframe(label="جدول مقایسه", visible=False) |
| |
|
| | |
| | with gr.Tab("📈 سیر پیشرفت"): |
| | gr.Markdown("هر بار که یک فایل ارزیابی میکنید، نتیجه اینجا ذخیره میشود") |
| | history_btn = gr.Button("🔄 بهروزرسانی", variant="secondary") |
| | history_tbl = gr.Dataframe(label="تاریخچه اجراها", visible=False) |
| |
|
| | |
| | with gr.Tab("📖 راهنما"): |
| | gr.Markdown(""" |
| | ## نحوه استفاده |
| | |
| | فایل CSV باید این ستونها را داشته باشد: |
| | - **`Reference_text`** — متن مرجع (انسانی) |
| | - **`anonymized_text`** — خروجی مدل |
| | |
| | --- |
| | ## بهبودها نسبت به نسخه قبلی |
| | |
| | | # | بهبود | توضیح | |
| | |---|-------|-------| |
| | | 1 | **رفع باگ ایندکس** | `AMOUNT-1` = `AMOUNT-01` — دیگر F1=0 نمیشود | |
| | | 2 | **تحلیل entity تفکیکی** | FP/FN جداگانه برای COMPANY / PERSON / AMOUNT / PERCENT | |
| | | 3 | **جدول سطرهای ناقص** | فیلتر F1<1 به صورت جدول جداگانه | |
| | | 4 | **نمودار توزیع F1** | ویژوال سریع از وضعیت کلی | |
| | | 5 | **مقایسه چند فایل** | نتایج n4، n5، n6 کنار هم | |
| | | 6 | **سیر پیشرفت** | تاریخچه اجراها در یک جلسه | |
| | | 7 | **missed/extra خوانا** | به جای لیستهای طولانی، نمایش سادهتر | |
| | |
| | --- |
| | ## تفاوت Macro و Micro F1 |
| | |
| | - **Macro F1** = میانگین F1 سطر به سطر (هر سطر وزن برابر) |
| | - **Micro F1** = F1 بر اساس مجموع TP/FP/FN کل (سطرهای بزرگتر وزن بیشتر) |
| | """) |
| |
|
| | |
| |
|
| | def do_evaluate(file): |
| | if file is None: |
| | return ( |
| | "❌ فایل بارگذاری نشده", |
| | gr.BarPlot(visible=False), |
| | gr.Dataframe(visible=False), |
| | gr.Dataframe(visible=False), |
| | gr.Button(visible=False), |
| | gr.File(visible=False), |
| | ) |
| | path = file if isinstance(file, str) else file.name |
| | ok, summary, full_df, bad_df, chart_df = evaluator.evaluate_dataset(path) |
| |
|
| | if not ok: |
| | return ( |
| | summary, |
| | gr.BarPlot(visible=False), |
| | gr.Dataframe(visible=False), |
| | gr.Dataframe(visible=False), |
| | gr.Button(visible=False), |
| | gr.File(visible=False), |
| | ) |
| |
|
| | return ( |
| | summary, |
| | gr.BarPlot(value=chart_df, x="سطح F1", y="تعداد", |
| | title="توزیع F1", visible=True, color="سطح F1"), |
| | gr.Dataframe(value=bad_df, visible=not bad_df.empty), |
| | gr.Dataframe(value=full_df.head(10), visible=True), |
| | gr.Button(visible=True), |
| | gr.File(visible=False), |
| | ) |
| |
|
| | def do_download(): |
| | path = evaluator.save_csv() |
| | if path: |
| | return "✅ آماده دانلود", gr.File(value=path, visible=True) |
| | return "❌ خطا", gr.File(visible=False) |
| |
|
| | def do_compare(files): |
| | if not files: |
| | return "❌ فایلی انتخاب نشده", gr.Dataframe(visible=False) |
| | summary, cdf = compare_files(files) |
| | return summary, gr.Dataframe(value=cdf, visible=not cdf.empty) |
| |
|
| | def do_history(): |
| | hdf = evaluator.get_history_df() |
| | if hdf.empty: |
| | return gr.Dataframe(value=None, visible=False) |
| | return gr.Dataframe(value=hdf, visible=True) |
| |
|
| | evaluate_btn.click( |
| | fn=do_evaluate, |
| | inputs=[file_input], |
| | outputs=[summary_md, chart_plot, bad_table, full_table, download_btn, download_file], |
| | ) |
| | download_btn.click( |
| | fn=do_download, |
| | outputs=[summary_md, download_file], |
| | ) |
| | compare_btn.click( |
| | fn=do_compare, |
| | inputs=[multi_files], |
| | outputs=[compare_md, compare_tbl], |
| | ) |
| | history_btn.click( |
| | fn=do_history, |
| | outputs=[history_tbl], |
| | ) |
| |
|
| | return demo |
| |
|
| |
|
| | if __name__ == "__main__": |
| | demo = create_interface() |
| | demo.launch(server_name="0.0.0.0", server_port=7860, share=False) |
| |
|