Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import fitz # PyMuPDF | |
| import os | |
| import subprocess | |
| import tempfile | |
| import sys | |
| import toml | |
| import shutil | |
| import zipfile | |
| import io | |
| # Ensure we can import from utils if needed | |
| sys.path.append(os.path.dirname(__file__)) | |
| from utils import toc_processor | |
| from pdfxmeta import pdfxmeta | |
| st.set_page_config(page_title="PDF Bookmark Splitter", layout="wide") | |
| st.title("PDF Bookmarker & Splitter") | |
| st.markdown(""" | |
| **Upload a PDF**, analyze its fonts to find top-level headings, and generate Bookmarks for splitting by chapter. | |
| """) | |
| uploaded_file = st.file_uploader("Choose a PDF file", type="pdf") | |
| # Short guidance shown under the uploader | |
| st.caption("Recommended use: After uploading your PDF, search for the text of a known chapter heading. Once the correct entry is identified in the search results, select the corresponding entry from the drop down, and optionally repeat the step to ensure back matter is split off from the last chapter before running the pipeline.") | |
| if uploaded_file is not None: | |
| # We need to save the uploaded file to disk for the CLI tools to read it | |
| # We'll use a permanent temp file for the session so we don't have to re-upload constantly | |
| # But for cleanliness, we might want to put this in a temp dir too? | |
| # For now, keeping the input file logic as is (tempfile), but we'll put OUTPUTS in a pure temp dir | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf: | |
| tmp_pdf.write(uploaded_file.getvalue()) | |
| input_pdf_path = tmp_pdf.name | |
| # --- State Management & Reset --- | |
| # Check if a new file is uploaded | |
| file_id = f"{uploaded_file.name}_{uploaded_file.size}" # Robust proxy for ID | |
| if 'current_file_id' not in st.session_state: | |
| st.session_state['current_file_id'] = None | |
| if st.session_state['current_file_id'] != file_id: | |
| # NEW FILE DETECTED: Reset Pipeline State | |
| keys_to_reset = ['final_pdf_bytes', 'final_zip_bytes', 'final_zip_name', 'search_matches', 'font_name', 'font_size'] | |
| for k in keys_to_reset: | |
| if k in st.session_state: | |
| del st.session_state[k] | |
| st.session_state['current_file_id'] = file_id | |
| # st.toast(f"New file loaded: {uploaded_file.name}. State cleared.") | |
| st.success(f"Loaded: {uploaded_file.name}") | |
| # --- Data Source Selection --- | |
| st.header("1. Source Selection") | |
| source_mode = st.radio("Where should the bookmarks come from?", | |
| ["Scan & Generate (Create New)", "Use Existing Bookmarks (Modify)"], | |
| help="Choose 'Scan & Generate' to build new bookmarks from fonts. Choose 'Use Existing' to tidy up bookmarks already in the file.") | |
| # --- Analysis Section (Only for Generate) --- | |
| if source_mode == "Scan & Generate (Create New)": | |
| st.header("2. Analyze Fonts") | |
| if 'font_name' not in st.session_state: | |
| st.session_state['font_name'] = '' | |
| if 'font_size' not in st.session_state: | |
| st.session_state['font_size'] = 18.0 | |
| tab1, tab2 = st.tabs(["Scan for Large Fonts", "Search by Text"]) | |
| with tab1: | |
| if st.button("Find Header Candidates"): | |
| with st.spinner("Scanning PDF for large fonts..."): | |
| doc = fitz.open(input_pdf_path) | |
| candidates = [] | |
| for page in doc[:50]: | |
| text_page = page.get_text("dict") | |
| for block in text_page["blocks"]: | |
| for line in block.get("lines", []): | |
| for span in line["spans"]: | |
| text = span["text"].strip() | |
| if len(text) > 3: | |
| candidates.append({ | |
| "Text": text[:50], | |
| "Font": span["font"], | |
| "Size": round(span["size"], 2), | |
| "Page": page.number + 1 | |
| }) | |
| doc.close() | |
| if candidates: | |
| df = pd.DataFrame(candidates) | |
| summary = df.groupby(['Font', 'Size']).size().reset_index(name='Count') | |
| summary = summary.sort_values(by=['Size', 'Count'], ascending=[False, False]).head(20) | |
| st.session_state['scan_results'] = summary | |
| else: | |
| st.warning("No text found.") | |
| if 'scan_results' in st.session_state: | |
| st.write("### Top Large Fonts Found") | |
| st.dataframe(st.session_state['scan_results'], use_container_width=True) | |
| def update_from_scan(): | |
| val = st.session_state.scan_selector | |
| if val: | |
| f_name = val.split(" (")[0] | |
| f_size = float(val.split("(")[1].replace("pt)", "")) | |
| st.session_state['font_name'] = f_name | |
| st.session_state['font_size'] = f_size | |
| options = st.session_state['scan_results'].apply(lambda x: f"{x['Font']} ({x['Size']}pt)", axis=1) | |
| st.selectbox("Select extraction font:", options, key='scan_selector', on_change=update_from_scan, index=None, placeholder="Choose a font...") | |
| with tab2: | |
| search_query = st.text_input("Enter text to find (e.g., 'Chapter 1')", "") | |
| c1, c2 = st.columns([1, 3]) | |
| with c1: | |
| do_search = st.button("Search Text") | |
| with c2: | |
| is_case_sensitive = st.checkbox("Case Sensitive", value=False) | |
| if do_search: | |
| with st.spinner(f"Searching for '{search_query}'..."): | |
| # Use the robust pdfxmeta library | |
| try: | |
| doc = fitz.open(input_pdf_path) | |
| # pdfxmeta expects a regex pattern, so we escape the query to be safe | |
| import re | |
| safe_pattern = re.escape(search_query) | |
| # extract_meta returns a list of dicts (spans) | |
| results = pdfxmeta.extract_meta(doc, safe_pattern, ign_case=(not is_case_sensitive)) | |
| doc.close() | |
| matches = [] | |
| for res in results: | |
| matches.append({ | |
| "Text": res.get("text", "").strip(), | |
| "Font": res.get("font", ""), | |
| "Size": round(res.get("size", 0), 2), | |
| "Page": res.get("page_index", 0) | |
| }) | |
| # Limit for display safety | |
| if len(matches) > 50: break | |
| if matches: | |
| st.session_state['search_matches'] = pd.DataFrame(matches) | |
| else: | |
| st.warning("No matches found.") | |
| except Exception as e: | |
| st.error(f"Search failed: {e}") | |
| if 'search_matches' in st.session_state: | |
| st.write(f"### Found Matches") | |
| st.dataframe(st.session_state['search_matches'], use_container_width=True) | |
| def update_from_search(): | |
| val = st.session_state.search_selector | |
| if val: | |
| parts = val.split(" (") | |
| f_name = parts[0] | |
| f_size = float(parts[1].split("pt)")[0]) | |
| st.session_state['font_name'] = f_name | |
| st.session_state['font_size'] = f_size | |
| options = st.session_state['search_matches'].apply(lambda x: f"{x['Font']} ({x['Size']}pt) - Pg {x['Page']}", axis=1) | |
| st.selectbox("Select font from match:", options, key='search_selector', on_change=update_from_search, index=None, placeholder="Choose a match...") | |
| # --- Configuration (Only for Generate) --- | |
| st.header("3. Configure Recipe") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| font_name_input = st.text_input("Font Name", key='font_name') | |
| with col2: | |
| font_size_input = st.number_input("Font Size", key='font_size') | |
| greedy = st.checkbox("Greedy Match (Merge multiline specs)", value=True) | |
| # --- Back Matter Configuration --- | |
| with st.expander("Back Matter Configuration (Optional)", expanded=False): | |
| st.markdown("Identify where the **Back Matter** (Index, Glossary, etc.) starts to split it into a separate `999_Back_matter.pdf`.") | |
| # Independent Search for Back Matter | |
| bm_query = st.text_input("Find Back Matter start (e.g., 'Index')", key="bm_search_query") | |
| c_bm1, c_bm2 = st.columns([1, 3]) | |
| with c_bm1: | |
| do_bm_search = st.button("Search Back Matter") | |
| with c_bm2: | |
| bm_case_sensitive = st.checkbox("Case Sensitive", key="bm_sens", value=False) | |
| if do_bm_search: | |
| with st.spinner("Searching..."): | |
| try: | |
| doc = fitz.open(input_pdf_path) | |
| import re | |
| safe_pattern = re.escape(bm_query) | |
| results = pdfxmeta.extract_meta(doc, safe_pattern, ign_case=(not bm_case_sensitive)) | |
| doc.close() | |
| bm_matches = [] | |
| for res in results: | |
| bm_matches.append({ | |
| "Text": res.get("text", "").strip(), | |
| "Page": res.get("page_index", 0) # Display raw (already 1-based from pdfxmeta) | |
| }) | |
| if len(bm_matches) > 50: break | |
| if bm_matches: | |
| st.session_state['bm_matches'] = pd.DataFrame(bm_matches) | |
| else: | |
| st.warning("No matches found.") | |
| except Exception as e: | |
| st.error(f"Search failed: {e}") | |
| if 'bm_matches' in st.session_state: | |
| st.dataframe(st.session_state['bm_matches'], use_container_width=True) | |
| def update_bm_page(): | |
| val = st.session_state.bm_selector | |
| if val: | |
| # Value format: "Page X - Text..." | |
| page_num = int(val.split(" -")[0].replace("Page ", "")) | |
| st.session_state['back_matter_page'] = page_num | |
| bm_options = st.session_state['bm_matches'].apply(lambda x: f"Page {x['Page']} - {x['Text'][:30]}...", axis=1) | |
| st.selectbox("Select Start Page:", bm_options, key='bm_selector', on_change=update_bm_page, index=None, placeholder="Select start page...") | |
| # Manual Override | |
| # Update session state when this input changes | |
| def update_manual_bm(): | |
| st.session_state['back_matter_page'] = st.session_state.back_matter_page_manual | |
| st.number_input("Or manually set Start Page:", min_value=0, value=st.session_state.get('back_matter_page', 0), key='back_matter_page_manual', on_change=update_manual_bm) | |
| else: | |
| # Existing Mode | |
| st.info("Using existing bookmarks. They will be cleaned, numbered, and used for splitting/downloading.") | |
| # --- Generation --- | |
| st.header("4. Process & Generate") | |
| if st.button("Run Pipeline"): | |
| # Validate inputs if generating | |
| if source_mode == "Scan & Generate (Create New)" and not st.session_state.get('font_name'): | |
| st.error("Please specify a font name for extraction.") | |
| else: | |
| with st.status("Running pipeline tasks...", expanded=True) as status: | |
| # Use a temporary directory for all intermediate files | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| status.write(f"Created temp workspace: {temp_dir}") | |
| # Paths | |
| recipe_path = os.path.join(temp_dir, "recipe.toml") | |
| raw_toc_path = os.path.join(temp_dir, "raw.toc") # pdftocgen output | |
| clean_toc_path = os.path.join(temp_dir, "clean.toc") # modify_toc output | |
| output_pdf_path = os.path.join(temp_dir, "final.pdf") | |
| raw_toc_content = "" | |
| if source_mode == "Scan & Generate (Create New)": | |
| # 1. Create Recipe | |
| recipe_data = { | |
| "heading": [{ | |
| "level": 1, | |
| "greedy": greedy, | |
| "font": { | |
| "name": st.session_state['font_name'], | |
| "size": st.session_state['font_size'], | |
| "size_tolerance": 0.1 | |
| } | |
| }] | |
| } | |
| with open(recipe_path, "w") as f: | |
| toml.dump(recipe_data, f) | |
| status.write("β Recipe created") | |
| # 2. Run pdftocgen -> raw.toc | |
| status.write("Running pdftocgen (Scanning)...") | |
| cmd1 = [sys.executable, "-m", "pdftocgen", "-r", recipe_path, input_pdf_path] | |
| process = subprocess.run(cmd1, capture_output=True, text=True, encoding='utf-8') | |
| if process.returncode != 0: | |
| st.error(f"pdftocgen failed: {process.stderr}") | |
| st.stop() | |
| raw_toc_content = process.stdout | |
| status.write("β Headers extracted") | |
| else: | |
| # Existing Bookmarks | |
| status.write("Extracting existing bookmarks...") | |
| # Run pdftocio in extract mode | |
| cmd1 = [sys.executable, "-m", "pdftocio", input_pdf_path] | |
| process = subprocess.run(cmd1, capture_output=True, text=True, encoding='utf-8') | |
| if process.returncode != 0: | |
| st.error(f"pdftocio failed: {process.stderr}") | |
| st.stop() | |
| raw_toc_content = process.stdout | |
| if not raw_toc_content.strip(): | |
| st.warning("No existing bookmarks found!") | |
| st.stop() | |
| status.write("β Existing bookmarks imported") | |
| # 3. Clean Content (Using centralized utility) | |
| status.write("Cleaning and merging bookmarks...") | |
| cleaned_toc_content = toc_processor.process_toc(raw_toc_content) | |
| with open(clean_toc_path, "w", encoding='utf-8') as f: | |
| f.write(cleaned_toc_content) | |
| status.write("β Bookmarks formatted (Double-splits fixed)") | |
| # 4. Write PDF | |
| status.write("Writing to PDF...") | |
| cmd3 = [sys.executable, "-m", "pdftocio", "-t", clean_toc_path, "-o", output_pdf_path, input_pdf_path] | |
| process = subprocess.run(cmd3, capture_output=True, text=True) | |
| if process.returncode != 0: | |
| st.error(f"pdftocio failed: {process.stderr}") | |
| st.stop() | |
| status.write("β PDF saved") | |
| # 5. Read Result for Download | |
| with open(output_pdf_path, "rb") as f: | |
| st.session_state['final_pdf_bytes'] = f.read() | |
| # 6. Split & Zip (The Feature) | |
| # Use a temp file for the zip to avoid memory issues | |
| with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp_zip: | |
| tmp_zip_path = tmp_zip.name | |
| try: | |
| # Pass back_matter_page if it exists and is valid | |
| bm_page = st.session_state.get('back_matter_page', 0) | |
| if bm_page == 0: bm_page = None | |
| toc_processor.generate_chapter_splits(output_pdf_path, tmp_zip_path, back_matter_start_page=bm_page) | |
| with open(tmp_zip_path, "rb") as f: | |
| st.session_state['final_zip_bytes'] = f.read() | |
| base_name = os.path.splitext(uploaded_file.name)[0] | |
| st.session_state['final_zip_name'] = f"{base_name}_chapters.zip" | |
| except Exception as e: | |
| st.error(f"Error generating zip: {e}") | |
| finally: | |
| if os.path.exists(tmp_zip_path): | |
| os.unlink(tmp_zip_path) | |
| # --- Persistent Download Area --- | |
| if 'final_pdf_bytes' in st.session_state: | |
| st.success("Pipeline completed successfully!") | |
| st.write("### Downloads") | |
| c_dl1, c_dl2 = st.columns(2) | |
| with c_dl1: | |
| st.download_button( | |
| label="Download Bookmarked PDF", | |
| data=st.session_state['final_pdf_bytes'], | |
| file_name="bookmarked_doc.pdf", | |
| mime="application/pdf", | |
| key="dl_pdf_btn" | |
| ) | |
| with c_dl2: | |
| if 'final_zip_bytes' in st.session_state: | |
| st.download_button( | |
| label=f"Download ZIP ({st.session_state['final_zip_name']})", | |
| data=st.session_state['final_zip_bytes'], | |
| file_name=st.session_state['final_zip_name'], | |
| mime="application/zip", | |
| key="dl_zip_btn" | |
| ) | |
| st.markdown("---") | |
| st.markdown(""" | |
| <div style="text-align: center; color: #666; font-size: 0.8em;"> | |
| Based on <a href="https://github.com/Krasjet/pdf.tocgen" target="_blank">pdf.tocgen</a> by krasjet. <br> | |
| Enhanced with UI, Chapter Splitting, and Metadata Search. Licensed under AGPL-3.0. | |
| </div> | |
| """, unsafe_allow_html=True) | |