| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | from __future__ import annotations |
| |
|
| | import json |
| | import os |
| | from typing import Any, Dict, Optional |
| |
|
| | from engine.parser_rules import parse_text_rules |
| | from engine.parser_ext import parse_text_extended |
| |
|
| | |
| | try: |
| | from engine.parser_llm import parse_llm as parse_text_llm |
| | HAS_LLM = True |
| | except Exception: |
| | parse_text_llm = None |
| | HAS_LLM = False |
| |
|
| | |
| | FIELD_WEIGHTS_PATH = os.path.join("data", "field_weights.json") |
| |
|
| | UNKNOWN = "Unknown" |
| | PARSER_ORDER = ["rules", "extended", "llm"] |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def _load_field_weights(path: str = FIELD_WEIGHTS_PATH) -> Dict[str, Any]: |
| | """ |
| | Load the JSON weights file produced by Stage 12A. |
| | |
| | Expected structure: |
| | { |
| | "global": { "rules": 0.7, "extended": 0.2, "llm": 0.1 }, |
| | "fields": { |
| | "DNase": { |
| | "rules": 0.95, |
| | "extended": 0.03, |
| | "llm": 0.02, |
| | "support": 123 |
| | }, |
| | ... |
| | }, |
| | "meta": { ... } |
| | } |
| | |
| | If the file is missing or broken, fall back to empty dict, |
| | triggering equal-weight behaviour later. |
| | """ |
| | if not os.path.exists(path): |
| | return {} |
| |
|
| | try: |
| | with open(path, "r", encoding="utf-8") as f: |
| | obj = json.load(f) |
| | return obj if isinstance(obj, dict) else {} |
| | except Exception: |
| | return {} |
| |
|
| |
|
| | FIELD_WEIGHTS_RAW: Dict[str, Any] = _load_field_weights() |
| | HAS_WEIGHTS_FILE: bool = bool(FIELD_WEIGHTS_RAW) |
| |
|
| |
|
| | def _normalise_scores(scores: Dict[str, float]) -> Dict[str, float]: |
| | """ |
| | Normalise parser -> score into weights summing to 1. |
| | If all scores are zero or dict is empty, return equal weights. |
| | """ |
| | cleaned = {k: max(0.0, float(v)) for k, v in scores.items()} |
| | total = sum(cleaned.values()) |
| |
|
| | if total <= 0: |
| | n = len(cleaned) or 1 |
| | return {k: 1.0 / n for k in cleaned} |
| |
|
| | return {k: v / total for k, v in cleaned.items()} |
| |
|
| |
|
| | def _get_base_weights_for_parsers(include_llm: bool) -> Dict[str, float]: |
| | """ |
| | Equal-weight distribution across available parsers. |
| | Used when no learned weights are available. |
| | """ |
| | parsers = ["rules", "extended"] |
| | if include_llm: |
| | parsers.append("llm") |
| |
|
| | n = len(parsers) or 1 |
| | return {p: 1.0 / n for p in parsers} |
| |
|
| |
|
| | def _get_weights_for_field(field_name: str, include_llm: bool) -> Dict[str, float]: |
| | """ |
| | Get weights for a specific field. |
| | |
| | Priority: |
| | 1) FIELD_WEIGHTS_RAW["fields"][field_name] |
| | 2) FIELD_WEIGHTS_RAW["global"] |
| | 3) Equal weights |
| | |
| | Always: |
| | - Drop 'llm' if include_llm == False |
| | - Normalise |
| | """ |
| | if not FIELD_WEIGHTS_RAW: |
| | return _normalise_scores(_get_base_weights_for_parsers(include_llm)) |
| |
|
| | fields_block = FIELD_WEIGHTS_RAW.get("fields", {}) or {} |
| | global_block = FIELD_WEIGHTS_RAW.get("global", {}) or {} |
| |
|
| | raw: Dict[str, float] = {} |
| |
|
| | field_entry = fields_block.get(field_name) |
| | if isinstance(field_entry, dict): |
| | for k, v in field_entry.items(): |
| | if k in ("rules", "extended", "llm"): |
| | raw[k] = float(v) |
| |
|
| | if not raw and isinstance(global_block, dict): |
| | for k, v in global_block.items(): |
| | if k in ("rules", "extended", "llm"): |
| | raw[k] = float(v) |
| |
|
| | if not raw: |
| | raw = _get_base_weights_for_parsers(include_llm) |
| |
|
| | if not include_llm: |
| | raw.pop("llm", None) |
| |
|
| | if not raw: |
| | raw = _get_base_weights_for_parsers(include_llm=False) |
| |
|
| | return _normalise_scores(raw) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def _clean_pred_value(val: Optional[str]) -> Optional[str]: |
| | """ |
| | Treat None, empty string, or explicit "Unknown" as missing. |
| | """ |
| | if val is None: |
| | return None |
| |
|
| | s = str(val).strip() |
| | if not s: |
| | return None |
| |
|
| | if s.lower() == UNKNOWN.lower(): |
| | return None |
| |
|
| | return s |
| |
|
| |
|
| | def parse_text_fused(text: str, use_llm: Optional[bool] = None) -> Dict[str, Any]: |
| | """ |
| | Main tri-parser fusion entrypoint. |
| | |
| | Parameters |
| | ---------- |
| | text : str |
| | use_llm : bool or None |
| | True -> include LLM |
| | False -> exclude LLM |
| | None -> include if available |
| | |
| | Returns |
| | ------- |
| | Dict[str, Any] |
| | Full fusion output including votes and per-parser breakdowns. |
| | """ |
| | original = text or "" |
| | include_llm = HAS_LLM if use_llm is None else bool(use_llm) |
| |
|
| | rules_out = parse_text_rules(original) or {} |
| | ext_out = parse_text_extended(original) or {} |
| |
|
| | rules_fields = dict(rules_out.get("parsed_fields", {})) |
| | ext_fields = dict(ext_out.get("parsed_fields", {})) |
| |
|
| | llm_fields: Dict[str, Any] = {} |
| | if include_llm and parse_text_llm is not None: |
| | try: |
| | merged_existing = {} |
| | merged_existing.update(rules_fields) |
| | merged_existing.update(ext_fields) |
| |
|
| | llm_out = parse_text_llm(original, existing_fields=merged_existing) |
| |
|
| | if isinstance(llm_out, dict): |
| | if "parsed_fields" in llm_out: |
| | llm_fields = dict(llm_out.get("parsed_fields", {})) |
| | else: |
| | llm_fields = {str(k): v for k, v in llm_out.items()} |
| | except Exception: |
| | llm_fields = {} |
| | else: |
| | include_llm = False |
| |
|
| | by_parser: Dict[str, Dict[str, Any]] = { |
| | "rules": rules_fields, |
| | "extended": ext_fields, |
| | "llm": llm_fields if include_llm else {}, |
| | } |
| |
|
| | candidate_fields = ( |
| | set(rules_fields.keys()) |
| | | set(ext_fields.keys()) |
| | | set(llm_fields.keys()) |
| | ) |
| |
|
| | fused_fields: Dict[str, Any] = {} |
| | votes_debug: Dict[str, Any] = {} |
| |
|
| | for field in sorted(candidate_fields): |
| | weights = _get_weights_for_field(field, include_llm) |
| |
|
| | parser_preds: Dict[str, Optional[str]] = { |
| | "rules": _clean_pred_value(rules_fields.get(field)), |
| | "extended": _clean_pred_value(ext_fields.get(field)), |
| | "llm": _clean_pred_value(llm_fields.get(field)) if include_llm else None, |
| | } |
| |
|
| | per_parser_info: Dict[str, Any] = {} |
| | value_scores: Dict[str, float] = {} |
| |
|
| | for parser_name in PARSER_ORDER: |
| | if parser_name == "llm" and not include_llm: |
| | continue |
| |
|
| | pred = parser_preds.get(parser_name) |
| | w = float(weights.get(parser_name, 0.0)) |
| |
|
| | per_parser_info[parser_name] = { |
| | "value": pred if pred is not None else UNKNOWN, |
| | "weight": w, |
| | } |
| |
|
| | if pred is not None: |
| | value_scores[pred] = value_scores.get(pred, 0.0) + w |
| |
|
| | if not value_scores: |
| | fused_value = UNKNOWN |
| | else: |
| | max_score = max(value_scores.values()) |
| | best_values = [v for v, s in value_scores.items() if s == max_score] |
| |
|
| | if len(best_values) == 1: |
| | fused_value = best_values[0] |
| | else: |
| | fused_value = best_values[0] |
| | for parser_name in PARSER_ORDER: |
| | if parser_name == "llm" and not include_llm: |
| | continue |
| | if parser_preds.get(parser_name) in best_values: |
| | fused_value = parser_preds[parser_name] |
| | break |
| |
|
| | fused_fields[field] = fused_value |
| | votes_debug[field] = { |
| | "per_parser": per_parser_info, |
| | "summed": value_scores, |
| | "chosen": fused_value, |
| | } |
| |
|
| | weights_meta = { |
| | "has_weights_file": HAS_WEIGHTS_FILE, |
| | "weights_path": FIELD_WEIGHTS_PATH, |
| | "meta": FIELD_WEIGHTS_RAW.get("meta", {}) if HAS_WEIGHTS_FILE else {}, |
| | } |
| |
|
| | return { |
| | "fused_fields": fused_fields, |
| | "by_parser": by_parser, |
| | "votes": votes_debug, |
| | "weights_meta": weights_meta, |
| | } |