Spaces:
Running
Running
| """ | |
| Commission rules engine. | |
| Reads all configuration from the database and computes the summary table output. | |
| Refactored: per-year tables, department-only bell ringer, P1-only deduction | |
| for Closer/Referral, ABE status, fixed-code agreements. | |
| """ | |
| from typing import Optional | |
| from sqlalchemy.orm import Session | |
| from app.models import ( | |
| CommissionAgreement, CommissionTier, FixedConstant, | |
| BellRingerThreshold, Producer, | |
| ) | |
| # ββ Bell Ringer: department-only thresholds ββββββββββββββ | |
| BELL_THRESHOLDS = { | |
| 'CL': 75000, | |
| 'CON': 7500, | |
| 'EB': 75000, | |
| 'PL': 4500, | |
| 'VAN': 10000, | |
| } | |
| # ββ Fixed-code agreements ββββββββββββββββββββββββββββββββ | |
| FIXED_CODE_AGREEMENTS = { | |
| "Gebhardt Agreement": {"name": "Steve Gebhardt", "code": "GEBST2", "suffix": 2}, | |
| "Beacham Agreement": {"name": "Wade Beacham", "code": "BEAWA2", "suffix": 2}, | |
| } | |
| # ββ Agreements where layers deduct ONLY from P1 βββββββββ | |
| P1_ONLY_DEDUCTION_AGREEMENTS = { | |
| "Closer Agreement", | |
| "Referral / Origination Fee Agreement", | |
| } | |
| def get_constant(db: Session, key: str) -> float: | |
| c = db.query(FixedConstant).filter(FixedConstant.key == key).first() | |
| return c.value if c else 0 | |
| def get_all_constants(db: Session) -> dict: | |
| return {c.key: c.value for c in db.query(FixedConstant).all()} | |
| def check_bell_ringer(db: Session, dept_code: str, revenue: float) -> dict: | |
| """Check if bell ringer is triggered. Department-only logic.""" | |
| threshold = BELL_THRESHOLDS.get(dept_code) | |
| if threshold is None: | |
| return {"status": "na", "message": "N/A for this Department"} | |
| if revenue >= threshold: | |
| return {"status": "triggered", "threshold": threshold, | |
| "message": f"TRIGGERED (threshold ${threshold:,.0f})"} | |
| return {"status": "not_triggered", "threshold": threshold, | |
| "message": f"Not triggered (threshold ${threshold:,.0f})"} | |
| def get_agreement_with_tiers(db: Session, agreement_id: int) -> Optional[dict]: | |
| agr = db.query(CommissionAgreement).filter( | |
| CommissionAgreement.id == agreement_id | |
| ).first() | |
| if not agr: | |
| return None | |
| p1_tiers = [t for t in agr.tiers if t.producer_role == "P1"] | |
| p2_tiers = [t for t in agr.tiers if t.producer_role == "P2"] | |
| return { | |
| "id": agr.id, "name": agr.name, | |
| "p1_suffix": agr.p1_suffix, "p2_suffix": agr.p2_suffix, | |
| "ce_as_producer": agr.ce_as_producer, | |
| "P1": [_tier_to_dict(t) for t in p1_tiers] or None, | |
| "P2": [_tier_to_dict(t) for t in p2_tiers] or None, | |
| } | |
| def _tier_to_dict(t: CommissionTier) -> dict: | |
| return { | |
| "years": t.years_label, "commission": t.commission, "credit": t.credit, | |
| "agreement": t.agreement_label, "note": t.note, "flag": t.flag, | |
| } | |
| def _abe_status(suffix) -> str: | |
| if suffix == 1: return "ABE" | |
| if suffix == 2: return "Non-ABE" | |
| return "β" | |
| def _get_tier_at_index(tiers, index): | |
| if not tiers or index >= len(tiers): | |
| return None | |
| return tiers[index] | |
| def compute_summary(db: Session, form_data: dict) -> dict: | |
| agr_id = form_data.get("agreement_id") | |
| if not agr_id: | |
| return {"error": "No agreement selected", "year_tables": [], | |
| "participants": [], "warnings": []} | |
| R = get_agreement_with_tiers(db, agr_id) | |
| if not R: | |
| return {"error": "Agreement not found", "year_tables": [], | |
| "participants": [], "warnings": []} | |
| consts = get_all_constants(db) | |
| rev = form_data.get("revenue", 0) or 0 | |
| p1pfx = form_data.get("producer1_prefix") | |
| p2pfx = form_data.get("producer2_prefix") | |
| include_ce = form_data.get("include_ce", False) | |
| cepfx = form_data.get("ce_prefix") if include_ce else None | |
| include_ccp = form_data.get("include_ccp", False) | |
| ccp_selection = form_data.get("ccp_selection") | |
| has_cc = include_ccp and ccp_selection | |
| is_ce = R["ce_as_producer"] | |
| is_referral_orig = R["name"] == "Referral / Origination Fee Agreement" | |
| is_house_standard = R["name"] == "Emerging Markets - House Standard" | |
| is_fixed_code = R["name"] in FIXED_CODE_AGREEMENTS | |
| is_p1_only_deduction = R["name"] in P1_ONLY_DEDUCTION_AGREEMENTS | |
| if is_referral_orig: | |
| has_p2 = False | |
| orig_tiers = R["P2"] | |
| else: | |
| has_p2 = R["P2"] is not None | |
| orig_tiers = None | |
| warnings = [] | |
| participants = [] | |
| def _lookup_name(prefix): | |
| if not prefix: return "" | |
| p = db.query(Producer).filter(Producer.prefix == prefix).first() | |
| return p.name if p else "" | |
| p1name = _lookup_name(p1pfx) | |
| p2name = _lookup_name(p2pfx) | |
| cename = _lookup_name(cepfx) | |
| p1suf = R["p1_suffix"] | |
| p2suf = R["p2_suffix"] | |
| ce_suf = int(consts.get("CE_PROD_SUFFIX", 2)) if is_ce else int(consts.get("CE_SUFFIX", 1)) | |
| if is_fixed_code: | |
| fixed = FIXED_CODE_AGREEMENTS[R["name"]] | |
| p1code = fixed["code"] | |
| p1name = fixed["name"] | |
| p1suf = fixed["suffix"] | |
| else: | |
| p1code = f"{p1pfx}{p1suf}" if (p1pfx and p1suf) else "β" | |
| p2code = f"{p2pfx}{p2suf}" if (has_p2 and p2pfx and p2suf) else "β" | |
| cecode = f"{cepfx}{ce_suf}" if cepfx else "β" | |
| has_orig_employee = form_data.get("employee_as_originator", False) and is_referral_orig | |
| has_orig_producer = (not has_orig_employee and | |
| form_data.get("orig_producer_prefix") and is_referral_orig) | |
| has_orig = has_orig_producer or has_orig_employee | |
| if has_orig_producer: | |
| opfx = form_data["orig_producer_prefix"] | |
| ocode = f"{opfx}{p2suf}" if p2suf else f"{opfx}2" | |
| oname = _lookup_name(opfx) | |
| elif has_orig_employee: | |
| ocode = "EMPOR2" | |
| oname = form_data.get("orig_employee_name", "") | |
| else: | |
| ocode = "β" | |
| oname = "" | |
| CE_COMM = consts.get("CE_COMM", 2) | |
| CC_COMM = consts.get("CC_COMM", 5) | |
| total_deduction = 0 | |
| if cepfx and not is_ce: total_deduction += CE_COMM | |
| if has_cc: total_deduction += CC_COMM | |
| # Validation | |
| if not p1pfx and not is_ce and not is_fixed_code and R["name"] != "Ceded Accounts - CE as Producer": | |
| warnings.append("Producer 1 (Lead) is required but not selected.") | |
| if has_p2 and not p2pfx: | |
| warnings.append(f"{R['name']} uses two producers β select Producer 2.") | |
| if is_ce and not cepfx: | |
| warnings.append("This agreement needs a Client Executive as producer β select one above.") | |
| if is_referral_orig and not has_orig: | |
| warnings.append("Referral / Origination Fee Agreement requires an Originating Producer β select one.") | |
| is_bond = "bond" in R["name"].lower() | |
| if has_cc and is_bond: | |
| warnings.append("Complex Claims Practice (CCP) is not eligible for Bond Agreement.") | |
| # Build per-year tables | |
| year_labels = [] | |
| for src in ([R["P1"]] + ([orig_tiers] if is_referral_orig and orig_tiers else | |
| [R["P2"]] if has_p2 and R["P2"] else [])): | |
| if src: | |
| for t in src: | |
| if t["years"] not in year_labels: | |
| year_labels.append(t["years"]) | |
| if not year_labels: | |
| year_labels = ["All Years"] | |
| year_tables = [] | |
| for yi, year_label in enumerate(year_labels): | |
| year_rows = [] | |
| p1_tier = _get_tier_at_index(R["P1"], yi) if R["P1"] else None | |
| p2_tier = None | |
| orig_tier_data = None | |
| if is_referral_orig and orig_tiers: | |
| orig_tier_data = _get_tier_at_index(orig_tiers, yi) | |
| elif has_p2 and R["P2"]: | |
| p2_tier = _get_tier_at_index(R["P2"], yi) | |
| active_producers = 0 | |
| if p1_tier and p1_tier["commission"] is not None and not is_ce: active_producers += 1 | |
| if p2_tier and p2_tier["commission"] is not None: active_producers += 1 | |
| if orig_tier_data and orig_tier_data["commission"] is not None: active_producers += 1 | |
| if is_p1_only_deduction: | |
| p1_ded, p2_ded, orig_ded = total_deduction, 0, 0 | |
| else: | |
| pp = total_deduction / active_producers if active_producers > 0 else 0 | |
| p1_ded = p2_ded = orig_ded = pp | |
| if is_house_standard: | |
| year_rows.append({"role": "House Account", "agreement_type": "House Standard Agreement", | |
| "commission_pct": 100.0, "production_pct": 100.0, | |
| "abe_status": "ABE", "code": "ACCHO1", "name": "House Account"}) | |
| if p1pfx: | |
| year_rows.append({"role": "Producer 1 (Approval Processor)", | |
| "agreement_type": "House Standard Agreement", | |
| "commission_pct": 0.0, "production_pct": 0.0, | |
| "abe_status": _abe_status(p1suf), "code": p1code, "name": p1name}) | |
| else: | |
| if not is_ce and p1_tier and p1_tier["commission"] is not None: | |
| year_rows.append({"role": "Producer 1", "agreement_type": p1_tier["agreement"], | |
| "commission_pct": round(max(p1_tier["commission"] - p1_ded, 0), 1), | |
| "production_pct": p1_tier["credit"], "abe_status": _abe_status(p1suf), | |
| "code": p1code, "name": p1name}) | |
| if not is_referral_orig and p2_tier and p2_tier["commission"] is not None and p2pfx: | |
| year_rows.append({"role": "Producer 2", "agreement_type": p2_tier["agreement"], | |
| "commission_pct": round(max(p2_tier["commission"] - p2_ded, 0), 1), | |
| "production_pct": p2_tier["credit"], "abe_status": _abe_status(p2suf), | |
| "code": p2code, "name": p2name}) | |
| if is_referral_orig and has_orig and orig_tier_data and orig_tier_data["commission"] is not None: | |
| year_rows.append({"role": "Originating Producer", "agreement_type": orig_tier_data["agreement"], | |
| "commission_pct": round(max(orig_tier_data["commission"] - orig_ded, 0), 1), | |
| "production_pct": orig_tier_data["credit"], | |
| "abe_status": "Non-ABE" if has_orig_employee else _abe_status(p2suf), | |
| "code": ocode, "name": oname}) | |
| if cepfx and not is_ce: | |
| year_rows.append({"role": "CE Layer", "agreement_type": "Client Executive Agreement", | |
| "commission_pct": CE_COMM, "production_pct": consts.get("CE_CREDIT", 0), | |
| "abe_status": _abe_status(ce_suf), "code": cecode, "name": cename}) | |
| elif is_ce and cepfx: | |
| year_rows.append({"role": "CE as Producer", "agreement_type": "Ceded Accounts β CE as Producer", | |
| "commission_pct": consts.get("CE_PROD_COMM", 10), | |
| "production_pct": consts.get("CE_PROD_CREDIT", 100), | |
| "abe_status": _abe_status(int(consts.get("CE_PROD_SUFFIX", 2))), | |
| "code": cecode, "name": cename}) | |
| if has_cc and not is_bond: | |
| ccp_name = "Mike Parsa" if ccp_selection == "mike_parsa" else "House Account" | |
| ccp_code = "PARMI2" if ccp_selection == "mike_parsa" else "ACCHO1" | |
| year_rows.append({"role": "CCP Layer", "agreement_type": "Complex Claims Practice", | |
| "commission_pct": CC_COMM, "production_pct": consts.get("CC_CREDIT", 0), | |
| "abe_status": _abe_status(2), "code": ccp_code, "name": ccp_name}) | |
| if year_rows: | |
| year_tables.append({"year_label": year_label, "rows": year_rows}) | |
| # Participants for approval | |
| seen_codes = set() | |
| if is_house_standard and p1pfx: | |
| participants.append({"role": "Producer 1 (Approval Processor)", "code": p1code, "name": p1name}) | |
| seen_codes.add(p1code) | |
| if is_fixed_code: | |
| fixed = FIXED_CODE_AGREEMENTS[R["name"]] | |
| if fixed["code"] not in seen_codes: | |
| participants.append({"role": "Producer 1", "code": fixed["code"], "name": fixed["name"]}) | |
| seen_codes.add(fixed["code"]) | |
| elif p1pfx and not is_ce and not is_house_standard and p1code not in seen_codes and p1code != "β": | |
| participants.append({"role": "Producer 1", "code": p1code, "name": p1name}) | |
| seen_codes.add(p1code) | |
| if has_p2 and p2pfx and p2code not in seen_codes and p2code != "β": | |
| participants.append({"role": "Producer 2", "code": p2code, "name": p2name}) | |
| seen_codes.add(p2code) | |
| if has_orig and ocode not in seen_codes and ocode != "β": | |
| participants.append({"role": "Originating Producer", "code": ocode, "name": oname}) | |
| seen_codes.add(ocode) | |
| if cepfx and cecode not in seen_codes and cecode != "β": | |
| participants.append({"role": "CE as Producer" if is_ce else "Client Executive", | |
| "code": cecode, "name": cename}) | |
| seen_codes.add(cecode) | |
| if has_cc and "PARMI2" not in seen_codes: | |
| participants.append({"role": "CCP (Mike Parsa)", "code": "PARMI2", "name": "Mike Parsa"}) | |
| seen_codes.add("PARMI2") | |
| if has_orig_employee and ocode not in seen_codes: | |
| participants.append({"role": "Originating Employee", "code": ocode, "name": oname}) | |
| seen_codes.add(ocode) | |
| bell = check_bell_ringer(db, form_data.get("dept_code", ""), rev) | |
| return { | |
| "agreement_name": R["name"], | |
| "year_tables": year_tables, | |
| "participants": participants, | |
| "warnings": warnings, | |
| "bell_ringer": bell, | |
| } | |