Spaces:
Sleeping
Sleeping
| """ | |
| Advanced Analysis Coordinator Module | |
| Provides high-level facade functions for advanced PDF accessibility features, | |
| with error handling and graceful degradation. | |
| """ | |
| from typing import Dict, List, Any, Optional, Callable | |
| from functools import wraps | |
| import pikepdf | |
| import traceback | |
| # Import feature modules | |
| from content_stream_parser import ( | |
| extract_content_stream_for_block, | |
| format_operators_markdown, | |
| format_raw_stream | |
| ) | |
| from screen_reader_sim import ( | |
| simulate_screen_reader, | |
| format_transcript | |
| ) | |
| from structure_tree import ( | |
| extract_structure_tree, | |
| format_tree_text, | |
| get_tree_statistics, | |
| format_statistics_markdown, | |
| map_blocks_to_tags, | |
| detect_visual_paragraphs, | |
| detect_semantic_paragraphs, | |
| compare_paragraphs | |
| ) | |
| def require_structure_tree(func: Callable) -> Callable: | |
| """ | |
| Decorator to check for structure tree before executing function. | |
| Functions decorated with this will return an error message if the PDF | |
| does not have a tagged structure tree. | |
| """ | |
| def wrapper(pdf_path: str, *args, **kwargs): | |
| try: | |
| with pikepdf.open(pdf_path) as pdf: | |
| if '/StructTreeRoot' not in pdf.Root: | |
| return { | |
| 'error': True, | |
| 'message': '## No Structure Tree Found\n\n' | |
| 'This PDF does not have a tagged structure tree. ' | |
| 'This feature requires a tagged PDF.\n\n' | |
| '**What this means**: The PDF was not created with ' | |
| 'accessibility tagging, so semantic structure information ' | |
| '(headings, paragraphs, alt text) is not available.\n\n' | |
| '**Recommendation**: Use authoring tools that support ' | |
| 'PDF/UA tagging (Adobe Acrobat, MS Word with "Save as Tagged PDF").' | |
| } | |
| except Exception as e: | |
| return { | |
| 'error': True, | |
| 'message': f'## Error\n\nCould not open PDF: {str(e)}' | |
| } | |
| return func(pdf_path, *args, **kwargs) | |
| return wrapper | |
| def safe_execute(func: Callable) -> Callable: | |
| """ | |
| Decorator for safe execution with comprehensive error handling. | |
| Catches all exceptions and returns user-friendly error messages. | |
| """ | |
| def wrapper(*args, **kwargs): | |
| try: | |
| return func(*args, **kwargs) | |
| except Exception as e: | |
| error_trace = traceback.format_exc() | |
| return { | |
| 'error': True, | |
| 'message': f'## Error\n\n{str(e)}\n\n**Details**:\n```\n{error_trace}\n```' | |
| } | |
| return wrapper | |
| # Feature 1: Content Stream Inspector | |
| def analyze_content_stream( | |
| pdf_path: str, | |
| page_index: int, | |
| block_index: int, | |
| blocks: List[Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Analyze content stream operators for a specific block. | |
| Args: | |
| pdf_path: Path to PDF file | |
| page_index: 0-based page index | |
| block_index: Index of block to analyze | |
| blocks: List of BlockInfo objects | |
| Returns: | |
| Dictionary with formatted operators and raw stream | |
| """ | |
| result = extract_content_stream_for_block(pdf_path, page_index, block_index, blocks) | |
| if 'error' in result: | |
| return { | |
| 'error': True, | |
| 'message': f"## Error\n\n{result['error']}" | |
| } | |
| return { | |
| 'error': False, | |
| 'formatted': format_operators_markdown(result), | |
| 'raw': format_raw_stream(result.get('raw_stream', '')), | |
| 'matched': result.get('matched', False) | |
| } | |
| # Feature 2: Screen Reader Simulator | |
| def analyze_screen_reader( | |
| pdf_path: str, | |
| page_index: int, | |
| blocks: List[Any], | |
| reader_type: str = "NVDA", | |
| detail_level: str = "default", | |
| order_mode: str = "tblr" | |
| ) -> Dict[str, Any]: | |
| """ | |
| Simulate screen reader output for a page. | |
| Args: | |
| pdf_path: Path to PDF file | |
| page_index: 0-based page index | |
| blocks: List of BlockInfo objects | |
| reader_type: "NVDA" or "JAWS" | |
| detail_level: "minimal", "default", or "verbose" | |
| order_mode: Reading order for untagged fallback | |
| Returns: | |
| Dictionary with transcript and analysis | |
| """ | |
| result = simulate_screen_reader( | |
| pdf_path, page_index, blocks, reader_type, detail_level, order_mode | |
| ) | |
| return { | |
| 'error': False, | |
| 'transcript': format_transcript(result), | |
| 'analysis': result['analysis'], | |
| 'mode': result['mode'] | |
| } | |
| # Feature 3: Paragraph Detection | |
| def analyze_paragraphs( | |
| pdf_path: str, | |
| page_index: int, | |
| blocks: List[Any], | |
| vertical_gap_threshold: float = 15.0 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Compare visual and semantic paragraph detection. | |
| Args: | |
| pdf_path: Path to PDF file | |
| page_index: 0-based page index | |
| blocks: List of BlockInfo objects | |
| vertical_gap_threshold: Spacing threshold for visual paragraphs | |
| Returns: | |
| Dictionary with comparison results | |
| """ | |
| # Detect visual paragraphs | |
| visual_paragraphs = detect_visual_paragraphs(blocks, vertical_gap_threshold) | |
| # Detect semantic paragraphs | |
| semantic_paragraphs = detect_semantic_paragraphs(pdf_path, page_index) | |
| # Compare | |
| comparison = compare_paragraphs(visual_paragraphs, semantic_paragraphs) | |
| # Format mismatches | |
| mismatch_lines = [ | |
| "## Paragraph Comparison", | |
| "", | |
| f"**Visual Paragraphs Detected**: {comparison['visual_count']}", | |
| f"**Semantic <P> Tags Found**: {comparison['semantic_count']}", | |
| f"**Match Quality Score**: {comparison['match_score']:.2%}", | |
| "" | |
| ] | |
| if comparison['count_mismatch'] == 0: | |
| mismatch_lines.append("✓ Count matches between visual and semantic paragraphs") | |
| else: | |
| mismatch_lines.append(f"⚠️ Count mismatch: {comparison['count_mismatch']} difference") | |
| if comparison['visual_count'] > comparison['semantic_count']: | |
| mismatch_lines.extend([ | |
| "", | |
| "**Issue**: More visual paragraphs than semantic tags", | |
| "- Some paragraphs may be missing <P> tags", | |
| "- Screen readers may not announce paragraph boundaries properly" | |
| ]) | |
| elif comparison['semantic_count'] > comparison['visual_count']: | |
| mismatch_lines.extend([ | |
| "", | |
| "**Issue**: More semantic tags than visual paragraphs", | |
| "- Tags may not correspond to actual visual layout", | |
| "- May cause confusion for users comparing visual and audio presentation" | |
| ]) | |
| if semantic_paragraphs == 0 and visual_paragraphs: | |
| mismatch_lines.extend([ | |
| "", | |
| "❌ **No semantic tagging found**", | |
| "This page has no <P> tags. Screen readers will not announce paragraphs." | |
| ]) | |
| return { | |
| 'error': False, | |
| 'visual_count': comparison['visual_count'], | |
| 'semantic_count': comparison['semantic_count'], | |
| 'match_score': comparison['match_score'], | |
| 'mismatches': '\n'.join(mismatch_lines), | |
| 'visual_paragraphs': visual_paragraphs, | |
| 'semantic_paragraphs': semantic_paragraphs | |
| } | |
| # Feature 4: Structure Tree Visualizer | |
| def analyze_structure_tree(pdf_path: str) -> Dict[str, Any]: | |
| """ | |
| Extract and visualize the PDF structure tree. | |
| Args: | |
| pdf_path: Path to PDF file | |
| Returns: | |
| Dictionary with tree visualization and statistics | |
| """ | |
| root = extract_structure_tree(pdf_path) | |
| if not root: | |
| return { | |
| 'error': True, | |
| 'message': '## Error\n\nCould not extract structure tree' | |
| } | |
| # Generate text view | |
| text_view = format_tree_text(root, max_nodes=500) | |
| # Generate statistics | |
| stats = get_tree_statistics(root) | |
| stats_markdown = format_statistics_markdown(stats) | |
| # Generate plotly diagram | |
| plot_data = _create_tree_plot(root) | |
| return { | |
| 'error': False, | |
| 'text_view': text_view, | |
| 'statistics': stats_markdown, | |
| 'plot_data': plot_data, | |
| 'stats': stats | |
| } | |
| def _create_tree_plot(root): | |
| """ | |
| Create Plotly sunburst diagram data from structure tree. | |
| Args: | |
| root: Root StructureNode | |
| Returns: | |
| Plotly figure | |
| """ | |
| import plotly.graph_objects as go | |
| labels = [] | |
| parents = [] | |
| values = [] | |
| colors = [] | |
| # Color map for common tag types | |
| color_map = { | |
| 'Document': '#1f77b4', | |
| 'Part': '#ff7f0e', | |
| 'Sect': '#2ca02c', | |
| 'H1': '#d62728', | |
| 'H2': '#9467bd', | |
| 'H3': '#8c564b', | |
| 'H4': '#e377c2', | |
| 'H5': '#7f7f7f', | |
| 'H6': '#bcbd22', | |
| 'P': '#17becf', | |
| 'Figure': '#ff9896', | |
| 'Table': '#c5b0d5', | |
| 'L': '#c49c94', | |
| 'LI': '#f7b6d2', | |
| 'Link': '#c7c7c7', | |
| } | |
| def _traverse(node, parent_label=None): | |
| # Create unique label | |
| if node.depth == 0: | |
| label = node.tag_type | |
| else: | |
| label = f"{node.tag_type}_{len(labels)}" | |
| labels.append(label) | |
| parents.append(parent_label if parent_label else "") | |
| values.append(1) | |
| # Assign color | |
| base_tag = node.tag_type.split('_')[0] | |
| color = color_map.get(base_tag, '#d3d3d3') | |
| colors.append(color) | |
| # Process children | |
| for child in node.children: | |
| _traverse(child, label) | |
| _traverse(root) | |
| fig = go.Figure(go.Sunburst( | |
| labels=labels, | |
| parents=parents, | |
| values=values, | |
| marker=dict(colors=colors), | |
| branchvalues="total" | |
| )) | |
| fig.update_layout( | |
| title="PDF Structure Tree Hierarchy", | |
| height=600, | |
| margin=dict(t=50, l=0, r=0, b=0) | |
| ) | |
| return fig | |
| # Feature 5: Block-to-Tag Mapping | |
| def analyze_block_tag_mapping( | |
| pdf_path: str, | |
| page_index: int, | |
| blocks: List[Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Map visual blocks to structure tree tags. | |
| Args: | |
| pdf_path: Path to PDF file | |
| page_index: 0-based page index | |
| blocks: List of BlockInfo objects | |
| Returns: | |
| Dictionary with mapping table | |
| """ | |
| mappings = map_blocks_to_tags(pdf_path, page_index, blocks) | |
| if not mappings: | |
| return { | |
| 'error': False, | |
| 'mappings': [], | |
| 'message': '## No Mappings Found\n\n' | |
| 'Could not find block-to-tag correlations for this page. ' | |
| 'This may occur if:\n' | |
| '- The page has no marked content IDs (MCIDs)\n' | |
| '- The structure tree is not properly linked to content\n' | |
| '- The page uses a non-standard tagging approach' | |
| } | |
| # Format as table data | |
| table_data = [] | |
| for m in mappings: | |
| table_data.append([ | |
| str(m['block_index']), | |
| m['tag_type'], | |
| str(m['mcid']), | |
| m['alt_text'][:50] if m['alt_text'] else "" | |
| ]) | |
| return { | |
| 'error': False, | |
| 'mappings': table_data, | |
| 'count': len(mappings), | |
| 'message': f'## Block-to-Tag Mapping\n\nFound {len(mappings)} correlations' | |
| } | |
| # Utility function for creating block dropdown choices | |
| def create_block_choices(blocks: List[Any]) -> List[tuple]: | |
| """ | |
| Create dropdown choices from blocks for UI. | |
| Args: | |
| blocks: List of BlockInfo objects | |
| Returns: | |
| List of (label, value) tuples | |
| """ | |
| choices = [] | |
| for i, block in enumerate(blocks): | |
| text_preview = block.text[:50].replace('\n', ' ').strip() | |
| if len(block.text) > 50: | |
| text_preview += "..." | |
| label = f"Block {i}: {text_preview}" if text_preview else f"Block {i} [Image]" | |
| choices.append((label, i)) | |
| return choices | |