import gradio as gr from markitdown import MarkItDown import google.generativeai as genai import tempfile import os from pathlib import Path # Initialize MarkItDown md = MarkItDown() # Configure Gemini AI genai.configure(api_key=os.getenv('GEMINI_KEY')) model = genai.GenerativeModel('gemini-2.0-flash-lite-preview-02-05') def process_with_markitdown(input_path): """Process file or URL with MarkItDown and return text content""" print(f"[DEBUG] Starting MarkItDown processing for: {input_path}") try: import concurrent.futures from concurrent.futures import ThreadPoolExecutor def convert_with_timeout(): print("[DEBUG] Attempting MarkItDown conversion") result = md.convert(input_path) print("[DEBUG] MarkItDown conversion successful") if not result or not hasattr(result, 'text_content'): print("[DEBUG] No text content in result") return "Error: No text content found in document" return result.text_content # Use ThreadPoolExecutor with timeout with ThreadPoolExecutor() as executor: future = executor.submit(convert_with_timeout) try: result = future.result(timeout=30) # 30 second timeout print("[DEBUG] Successfully got result from MarkItDown") return result except concurrent.futures.TimeoutError: print("[DEBUG] MarkItDown processing timed out") return "Error: Processing timed out after 30 seconds" except Exception as e: print(f"[DEBUG] Error in process_with_markitdown: {str(e)}") return f"Error processing input: {str(e)}" def save_uploaded_file(uploaded_file): """Saves an uploaded file to a temporary location.""" print("[DEBUG] Starting save_uploaded_file") if uploaded_file is None: print("[DEBUG] No file uploaded") return "No file uploaded." try: print(f"[DEBUG] Uploaded file object type: {type(uploaded_file)}") print(f"[DEBUG] Uploaded file name: {uploaded_file.name}") # Get the actual file path from the uploaded file file_path = uploaded_file.name print(f"[DEBUG] Original file path: {file_path}") # Read the content directly from the original file try: with open(file_path, 'rb') as source_file: content = source_file.read() print(f"[DEBUG] Successfully read {len(content)} bytes from source file") except Exception as e: print(f"[DEBUG] Error reading source file: {str(e)}") return f"Error reading file: {str(e)}" # Save to temp file temp_dir = tempfile.gettempdir() temp_filename = os.path.join(temp_dir, os.path.basename(file_path)) with open(temp_filename, 'wb') as f: f.write(content) print(f"[DEBUG] File saved successfully at: {temp_filename}") return temp_filename except Exception as e: print(f"[DEBUG] Error in save_uploaded_file: {str(e)}") return f"An error occurred: {str(e)}" async def summarize_text(text): """Summarize the input text using Gemini AI""" try: prompt = f"""Please provide a concise summary of the following text. Focus on the main points and key takeaways: {text} Summary:""" # Use the synchronous version since async version isn't working as expected response = model.generate_content(prompt) return response.text except Exception as e: return f"Error generating summary: {str(e)}" async def process_input(input_text, uploaded_file=None): """Main function to process either URL or uploaded file""" print("[DEBUG] Starting process_input") try: if uploaded_file is not None: # Handle file upload temp_path = save_uploaded_file(uploaded_file) if temp_path.startswith('Error'): return temp_path text = process_with_markitdown(temp_path) # Clean up temporary file try: os.remove(temp_path) except: pass elif input_text.startswith(('http://', 'https://')): # Handle URL text = process_with_markitdown(input_text) else: # Handle direct text input text = input_text if text.startswith('Error'): return text # Generate summary using Gemini AI summary = await summarize_text(text) return summary except Exception as e: return f"Error processing input: {str(e)}" def clear_inputs(): return ["", None, ""] # Create Gradio interface with drag-and-drop with gr.Blocks(theme=gr.themes.Soft()) as iface: gr.Markdown( """ # Summarizeit > Summarize any document! Using Gemini 2.0 Flash model. Enter a URL, paste text, or drag & drop a file to get a summary. """ ) with gr.Row(): input_text = gr.Textbox( label="Enter URL or text", placeholder="Enter a URL or paste text here...", scale=2 ) with gr.Row(): file_upload = gr.File( label="Drop files here or click to upload", file_types=[ ".pdf", ".docx", ".xlsx", ".csv", ".txt", ".html", ".htm", ".xml", ".json" ], file_count="single", scale=2 ) with gr.Row(): submit_btn = gr.Button("Summarize", variant="primary") clear_btn = gr.Button("Clear") output_text = gr.Textbox( label="Summary", lines=10, show_copy_button=True ) # Set up event handlers submit_btn.click( fn=process_input, inputs=[input_text, file_upload], outputs=output_text, api_name="process" ) clear_btn.click( fn=clear_inputs, outputs=[input_text, file_upload, output_text] ) # Add examples gr.Examples( examples=[ ["https://h3manth.com"], ["https://www.youtube.com/watch?v=bSHp7WVpPgc"], ["https://en.wikipedia.org/wiki/Three-body_problem"] ], inputs=input_text ) if __name__ == "__main__": iface.launch(True)