| import shutil
|
| import tempfile
|
| import time
|
| import concurrent.futures
|
| from typing import List, Dict, Any, Tuple
|
| from concurrent.futures import ThreadPoolExecutor, as_completed
|
| import logging
|
| import os
|
|
|
| from app.config.settings import DOCS_FOLDER
|
| from app.document_processing.extractors import DocumentProcessorAdapter
|
| from app.retrieval.vector_store import Retriever
|
| from app.summarization.summarizer import DocumentSummarizer
|
| from app.summarization.output import SummaryOutputManager
|
| from app.utils.performance import timeit
|
| from app.utils.logger import setup_logger
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
| TEMP_UPLOAD_DIR = os.path.join("/tmp", "temp_uploads")
|
| os.makedirs(TEMP_UPLOAD_DIR, exist_ok=True)
|
|
|
| def clear_upload_directory():
|
| """Clears all files from the persistent temporary upload directory."""
|
| if os.path.exists(TEMP_UPLOAD_DIR):
|
| logger.info(f"Clearing temporary upload directory: {TEMP_UPLOAD_DIR}")
|
|
|
| for item in os.listdir(TEMP_UPLOAD_DIR):
|
| item_path = os.path.join(TEMP_UPLOAD_DIR, item)
|
| try:
|
|
|
| if os.path.isfile(item_path) or os.path.islink(item_path):
|
| os.unlink(item_path)
|
| logger.debug(f"Deleted file/link: {item_path}")
|
|
|
| elif os.path.isdir(item_path):
|
| shutil.rmtree(item_path)
|
| logger.debug(f"Deleted directory: {item_path}")
|
| except Exception as e:
|
| logger.error(f"Error deleting {item_path}: {e}", exc_info=True)
|
| else:
|
| logger.info(f"Temporary upload directory does not exist, no need to clear: {TEMP_UPLOAD_DIR}")
|
|
|
|
|
|
|
| def process_uploaded_files(uploaded_files) -> List[Dict[str, Any]]:
|
| start_time = time.time()
|
| logger.info(f"Starting processing for {len(uploaded_files)} uploaded files.")
|
| with tempfile.TemporaryDirectory() as tmpdir:
|
| for uploaded_file in uploaded_files:
|
| file_path = os.path.join(tmpdir, uploaded_file.name)
|
| with open(file_path, "wb") as f:
|
| f.write(uploaded_file.getvalue())
|
|
|
| processor = DocumentProcessorAdapter()
|
| extraction_results = processor.process_folder(tmpdir)
|
|
|
| end_time = time.time()
|
| logger.info(f"Finished processing uploaded files in {end_time - start_time:.2f} seconds.")
|
| return extraction_results
|
|
|
|
|
| @timeit
|
| def process_documents(directory=None) -> List[Dict[str, Any]]:
|
| """Extract and preprocess documents from the configured folder."""
|
| processor = DocumentProcessorAdapter()
|
| if directory:
|
| return processor.process_folder(directory)
|
| return processor.process_folder(DOCS_FOLDER)
|
|
|
|
|
| def setup_retrieval_system(doc_data: Dict[str, Any]) -> Tuple[Dict[str, Any], Retriever]:
|
| """Initialize the retriever with document data."""
|
| retriever = Retriever()
|
| updated_doc_data = retriever.create_from_documents(doc_data)
|
| return updated_doc_data, retriever
|
|
|
|
|
| def process_single_document(doc_data: Dict[str, Any], max_workers: int = 4) -> List[Dict[str, Any]]:
|
| """Process a single document and generate its summary components."""
|
| try:
|
| doc_data, retriever = setup_retrieval_system(doc_data)
|
|
|
| summarizer = DocumentSummarizer(retriever, max_workers=max_workers)
|
|
|
| components = summarizer.generate_summarizer_components(
|
| filename=doc_data.get("filename"),
|
| language=doc_data.get("language", "en"),
|
| chunk_size=doc_data.get("chunk_size", 1000),
|
| document_text=doc_data.get("text", '')[:1000]
|
| )
|
| return components
|
| except Exception as e:
|
| logger.error(f"Failed to summarize {doc_data.get('filename')}: {str(e)}")
|
| return []
|
|
|
|
|
| @timeit
|
| def batch_summarize_documents(extraction_results: List[Dict[str, Any]],
|
| max_workers: int = None) -> List[Dict[str, Any]]:
|
| """
|
| Generate summary components with stream generators for all documents in parallel.
|
|
|
| Args:
|
| extraction_results: List of document data dictionaries
|
| max_workers: Maximum number of worker threads (defaults to CPU count)
|
|
|
| Returns:
|
| List of summary component dictionaries with stream generators
|
| """
|
|
|
| if max_workers is None:
|
| max_workers = min(os.cpu_count() or 4, 8)
|
|
|
|
|
| doc_count = len(extraction_results)
|
| doc_workers = max(1, min(4, max_workers // max(1, doc_count)))
|
|
|
| logger.info(f"Processing {doc_count} documents with {max_workers} total workers "
|
| f"({doc_workers} workers per document)")
|
|
|
| summary_component_streams = []
|
|
|
|
|
| with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
|
| futures = {
|
| executor.submit(process_single_document, doc_data, doc_workers): doc_data.get('filename', 'unknown')
|
| for doc_data in extraction_results
|
| }
|
|
|
|
|
| for future in as_completed(futures):
|
| doc_name = futures[future]
|
| try:
|
| components = future.result()
|
| if components:
|
| summary_component_streams.extend(components)
|
| logger.info(f"Successfully processed components for '{doc_name}'")
|
| else:
|
| logger.warning(f"No components generated for '{doc_name}'")
|
| except Exception as e:
|
| logger.error(f"Error processing '{doc_name}': {str(e)}")
|
|
|
| logger.info(f"Generated {len(summary_component_streams)} total summary components")
|
| return summary_component_streams
|
|
|
|
|
| def consume_stream(stream_data: Dict[str, Any]) -> Tuple[str, str]:
|
| """Consume a single streaming generator and return the result."""
|
| file_id = f"{stream_data['filename']}-{stream_data['comp_name']}"
|
| component_type = stream_data['comp_name']
|
|
|
| try:
|
| stream_generator = stream_data[component_type]
|
| content_buffer = []
|
|
|
| logger.info(f"Processing stream for {file_id}")
|
| print(f"\n{'=' * 50}\nProcessing: {file_id}\n")
|
|
|
|
|
| if component_type == 'resource_link':
|
| for event in stream_generator:
|
| content_buffer.append(str(event))
|
| else:
|
|
|
| for event in stream_generator:
|
| if event.type == "content-delta":
|
| delta_text = event.delta.message.content.text
|
| content_buffer.append(delta_text)
|
| print(delta_text, end="", flush=True)
|
|
|
| print(f"\n{'=' * 50}")
|
| return file_id, "success"
|
| except Exception as e:
|
| logger.error(f"Error processing stream {file_id}: {str(e)}")
|
| return file_id, f"Error: {str(e)}"
|
|
|
|
|
| @timeit
|
| def process_stream_components(stream_components: List[Dict[str, Any]], max_workers: int = 4) -> Dict[str, str]:
|
| """Process all streaming components in parallel with controlled concurrency."""
|
| results = {}
|
|
|
| logger.info(f"Processing {len(stream_components)} summary components with {max_workers} workers")
|
|
|
|
|
| with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
| futures = {
|
| executor.submit(consume_stream, component): component
|
| for component in stream_components
|
| }
|
|
|
|
|
| for future in as_completed(futures):
|
| component_id, status = future.result()
|
| results[component_id] = status
|
|
|
| return results
|
|
|
|
|
| def display_summary_results(results: Dict[str, str]) -> None:
|
| """Display results summary in a clean format."""
|
| successful = [k for k, v in results.items() if v == 'success']
|
| failed = [(k, v) for k, v in results.items() if v != 'success']
|
|
|
| print("\n" + "=" * 60)
|
| print(f"SUMMARY: {len(successful)}/{len(results)} components successfully processed")
|
|
|
| if successful:
|
| print("\nSuccessful components:")
|
| for comp in successful:
|
| print(f" ✓ {comp}")
|
|
|
| if failed:
|
| print("\nFailed components:")
|
| for comp, error in failed:
|
| print(f" ✗ {comp}: {error}")
|
|
|
| print("=" * 60)
|
|
|
|
|
| def main():
|
| """Main execution flow for document processing and summarization."""
|
| try:
|
|
|
| setup_logger()
|
| logger.info(f"Starting document processing from: {DOCS_FOLDER}")
|
|
|
|
|
| extraction_results = process_documents()
|
| logger.info(f"Processed {len(extraction_results)} documents")
|
|
|
|
|
| cpu_count = os.cpu_count() or 4
|
| doc_count = len(extraction_results)
|
|
|
|
|
|
|
| summary_workers = min(max(2, cpu_count), 8)
|
|
|
|
|
| logger.info(f"Starting parallel streaming summarization with {summary_workers} workers")
|
| stream_components = batch_summarize_documents(
|
| extraction_results,
|
| max_workers=summary_workers
|
| )
|
|
|
|
|
|
|
| stream_workers = min(max(2, cpu_count // 2), 4)
|
| logger.info(f"Processing streams with {stream_workers} workers")
|
| results = process_stream_components(stream_components, max_workers=stream_workers)
|
|
|
|
|
| display_summary_results(results)
|
|
|
| except Exception as e:
|
| logger.critical(f"Critical error in main execution: {str(e)}")
|
| raise
|
|
|
|
|
| if __name__ == "__main__":
|
| main() |