Spaces:
Sleeping
Sleeping
| import os | |
| import nbformat | |
| from typing import List, Dict, Any, Tuple | |
| def _get_run_manager(): | |
| """Get run manager if available, otherwise return None.""" | |
| try: | |
| from .run_manager import get_run_manager | |
| return get_run_manager() | |
| except: | |
| return None | |
| class ContentProcessor: | |
| """Processes content from .vtt, .srt, .ipynb, and .md files.""" | |
| def __init__(self): | |
| """Initialize the ContentProcessor.""" | |
| self.file_contents = [] | |
| self.run_manager = _get_run_manager() | |
| def process_file(self, file_path: str) -> List[str]: | |
| """ | |
| Process a file based on its extension and return the content. | |
| Args: | |
| file_path: Path to the file to process | |
| Returns: | |
| List containing the file content with source tags | |
| """ | |
| _, ext = os.path.splitext(file_path) | |
| if ext.lower() in ['.vtt', '.srt']: | |
| return self._process_subtitle_file(file_path) | |
| elif ext.lower() == '.ipynb': | |
| return self._process_notebook_file(file_path) | |
| elif ext.lower() == '.md': | |
| return self._process_markdown_file(file_path) | |
| else: | |
| raise ValueError(f"Unsupported file type: {ext}") | |
| def _process_subtitle_file(self, file_path: str) -> List[str]: | |
| """Process a subtitle file (.vtt or .srt).""" | |
| try: | |
| filename = os.path.basename(file_path) | |
| if self.run_manager: | |
| self.run_manager.log(f"Found source file: {filename}", level="DEBUG") | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| # Simple processing for subtitle files | |
| # Remove timestamp lines and other metadata | |
| lines = content.split('\n') | |
| text_content = [] | |
| for line in lines: | |
| # Skip empty lines, timestamp lines, and subtitle numbers | |
| if (line.strip() and | |
| not line.strip().isdigit() and | |
| not '-->' in line and | |
| not line.strip().startswith('WEBVTT')): | |
| text_content.append(line.strip()) | |
| # Combine all text into a single content string | |
| combined_text = "\n".join(text_content) | |
| # Add XML source tags at the beginning and end of the content | |
| tagged_content = f"<source file='{filename}'>\n{combined_text}\n</source>" | |
| return [tagged_content] | |
| except Exception as e: | |
| if self.run_manager: | |
| self.run_manager.log(f"Error processing subtitle file {file_path}: {e}", level="ERROR") | |
| return [] | |
| def _process_markdown_file(self, file_path: str) -> List[str]: | |
| """Process a Markdown file (.md).""" | |
| try: | |
| filename = os.path.basename(file_path) | |
| if self.run_manager: | |
| self.run_manager.log(f"Found source file: {filename}", level="DEBUG") | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| # Add XML source tags at the beginning and end of the content | |
| tagged_content = f"<source file='{filename}'>\n{content}\n</source>" | |
| return [tagged_content] | |
| except Exception as e: | |
| if self.run_manager: | |
| self.run_manager.log(f"Error processing markdown file {file_path}: {e}", level="ERROR") | |
| return [] | |
| def _process_notebook_file(self, file_path: str) -> List[str]: | |
| """Process a Jupyter notebook file (.ipynb).""" | |
| try: | |
| filename = os.path.basename(file_path) | |
| if self.run_manager: | |
| self.run_manager.log(f"Found source file: {filename}", level="DEBUG") | |
| # First check if the file is valid JSON | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| import json | |
| # Try to parse as JSON first | |
| json.load(f) | |
| except json.JSONDecodeError as json_err: | |
| if self.run_manager: | |
| self.run_manager.log(f"File {file_path} is not valid JSON: {json_err}", level="DEBUG") | |
| # If it's not valid JSON, add it as plain text with a source tag | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| tagged_content = f"<source file='{filename}'>\n```\n{content}\n```\n</source>" | |
| return [tagged_content] | |
| # If we get here, the file is valid JSON, try to parse as notebook | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| notebook = nbformat.read(f, as_version=4) | |
| # Extract text from markdown and code cells | |
| content_parts = [] | |
| for cell in notebook.cells: | |
| if cell.cell_type == 'markdown': | |
| content_parts.append(f"[Markdown]\n{cell.source}") | |
| elif cell.cell_type == 'code': | |
| content_parts.append(f"[Code]\n```python\n{cell.source}\n```") | |
| # # Include output if present | |
| # if hasattr(cell, 'outputs') and cell.outputs: | |
| # for output in cell.outputs: | |
| # if 'text' in output: | |
| # content_parts.append(f"[Output]\n```\n{output.text}\n```") | |
| # elif 'data' in output and 'text/plain' in output.data: | |
| # content_parts.append(f"[Output]\n```\n{output.data['text/plain']}\n```") | |
| # Combine all content into a single string | |
| combined_content = "\n\n".join(content_parts) | |
| # Add XML source tags at the beginning and end of the content | |
| tagged_content = f"<source file='{filename}'>\n{combined_content}\n</source>" | |
| return [tagged_content] | |
| except Exception as e: | |
| if self.run_manager: | |
| self.run_manager.log(f"Error processing notebook file {file_path}: {e}", level="ERROR") | |
| # Try to extract content as plain text if notebook parsing fails | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| tagged_content = f"<source file='{filename}'>\n```\n{content}\n```\n</source>" | |
| return [tagged_content] | |
| except Exception as read_err: | |
| if self.run_manager: | |
| self.run_manager.log(f"Could not read file as text either: {read_err}", level="ERROR") | |
| return [] | |
| def process_files(self, file_paths: List[str]) -> List[str]: | |
| """ | |
| Process multiple files and combine their content. | |
| Args: | |
| file_paths: List of paths to files to process | |
| Returns: | |
| List of file contents with source tags | |
| """ | |
| all_file_contents = [] | |
| for file_path in file_paths: | |
| file_content = self.process_file(file_path) | |
| all_file_contents.extend(file_content) | |
| # Store the processed file contents | |
| self.file_contents = all_file_contents | |
| # The entire content of each file is used as context | |
| # Each file's content is wrapped in XML source tags | |
| # This approach ensures that the LLM has access to the complete context | |
| return all_file_contents | |