Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import re | |
| import difflib | |
| from typing import Any, Dict, List, Optional, Sequence, Tuple | |
| # ========================= | |
| # Targets (ONLY these 3) | |
| # ========================= | |
| TARGETS = ["balance_sheet", "profit_and_loss", "cash_flow"] | |
| AUX = ["comprehensive_income", "equity", "notes"] # only for delimiting (when available) | |
| # ========================= | |
| # Title variants | |
| # ========================= | |
| TITLE_VARIANTS: Dict[str, List[str]] = { | |
| "balance_sheet": [ | |
| "Consolidated Balance Sheets", | |
| "Standalone Balance Sheets", | |
| "Balance Sheets", | |
| "Statement of Financial Position", | |
| "Standalone Statement of Financial Position", | |
| ], | |
| "profit_and_loss": [ | |
| "Consolidated Statements of Earnings", | |
| "Standalone Statements of Earnings", | |
| "Consolidated Statements of Operations", | |
| "Standalone Statements of Operations", | |
| "Consolidated Statements of Income", | |
| "Standalone Statements of Income", | |
| "Income Statement", | |
| "Statement of Profit and Loss", | |
| "Statement of Profit & Loss", | |
| ], | |
| "cash_flow": [ | |
| "Consolidated Statements of Cash Flows", | |
| "Standalone Statements of Cash Flows", | |
| "Statement of Cash Flows", | |
| "Cash Flow Statement", | |
| ], | |
| # aux | |
| "comprehensive_income": [ | |
| "Consolidated Statements of Comprehensive Income", | |
| "Standalone Statements of Comprehensive Income", | |
| "Statement of Comprehensive Income", | |
| ], | |
| "equity": [ | |
| "Consolidated Statements of Equity", | |
| "Standalone Statements of Equity", | |
| "Statement of Stockholders' Equity", | |
| "Statement of Shareholders' Equity", | |
| ], | |
| "notes": [ | |
| "Notes to Consolidated Financial Statements", | |
| "Notes to Standalone Financial Statements", | |
| "Notes to Financial Statements", | |
| ], | |
| } | |
| INTEGRAL_FOOTER = "the accompanying notes are an integral part" | |
| SIG_TERMS: Dict[str, List[str]] = { | |
| "balance_sheet": [ | |
| "total assets", | |
| "total liabilities", | |
| "total equity", | |
| "stockholders' equity", | |
| "shareholders' equity", | |
| "liabilities and equity", | |
| "current assets", | |
| "current liabilities", | |
| "non-current assets", | |
| "non-current liabilities", | |
| ], | |
| "profit_and_loss": [ | |
| "net revenues", | |
| "net sales", | |
| "revenue", | |
| "cost of sales", | |
| "cost of products sold", | |
| "gross profit", | |
| "operating income", | |
| "operating profit", | |
| "profit before tax", | |
| "net income", | |
| "net earnings", | |
| "earnings per share", | |
| "basic", | |
| "diluted", | |
| ], | |
| "cash_flow": [ | |
| "cash flows from operating activities", | |
| "cash flows from investing activities", | |
| "cash flows from financing activities", | |
| "net cash provided by operating activities", | |
| "net cash used in investing activities", | |
| "net cash used in financing activities", | |
| "cash and cash equivalents, end of year", | |
| "net change in cash", | |
| ], | |
| } | |
| NOTE_HEADING_RE = re.compile(r"^\s*note\s+\d+\b", re.IGNORECASE) | |
| DOT_LEADER_RE = re.compile(r"\.{5,}") | |
| ITEM8_RE = re.compile( | |
| r"\bITEM\s+8\.\s+FINANCIAL\s+STATEMENTS\s+AND\s+SUPPLEMENTARY\s+DATA\b", re.IGNORECASE | |
| ) | |
| CONTINUED_RE = re.compile(r"\bcontinued\b", re.IGNORECASE) | |
| # ========================= | |
| # Utilities | |
| # ========================= | |
| def _combined_text(page_obj: Any) -> str: | |
| if page_obj is None: | |
| return "" | |
| if isinstance(page_obj, str): | |
| return page_obj | |
| if isinstance(page_obj, dict): | |
| a = page_obj.get("extracted_text") or page_obj.get("text") or "" | |
| b = page_obj.get("ocr_text") or "" | |
| return (a + "\n" + b).strip() | |
| a = getattr(page_obj, "extracted_text", None) or getattr(page_obj, "text", None) or "" | |
| b = getattr(page_obj, "ocr_text", None) or "" | |
| return (a + "\n" + b).strip() | |
| def _norm(s: str) -> str: | |
| return re.sub(r"\s+", " ", (s or "")).strip().lower() | |
| def _fuzzy_line_contains_title(top_lines: List[str], title: str, threshold: float = 0.86) -> bool: | |
| title_n = _norm(title) | |
| for ln in top_lines: | |
| ln_n = _norm(ln) | |
| if not ln_n: | |
| continue | |
| if title_n in ln_n: | |
| return True | |
| r = difflib.SequenceMatcher(None, ln_n, title_n).ratio() | |
| if r >= threshold: | |
| return True | |
| return False | |
| def detect_title_match(text: str, stmt: str) -> Tuple[bool, Optional[str], str]: | |
| """ | |
| Returns (matched?, matched_variant, scope) | |
| scope in {"consolidated","standalone","unknown"} | |
| """ | |
| lines = (text or "").splitlines() | |
| top_lines = [ln.strip() for ln in lines[:16] if ln.strip()] | |
| for variant in TITLE_VARIANTS.get(stmt, []): | |
| if _fuzzy_line_contains_title(top_lines, variant): | |
| vlow = variant.lower() | |
| if "consolidated" in vlow: | |
| scope = "consolidated" | |
| elif "standalone" in vlow or "separate" in vlow: | |
| scope = "standalone" | |
| else: | |
| scope = "unknown" | |
| return True, variant, scope | |
| joined = " ".join(top_lines).lower() | |
| # fallback for OCR garble | |
| if stmt == "balance_sheet" and ("balance sheet" in joined or "financial position" in joined): | |
| if "consolidated" in joined: | |
| return True, None, "consolidated" | |
| if "standalone" in joined or "separate" in joined: | |
| return True, None, "standalone" | |
| return True, None, "unknown" | |
| if stmt == "cash_flow" and ("cash flow" in joined or "cash flows" in joined): | |
| if "consolidated" in joined: | |
| return True, None, "consolidated" | |
| if "standalone" in joined or "separate" in joined: | |
| return True, None, "standalone" | |
| return True, None, "unknown" | |
| if stmt == "profit_and_loss" and ( | |
| "statement of profit" in joined | |
| or "profit and loss" in joined | |
| or "income statement" in joined | |
| or "statements of income" in joined | |
| or "statements of operations" in joined | |
| or "statements of earnings" in joined | |
| ): | |
| if "consolidated" in joined: | |
| return True, None, "consolidated" | |
| if "standalone" in joined or "separate" in joined: | |
| return True, None, "standalone" | |
| return True, None, "unknown" | |
| return False, None, "unknown" | |
| def detect_title(text: str, stmt: str) -> bool: | |
| ok, _, _ = detect_title_match(text, stmt) | |
| return ok | |
| # ========================= | |
| # (Optional) 10-K TOC mapping helpers (kept, but now scope-safe) | |
| # ========================= | |
| FOOTER_PIPE_RE = re.compile(r"\|\s*(\d{1,4})\s*$", re.MULTILINE) | |
| FOOTER_FORM_RE = re.compile(r"form\s+10-?k\s*\|\s*(\d{1,4})\s*$", re.IGNORECASE | re.MULTILINE) | |
| def extract_footer_internal_page(text: str) -> Optional[int]: | |
| t = text or "" | |
| m = FOOTER_PIPE_RE.findall(t) | |
| if m: | |
| return int(m[-1]) | |
| m = FOOTER_FORM_RE.findall(t) | |
| if m: | |
| return int(m[-1]) | |
| lines = [ln.strip() for ln in (t.splitlines() if t else []) if ln.strip()] | |
| for ln in reversed(lines[-6:]): | |
| if re.fullmatch(r"\d{1,4}", ln): | |
| return int(ln) | |
| return None | |
| def find_item8_toc_page(all_texts: Sequence[str]) -> Optional[int]: | |
| candidates = [] | |
| for i, txt in enumerate(all_texts): | |
| if not ITEM8_RE.search(txt or ""): | |
| continue | |
| low = _norm(txt) | |
| tocish = ("page" in low) and (DOT_LEADER_RE.search(txt or "") is not None) | |
| if tocish: | |
| candidates.append(i) | |
| return candidates[0] if candidates else None | |
| def parse_statement_index_numbers(toc_text: str) -> Dict[str, int]: | |
| """ | |
| Return internal page numbers from the index. | |
| IMPORTANT: keeps consolidated + standalone separately: | |
| key = f"{stmt}__{scope}" | |
| """ | |
| lines = [ln.strip() for ln in (toc_text or "").splitlines()] | |
| out: Dict[str, int] = {} | |
| pats = { | |
| "profit_and_loss": re.compile(r"(consolidated|standalone)\s+statements?\s+of\s+(earnings|operations|income)", re.I), | |
| "comprehensive_income": re.compile(r"(consolidated|standalone)\s+statements?\s+of\s+comprehensive\s+income", re.I), | |
| "balance_sheet": re.compile(r"(consolidated|standalone)\s+balance\s+sheets?|statement\s+of\s+financial\s+position", re.I), | |
| "equity": re.compile(r"(consolidated|standalone)\s+statements?\s+of\s+equity|stockholders[’']\s+equity|shareholders[’']\s+equity", re.I), | |
| "cash_flow": re.compile(r"(consolidated|standalone)\s+statements?\s+of\s+cash\s+flows?", re.I), | |
| "notes": re.compile(r"notes\s+to\s+(consolidated|standalone)\s+financial\s+statements", re.I), | |
| } | |
| for i, ln in enumerate(lines): | |
| if not ln: | |
| continue | |
| for stmt, pat in pats.items(): | |
| mscope = pat.search(ln) | |
| if not mscope: | |
| continue | |
| scope = (mscope.group(1) or "").strip().lower() | |
| if scope not in {"consolidated", "standalone"}: | |
| scope = "unknown" | |
| out_key = f"{stmt}__{scope}" | |
| # number at end of line | |
| m = re.findall(r"(\d{1,4})\s*$", ln) | |
| if m and ln.endswith(m[-1]): | |
| out.setdefault(out_key, int(m[-1])) | |
| continue | |
| # number on next line | |
| j = i + 1 | |
| while j < len(lines) and not lines[j]: | |
| j += 1 | |
| if j < len(lines) and re.fullmatch(r"\d{1,4}", lines[j]): | |
| out.setdefault(out_key, int(lines[j])) | |
| return out | |
| def build_internal_to_pdf_map(all_texts: Sequence[str]) -> Dict[int, int]: | |
| mapping: Dict[int, int] = {} | |
| for pdf_i, txt in enumerate(all_texts): | |
| n = extract_footer_internal_page(txt or "") | |
| if n is None: | |
| continue | |
| mapping.setdefault(n, pdf_i) | |
| return mapping | |
| def map_internal_to_pdf(internal: int, internal_to_pdf: Dict[int, int]) -> Optional[int]: | |
| if internal in internal_to_pdf: | |
| return internal_to_pdf[internal] | |
| keys = sorted(internal_to_pdf.keys()) | |
| if not keys: | |
| return None | |
| best_k = min(keys, key=lambda k: abs(k - internal)) | |
| return internal_to_pdf[best_k] + (internal - best_k) | |
| # ========================= | |
| # Scoring | |
| # ========================= | |
| def _page_stats(text: str) -> Dict[str, float]: | |
| t = text or "" | |
| low = t.lower() | |
| year_count = len(re.findall(r"\b20\d{2}\b", t)) | |
| currency_count = len(re.findall(r"[$€£]|usd|inr|eur|gbp", low)) | |
| paren_neg = len(re.findall(r"\(\s*\d", t)) | |
| integral = 1.0 if INTEGRAL_FOOTER in low else 0.0 | |
| tokens = re.findall(r"[A-Za-z]+|\d+(?:,\d{3})*(?:\.\d+)?", t) | |
| if not tokens: | |
| return dict(num_ratio=0.0, year_count=float(year_count), currency=float(currency_count), paren=float(paren_neg), integral=integral) | |
| nums = sum(1 for tok in tokens if re.fullmatch(r"\d+(?:,\d{3})*(?:\.\d+)?", tok)) | |
| alphas = sum(1 for tok in tokens if re.fullmatch(r"[A-Za-z]+", tok)) | |
| num_ratio = nums / max(1.0, nums + alphas) | |
| return dict(num_ratio=float(num_ratio), year_count=float(year_count), currency=float(currency_count), paren=float(paren_neg), integral=integral) | |
| def score_statement_page(text: str, stmt: str) -> Tuple[float, Dict[str, Any]]: | |
| low = (text or "").lower() | |
| top = (text or "")[:1200] | |
| st = _page_stats(text) | |
| reasons: Dict[str, Any] = {"title": False, "scope": "unknown", "sig_hits": [], "integral": False, "penalties": [], "stats": st} | |
| score = 0.0 | |
| ok, _, scope = detect_title_match(top, stmt) | |
| if ok: | |
| score += 60.0 | |
| reasons["title"] = True | |
| reasons["scope"] = scope | |
| else: | |
| score -= 20.0 | |
| reasons["penalties"].append("no_title(-20)") | |
| if st["integral"] > 0: | |
| score += 12.0 | |
| reasons["integral"] = True | |
| hits = 0 | |
| for term in SIG_TERMS.get(stmt, []): | |
| if term in low: | |
| hits += 1 | |
| reasons["sig_hits"].append(term) | |
| score += min(hits, 10) * 5.0 | |
| score += st["num_ratio"] * 24.0 | |
| score += min(st["year_count"], 10.0) * 1.2 | |
| score += min(st["currency"], 10.0) * 1.8 | |
| score += min(st["paren"], 10.0) * 1.0 | |
| if NOTE_HEADING_RE.search((text or "")[:220]): | |
| score -= 45.0 | |
| reasons["penalties"].append("note_heading(-45)") | |
| if DOT_LEADER_RE.search(text or ""): | |
| score -= 25.0 | |
| reasons["penalties"].append("toc_dotleaders(-25)") | |
| if reasons["title"] and st["num_ratio"] < 0.08 and st["year_count"] < 1: | |
| score -= 30.0 | |
| reasons["penalties"].append("title_without_table(-30)") | |
| if hits < 2: | |
| score -= 12.0 | |
| reasons["penalties"].append("low_sig_hits(<2)(-12)") | |
| return score, reasons | |
| def _statement_signal_no_title(text: str, stmt: str) -> float: | |
| """ | |
| Continuation-page score (no title required). Used to extend blocks forward. | |
| """ | |
| if not text: | |
| return 0.0 | |
| if NOTE_HEADING_RE.search(text[:220]): | |
| return 0.0 | |
| if DOT_LEADER_RE.search(text): | |
| return 0.0 | |
| low = text.lower() | |
| st = _page_stats(text) | |
| hits = 0 | |
| for term in SIG_TERMS.get(stmt, []): | |
| if term in low: | |
| hits += 1 | |
| score = 0.0 | |
| score += min(hits, 10) * 4.5 | |
| score += st["num_ratio"] * 26.0 | |
| score += min(st["year_count"], 10.0) * 1.1 | |
| score += min(st["currency"], 10.0) * 1.5 | |
| score += min(st["paren"], 10.0) * 0.7 | |
| if CONTINUED_RE.search(text[:240]): | |
| score += 8.0 | |
| # special: if a page has strong signature terms + years, it's often a continuation | |
| if hits >= 2 and st["year_count"] >= 1: | |
| score += 6.0 | |
| return score | |
| def _any_other_statement_title(text: str, stmt: str) -> bool: | |
| for other in TARGETS: | |
| if other == stmt: | |
| continue | |
| if detect_title(text[:1200], other): | |
| return True | |
| return False | |
| def _expand_block(all_texts: Sequence[str], stmt: str, start: int, max_forward: int = 6) -> int: | |
| """ | |
| Expand forward to include continuation pages. | |
| Stops if another statement begins (unless this stmt title repeats). | |
| """ | |
| end = start | |
| n = len(all_texts) | |
| for j in range(start + 1, min(n, start + 1 + max_forward)): | |
| txt = all_texts[j] or "" | |
| if _any_other_statement_title(txt, stmt) and not detect_title(txt[:1200], stmt): | |
| break | |
| sig = _statement_signal_no_title(txt, stmt) | |
| if sig >= 13.5: | |
| end = j | |
| continue | |
| if CONTINUED_RE.search(txt[:240]) and sig >= 8.0: | |
| end = j | |
| continue | |
| break | |
| return end | |
| def _blocks_overlap(a: Tuple[int, int], b: Tuple[int, int]) -> bool: | |
| return not (a[1] < b[0] or b[1] < a[0]) | |
| def _dedup_blocks(blocks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """ | |
| Deduplicate overlapping blocks, keeping higher 'score'. | |
| """ | |
| blocks = sorted(blocks, key=lambda x: (int(x.get("start", 10**9)), -(float(x.get("score") or 0.0)))) | |
| kept: List[Dict[str, Any]] = [] | |
| for b in blocks: | |
| r = (int(b.get("start")), int(b.get("end"))) | |
| merged = False | |
| for k in kept: | |
| kr = (int(k.get("start")), int(k.get("end"))) | |
| if _blocks_overlap(r, kr): | |
| if float(b.get("score") or 0.0) > float(k.get("score") or 0.0): | |
| k.update(b) | |
| merged = True | |
| break | |
| if not merged: | |
| kept.append(b) | |
| return kept | |
| def build_blocks_from_titles(all_texts: Sequence[str], continuation_max_forward: int = 6) -> Dict[str, List[Dict[str, Any]]]: | |
| """ | |
| Finds MULTIPLE blocks per statement (consolidated + standalone). | |
| Strategy: | |
| - find title pages for stmt | |
| - cluster nearby title hits of same scope | |
| - expand each start forward with continuation scoring | |
| """ | |
| out: Dict[str, List[Dict[str, Any]]] = {k: [] for k in TARGETS} | |
| for stmt in TARGETS: | |
| title_hits: List[Tuple[int, float, str, Optional[str]]] = [] | |
| for i, txt in enumerate(all_texts): | |
| ok, variant, scope = detect_title_match((txt or "")[:1200], stmt) | |
| if not ok: | |
| continue | |
| sc, _why = score_statement_page(txt or "", stmt) | |
| if sc < 30.0: | |
| continue | |
| title_hits.append((i, float(sc), scope, variant)) | |
| if not title_hits: | |
| continue | |
| title_hits.sort(key=lambda x: x[0]) | |
| clusters: List[List[Tuple[int, float, str, Optional[str]]]] = [] | |
| for hit in title_hits: | |
| if not clusters: | |
| clusters.append([hit]) | |
| continue | |
| last = clusters[-1][-1] | |
| # group if same scope and close | |
| if hit[2] == last[2] and hit[0] <= last[0] + 3: | |
| clusters[-1].append(hit) | |
| else: | |
| clusters.append([hit]) | |
| blocks: List[Dict[str, Any]] = [] | |
| for cl in clusters: | |
| start = min(h[0] for h in cl) | |
| best = max(cl, key=lambda x: x[1]) | |
| best_score = best[1] | |
| scope = best[2] | |
| title = best[3] | |
| end = _expand_block(all_texts, stmt, start, max_forward=continuation_max_forward) | |
| blocks.append( | |
| { | |
| "start": int(start), | |
| "end": int(end), | |
| "scope": scope, | |
| "title": title, | |
| "score": float(best_score), | |
| } | |
| ) | |
| out[stmt] = _dedup_blocks(blocks) | |
| return out | |
| # ========================= | |
| # Main builder | |
| # ========================= | |
| def build_candidate_lists( | |
| pages: Sequence[Any], | |
| page_count: int, | |
| topk_per_statement: int = 3, | |
| continuation_max_forward: int = 6, | |
| debug: bool = True, | |
| ) -> Tuple[Dict[str, List[Tuple[int, float]]], Dict[str, Any]]: | |
| """ | |
| Returns: | |
| candidates: {stmt: [(page_idx, score), ...]} | |
| debug_info: includes heuristic_blocks_0_based per stmt (list of blocks) | |
| """ | |
| all_texts = [_combined_text(p) for p in pages] | |
| debug_info: Dict[str, Any] = { | |
| "item8_toc_page": None, | |
| "toc_internal": {}, | |
| "internal_to_pdf_map_size": 0, | |
| "heuristic_blocks_0_based": {k: [] for k in TARGETS}, | |
| "top_scoring": {k: [] for k in TARGETS}, | |
| } | |
| # 1) Title-based multi-blocks (works for many non-10K PDFs too) | |
| title_blocks = build_blocks_from_titles(all_texts, continuation_max_forward=continuation_max_forward) | |
| # 2) Try 10-K Item8 TOC mapping (optional; mostly US 10-Ks) | |
| toc_blocks: Dict[str, List[Dict[str, Any]]] = {k: [] for k in TARGETS} | |
| toc_i = find_item8_toc_page(all_texts) | |
| if toc_i is not None: | |
| debug_info["item8_toc_page"] = toc_i | |
| toc_text = all_texts[toc_i] or "" | |
| toc_internal = parse_statement_index_numbers(toc_text) | |
| debug_info["toc_internal"] = toc_internal | |
| internal_to_pdf = build_internal_to_pdf_map(all_texts) | |
| debug_info["internal_to_pdf_map_size"] = len(internal_to_pdf) | |
| # convert internal -> pdf | |
| for key_scoped, internal_page in toc_internal.items(): | |
| if "__" not in key_scoped: | |
| continue | |
| stmt, scope = key_scoped.split("__", 1) | |
| if stmt not in TARGETS: | |
| continue | |
| start_pdf = map_internal_to_pdf(internal_page, internal_to_pdf) | |
| if start_pdf is None: | |
| continue | |
| # expand a block from TOC-derived start | |
| end_pdf = _expand_block(all_texts, stmt, start_pdf, max_forward=continuation_max_forward) | |
| toc_blocks[stmt].append( | |
| { | |
| "start": int(start_pdf), | |
| "end": int(end_pdf), | |
| "scope": scope if scope in {"consolidated", "standalone"} else "unknown", | |
| "title": None, | |
| "score": 55.0, # heuristic | |
| } | |
| ) | |
| for stmt in TARGETS: | |
| toc_blocks[stmt] = _dedup_blocks(toc_blocks[stmt]) | |
| # merge blocks | |
| merged_blocks: Dict[str, List[Dict[str, Any]]] = {} | |
| for stmt in TARGETS: | |
| merged_blocks[stmt] = _dedup_blocks((title_blocks.get(stmt) or []) + (toc_blocks.get(stmt) or [])) | |
| # keep only top N blocks by score, but keep distinct scope if possible | |
| bl = sorted(merged_blocks[stmt], key=lambda b: float(b.get("score") or 0.0), reverse=True) | |
| chosen: List[Dict[str, Any]] = [] | |
| seen_scope = set() | |
| for b in bl: | |
| scope = (b.get("scope") or "unknown") | |
| if scope in seen_scope and len(bl) > 1: | |
| continue | |
| chosen.append(b) | |
| seen_scope.add(scope) | |
| if len(chosen) >= 4: # internal cap, actual final cap comes from settings in main | |
| break | |
| merged_blocks[stmt] = sorted(chosen, key=lambda b: (int(b["start"]), int(b["end"]))) | |
| debug_info["heuristic_blocks_0_based"] = merged_blocks | |
| # 3) Strong per-page scoring candidates (fallback / also helpful for LLM page picking) | |
| candidates: Dict[str, List[Tuple[int, float]]] = {k: [] for k in TARGETS} | |
| reasons_store: Dict[str, Dict[int, Any]] = {k: {} for k in TARGETS} | |
| for i, txt in enumerate(all_texts): | |
| for stmt in TARGETS: | |
| sc, why = score_statement_page(txt or "", stmt) | |
| if sc > 0: | |
| candidates[stmt].append((i, float(sc))) | |
| if debug and (why.get("title") or sc > 80): | |
| reasons_store[stmt][i] = why | |
| for stmt in TARGETS: | |
| candidates[stmt].sort(key=lambda x: x[1], reverse=True) | |
| debug_info["top_scoring"][stmt] = candidates[stmt][: min(len(candidates[stmt]), 10)] | |
| candidates[stmt] = candidates[stmt][:topk_per_statement] | |
| debug_info[f"reasons_{stmt}"] = reasons_store[stmt] | |
| return candidates, debug_info | |
| def select_pages_for_llm( | |
| candidates: Dict[str, List[Tuple[int, float]]], | |
| debug_info: Dict[str, Any], | |
| page_count: int, | |
| max_images: int, | |
| max_blocks_per_statement: int = 2, | |
| ) -> List[int]: | |
| """ | |
| Prefer multi-block heuristic pages (include BOTH consolidated + standalone if found). | |
| Else fallback to top candidates + neighbors. | |
| """ | |
| picked: List[int] = [] | |
| seen = set() | |
| def add(p: int): | |
| if 0 <= p < page_count and p not in seen and len(picked) < max_images: | |
| seen.add(p) | |
| picked.append(p) | |
| blocks_by_stmt = debug_info.get("heuristic_blocks_0_based") or {} | |
| if isinstance(blocks_by_stmt, dict) and any(blocks_by_stmt.get(k) for k in TARGETS): | |
| for stmt in ["profit_and_loss", "balance_sheet", "cash_flow"]: | |
| bl = blocks_by_stmt.get(stmt) or [] | |
| if not isinstance(bl, list) or not bl: | |
| continue | |
| # pick top blocks, prefer distinct scopes | |
| bl_sorted = sorted(bl, key=lambda b: float(b.get("score") or 0.0), reverse=True) | |
| chosen: List[Dict[str, Any]] = [] | |
| seen_scope = set() | |
| for b in bl_sorted: | |
| scope = (b.get("scope") or "unknown") | |
| if scope in seen_scope and len(bl_sorted) > 1: | |
| continue | |
| chosen.append(b) | |
| seen_scope.add(scope) | |
| if len(chosen) >= max_blocks_per_statement: | |
| break | |
| for b in chosen: | |
| s, e = int(b.get("start")), int(b.get("end")) | |
| for p in range(s, e + 1): | |
| add(p) | |
| add(s - 1) | |
| add(e + 1) | |
| return sorted(picked) | |
| # fallback: use top candidates | |
| for stmt in ["profit_and_loss", "balance_sheet", "cash_flow"]: | |
| for (p, _sc) in candidates.get(stmt, [])[:2]: | |
| add(p) | |
| add(p - 1) | |
| add(p + 1) | |
| return sorted(picked) | |