|
|
|
|
|
import os |
|
|
import asyncio |
|
|
import tempfile |
|
|
import hashlib |
|
|
import json |
|
|
import time |
|
|
from pathlib import Path |
|
|
import pdfplumber |
|
|
import numpy as np |
|
|
from uuid import uuid4 |
|
|
import openai |
|
|
import shutil |
|
|
from typing import List, Dict, Any, Optional |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
OPENAI_KEY = os.environ.get("OPENAI_API_KEY") |
|
|
if OPENAI_KEY is None: |
|
|
raise RuntimeError("Set OPENAI_API_KEY environment variable before running.") |
|
|
|
|
|
openai.api_key = OPENAI_KEY |
|
|
|
|
|
|
|
|
def uuid4_hex(): |
|
|
from uuid import uuid4 |
|
|
return uuid4().hex |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def call_openai_chat(model: str, messages: list, temperature=0.2, max_tokens=800): |
|
|
""" |
|
|
Async wrapper for OpenAI >=1.0.0 Chat Completions |
|
|
""" |
|
|
def _call(): |
|
|
resp = openai.chat.completions.create( |
|
|
model=model, |
|
|
messages=messages, |
|
|
temperature=temperature, |
|
|
max_tokens=max_tokens, |
|
|
) |
|
|
return resp.choices[0].message.content.strip() |
|
|
return await asyncio.to_thread(_call) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_pdf_text(path: str) -> str: |
|
|
"""Extract comprehensive content from PDF using pdfplumber""" |
|
|
content = [] |
|
|
with pdfplumber.open(path) as pdf: |
|
|
for page_num, page in enumerate(pdf.pages, 1): |
|
|
page_content = [] |
|
|
|
|
|
|
|
|
text = page.extract_text() |
|
|
if text: |
|
|
page_content.append(f"=== PAGE {page_num} TEXT ===") |
|
|
page_content.append(text) |
|
|
|
|
|
|
|
|
tables = page.extract_tables() |
|
|
if tables: |
|
|
page_content.append(f"\n=== PAGE {page_num} TABLES ===") |
|
|
for table_num, table in enumerate(tables, 1): |
|
|
page_content.append(f"\n--- TABLE {table_num} ---") |
|
|
for row in table: |
|
|
if row: |
|
|
|
|
|
clean_row = [cell.strip() if cell else "" for cell in row] |
|
|
page_content.append(" | ".join(clean_row)) |
|
|
|
|
|
|
|
|
images = page.images |
|
|
if images: |
|
|
page_content.append(f"\n=== PAGE {page_num} IMAGES ===") |
|
|
for img_num, img in enumerate(images, 1): |
|
|
page_content.append(f"Image {img_num}: {img.get('width', 'unknown')}x{img.get('height', 'unknown')} pixels") |
|
|
|
|
|
|
|
|
page_content.append(f"\n=== PAGE {page_num} METADATA ===") |
|
|
page_content.append(f"Page size: {page.width}x{page.height}") |
|
|
page_content.append(f"Rotation: {page.rotation}") |
|
|
|
|
|
if page_content: |
|
|
content.append("\n".join(page_content)) |
|
|
|
|
|
return "\n\n".join(content) |
|
|
|
|
|
def save_text_as_file(text: str, suffix=".txt") -> str: |
|
|
"""Save text to a temporary file""" |
|
|
fp = Path(tempfile.gettempdir()) / f"analysis_{uuid4().hex}{suffix}" |
|
|
fp.write_text(text, encoding="utf-8") |
|
|
return str(fp) |
|
|
|
|
|
def save_uploaded_file(uploaded) -> str: |
|
|
""" |
|
|
Save uploaded file to temporary location |
|
|
""" |
|
|
dst = Path(tempfile.gettempdir()) / f"upload_{uuid4().hex}.pdf" |
|
|
with open(dst, "wb") as f: |
|
|
shutil.copyfileobj(uploaded, f) |
|
|
return str(dst) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def chunk_text(text: str, chunk_size: int = 15000, overlap: int = 1000) -> List[str]: |
|
|
""" |
|
|
Split text into overlapping chunks for processing large documents |
|
|
""" |
|
|
if len(text) <= chunk_size: |
|
|
return [text] |
|
|
|
|
|
chunks = [] |
|
|
start = 0 |
|
|
|
|
|
while start < len(text): |
|
|
end = start + chunk_size |
|
|
|
|
|
|
|
|
if end < len(text): |
|
|
|
|
|
search_start = max(start, end - 200) |
|
|
sentence_end = text.rfind('.', search_start, end) |
|
|
if sentence_end > search_start: |
|
|
end = sentence_end + 1 |
|
|
|
|
|
chunk = text[start:end].strip() |
|
|
if chunk: |
|
|
chunks.append(chunk) |
|
|
|
|
|
|
|
|
start = end - overlap |
|
|
if start >= len(text): |
|
|
break |
|
|
|
|
|
return chunks |
|
|
|
|
|
|
|
|
def get_file_hash(file_path: str) -> str: |
|
|
"""Generate hash for file caching""" |
|
|
with open(file_path, 'rb') as f: |
|
|
return hashlib.md5(f.read()).hexdigest() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def estimate_tokens(text: str) -> int: |
|
|
"""Rough estimation of token count (1 token ≈ 4 characters for English)""" |
|
|
return len(text) // 4 |
|
|
|
|
|
def is_within_token_limit(text: str, max_tokens: int = 6000) -> bool: |
|
|
"""Check if text is within token limit for API calls""" |
|
|
return estimate_tokens(text) <= max_tokens |
|
|
|
|
|
def truncate_to_token_limit(text: str, max_tokens: int = 6000) -> str: |
|
|
"""Truncate text to fit within token limit""" |
|
|
if is_within_token_limit(text, max_tokens): |
|
|
return text |
|
|
|
|
|
|
|
|
char_limit = max_tokens * 4 |
|
|
return text[:char_limit] + "\n\n[Content truncated due to length...]" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def create_hierarchical_summary(chunk_results: List[str], prompt: str, model: str, max_tokens: int = 6000) -> str: |
|
|
"""Create a summary using hierarchical approach to avoid token limits""" |
|
|
|
|
|
|
|
|
intermediate_summaries = [] |
|
|
group_size = 3 |
|
|
|
|
|
for i in range(0, len(chunk_results), group_size): |
|
|
group = chunk_results[i:i + group_size] |
|
|
group_text = "\n\n".join(group) |
|
|
|
|
|
|
|
|
if not is_within_token_limit(group_text, max_tokens): |
|
|
group_text = truncate_to_token_limit(group_text, max_tokens) |
|
|
|
|
|
group_prompt = f"Summarize the following chunk analyses, focusing on key insights and findings:\n\n{group_text}" |
|
|
|
|
|
try: |
|
|
summary = await call_openai_chat( |
|
|
model=model, |
|
|
messages=[ |
|
|
{"role": "system", "content": "You are an expert analyst creating sophisticated summaries. Focus on:\n- Identifying strategic opportunities and competitive advantages\n- Extracting specific, actionable insights with real-world applications\n- Highlighting unique value propositions and market implications\n- Connecting insights to broader business themes and opportunities\n- Providing concrete examples and implementation considerations"}, |
|
|
{"role": "user", "content": group_prompt} |
|
|
], |
|
|
temperature=0.2, |
|
|
max_tokens=800 |
|
|
) |
|
|
intermediate_summaries.append(f"Group {i//group_size + 1} Summary:\n{summary}") |
|
|
except Exception as e: |
|
|
intermediate_summaries.append(f"Group {i//group_size + 1} Summary:\nError: {str(e)}") |
|
|
|
|
|
|
|
|
if len(intermediate_summaries) == 1: |
|
|
return intermediate_summaries[0] |
|
|
|
|
|
final_text = "\n\n".join(intermediate_summaries) |
|
|
|
|
|
|
|
|
if not is_within_token_limit(final_text, max_tokens): |
|
|
final_text = truncate_to_token_limit(final_text, max_tokens) |
|
|
|
|
|
final_prompt = f"Create a comprehensive final summary based on the following intermediate summaries. Original prompt: {prompt}\n\n{final_text}" |
|
|
|
|
|
try: |
|
|
final_summary = await call_openai_chat( |
|
|
model=model, |
|
|
messages=[ |
|
|
{"role": "system", "content": "You are a strategic business analyst creating comprehensive, actionable insights. Your final summary should:\n- Synthesize insights into a coherent strategic narrative\n- Prioritize opportunities by potential impact and feasibility\n- Provide specific, actionable recommendations with clear next steps\n- Include quantifiable insights where possible (market size, ROI, timelines)\n- Address implementation challenges and mitigation strategies\n- Connect all insights to create a unified strategic vision\n- Focus on what matters most for business success"}, |
|
|
{"role": "user", "content": final_prompt} |
|
|
], |
|
|
temperature=0.2, |
|
|
max_tokens=1000 |
|
|
) |
|
|
return final_summary |
|
|
except Exception as e: |
|
|
return f"Error creating final summary: {str(e)}\n\nIntermediate summaries:\n{final_text}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CACHE_DIR = Path(tempfile.gettempdir()) / "pdf_analysis_cache" |
|
|
CACHE_DIR.mkdir(exist_ok=True) |
|
|
|
|
|
def get_cached_analysis(file_path: str, prompt: str) -> Optional[Dict[str, Any]]: |
|
|
"""Retrieve cached analysis if available - exact prompt match""" |
|
|
file_hash = get_file_hash(file_path) |
|
|
prompt_hash = hashlib.md5(prompt.encode()).hexdigest() |
|
|
cache_file = CACHE_DIR / f"{file_hash}_{prompt_hash}.json" |
|
|
|
|
|
if cache_file.exists(): |
|
|
try: |
|
|
with open(cache_file, 'r', encoding='utf-8') as f: |
|
|
cache_data = json.load(f) |
|
|
|
|
|
if (cache_data.get('file_hash') == file_hash and |
|
|
cache_data.get('prompt_hash') == prompt_hash and |
|
|
time.time() - cache_data.get('cached_at', 0) < 86400): |
|
|
return cache_data.get('analysis') |
|
|
except Exception: |
|
|
pass |
|
|
return None |
|
|
|
|
|
def get_cached_document_content(file_path: str) -> Optional[str]: |
|
|
"""Retrieve cached document content for any prompt - document-only match""" |
|
|
file_hash = get_file_hash(file_path) |
|
|
cache_file = CACHE_DIR / f"{file_hash}_content.json" |
|
|
|
|
|
if cache_file.exists(): |
|
|
try: |
|
|
with open(cache_file, 'r', encoding='utf-8') as f: |
|
|
cache_data = json.load(f) |
|
|
|
|
|
if (cache_data.get('file_hash') == file_hash and |
|
|
time.time() - cache_data.get('cached_at', 0) < 86400): |
|
|
return cache_data.get('content') |
|
|
except Exception: |
|
|
pass |
|
|
return None |
|
|
|
|
|
def cache_analysis(file_path: str, prompt: str, analysis: Dict[str, Any]) -> None: |
|
|
"""Cache analysis results for future use""" |
|
|
file_hash = get_file_hash(file_path) |
|
|
prompt_hash = hashlib.md5(prompt.encode()).hexdigest() |
|
|
cache_file = CACHE_DIR / f"{file_hash}_{prompt_hash}.json" |
|
|
|
|
|
try: |
|
|
cache_data = { |
|
|
'file_hash': file_hash, |
|
|
'prompt_hash': prompt_hash, |
|
|
'analysis': analysis, |
|
|
'cached_at': time.time() |
|
|
} |
|
|
with open(cache_file, 'w', encoding='utf-8') as f: |
|
|
json.dump(cache_data, f, ensure_ascii=False) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def cache_document_content(file_path: str, content: str) -> None: |
|
|
"""Cache document content for reuse with any prompt""" |
|
|
file_hash = get_file_hash(file_path) |
|
|
cache_file = CACHE_DIR / f"{file_hash}_content.json" |
|
|
|
|
|
try: |
|
|
cache_data = { |
|
|
'file_hash': file_hash, |
|
|
'content': content, |
|
|
'cached_at': time.time() |
|
|
} |
|
|
with open(cache_file, 'w', encoding='utf-8') as f: |
|
|
json.dump(cache_data, f, ensure_ascii=False) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def get_cached_text(file_path: str) -> Optional[str]: |
|
|
"""Retrieve cached PDF text if available""" |
|
|
file_hash = get_file_hash(file_path) |
|
|
cache_file = CACHE_DIR / f"{file_hash}_text.json" |
|
|
|
|
|
if cache_file.exists(): |
|
|
try: |
|
|
with open(cache_file, 'r', encoding='utf-8') as f: |
|
|
cache_data = json.load(f) |
|
|
|
|
|
if cache_data.get('file_hash') == file_hash: |
|
|
return cache_data.get('text') |
|
|
except Exception: |
|
|
pass |
|
|
return None |
|
|
|
|
|
def cache_text(file_path: str, text: str) -> None: |
|
|
"""Cache PDF text for future use""" |
|
|
file_hash = get_file_hash(file_path) |
|
|
cache_file = CACHE_DIR / f"{file_hash}_text.json" |
|
|
|
|
|
try: |
|
|
cache_data = { |
|
|
'file_hash': file_hash, |
|
|
'text': text, |
|
|
'cached_at': time.time() |
|
|
} |
|
|
with open(cache_file, 'w', encoding='utf-8') as f: |
|
|
json.dump(cache_data, f, ensure_ascii=False) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def load_pdf_text_cached(path: str) -> str: |
|
|
"""Load PDF text with caching support""" |
|
|
|
|
|
cached_text = get_cached_text(path) |
|
|
if cached_text: |
|
|
return cached_text |
|
|
|
|
|
|
|
|
text = load_pdf_text(path) |
|
|
|
|
|
|
|
|
cache_text(path, text) |
|
|
|
|
|
return text |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_pdf_text_chunked(path: str, chunk_size: int = 15000) -> List[str]: |
|
|
"""Load PDF text and return as chunks for large documents""" |
|
|
text = load_pdf_text_cached(path) |
|
|
return chunk_text(text, chunk_size) |
|
|
|
|
|
def get_document_metadata(path: str) -> Dict[str, Any]: |
|
|
"""Extract basic metadata from PDF""" |
|
|
try: |
|
|
with pdfplumber.open(path) as pdf: |
|
|
return { |
|
|
'page_count': len(pdf.pages), |
|
|
'file_size': Path(path).stat().st_size, |
|
|
'extracted_at': time.time() |
|
|
} |
|
|
except Exception: |
|
|
return {'page_count': 0, 'file_size': 0, 'extracted_at': time.time()} |
|
|
|