Spaces:
Running on T4
Running on T4
| """Post-processing functions and regex patterns for markdown cleanup.""" | |
| import re | |
| _STANDALONE_DATE = re.compile( | |
| r"^\s*(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday),\s+" | |
| r"(?:January|February|March|April|May|June|July|August|September|" | |
| r"October|November|December)\s+\d{1,2},\s+\d{4}\s*$", | |
| re.MULTILINE, | |
| ) | |
| _STANDALONE_TIME = re.compile(r"^\s*\d{1,2}:\d{2}\s*(?:AM|PM)\s*$", re.MULTILINE) | |
| _PAGE_FOOTER = re.compile( | |
| r"^\s*\d{1,3}\s*\|?\s*\d{2,5}\s+\w.*(?:Rd|St|Ave|Blvd|Dr|Ln|Way|Ct)\b.*\d{5}.*$", | |
| re.MULTILINE, | |
| ) | |
| _STANDALONE_PAGE_NUM = re.compile(r"^\s*\d{1,3}\s*$", re.MULTILINE) | |
| _BRANDING_FOOTER = re.compile(r"^\s*[A-Za-z][^|]{5,}\|[^|]+\|?\s*\d{1,3}\s*$", re.MULTILINE) | |
| _SHORT_LOCATION_LINE = re.compile(r"^\s*[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*,\s*[A-Z]{2}\s*$", re.MULTILINE) | |
| _NUMBERED_SECTION = re.compile(r"^(\d{1,2})\.\s+([A-Z][A-Z\s\-/&,]+(?:\.\s*)?)") | |
| _EMPTY_TABLE_ROW = re.compile(r"^\|(?:\s*\|)+\s*$", re.MULTILINE) | |
| _TRAILING_EMPTY_CELLS = re.compile(r"(?:\s*\|\s*){2,}\s*$") | |
| _TABLE_SEP_ROW = re.compile(r"^\|[\s\-:]+(?:\|[\s\-:]+)+\|?\s*$") | |
| _LATEX_MATHRM = re.compile(r"\$\s*\\mathrm\{([^}]*)\}\s*\$") | |
| _LATEX_SUPERSCRIPT = re.compile(r"\$\s*\^?\{([^}]*)\}\s*\$") | |
| _LATEX_SUBSCRIPT = re.compile(r"\$\s*_\{([^}]*)\}\s*\$") | |
| _LATEX_PLUSMINUS = re.compile(r"\$\s*\\pm\s*([^$]*?)\s*\$") | |
| _LATEX_INLINE = re.compile(r"\$\s*([^$]{1,60}?[\\^_{}][^$]{0,60}?)\s*\$") | |
| _LATEX_ESCAPED_CHARS = re.compile(r"\\([%$&_#])") | |
| _TINY_IMAGE_DIV = re.compile(r'<div[^>]*>\s*<img[^>]*width="(\d+)%"[^>]*/>\s*</div>', re.IGNORECASE) | |
| def _post_process_merged_markdown(content: str) -> str: | |
| """Post-process merged markdown to fix extraction artifacts.""" | |
| content = _strip_latex_artifacts(content) | |
| content = _remove_tiny_image_tags(content) | |
| content = _deduplicate_headings(content) | |
| content = _deduplicate_short_blocks(content) | |
| content = _remove_page_boundary_artifacts(content) | |
| content = _normalize_numbered_headings(content) | |
| content = _clean_table_artifacts(content) | |
| content = _merge_split_tables(content) | |
| content = re.sub(r"\n{4,}", "\n\n\n", content) | |
| return content.strip() | |
| def _strip_latex_artifacts(content: str) -> str: | |
| def _clean_mathrm(match: re.Match) -> str: | |
| return match.group(1).replace("~", "").strip() | |
| content = _LATEX_MATHRM.sub(_clean_mathrm, content) | |
| content = _LATEX_PLUSMINUS.sub(lambda m: "±" + m.group(1), content) | |
| content = _LATEX_SUPERSCRIPT.sub(lambda m: m.group(1), content) | |
| content = _LATEX_SUBSCRIPT.sub(lambda m: m.group(1), content) | |
| content = _LATEX_ESCAPED_CHARS.sub(lambda m: m.group(1), content) | |
| content = _LATEX_INLINE.sub(lambda m: m.group(1).strip(), content) | |
| return content | |
| def _remove_tiny_image_tags(content: str) -> str: | |
| def _check_size(match: re.Match) -> str: | |
| return "" if int(match.group(1)) <= 10 else match.group(0) | |
| return _TINY_IMAGE_DIV.sub(_check_size, content) | |
| def _deduplicate_headings(content: str) -> str: | |
| lines = content.split("\n") | |
| seen_headings: set[str] = set() | |
| result: list[str] = [] | |
| for line in lines: | |
| stripped = line.strip() | |
| if stripped.startswith("#"): | |
| key = stripped.lstrip("#").strip().lower() | |
| if key and key in seen_headings: | |
| continue | |
| if key: | |
| seen_headings.add(key) | |
| result.append(line) | |
| return "\n".join(result) | |
| def _deduplicate_short_blocks(content: str) -> str: | |
| blocks = content.split("\n\n") | |
| seen: set[str] = set() | |
| result: list[str] = [] | |
| for block in blocks: | |
| stripped = block.strip() | |
| if not stripped: | |
| result.append(block) | |
| continue | |
| is_table = stripped.startswith("|") and "|" in stripped[1:] | |
| is_heading = stripped.startswith("#") | |
| if is_table or is_heading or len(stripped) > 120: | |
| result.append(block) | |
| continue | |
| key = stripped.lower() | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| result.append(block) | |
| return "\n\n".join(result) | |
| def _remove_page_boundary_artifacts(content: str) -> str: | |
| content = _STANDALONE_DATE.sub("", content) | |
| content = _STANDALONE_TIME.sub("", content) | |
| content = _PAGE_FOOTER.sub("", content) | |
| content = _STANDALONE_PAGE_NUM.sub("", content) | |
| content = _remove_repeated_lines(content, _BRANDING_FOOTER, min_repeats=3) | |
| content = _remove_repeated_lines(content, _SHORT_LOCATION_LINE, min_repeats=3) | |
| return content | |
| def _remove_repeated_lines(content: str, pattern: re.Pattern, min_repeats: int = 3) -> str: | |
| counts: dict[str, int] = {} | |
| for match in pattern.finditer(content): | |
| key = match.group(0).strip().lower() | |
| counts[key] = counts.get(key, 0) + 1 | |
| repeated = {k for k, v in counts.items() if v >= min_repeats} | |
| if not repeated: | |
| return content | |
| result: list[str] = [] | |
| for line in content.split("\n"): | |
| if line.strip().lower() in repeated: | |
| continue | |
| result.append(line) | |
| return "\n".join(result) | |
| def _normalize_numbered_headings(content: str) -> str: | |
| lines = content.split("\n") | |
| result: list[str] = [] | |
| sections_with_heading: set[int] = set() | |
| sections_without_heading: set[int] = set() | |
| for line in lines: | |
| stripped = line.strip() | |
| heading_match = re.match(r"^#{1,3}\s+(\d{1,2})\.\s+[A-Z]", stripped) | |
| if heading_match: | |
| sections_with_heading.add(int(heading_match.group(1))) | |
| continue | |
| plain_match = _NUMBERED_SECTION.match(stripped) | |
| if plain_match: | |
| sections_without_heading.add(int(plain_match.group(1))) | |
| if sections_with_heading and sections_without_heading: | |
| for line in lines: | |
| stripped = line.strip() | |
| plain_match = _NUMBERED_SECTION.match(stripped) | |
| if plain_match: | |
| title_end = plain_match.end() | |
| title = stripped[:title_end].rstrip(".") | |
| body = stripped[title_end:].strip() | |
| result.append(f"## {title}") | |
| if body: | |
| result.append(body) | |
| continue | |
| result.append(line) | |
| return "\n".join(result) | |
| return content | |
| def _clean_table_artifacts(content: str) -> str: | |
| lines = content.split("\n") | |
| result: list[str] = [] | |
| for line in lines: | |
| stripped = line.strip() | |
| if _EMPTY_TABLE_ROW.match(stripped): | |
| continue | |
| if stripped.startswith("|") and "|" in stripped[1:] and not _TABLE_SEP_ROW.match(stripped): | |
| result.append(_TRAILING_EMPTY_CELLS.sub(" |", stripped)) | |
| continue | |
| result.append(line) | |
| return "\n".join(result) | |
| def _is_table_line(line: str) -> bool: | |
| s = line.strip() | |
| return bool(s.startswith("|") and s.endswith("|") and s.count("|") >= 3) | |
| def _count_columns(line: str) -> int: | |
| s = line.strip() | |
| if not s.startswith("|"): | |
| return 0 | |
| return max(0, len(s.split("|")) - 2) | |
| def _merge_split_tables(content: str) -> str: | |
| lines = content.split("\n") | |
| result: list[str] = [] | |
| i = 0 | |
| while i < len(lines): | |
| result.append(lines[i]) | |
| i += 1 | |
| if not _is_table_line(result[-1]): | |
| continue | |
| last_table_cols = _count_columns(result[-1]) | |
| if last_table_cols < 2: | |
| continue | |
| j = i | |
| while j < len(lines) and lines[j].strip() == "": | |
| j += 1 | |
| if j >= len(lines) or not _is_table_line(lines[j]): | |
| continue | |
| next_table_cols = _count_columns(lines[j]) | |
| if next_table_cols < 2: | |
| continue | |
| ratio = min(last_table_cols, next_table_cols) / max(last_table_cols, next_table_cols) | |
| if ratio < 0.7: | |
| continue | |
| has_new_header = False | |
| for k in range(j + 1, min(j + 3, len(lines))): | |
| if _TABLE_SEP_ROW.match(lines[k].strip()): | |
| has_new_header = True | |
| break | |
| if has_new_header: | |
| skip_to = j | |
| while skip_to < len(lines): | |
| if _TABLE_SEP_ROW.match(lines[skip_to].strip()): | |
| skip_to += 1 | |
| break | |
| skip_to += 1 | |
| i = skip_to | |
| else: | |
| i = j | |
| return "\n".join(result) | |