adelevett's picture
Upload app.py
fbc3470 verified
import streamlit as st
import pandas as pd
import fitz # PyMuPDF
import os
import subprocess
import tempfile
import sys
import toml
import shutil
import zipfile
import io
# Ensure we can import from utils if needed
sys.path.append(os.path.dirname(__file__))
from utils import toc_processor
from pdfxmeta import pdfxmeta
st.set_page_config(page_title="PDF Bookmark Splitter", layout="wide")
st.title("PDF Bookmarker & Splitter")
st.markdown("""
**Upload a PDF**, analyze its fonts to find top-level headings, and generate Bookmarks for splitting by chapter.
""")
uploaded_file = st.file_uploader("Choose a PDF file", type="pdf")
# Short guidance shown under the uploader
st.caption("Recommended use: After uploading your PDF, search for the text of a known chapter heading. Once the correct entry is identified in the search results, select the corresponding entry from the drop down, and optionally repeat the step to ensure back matter is split off from the last chapter before running the pipeline.")
if uploaded_file is not None:
# We need to save the uploaded file to disk for the CLI tools to read it
# We'll use a permanent temp file for the session so we don't have to re-upload constantly
# But for cleanliness, we might want to put this in a temp dir too?
# For now, keeping the input file logic as is (tempfile), but we'll put OUTPUTS in a pure temp dir
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf:
tmp_pdf.write(uploaded_file.getvalue())
input_pdf_path = tmp_pdf.name
# --- State Management & Reset ---
# Check if a new file is uploaded
file_id = f"{uploaded_file.name}_{uploaded_file.size}" # Robust proxy for ID
if 'current_file_id' not in st.session_state:
st.session_state['current_file_id'] = None
if st.session_state['current_file_id'] != file_id:
# NEW FILE DETECTED: Reset Pipeline State
keys_to_reset = ['final_pdf_bytes', 'final_zip_bytes', 'final_zip_name', 'search_matches', 'font_name', 'font_size']
for k in keys_to_reset:
if k in st.session_state:
del st.session_state[k]
st.session_state['current_file_id'] = file_id
# st.toast(f"New file loaded: {uploaded_file.name}. State cleared.")
st.success(f"Loaded: {uploaded_file.name}")
# --- Data Source Selection ---
st.header("1. Source Selection")
source_mode = st.radio("Where should the bookmarks come from?",
["Scan & Generate (Create New)", "Use Existing Bookmarks (Modify)"],
help="Choose 'Scan & Generate' to build new bookmarks from fonts. Choose 'Use Existing' to tidy up bookmarks already in the file.")
# --- Analysis Section (Only for Generate) ---
if source_mode == "Scan & Generate (Create New)":
st.header("2. Analyze Fonts")
if 'font_name' not in st.session_state:
st.session_state['font_name'] = ''
if 'font_size' not in st.session_state:
st.session_state['font_size'] = 18.0
tab1, tab2 = st.tabs(["Scan for Large Fonts", "Search by Text"])
with tab1:
if st.button("Find Header Candidates"):
with st.spinner("Scanning PDF for large fonts..."):
doc = fitz.open(input_pdf_path)
candidates = []
for page in doc[:50]:
text_page = page.get_text("dict")
for block in text_page["blocks"]:
for line in block.get("lines", []):
for span in line["spans"]:
text = span["text"].strip()
if len(text) > 3:
candidates.append({
"Text": text[:50],
"Font": span["font"],
"Size": round(span["size"], 2),
"Page": page.number + 1
})
doc.close()
if candidates:
df = pd.DataFrame(candidates)
summary = df.groupby(['Font', 'Size']).size().reset_index(name='Count')
summary = summary.sort_values(by=['Size', 'Count'], ascending=[False, False]).head(20)
st.session_state['scan_results'] = summary
else:
st.warning("No text found.")
if 'scan_results' in st.session_state:
st.write("### Top Large Fonts Found")
st.dataframe(st.session_state['scan_results'], use_container_width=True)
def update_from_scan():
val = st.session_state.scan_selector
if val:
f_name = val.split(" (")[0]
f_size = float(val.split("(")[1].replace("pt)", ""))
st.session_state['font_name'] = f_name
st.session_state['font_size'] = f_size
options = st.session_state['scan_results'].apply(lambda x: f"{x['Font']} ({x['Size']}pt)", axis=1)
st.selectbox("Select extraction font:", options, key='scan_selector', on_change=update_from_scan, index=None, placeholder="Choose a font...")
with tab2:
search_query = st.text_input("Enter text to find (e.g., 'Chapter 1')", "")
c1, c2 = st.columns([1, 3])
with c1:
do_search = st.button("Search Text")
with c2:
is_case_sensitive = st.checkbox("Case Sensitive", value=False)
if do_search:
with st.spinner(f"Searching for '{search_query}'..."):
# Use the robust pdfxmeta library
try:
doc = fitz.open(input_pdf_path)
# pdfxmeta expects a regex pattern, so we escape the query to be safe
import re
safe_pattern = re.escape(search_query)
# extract_meta returns a list of dicts (spans)
results = pdfxmeta.extract_meta(doc, safe_pattern, ign_case=(not is_case_sensitive))
doc.close()
matches = []
for res in results:
matches.append({
"Text": res.get("text", "").strip(),
"Font": res.get("font", ""),
"Size": round(res.get("size", 0), 2),
"Page": res.get("page_index", 0)
})
# Limit for display safety
if len(matches) > 50: break
if matches:
st.session_state['search_matches'] = pd.DataFrame(matches)
else:
st.warning("No matches found.")
except Exception as e:
st.error(f"Search failed: {e}")
if 'search_matches' in st.session_state:
st.write(f"### Found Matches")
st.dataframe(st.session_state['search_matches'], use_container_width=True)
def update_from_search():
val = st.session_state.search_selector
if val:
parts = val.split(" (")
f_name = parts[0]
f_size = float(parts[1].split("pt)")[0])
st.session_state['font_name'] = f_name
st.session_state['font_size'] = f_size
options = st.session_state['search_matches'].apply(lambda x: f"{x['Font']} ({x['Size']}pt) - Pg {x['Page']}", axis=1)
st.selectbox("Select font from match:", options, key='search_selector', on_change=update_from_search, index=None, placeholder="Choose a match...")
# --- Configuration (Only for Generate) ---
st.header("3. Configure Recipe")
col1, col2 = st.columns(2)
with col1:
font_name_input = st.text_input("Font Name", key='font_name')
with col2:
font_size_input = st.number_input("Font Size", key='font_size')
greedy = st.checkbox("Greedy Match (Merge multiline specs)", value=True)
# --- Back Matter Configuration ---
with st.expander("Back Matter Configuration (Optional)", expanded=False):
st.markdown("Identify where the **Back Matter** (Index, Glossary, etc.) starts to split it into a separate `999_Back_matter.pdf`.")
# Independent Search for Back Matter
bm_query = st.text_input("Find Back Matter start (e.g., 'Index')", key="bm_search_query")
c_bm1, c_bm2 = st.columns([1, 3])
with c_bm1:
do_bm_search = st.button("Search Back Matter")
with c_bm2:
bm_case_sensitive = st.checkbox("Case Sensitive", key="bm_sens", value=False)
if do_bm_search:
with st.spinner("Searching..."):
try:
doc = fitz.open(input_pdf_path)
import re
safe_pattern = re.escape(bm_query)
results = pdfxmeta.extract_meta(doc, safe_pattern, ign_case=(not bm_case_sensitive))
doc.close()
bm_matches = []
for res in results:
bm_matches.append({
"Text": res.get("text", "").strip(),
"Page": res.get("page_index", 0) # Display raw (already 1-based from pdfxmeta)
})
if len(bm_matches) > 50: break
if bm_matches:
st.session_state['bm_matches'] = pd.DataFrame(bm_matches)
else:
st.warning("No matches found.")
except Exception as e:
st.error(f"Search failed: {e}")
if 'bm_matches' in st.session_state:
st.dataframe(st.session_state['bm_matches'], use_container_width=True)
def update_bm_page():
val = st.session_state.bm_selector
if val:
# Value format: "Page X - Text..."
page_num = int(val.split(" -")[0].replace("Page ", ""))
st.session_state['back_matter_page'] = page_num
bm_options = st.session_state['bm_matches'].apply(lambda x: f"Page {x['Page']} - {x['Text'][:30]}...", axis=1)
st.selectbox("Select Start Page:", bm_options, key='bm_selector', on_change=update_bm_page, index=None, placeholder="Select start page...")
# Manual Override
# Update session state when this input changes
def update_manual_bm():
st.session_state['back_matter_page'] = st.session_state.back_matter_page_manual
st.number_input("Or manually set Start Page:", min_value=0, value=st.session_state.get('back_matter_page', 0), key='back_matter_page_manual', on_change=update_manual_bm)
else:
# Existing Mode
st.info("Using existing bookmarks. They will be cleaned, numbered, and used for splitting/downloading.")
# --- Generation ---
st.header("4. Process & Generate")
if st.button("Run Pipeline"):
# Validate inputs if generating
if source_mode == "Scan & Generate (Create New)" and not st.session_state.get('font_name'):
st.error("Please specify a font name for extraction.")
else:
with st.status("Running pipeline tasks...", expanded=True) as status:
# Use a temporary directory for all intermediate files
with tempfile.TemporaryDirectory() as temp_dir:
status.write(f"Created temp workspace: {temp_dir}")
# Paths
recipe_path = os.path.join(temp_dir, "recipe.toml")
raw_toc_path = os.path.join(temp_dir, "raw.toc") # pdftocgen output
clean_toc_path = os.path.join(temp_dir, "clean.toc") # modify_toc output
output_pdf_path = os.path.join(temp_dir, "final.pdf")
raw_toc_content = ""
if source_mode == "Scan & Generate (Create New)":
# 1. Create Recipe
recipe_data = {
"heading": [{
"level": 1,
"greedy": greedy,
"font": {
"name": st.session_state['font_name'],
"size": st.session_state['font_size'],
"size_tolerance": 0.1
}
}]
}
with open(recipe_path, "w") as f:
toml.dump(recipe_data, f)
status.write("βœ… Recipe created")
# 2. Run pdftocgen -> raw.toc
status.write("Running pdftocgen (Scanning)...")
cmd1 = [sys.executable, "-m", "pdftocgen", "-r", recipe_path, input_pdf_path]
process = subprocess.run(cmd1, capture_output=True, text=True, encoding='utf-8')
if process.returncode != 0:
st.error(f"pdftocgen failed: {process.stderr}")
st.stop()
raw_toc_content = process.stdout
status.write("βœ… Headers extracted")
else:
# Existing Bookmarks
status.write("Extracting existing bookmarks...")
# Run pdftocio in extract mode
cmd1 = [sys.executable, "-m", "pdftocio", input_pdf_path]
process = subprocess.run(cmd1, capture_output=True, text=True, encoding='utf-8')
if process.returncode != 0:
st.error(f"pdftocio failed: {process.stderr}")
st.stop()
raw_toc_content = process.stdout
if not raw_toc_content.strip():
st.warning("No existing bookmarks found!")
st.stop()
status.write("βœ… Existing bookmarks imported")
# 3. Clean Content (Using centralized utility)
status.write("Cleaning and merging bookmarks...")
cleaned_toc_content = toc_processor.process_toc(raw_toc_content)
with open(clean_toc_path, "w", encoding='utf-8') as f:
f.write(cleaned_toc_content)
status.write("βœ… Bookmarks formatted (Double-splits fixed)")
# 4. Write PDF
status.write("Writing to PDF...")
cmd3 = [sys.executable, "-m", "pdftocio", "-t", clean_toc_path, "-o", output_pdf_path, input_pdf_path]
process = subprocess.run(cmd3, capture_output=True, text=True)
if process.returncode != 0:
st.error(f"pdftocio failed: {process.stderr}")
st.stop()
status.write("βœ… PDF saved")
# 5. Read Result for Download
with open(output_pdf_path, "rb") as f:
st.session_state['final_pdf_bytes'] = f.read()
# 6. Split & Zip (The Feature)
# Use a temp file for the zip to avoid memory issues
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp_zip:
tmp_zip_path = tmp_zip.name
try:
# Pass back_matter_page if it exists and is valid
bm_page = st.session_state.get('back_matter_page', 0)
if bm_page == 0: bm_page = None
toc_processor.generate_chapter_splits(output_pdf_path, tmp_zip_path, back_matter_start_page=bm_page)
with open(tmp_zip_path, "rb") as f:
st.session_state['final_zip_bytes'] = f.read()
base_name = os.path.splitext(uploaded_file.name)[0]
st.session_state['final_zip_name'] = f"{base_name}_chapters.zip"
except Exception as e:
st.error(f"Error generating zip: {e}")
finally:
if os.path.exists(tmp_zip_path):
os.unlink(tmp_zip_path)
# --- Persistent Download Area ---
if 'final_pdf_bytes' in st.session_state:
st.success("Pipeline completed successfully!")
st.write("### Downloads")
c_dl1, c_dl2 = st.columns(2)
with c_dl1:
st.download_button(
label="Download Bookmarked PDF",
data=st.session_state['final_pdf_bytes'],
file_name="bookmarked_doc.pdf",
mime="application/pdf",
key="dl_pdf_btn"
)
with c_dl2:
if 'final_zip_bytes' in st.session_state:
st.download_button(
label=f"Download ZIP ({st.session_state['final_zip_name']})",
data=st.session_state['final_zip_bytes'],
file_name=st.session_state['final_zip_name'],
mime="application/zip",
key="dl_zip_btn"
)
st.markdown("---")
st.markdown("""
<div style="text-align: center; color: #666; font-size: 0.8em;">
Based on <a href="https://github.com/Krasjet/pdf.tocgen" target="_blank">pdf.tocgen</a> by krasjet. <br>
Enhanced with UI, Chapter Splitting, and Metadata Search. Licensed under AGPL-3.0.
</div>
""", unsafe_allow_html=True)