Spaces:
Running
Running
import os | |
import tempfile | |
import zipfile | |
import io | |
import shutil | |
import streamlit as st | |
from utils.pdf_processing import ( | |
split_pdf_pages, merge_pdfs, extract_page_range, remove_first_last_pages, | |
extract_text_from_pdf, keyword_highlight_pdf, extract_images, extract_tables, | |
ocr_pdf, reorder_pages, rotate_pages, add_watermark, extract_metadata | |
) | |
from utils.pdf_analysis import rag_qa, summarize_text | |
from utils.pdf_export import export_to_word, export_to_text, export_text_to_markdown | |
st.set_page_config(page_title="PDF Toolbox", layout="wide") | |
st.title("π PDF Toolbox") | |
with st.sidebar: | |
st.header("Tools") | |
# Define tool categories | |
tool_categories = { | |
"PDF Processing": [ | |
"Split PDF Pages", | |
"Merge PDFs", | |
"Extract Page Range", | |
"Remove First/Last Pages", | |
], | |
"Advanced Processing": [ | |
"Keyword Search & Highlight", | |
"Extract Images", | |
"Extract Tables", | |
"OCR Scanned PDF", | |
"Reorder Pages", | |
"Rotate Pages", | |
"Add Watermark", | |
"Extract Metadata", | |
], | |
"Analysis": [ | |
"Summarize PDF", | |
"Ask Questions on PDF (RAG)", | |
], | |
"Export": [ | |
"Export to Word (.docx)", | |
"Export to Text (.txt)", | |
"Export to Markdown (.md)", | |
], | |
} | |
# Step 1: User selects category | |
selected_category = st.selectbox( | |
"Choose a Category", | |
list(tool_categories.keys()) | |
) | |
# Step 2: Show tools under that category | |
tool = st.selectbox( | |
"Choose a Tool", | |
tool_categories[selected_category] | |
) | |
# st.caption("Note: For OCR, ensure Tesseract is installed on system path.") | |
# ------------- Helpers for downloads ------------- | |
def download_bytes(label: str, data: bytes, file_name: str, mime: str): | |
st.download_button(label, data, file_name=file_name, mime=mime) | |
def zip_folder_to_bytes(folder_path: str) -> bytes: | |
mem = io.BytesIO() | |
with zipfile.ZipFile(mem, "w", zipfile.ZIP_DEFLATED) as zf: | |
for root, _, files in os.walk(folder_path): | |
for f in files: | |
full = os.path.join(root, f) | |
arc = os.path.relpath(full, start=folder_path) | |
zf.write(full, arcname=arc) | |
mem.seek(0) | |
return mem.read() | |
OUTPUT_DIR = "outputs" | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
def out_file(name: str) -> str: | |
return os.path.join(OUTPUT_DIR, name) | |
# ------------- UI Logic ------------- | |
# Most tools need a PDF file, except "Merge PDFs" | |
if tool == "Merge PDFs": | |
uploaded_files = st.file_uploader("Upload PDFs to merge", type=["pdf"], accept_multiple_files=True) | |
if uploaded_files: | |
if st.button("Merge"): | |
merged = merge_pdfs(uploaded_files) # returns BytesIO | |
download_bytes("π₯ Download Merged PDF", merged.getvalue(), "merged.pdf", "application/pdf") | |
else: | |
uploaded = st.file_uploader("Upload PDF", type=["pdf"]) | |
if uploaded: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp: | |
tmp.write(uploaded.read()) | |
pdf_path = tmp.name | |
if tool == "Split PDF Pages": | |
# Read total pages for UI | |
from PyPDF2 import PdfReader as _Reader | |
total_pages = len(_Reader(pdf_path).pages) | |
option = st.radio("Select Option", ["All Pages", "Page Range", "Single Page"], horizontal=True) | |
if option == "All Pages": | |
start_page, end_page = 1, total_pages | |
elif option == "Page Range": | |
start_page = st.number_input("Start Page", 1, total_pages, 1) | |
end_page = st.number_input("End Page", start_page, total_pages, total_pages) | |
else: | |
start_page = st.number_input("Page Number", 1, total_pages, 1) | |
end_page = start_page | |
if st.button("Split & Download ZIP"): | |
zip_bytesio = split_pdf_pages(pdf_path, int(start_page), int(end_page)) | |
download_bytes("π₯ Download ZIP", zip_bytesio.getvalue(), "split_pages.zip", "application/zip") | |
elif tool == "Extract Page Range": | |
from PyPDF2 import PdfReader as _Reader | |
total_pages = len(_Reader(pdf_path).pages) | |
start_page = st.number_input("Start Page", 1, total_pages, 1) | |
end_page = st.number_input("End Page", start_page, total_pages, total_pages) | |
if st.button("Extract Range"): | |
out = extract_page_range(pdf_path, int(start_page), int(end_page)) | |
download_bytes("π₯ Download Extracted PDF", out.getvalue(), "extracted_range.pdf", "application/pdf") | |
elif tool == "Remove First/Last Pages": | |
remove_first = st.checkbox("Remove First Page", value=True) | |
remove_last = st.checkbox("Remove Last Page", value=False) | |
if st.button("Remove & Download"): | |
out = remove_first_last_pages(pdf_path, remove_first, remove_last) | |
download_bytes("π₯ Download Modified PDF", out.getvalue(), "modified.pdf", "application/pdf") | |
elif tool == "Keyword Search & Highlight": | |
keyword = st.text_input("Keyword to highlight", "") | |
if st.button("Search & Highlight") and keyword.strip(): | |
out_path = keyword_highlight_pdf(pdf_path, keyword.strip(), out_file("highlighted.pdf")) | |
with open(out_path, "rb") as f: | |
download_bytes("π₯ Download Highlighted PDF", f.read(), "highlighted.pdf", "application/pdf") | |
elif tool == "Extract Images": | |
folder = extract_images(pdf_path, output_folder="images_out") | |
# st.success(f"Extracted images β {folder}") | |
if isinstance(folder, list): | |
folder = folder[0] | |
if os.path.isdir(folder) and len(os.listdir(folder)) > 0: | |
zbytes = zip_folder_to_bytes(folder) | |
if st.download_button("π₯ Download Images (ZIP)", zbytes, "images.zip", "application/zip"): | |
shutil.rmtree(folder) | |
if os.path.isdir(folder): | |
image_files = [os.path.join(folder, f) for f in sorted(os.listdir(folder))] | |
# st.write("### Extracted Images Preview") | |
cols = st.columns(3) # grid with 3 columns | |
for i, img in enumerate(image_files): | |
with cols[i % 3]: | |
st.image(img, caption=os.path.basename(img), use_container_width=True) | |
elif tool == "Extract Tables": | |
tables = extract_tables(pdf_path) | |
if not tables: | |
st.info("No tables detected.") | |
else: | |
try: | |
import pandas as pd | |
except ImportError: | |
st.warning("Install pandas to view tables nicely.") | |
st.write(tables) | |
else: | |
for i, t in enumerate(tables): | |
if hasattr(t, "to_csv"): # Camelot DataFrame | |
df = t | |
else: # pdfplumber list-of-rows | |
df = pd.DataFrame(t) | |
st.subheader(f"Table {i+1}") | |
st.dataframe(df) | |
elif tool == "OCR Scanned PDF": | |
# st.info("Requires Tesseract installed on your system.") | |
lang = st.selectbox( | |
"Select OCR language", | |
["eng", "hin", "fra", "deu", "jpn", "kor"], # Add more as needed | |
index=0 | |
) | |
if st.button("Run OCR"): | |
text = ocr_pdf(pdf_path, lang) | |
st.text_area("OCR Output", text, height=300) | |
elif tool == "Reorder Pages": | |
st.caption("Enter comma-separated 0-indexed page order. Example for 3 pages: 2,0,1") | |
order_str = st.text_input("New order", "") | |
if st.button("Reorder") and order_str.strip(): | |
new_order = [int(x.strip()) for x in order_str.split(",") if x.strip().isdigit()] | |
out_path = reorder_pages(pdf_path, new_order, out_file("reordered.pdf")) | |
with open(out_path, "rb") as f: | |
download_bytes("π₯ Download Reordered PDF", f.read(), "reordered.pdf", "application/pdf") | |
elif tool == "Rotate Pages": | |
st.caption("Enter 0-indexed pages, comma-separated. Angle typically 90/180/270.") | |
pages_str = st.text_input("Pages to rotate", "") | |
angle = st.number_input("Angle", min_value=0, max_value=360, value=90, step=90) | |
if st.button("Rotate") and pages_str.strip(): | |
pages = [int(x.strip()) for x in pages_str.split(",") if x.strip().isdigit()] | |
out_path = rotate_pages(pdf_path, pages, int(angle), out_file("rotated.pdf")) | |
with open(out_path, "rb") as f: | |
download_bytes("π₯ Download Rotated PDF", f.read(), "rotated.pdf", "application/pdf") | |
elif tool == "Add Watermark": | |
wm = st.text_input("Watermark text", "CONFIDENTIAL") | |
if st.button("Apply Watermark"): | |
out_path = add_watermark(pdf_path, wm, out_file("watermarked.pdf")) | |
with open(out_path, "rb") as f: | |
download_bytes("π₯ Download Watermarked PDF", f.read(), "watermarked.pdf", "application/pdf") | |
elif tool == "Extract Metadata": | |
meta = extract_metadata(pdf_path) | |
st.json(meta) | |
elif tool == "Summarize PDF": | |
text = extract_text_from_pdf(pdf_path) | |
# st.info("Using Together.ai LLaMA for summarization. Set TOGETHER_API_KEY in your environment.") | |
if st.button("Summarize"): | |
with st.spinner("Summarizing... Please wait β³"): | |
summary = summarize_text(text) | |
st.write(summary) | |
elif tool == "Ask Questions on PDF (RAG)": | |
# st.info("Uses FAISS + MiniLM embeddings + Together.ai LLaMA. Set TOGETHER_API_KEY in your environment.") | |
question = st.text_input("Your question") | |
if st.button("Ask") and question.strip(): | |
text = extract_text_from_pdf(pdf_path) | |
with st.spinner("Analyzing... Please wait β³"): | |
answer, sources = rag_qa(text, question) | |
st.subheader("Answer") | |
st.write(answer) | |
if sources: | |
st.subheader("Top source chunks") | |
for i, s in enumerate(sources, start=1): | |
st.markdown(f"**Source {i}:**\n\n{getattr(s, 'page_content', '')[:800]}") | |
elif tool == "Export to Word (.docx)": | |
out = export_to_word(pdf_path, out_file("export.docx")) | |
with open(out, "rb") as f: | |
download_bytes("π₯ Download DOCX", f.read(), "export.docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document") | |
elif tool == "Export to Text (.txt)": | |
out = export_to_text(pdf_path, out_file("export.txt")) | |
with open(out, "rb") as f: | |
download_bytes("π₯ Download TXT", f.read(), "export.txt", "text/plain") | |
elif tool == "Export to Markdown (.md)": | |
text = extract_text_from_pdf(pdf_path) | |
out = export_text_to_markdown(text, out_file("export.md")) | |
with open(out, "rb") as f: | |
download_bytes("π₯ Download MD", f.read(), "export.md", "text/markdown") |