import os import re import tempfile import requests import gradio as gr from PyPDF2 import PdfReader import logging import webbrowser from huggingface_hub import InferenceClient from typing import Dict, List, Optional, Tuple import time # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Constants CONTEXT_SIZES = { "4K": 4000, "8K": 8000, "32K": 32000, "128K": 128000, "200K": 200000 } MODEL_CONTEXT_SIZES = { "OpenAI ChatGPT": 4096, "HuggingFace Inference": 4096, "Groq API": { "llama-3.1-70b-versatile": 32768, "mixtral-8x7b-32768": 32768, "llama-3.1-8b-instant": 8192 } } class ModelRegistry: def __init__(self): self.hf_models = { "Phi-3 Mini 128k": "microsoft/Phi-3-mini-128k-instruct", "Custom Model": "" } self.groq_models = self._fetch_groq_models() def _fetch_groq_models(self) -> Dict[str, str]: """Fetch available Groq models with proper error handling""" try: groq_api_key = os.getenv('GROQ_API_KEY') if not groq_api_key: logging.warning("No GROQ_API_KEY found in environment") return self._get_default_groq_models() headers = { "Authorization": f"Bearer {groq_api_key}", "Content-Type": "application/json" } response = requests.get("https://api.groq.com/openai/v1/models", headers=headers) if response.status_code == 200: models = response.json().get("data", []) return {model["id"]: model["id"] for model in models} else: logging.error(f"Failed to fetch Groq models: {response.status_code}") return self._get_default_groq_models() except Exception as e: logging.error(f"Error fetching Groq models: {e}") return self._get_default_groq_models() def _get_default_groq_models(self) -> Dict[str, str]: """Return default Groq models when API is unavailable""" return { "llama-3.1-70b-versatile": "llama-3.1-70b-versatile", "mixtral-8x7b-32768": "mixtral-8x7b-32768", "llama-3.1-8b-instant": "llama-3.1-8b-instant" } def refresh_groq_models(self) -> Dict[str, str]: """Refresh the list of available Groq models""" self.groq_models = self._fetch_groq_models() return self.groq_models # Initialize model registry model_registry = ModelRegistry() def extract_text_from_pdf(pdf_path: str) -> str: """Extract text content from PDF file.""" try: reader = PdfReader(pdf_path) text = "" for page_num, page in enumerate(reader.pages, start=1): page_text = page.extract_text() if page_text: text += page_text + "\n" else: logging.warning(f"No text found on page {page_num}.") if not text.strip(): return "Error: No extractable text found in the PDF." return text except Exception as e: logging.error(f"Error reading PDF file: {e}") return f"Error reading PDF file: {e}" def format_content(text: str, format_type: str) -> str: """Format extracted text according to specified format.""" if format_type == 'txt': return text elif format_type == 'md': paragraphs = text.split('\n\n') return '\n\n'.join(paragraphs) elif format_type == 'html': paragraphs = text.split('\n\n') return ''.join([f'
{para.strip()}
' for para in paragraphs if para.strip()]) else: logging.error(f"Unsupported format: {format_type}") return f"Unsupported format: {format_type}" def split_into_snippets(text: str, context_size: int) -> List[str]: """Split text into manageable snippets based on context size.""" sentences = re.split(r'(?<=[.!?]) +', text) snippets = [] current_snippet = "" for sentence in sentences: if len(current_snippet) + len(sentence) + 1 > context_size: if current_snippet: snippets.append(current_snippet.strip()) current_snippet = sentence + " " else: snippets.append(sentence.strip()) current_snippet = "" else: current_snippet += sentence + " " if current_snippet.strip(): snippets.append(current_snippet.strip()) return snippets def build_prompts(snippets: List[str], prompt_instruction: str, custom_prompt: Optional[str], snippet_num: Optional[int] = None) -> str: """Build formatted prompts from text snippets.""" if snippet_num is not None: if 1 <= snippet_num <= len(snippets): selected_snippets = [snippets[snippet_num - 1]] else: return f"Error: Invalid snippet number. Please choose between 1 and {len(snippets)}." else: selected_snippets = snippets prompts = [] base_prompt = custom_prompt if custom_prompt else prompt_instruction for idx, snippet in enumerate(selected_snippets, start=1): if len(selected_snippets) > 1: prompt_header = f"{base_prompt} Part {idx} of {len(selected_snippets)}: ---\n" else: prompt_header = f"{base_prompt} ---\n" framed_prompt = f"{prompt_header}{snippet}\n---" prompts.append(framed_prompt) return "\n\n".join(prompts) def send_to_model(*args, **kwargs): try: with gr.Progress() as progress: progress(0, "Preparing to send to model...") result = send_to_model_impl(*args, **kwargs) progress(1, "Complete!") return result except Exception as e: return f"Error: {str(e)}", None def send_to_model_impl(prompt, model_selection, hf_model_choice, hf_custom_model, hf_api_key, groq_model_choice, groq_api_key, openai_api_key): """Implementation of send to model functionality""" if model_selection == "HuggingFace Inference": if not hf_api_key: return "HuggingFace API key required.", [] model_id = hf_custom_model if hf_model_choice == "Custom Model" else model_registry.hf_models[hf_model_choice] summary = send_to_hf_inference(prompt, model_id, hf_api_key) elif model_selection == "Groq API": if not groq_api_key: return "Groq API key required.", [] summary = send_to_groq(prompt, groq_model_choice, groq_api_key) elif model_selection == "OpenAI ChatGPT": if not openai_api_key: return "OpenAI API key required.", [] summary = send_to_openai(prompt, openai_api_key) else: return "Invalid model selection.", [] if summary.startswith("Error"): return summary, [] # Save summary for download with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as f: f.write(summary) return summary, [f.name] def send_to_hf_inference(prompt: str, model_name: str, api_key: str) -> str: """Send prompt to HuggingFace using Inference API""" try: client = InferenceClient(token=api_key) response = client.text_generation( prompt, model=model_name, max_new_tokens=500, temperature=0.7, details=True, # Get full response details stream=False # Don't stream output ) return response.generated_text # Return just the generated text except Exception as e: logging.error(f"Error with HF inference: {e}") return f"Error with HF inference: {e}" def send_to_groq(prompt: str, model_name: str, api_key: str) -> str: """Send prompt to Groq API""" try: headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } data = { "model": model_name, "messages": [{"role": "user", "content": prompt}], "temperature": 0.7, "max_tokens": 500 } response = requests.post( "https://api.groq.com/openai/v1/chat/completions", headers=headers, json=data ) if response.status_code != 200: return f"Error: Groq API returned status {response.status_code}" response_json = response.json() if "choices" not in response_json or not response_json["choices"]: return "Error: No response from Groq API" return response_json["choices"][0]["message"]["content"] except Exception as e: logging.error(f"Error with Groq API: {e}") return f"Error with Groq API: {e}" def send_to_openai(prompt: str, api_key: str) -> str: """Send prompt to OpenAI API""" try: import openai openai.api_key = api_key response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}], temperature=0.7, max_tokens=500 ) return response.choices[0].message.content except Exception as e: logging.error(f"Error with OpenAI API: {e}") return f"Error with OpenAI API: {e}" def copy_text_js(element_id: str) -> str: return f""" () => {{ try {{ const elem = document.querySelector('#{element_id} textarea'); if (!elem) throw new Error('Element not found'); const text = elem.value; if (!text) throw new Error('No text to copy'); navigator.clipboard.writeText(text); return "Copied to clipboard!"; }} catch (e) {{ console.error(e); return "Failed to copy: " + e.message; }} }} """ def open_chatgpt() -> str: """Open ChatGPT in new browser tab""" return """window.open('https://chat.openai.com/', '_blank');""" def process_pdf(pdf, fmt, ctx_size): """Process PDF and return text and snippets""" try: if not pdf: return "Please upload a PDF file.", "", [], None # Extract text text = extract_text_from_pdf(pdf.name) if text.startswith("Error"): return text, "", [], None # Format content formatted_text = format_content(text, fmt) # Split into snippets snippets = split_into_snippets(formatted_text, ctx_size) # Save full text for download with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as text_file: text_file.write(formatted_text) snippet_choices = [f"Snippet {i+1} of {len(snippets)}" for i in range(len(snippets))] return ( "PDF processed successfully!", formatted_text, snippets, snippet_choices, [text_file.name] ) except Exception as e: logging.error(f"Error processing PDF: {e}") return f"Error processing PDF: {str(e)}", "", [], None def generate_prompt(text, template, snippet_idx=None): """Generate prompt from text or selected snippet""" try: if not text: return "No text available.", "", None default_prompt = "Summarize the following text:" prompt_template = template if template else default_prompt if isinstance(text, list): # If text is list of snippets if snippet_idx is not None: if 0 <= snippet_idx < len(text): content = text[snippet_idx] else: return "Invalid snippet index.", "", None else: content = "\n\n".join(text) else: content = text prompt = f"{prompt_template}\n---\n{content}\n---" # Save prompt for download with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as prompt_file: prompt_file.write(prompt) return "Prompt generated!", prompt, [prompt_file.name] except Exception as e: logging.error(f"Error generating prompt: {e}") return f"Error generating prompt: {str(e)}", "", None def download_file(content: str, prefix: str = "file") -> List[str]: """Create a downloadable file with content and better error handling""" if not content: return [] try: with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt', prefix=prefix) as f: f.write(content) return [f.name] except Exception as e: logging.error(f"Error creating download file: {e}") return [] # Main Interface with gr.Blocks(theme=gr.themes.Default()) as demo: # State variables pdf_content = gr.State("") snippets = gr.State([]) # Header gr.Markdown("# 📄 Smart PDF Summarizer") gr.Markdown("Upload a PDF document and get AI-powered summaries using various AI models.") with gr.Tabs() as tabs: # Tab 1: PDF Processing with gr.Tab("1️⃣ PDF Processing"): with gr.Row(): with gr.Column(scale=1): pdf_input = gr.File( label="📁 Upload PDF", file_types=[".pdf"] ) format_type = gr.Radio( choices=["txt", "md", "html"], value="txt", label="📝 Output Format" ) context_size = gr.Slider( minimum=1000, maximum=200000, step=1000, value=4096, label="Context Size" ) with gr.Row(): for size_name, size_value in CONTEXT_SIZES.items(): gr.Button( size_name, size="sm", scale=1 ).click( lambda v=size_value: v, # Simplified None, context_size ) process_button = gr.Button("🔍 Process PDF", variant="primary") with gr.Column(scale=1): progress_status = gr.Textbox( label="Status", interactive=False, show_label=True, visible=True # Ensure error messages are always visible ) processed_text = gr.Textbox( label="Processed Text", lines=10, max_lines=50, show_copy_button=True ) download_full_text = gr.Button("📥 Download Full Text") # Tab 2: Snippet Selection with gr.Tab("2️⃣ Snippet Selection"): with gr.Row(): with gr.Column(scale=1): snippet_selector = gr.Dropdown( label="Select Snippet", choices=[], interactive=True ) custom_prompt = gr.Textbox( label="✍️ Custom Prompt Template", placeholder="Enter your custom prompt here...", lines=2 ) generate_prompt_btn = gr.Button("Generate Prompt", variant="primary") with gr.Column(scale=1): generated_prompt = gr.Textbox( label="📋 Generated Prompt", lines=10, max_lines=50, show_copy_button=True, elem_id="generated_prompt" # Add this ) with gr.Row(): copy_prompt_button = gr.Button("📋 Copy Prompt") download_prompt = gr.Button("📥 Download Prompt") download_snippet = gr.Button("📥 Download Selected Snippet") # Tab 3: Model Processing with gr.Tab("3️⃣ Model Processing"): with gr.Row(): with gr.Column(scale=1): model_choice = gr.Radio( choices=["OpenAI ChatGPT", "HuggingFace Inference", "Groq API"], value="OpenAI ChatGPT", label="🤖 Model Selection" ) with gr.Column(visible=False) as openai_options: openai_api_key = gr.Textbox( label="🔑 OpenAI API Key", type="password" ) with gr.Column(visible=False) as hf_options: hf_model = gr.Dropdown( choices=list(model_registry.hf_models.keys()), label="🔧 HuggingFace Model", value="Phi-3 Mini 128k" ) hf_custom_model = gr.Textbox( label="Custom Model ID", visible=False ) hf_api_key = gr.Textbox( label="🔑 HuggingFace API Key", type="password" ) with gr.Column(visible=False) as groq_options: groq_model = gr.Dropdown( choices=list(model_registry.groq_models.keys()), label="🔧 Groq Model" ) groq_refresh_btn = gr.Button("🔄 Refresh Models") groq_api_key = gr.Textbox( label="🔑 Groq API Key", type="password" ) send_to_model_btn = gr.Button("🚀 Send to Model", variant="primary") open_chatgpt_button = gr.Button("🌐 Open ChatGPT") with gr.Column(scale=1): summary_output = gr.Textbox( label="📝 Summary", lines=15, max_lines=50, show_copy_button=True, elem_id="summary_output" # Add this ) with gr.Row(): copy_summary_button = gr.Button("📋 Copy Summary") download_summary = gr.Button("📥 Download Summary") # Hidden components for file handling download_files = gr.Files(label="📥 Downloads", visible=False) # Event Handlers def update_context_size(size: int) -> None: """Update context size slider with validation""" if not isinstance(size, (int, float)): size = 4096 # Default size return gr.update(value=int(size)) def get_model_context_size(choice: str, groq_model: str = None) -> int: """Get context size for model with better defaults""" if choice == "Groq API" and groq_model: return MODEL_CONTEXT_SIZES["Groq API"].get(groq_model, 4096) elif choice == "OpenAI ChatGPT": return 4096 elif choice == "HuggingFace Inference": return 4096 return 32000 # Safe default def update_snippet_choices(snippets_list: List[str]) -> List[str]: """Create formatted snippet choices""" return [f"Snippet {i+1} of {len(snippets_list)}" for i in range(len(snippets_list))] def get_snippet_index(choice: str) -> int: """Extract snippet index from choice string""" if not choice: return 0 try: return int(choice.split()[1]) - 1 except: return 0 def toggle_model_options(choice): return ( gr.update(visible=choice == "HuggingFace Inference"), gr.update(visible=choice == "Groq API"), gr.update(visible=choice == "OpenAI ChatGPT") ) def refresh_groq_models_list(): updated_models = model_registry.refresh_groq_models() return gr.update(choices=list(updated_models.keys())) def toggle_custom_model(model_name): return gr.update(visible=model_name == "Custom Model") def handle_model_change(choice): """Handle model selection change""" return ( gr.update(visible=choice == "HuggingFace Inference"), gr.update(visible=choice == "Groq API"), gr.update(visible=choice == "OpenAI ChatGPT"), update_context_size(choice) ) def handle_groq_model_change(model_name): """Handle Groq model selection change""" return update_context_size("Groq API", model_name) def handle_model_selection(choice): """Handle model selection and update UI""" ctx_size = get_model_context_size(choice) return ( gr.update(visible=choice == "HuggingFace Inference"), # hf_options gr.update(visible=choice == "Groq API"), # groq_options gr.update(visible=choice == "OpenAI ChatGPT"), # openai_options gr.update(value=ctx_size) # context_size ) # PDF Processing Handlers def handle_pdf_process(pdf, fmt, ctx_size): """Process PDF and update UI state""" if not pdf: return ( "Please upload a PDF file.", # progress_status "", # processed_text "", # pdf_content [], # snippets gr.update(choices=[], value=None), # snippet_selector None # download_files ) try: # Extract and format text text = extract_text_from_pdf(pdf.name) if text.startswith("Error"): return ( text, "", "", [], gr.update(choices=[], value=None), None ) formatted_text = format_content(text, fmt) snippets_list = split_into_snippets(formatted_text, ctx_size) # Create downloadable full text with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as f: f.write(formatted_text) download_file = f.name return ( f"PDF processed successfully! Generated {len(snippets_list)} snippets.", formatted_text, formatted_text, snippets_list, gr.update(choices=update_snippet_choices(snippets_list), value="Snippet 1 of " + str(len(snippets_list))), [download_file] ) except Exception as e: error_msg = f"Error processing PDF: {str(e)}" logging.error(error_msg) return ( error_msg, "", "", [], gr.update(choices=[], value=None), None ) def handle_snippet_selection(choice, snippets_list): """Handle snippet selection and update prompt""" if not snippets_list: return ( "No snippets available.", # progress_status "", # generated_prompt None # download_files ) try: idx = get_snippet_index(choice) selected_snippet = snippets_list[idx] # Create downloadable snippet with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as f: f.write(selected_snippet) return ( f"Selected snippet {idx + 1}", selected_snippet, [f.name] ) except Exception as e: error_msg = f"Error selecting snippet: {str(e)}" logging.error(error_msg) return ( error_msg, "", None ) # Copy button handlers def handle_prompt_generation(snippet_text, template, snippet_choice, snippets_list): """Generate prompt from selected snippet""" if not snippet_text or not snippets_list: return "No text available for prompt generation.", "", None try: idx = get_snippet_index(snippet_choice) prompt = generate_prompt(snippets_list[idx], template or "Summarize the following text:") # Create downloadable prompt with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as f: f.write(prompt) return "Prompt generated successfully!", prompt, [f.name] except Exception as e: error_msg = f"Error generating prompt: {str(e)}" logging.error(error_msg) return error_msg, "", None def handle_copy_action(text): """Handle copy to clipboard action""" return { progress_status: gr.update(value="Text copied to clipboard!", visible=True) } # Connect all event handlers # Core event handlers process_button.click( handle_pdf_process, inputs=[pdf_input, format_type, context_size], outputs=[ # List of outputs, not dict progress_status, processed_text, pdf_content, snippets, snippet_selector, download_files ] ) generate_prompt_btn.click( handle_prompt_generation, inputs=[generated_prompt, custom_prompt, snippet_selector, snippets], outputs=[ progress_status, generated_prompt, download_files ] ) # Snippet handling snippet_selector.change( handle_snippet_selection, inputs=[snippet_selector, snippets], outputs=[ progress_status, generated_prompt, download_files ] ) # Model selection model_choice.change( handle_model_selection, inputs=[model_choice], outputs=[ hf_options, groq_options, openai_options, context_size ] ) hf_model.change( toggle_custom_model, inputs=[hf_model], outputs=[hf_custom_model] ) groq_model.change( handle_groq_model_change, inputs=[groq_model], outputs=[context_size] ) # Context size buttons for size_name, size_value in CONTEXT_SIZES.items(): gr.Button( size_name, size="sm", scale=1 ).click( lambda v=size_value: gr.update(value=v), None, context_size ) # Download handlers (simplified) for btn, content in [ (download_full_text, pdf_content), (download_snippet, generated_prompt), (download_prompt, generated_prompt), (download_summary, summary_output) ]: btn.click( lambda x: [x] if x else None, inputs=[content], outputs=[download_files] ) # Copy button handlers for btn, elem_id in [ (copy_prompt_button, "generated_prompt"), (copy_summary_button, "summary_output") ]: btn.click( fn=lambda _: f"const text = document.querySelector('#{elem_id} textarea').value; navigator.clipboard.writeText(text); return 'Copied to clipboard!';", inputs=None, outputs=progress_status, js=True ) # ChatGPT handler open_chatgpt_button.click( fn=lambda: "window.open('https://chat.openai.com/', '_blank'); return 'Opened ChatGPT in new tab';", inputs=None, outputs=progress_status, js=True ) # Model processing send_to_model_btn.click( send_to_model, inputs=[ generated_prompt, model_choice, hf_model, hf_custom_model, hf_api_key, groq_model, groq_api_key, openai_api_key ], outputs=[ summary_output, download_files ] ) groq_refresh_btn.click( refresh_groq_models_list, outputs=[groq_model] ) # Instructions gr.Markdown(""" ### 📌 Instructions: 1. Upload a PDF document 2. Choose output format and context window size 3. Select snippet number (default: 1) or enter custom prompt 4. Select your preferred model in case you want to proceed directly (or continue with 5): - OpenAI ChatGPT: Manual copy/paste workflow - HuggingFace Inference: Direct API integration - Groq API: High-performance inference 5. Click 'Process PDF' to generate summary 6. Use 'Copy Prompt' and, optionally, 'Open ChatGPT' for manual processing 7. Download generated files as needed """) # Launch the interface if __name__ == "__main__": demo.launch(share=False, debug=True)