| """ |
| Fraud Detection Engine - Hugging Face Spaces Version |
| |
| This version is configured for deployment on Hugging Face Spaces. |
| Data files should be placed in the 'data/' directory within the Space. |
| """ |
| import gradio as gr |
| import json |
| import os |
| import re |
| import pandas as pd |
| from datetime import datetime |
| from typing import Optional, Dict, List, Any |
|
|
| |
| |
| |
|
|
| |
| BASE_DATA_PATH = os.environ.get("DATA_PATH", "data") |
| FINANCIAL_DATA_CSV = os.path.join(BASE_DATA_PATH, "Financial Data.csv") |
| BENEISH_MSCORE_CSV = os.path.join(BASE_DATA_PATH, "Beneish M-score - Sheet1.csv") |
| ZSCORE_DATA_CSV = os.path.join(BASE_DATA_PATH, "Z-score data.csv") |
| COMPANY_TICKERS_JSON = os.path.join(BASE_DATA_PATH, "company_tickers.json") |
| FILINGS_DIR = os.path.join(BASE_DATA_PATH, "filings") |
|
|
| MSCORE_MANIPULATION_THRESHOLD = -1.78 |
| ZSCORE_SAFE_THRESHOLD = 2.99 |
| ZSCORE_GREY_THRESHOLD = 1.81 |
|
|
| |
| |
| |
|
|
| class DataLoader: |
| """Simplified data loader for HF Spaces deployment.""" |
|
|
| _financial_df: Optional[pd.DataFrame] = None |
| _mscore_df: Optional[pd.DataFrame] = None |
| _zscore_df: Optional[pd.DataFrame] = None |
| _company_tickers: Optional[Dict] = None |
|
|
| @classmethod |
| def _load_financial_csv(cls) -> pd.DataFrame: |
| if cls._financial_df is None: |
| if os.path.exists(FINANCIAL_DATA_CSV): |
| cls._financial_df = pd.read_csv(FINANCIAL_DATA_CSV) |
| cls._financial_df['cik_normalized'] = pd.to_numeric( |
| cls._financial_df['cik'], errors='coerce' |
| ).astype('Int64') |
| else: |
| cls._financial_df = pd.DataFrame() |
| return cls._financial_df |
|
|
| @classmethod |
| def _load_mscore_csv(cls) -> pd.DataFrame: |
| if cls._mscore_df is None: |
| if os.path.exists(BENEISH_MSCORE_CSV): |
| cls._mscore_df = pd.read_csv(BENEISH_MSCORE_CSV) |
| if 'CIK Numbers' in cls._mscore_df.columns: |
| cls._mscore_df['cik_normalized'] = pd.to_numeric( |
| cls._mscore_df['CIK Numbers'], errors='coerce' |
| ).astype('Int64') |
| else: |
| cls._mscore_df = pd.DataFrame() |
| return cls._mscore_df |
|
|
| @classmethod |
| def _load_zscore_csv(cls) -> pd.DataFrame: |
| if cls._zscore_df is None: |
| if os.path.exists(ZSCORE_DATA_CSV): |
| cls._zscore_df = pd.read_csv(ZSCORE_DATA_CSV) |
| cls._zscore_df['cik_normalized'] = pd.to_numeric( |
| cls._zscore_df['cik'], errors='coerce' |
| ).astype('Int64') |
| else: |
| cls._zscore_df = pd.DataFrame() |
| return cls._zscore_df |
|
|
| @classmethod |
| def load_company_tickers(cls) -> Dict[int, Dict[str, str]]: |
| if cls._company_tickers is None: |
| if os.path.exists(COMPANY_TICKERS_JSON): |
| with open(COMPANY_TICKERS_JSON, 'r') as f: |
| raw = json.load(f) |
| cls._company_tickers = {} |
| for idx, company in raw.items(): |
| cik = company.get('cik_str') |
| if cik: |
| cls._company_tickers[int(cik)] = { |
| "ticker": company.get('ticker', ''), |
| "title": company.get('title', '') |
| } |
| else: |
| cls._company_tickers = {} |
| return cls._company_tickers |
|
|
| @classmethod |
| def get_company_info(cls, cik: int) -> Dict[str, str]: |
| tickers = cls.load_company_tickers() |
| return tickers.get(cik, {"ticker": "Unknown", "title": "Unknown"}) |
|
|
| @classmethod |
| def get_available_companies(cls) -> List[tuple]: |
| """Get list of companies with available data.""" |
| tickers = cls.load_company_tickers() |
| mscore_df = cls._load_mscore_csv() |
|
|
| if mscore_df.empty: |
| |
| return [("DEMO - Sample Company", 0)] |
|
|
| available_ciks = set(mscore_df['cik_normalized'].dropna().astype(int).tolist()) |
|
|
| choices = [] |
| for cik in available_ciks: |
| info = tickers.get(cik, {}) |
| ticker = info.get('ticker', 'UNK') |
| name = info.get('title', 'Unknown') |
| if ticker and ticker != 'Unknown': |
| choices.append((f"{ticker} - {name[:40]}", cik)) |
|
|
| return sorted(choices, key=lambda x: x[0]) if choices else [("DEMO - Sample Company", 0)] |
|
|
| @classmethod |
| def get_precomputed_mscore(cls, cik: int) -> Optional[float]: |
| df = cls._load_mscore_csv() |
| if df.empty: |
| return None |
| row = df[df['cik_normalized'] == cik] |
| if row.empty: |
| return None |
| m_score = row.iloc[0].get('m_score') |
| return float(m_score) if pd.notna(m_score) else None |
|
|
| @classmethod |
| def get_zscore_inputs(cls, cik: int) -> Optional[Dict[str, float]]: |
| df = cls._load_zscore_csv() |
| if df.empty: |
| return None |
| row = df[df['cik_normalized'] == cik] |
| if row.empty: |
| return None |
| row = row.iloc[-1] |
|
|
| try: |
| at = float(row.get('at', 0) or 0) |
| if at == 0: |
| return None |
|
|
| act = float(row.get('act', 0) or 0) |
| lct = float(row.get('lct', 0) or 0) |
| re_val = float(row.get('re', 0) or 0) |
| ebit = float(row.get('ebit', 0) or 0) |
| revt = float(row.get('revt', 0) or 0) |
| csho = float(row.get('csho', 0) or 0) |
| prcc_f = float(row.get('prcc_f', 0) or 0) |
|
|
| mve = csho * prcc_f |
| tl = lct if lct > 0 else at * 0.5 |
|
|
| return { |
| "x1": (act - lct) / at, |
| "x2": re_val / at, |
| "x3": ebit / at, |
| "x4": mve / tl if tl > 0 else 0, |
| "x5": revt / at |
| } |
| except (ValueError, TypeError): |
| return None |
|
|
|
|
| |
| |
| |
|
|
| class FinancialAgent: |
| def __init__(self): |
| self.name = "Financial Analysis Agent" |
|
|
| def calculate_beneish_m_score(self, data: dict) -> dict: |
| try: |
| m_score = ( |
| -4.84 |
| + 0.92 * data.get('dsri', 0) |
| + 0.52 * data.get('gmi', 0) |
| + 0.71 * data.get('aqi', 0) |
| + 0.20 * data.get('sgi', 0) |
| + 0.11 * data.get('depi', 0) |
| - 0.17 * data.get('sgai', 0) |
| + 4.67 * data.get('tata', 0) |
| - 0.32 * data.get('lvgi', 0) |
| ) |
| risk_flag = m_score > MSCORE_MANIPULATION_THRESHOLD |
| return { |
| "m_score": round(m_score, 4), |
| "risk_flag": risk_flag, |
| "details": "High probability of manipulation" if risk_flag else "Low probability of manipulation" |
| } |
| except Exception as e: |
| return {"error": str(e)} |
|
|
| def calculate_altman_z_score(self, data: dict) -> dict: |
| try: |
| z_score = ( |
| 1.2 * data.get('x1', 0) |
| + 1.4 * data.get('x2', 0) |
| + 3.3 * data.get('x3', 0) |
| + 0.6 * data.get('x4', 0) |
| + 1.0 * data.get('x5', 0) |
| ) |
| if z_score > ZSCORE_SAFE_THRESHOLD: |
| status = "Safe Zone" |
| elif z_score > ZSCORE_GREY_THRESHOLD: |
| status = "Grey Zone" |
| else: |
| status = "Distress Zone" |
| return {"z_score": round(z_score, 4), "status": status} |
| except Exception as e: |
| return {"error": str(e)} |
|
|
|
|
| class RiskAgent: |
| def __init__(self): |
| self.name = "Risk Assessment Agent" |
|
|
| def calculate_final_risk(self, financial_results: dict, text_results: dict) -> dict: |
| risk_score = 0 |
| reasons = [] |
|
|
| if financial_results.get("risk_flag"): |
| risk_score += 40 |
| reasons.append("Beneish M-Score indicates manipulation risk") |
|
|
| z_status = financial_results.get("altman_z", {}).get("status") |
| if z_status == "Distress Zone": |
| risk_score += 30 |
| reasons.append("Altman Z-Score indicates financial distress") |
| elif z_status == "Grey Zone": |
| risk_score += 15 |
| reasons.append("Altman Z-Score in Grey Zone") |
|
|
| obfuscation = text_results.get("obfuscation_score", 0) |
| if obfuscation > 0.7: |
| risk_score += 30 |
| reasons.append(f"High managerial obfuscation (Score: {obfuscation:.2f})") |
| elif obfuscation > 0.4: |
| risk_score += 10 |
|
|
| if risk_score > 70: |
| risk_level = "CRITICAL" |
| elif risk_score > 40: |
| risk_level = "HIGH" |
| elif risk_score > 20: |
| risk_level = "MODERATE" |
| else: |
| risk_level = "LOW" |
|
|
| return { |
| "total_risk_score": risk_score, |
| "risk_level": risk_level, |
| "key_factors": reasons |
| } |
|
|
|
|
| |
| |
| |
|
|
| fin_agent = FinancialAgent() |
| risk_agent = RiskAgent() |
|
|
|
|
| def analyze_company(cik_selection): |
| """Run fraud detection analysis on selected company.""" |
| if not cik_selection: |
| return "Please select a company", "", "", "" |
|
|
| cik = int(cik_selection) |
|
|
| |
| if cik == 0: |
| return demo_analysis() |
|
|
| company_info = DataLoader.get_company_info(cik) |
|
|
| |
| m_score_val = DataLoader.get_precomputed_mscore(cik) |
|
|
| if m_score_val is not None: |
| m_score_result = { |
| "m_score": round(m_score_val, 4), |
| "risk_flag": m_score_val > MSCORE_MANIPULATION_THRESHOLD, |
| "details": "High probability of manipulation" if m_score_val > MSCORE_MANIPULATION_THRESHOLD else "Low probability of manipulation" |
| } |
| else: |
| m_score_result = {"m_score": None, "risk_flag": False, "details": "Data not available"} |
|
|
| zscore_inputs = DataLoader.get_zscore_inputs(cik) |
| if zscore_inputs: |
| z_score_result = fin_agent.calculate_altman_z_score(zscore_inputs) |
| else: |
| z_score_result = {"z_score": None, "status": "Unknown"} |
|
|
| financial_results = { |
| "beneish_m": m_score_result, |
| "altman_z": z_score_result, |
| "risk_flag": m_score_result.get("risk_flag", False) or (z_score_result.get("status") == "Distress Zone") |
| } |
|
|
| |
| text_results = { |
| "obfuscation_score": 0.3, |
| "note": "Text analysis requires 10-K filing upload" |
| } |
|
|
| |
| final_report = risk_agent.calculate_final_risk(financial_results, text_results) |
|
|
| |
| company_header = f"""## {company_info['ticker']} - {company_info['title']} |
| **CIK:** {cik} |
| """ |
|
|
| m_val = m_score_result.get('m_score', 'N/A') |
| m_flag = "HIGH RISK" if m_score_result.get('risk_flag') else "Normal" |
| z_val = z_score_result.get('z_score', 'N/A') |
| z_status = z_score_result.get('status', 'Unknown') |
|
|
| financial_output = f"""### Beneish M-Score |
| **Score:** {m_val} |
| **Status:** {m_flag} |
| **Interpretation:** {m_score_result.get('details', 'N/A')} |
| |
| > M-Score > -1.78 indicates high probability of earnings manipulation |
| |
| ### Altman Z-Score |
| **Score:** {z_val} |
| **Status:** {z_status} |
| |
| > Safe Zone (>2.99) | Grey Zone (1.81-2.99) | Distress Zone (<1.81) |
| """ |
|
|
| text_output = """### MD&A Analysis |
| **Status:** Not available in demo |
| |
| > Upload 10-K filings for full text analysis |
| """ |
|
|
| risk_level = final_report['risk_level'] |
| risk_score = final_report['total_risk_score'] |
|
|
| risk_output = f"""## FRAUD RISK ASSESSMENT |
| |
| ### Risk Level: {risk_level} |
| ### Total Score: {risk_score}/100 |
| |
| **Key Risk Factors:** |
| """ |
| factors = final_report.get('key_factors', []) |
| if factors: |
| risk_output += "\n".join([f"- {f}" for f in factors]) |
| else: |
| risk_output += "- No significant risk factors identified" |
|
|
| return company_header, financial_output, text_output, risk_output |
|
|
|
|
| def demo_analysis(): |
| """Return demo analysis when no real data is available.""" |
| company_header = """## DEMO - Sample Analysis |
| **Note:** This is a demonstration with sample data. |
| Upload your data files to analyze real companies. |
| """ |
|
|
| financial_output = """### Beneish M-Score (Demo) |
| **Score:** -2.45 |
| **Status:** Normal |
| **Interpretation:** Low probability of manipulation |
| |
| ### Altman Z-Score (Demo) |
| **Score:** 3.25 |
| **Status:** Safe Zone |
| """ |
|
|
| text_output = """### MD&A Analysis (Demo) |
| **Obfuscation Score:** 0.35 |
| **Flagged Phrases:** 3 found |
| - "challenging market conditions" |
| - "strategic realignment" |
| - "factors beyond our control" |
| """ |
|
|
| risk_output = """## FRAUD RISK ASSESSMENT (Demo) |
| |
| ### Risk Level: LOW |
| ### Total Score: 15/100 |
| |
| **Key Risk Factors:** |
| - No significant risk factors in this demo |
| """ |
|
|
| return company_header, financial_output, text_output, risk_output |
|
|
|
|
| |
| |
| |
|
|
| with gr.Blocks(title="Fraud Detection Engine") as demo: |
| gr.Markdown(""" |
| # Fraud Detection Engine |
| |
| Analyzes SEC 10-K filings to identify risks of financial statement manipulation using: |
| - **Beneish M-Score** - Earnings manipulation detection |
| - **Altman Z-Score** - Financial distress assessment |
| - **MD&A Text Analysis** - Managerial obfuscation detection |
| """) |
|
|
| with gr.Row(): |
| company_dropdown = gr.Dropdown( |
| choices=DataLoader.get_available_companies(), |
| label="Select Company", |
| info="Companies with available financial data" |
| ) |
| analyze_btn = gr.Button("Analyze", variant="primary") |
|
|
| company_info = gr.Markdown() |
|
|
| with gr.Row(): |
| with gr.Column(): |
| financial_output = gr.Markdown(label="Financial Analysis") |
| with gr.Column(): |
| text_output = gr.Markdown(label="Text Analysis") |
|
|
| risk_output = gr.Markdown() |
|
|
| analyze_btn.click( |
| fn=analyze_company, |
| inputs=[company_dropdown], |
| outputs=[company_info, financial_output, text_output, risk_output] |
| ) |
|
|
| gr.Markdown(""" |
| --- |
| **Methodology:** Based on Beneish (1999) M-Score and Altman (1968) Z-Score models. |
| |
| **Data Requirements:** Place CSV files in `data/` directory: |
| - `Financial Data.csv` |
| - `Beneish M-score - Sheet1.csv` |
| - `Z-score data.csv` |
| - `company_tickers.json` |
| """) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=7860) |
|
|