Spaces:
Running
Running
import gradio as gr | |
from markitdown import MarkItDown | |
import google.generativeai as genai | |
import tempfile | |
import os | |
from pathlib import Path | |
# Initialize MarkItDown | |
md = MarkItDown() | |
# Configure Gemini AI | |
genai.configure(api_key=os.getenv('GEMINI_KEY')) | |
model = genai.GenerativeModel('gemini-2.0-flash-lite-preview-02-05') | |
def process_with_markitdown(input_path): | |
"""Process file or URL with MarkItDown and return text content""" | |
print(f"[DEBUG] Starting MarkItDown processing for: {input_path}") | |
try: | |
import concurrent.futures | |
from concurrent.futures import ThreadPoolExecutor | |
def convert_with_timeout(): | |
print("[DEBUG] Attempting MarkItDown conversion") | |
result = md.convert(input_path) | |
print("[DEBUG] MarkItDown conversion successful") | |
if not result or not hasattr(result, 'text_content'): | |
print("[DEBUG] No text content in result") | |
return "Error: No text content found in document" | |
return result.text_content | |
# Use ThreadPoolExecutor with timeout | |
with ThreadPoolExecutor() as executor: | |
future = executor.submit(convert_with_timeout) | |
try: | |
result = future.result(timeout=30) # 30 second timeout | |
print("[DEBUG] Successfully got result from MarkItDown") | |
return result | |
except concurrent.futures.TimeoutError: | |
print("[DEBUG] MarkItDown processing timed out") | |
return "Error: Processing timed out after 30 seconds" | |
except Exception as e: | |
print(f"[DEBUG] Error in process_with_markitdown: {str(e)}") | |
return f"Error processing input: {str(e)}" | |
def save_uploaded_file(uploaded_file): | |
"""Saves an uploaded file to a temporary location.""" | |
print("[DEBUG] Starting save_uploaded_file") | |
if uploaded_file is None: | |
print("[DEBUG] No file uploaded") | |
return "No file uploaded." | |
try: | |
print(f"[DEBUG] Uploaded file object type: {type(uploaded_file)}") | |
print(f"[DEBUG] Uploaded file name: {uploaded_file.name}") | |
# Get the actual file path from the uploaded file | |
file_path = uploaded_file.name | |
print(f"[DEBUG] Original file path: {file_path}") | |
# Read the content directly from the original file | |
try: | |
with open(file_path, 'rb') as source_file: | |
content = source_file.read() | |
print(f"[DEBUG] Successfully read {len(content)} bytes from source file") | |
except Exception as e: | |
print(f"[DEBUG] Error reading source file: {str(e)}") | |
return f"Error reading file: {str(e)}" | |
# Save to temp file | |
temp_dir = tempfile.gettempdir() | |
temp_filename = os.path.join(temp_dir, os.path.basename(file_path)) | |
with open(temp_filename, 'wb') as f: | |
f.write(content) | |
print(f"[DEBUG] File saved successfully at: {temp_filename}") | |
return temp_filename | |
except Exception as e: | |
print(f"[DEBUG] Error in save_uploaded_file: {str(e)}") | |
return f"An error occurred: {str(e)}" | |
async def summarize_text(text): | |
"""Summarize the input text using Gemini AI""" | |
try: | |
prompt = f"""Please provide a concise summary of the following text. Focus on the main points and key takeaways: | |
{text} | |
Summary:""" | |
# Use the synchronous version since async version isn't working as expected | |
response = model.generate_content(prompt) | |
return response.text | |
except Exception as e: | |
return f"Error generating summary: {str(e)}" | |
async def process_input(input_text, uploaded_file=None): | |
"""Main function to process either URL or uploaded file""" | |
print("[DEBUG] Starting process_input") | |
try: | |
if uploaded_file is not None: | |
# Handle file upload | |
temp_path = save_uploaded_file(uploaded_file) | |
if temp_path.startswith('Error'): | |
return temp_path | |
text = process_with_markitdown(temp_path) | |
# Clean up temporary file | |
try: | |
os.remove(temp_path) | |
except: | |
pass | |
elif input_text.startswith(('http://', 'https://')): | |
# Handle URL | |
text = process_with_markitdown(input_text) | |
else: | |
# Handle direct text input | |
text = input_text | |
if text.startswith('Error'): | |
return text | |
# Generate summary using Gemini AI | |
summary = await summarize_text(text) | |
return summary | |
except Exception as e: | |
return f"Error processing input: {str(e)}" | |
def clear_inputs(): | |
return ["", None, ""] | |
# Create Gradio interface with drag-and-drop | |
with gr.Blocks(theme=gr.themes.Soft()) as iface: | |
gr.Markdown( | |
""" | |
# Summarizeit | |
> Summarize any document! Using Gemini 2.0 Flash model. | |
Enter a URL, paste text, or drag & drop a file to get a summary. | |
""" | |
) | |
with gr.Row(): | |
input_text = gr.Textbox( | |
label="Enter URL or text", | |
placeholder="Enter a URL or paste text here...", | |
scale=2 | |
) | |
with gr.Row(): | |
file_upload = gr.File( | |
label="Drop files here or click to upload", | |
file_types=[ | |
".pdf", ".docx", ".xlsx", ".csv", ".txt", | |
".html", ".htm", ".xml", ".json" | |
], | |
file_count="single", | |
scale=2 | |
) | |
with gr.Row(): | |
submit_btn = gr.Button("Summarize", variant="primary") | |
clear_btn = gr.Button("Clear") | |
output_text = gr.Textbox( | |
label="Summary", | |
lines=10, | |
show_copy_button=True | |
) | |
# Set up event handlers | |
submit_btn.click( | |
fn=process_input, | |
inputs=[input_text, file_upload], | |
outputs=output_text, | |
api_name="process" | |
) | |
clear_btn.click( | |
fn=clear_inputs, | |
outputs=[input_text, file_upload, output_text] | |
) | |
# Add examples | |
gr.Examples( | |
examples=[ | |
["https://h3manth.com"], | |
["https://www.youtube.com/watch?v=bSHp7WVpPgc"], | |
["https://en.wikipedia.org/wiki/Three-body_problem"] | |
], | |
inputs=input_text | |
) | |
if __name__ == "__main__": | |
iface.launch(True) |