Spaces:
Runtime error
Runtime error
import os | |
import random | |
import shutil | |
import tempfile | |
import zipfile | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from datetime import datetime | |
import fitz # PyMuPDF | |
import gradio as gr | |
from huggingface_hub import DatasetCard, DatasetCardData, HfApi | |
from dataset_card_template import DATASET_CARD_TEMPLATE | |
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" | |
def process_pdf(pdf_file, sample_size, temp_dir, progress=gr.Progress()): | |
try: | |
pdf_path = pdf_file.name | |
doc = fitz.open(pdf_path) | |
total_pages = len(doc) | |
pages_to_convert = ( | |
total_pages if sample_size == 0 else min(sample_size, total_pages) | |
) | |
selected_pages = ( | |
sorted(random.sample(range(total_pages), pages_to_convert)) | |
if sample_size > 0 and sample_size < total_pages | |
else range(total_pages) | |
) | |
images = [] | |
for page_num in progress.tqdm( | |
selected_pages, desc=f"Converting {os.path.basename(pdf_path)}", unit="page" | |
): | |
page = doc[page_num] | |
pix = page.get_pixmap() | |
image_path = os.path.join( | |
temp_dir, f"{os.path.basename(pdf_path)}_page_{page_num+1}.png" | |
) | |
pix.save(image_path) | |
images.append(image_path) | |
doc.close() | |
return images, None | |
except Exception as e: | |
return [], f"Error processing {pdf_file.name}: {str(e)}" | |
def pdf_to_images(pdf_files, sample_size, temp_dir, progress=gr.Progress()): | |
if not os.path.exists(temp_dir): | |
os.makedirs(temp_dir) | |
progress(0, desc="Starting conversion") | |
all_images = [] | |
skipped_pdfs = [] | |
for i, pdf_file in enumerate( | |
progress.tqdm(pdf_files, desc="Converting PDFs", unit="PDF") | |
): | |
images, error = process_pdf(pdf_file, sample_size, temp_dir, progress) | |
if error: | |
skipped_pdfs.append(pdf_file.name) | |
gr.Info(error) | |
else: | |
all_images.extend(images) | |
message = f"Saved {len(all_images)} images to temporary directory" | |
if skipped_pdfs: | |
message += f"\nSkipped {len(skipped_pdfs)} PDFs due to errors: {', '.join(skipped_pdfs)}" | |
return all_images, message | |
def get_size_category(num_images): | |
if num_images < 1000: | |
return "n<1K" | |
elif num_images < 10000: | |
return "1K<n<10K" | |
elif num_images < 100000: | |
return "10K<n<100K" | |
elif num_images < 1000000: | |
return "100K<n<1M" | |
else: | |
return "n>1M" | |
def process_pdfs( | |
pdf_files, | |
sample_size, | |
hf_repo, | |
create_zip, | |
private_repo, | |
oauth_token: gr.OAuthToken | None, | |
progress=gr.Progress(), | |
): | |
if not pdf_files: | |
return ( | |
None, | |
None, | |
gr.Markdown( | |
"⚠️ No PDF files uploaded. Please upload at least one PDF file." | |
), | |
) | |
if oauth_token is None: | |
return ( | |
None, | |
None, | |
gr.Markdown( | |
"⚠️ Not logged in to Hugging Face. Please log in to upload to a Hugging Face dataset." | |
), | |
) | |
try: | |
temp_dir = tempfile.mkdtemp() | |
images_dir = os.path.join(temp_dir, "images") | |
os.makedirs(images_dir) | |
progress(0, desc="Starting PDF processing") | |
images, message = pdf_to_images(pdf_files, sample_size, images_dir) | |
zip_path = None | |
if create_zip: | |
# Create a zip file of the images | |
zip_path = os.path.join(temp_dir, "converted_images.zip") | |
with zipfile.ZipFile(zip_path, "w") as zipf: | |
progress(0, desc="Zipping images") | |
for image in progress.tqdm(images, desc="Zipping images"): | |
zipf.write(image, os.path.basename(image)) | |
message += f"\nCreated zip file with {len(images)} images" | |
if hf_repo: | |
try: | |
hf_api = HfApi(token=oauth_token.token) | |
hf_api.create_repo( | |
hf_repo, | |
repo_type="dataset", | |
private=private_repo, | |
) | |
hf_api.upload_large_folder( | |
folder_path=temp_dir, | |
repo_id=hf_repo, | |
repo_type="dataset", | |
# path_in_repo="images", | |
) | |
# Determine size category | |
size_category = get_size_category(len(images)) | |
# Create DatasetCardData instance | |
card_data = DatasetCardData( | |
tags=["created-with-pdfs-to-page-images-converter", "pdf-to-image"], | |
size_categories=[size_category], | |
) | |
# Create and populate the dataset card | |
card = DatasetCard.from_template( | |
card_data, | |
template_path=None, # Use default template | |
hf_repo=hf_repo, | |
num_images=len(images), | |
num_pdfs=len(pdf_files), | |
sample_size=sample_size if sample_size > 0 else "All pages", | |
creation_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
) | |
# Add our custom content to the card | |
card.text = DATASET_CARD_TEMPLATE.format( | |
hf_repo=hf_repo, | |
num_images=len(images), | |
num_pdfs=len(pdf_files), | |
sample_size=sample_size if sample_size > 0 else "All pages", | |
creation_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
size_category=size_category, | |
) | |
repo_url = f"https://huggingface.co/datasets/{hf_repo}" | |
message += f"\nUploaded dataset card to Hugging Face repo: [{hf_repo}]({repo_url})" | |
card.push_to_hub(hf_repo, token=oauth_token.token) | |
except Exception as e: | |
message += f"\nFailed to upload to Hugging Face: {str(e)}" | |
return images, zip_path, message | |
except Exception as e: | |
if "temp_dir" in locals(): | |
shutil.rmtree(temp_dir) | |
return None, None, f"An error occurred: {str(e)}" | |
# Define the Gradio interface | |
with gr.Blocks() as demo: | |
gr.HTML( | |
"""<h1 style='text-align: center;'> PDFs to Page Images Converter</h1> | |
<center><i> 📁 Convert PDFs to an image dataset, splitting pages into individual images 📁 </i></center>""" | |
) | |
gr.Markdown( | |
""" | |
This app allows you to: | |
1. Upload one or more PDF files | |
2. Convert each page of the PDFs into separate image files | |
3. (Optionally) sample a specific number of pages from each PDF | |
4. (Optionally) Create a downloadable ZIP file of the converted images | |
5. (Optionally) Upload the images to a Hugging Face dataset repository | |
""" | |
) | |
with gr.Row(): | |
gr.LoginButton(size="sm") | |
with gr.Row(): | |
pdf_files = gr.File( | |
file_count="multiple", label="Upload PDF(s)", file_types=["*.pdf"] | |
) | |
with gr.Row(): | |
sample_size = gr.Number( | |
value=None, | |
label="Pages per PDF (0 for all pages)", | |
info="Specify how many pages to convert from each PDF. Use 0 to convert all pages.", | |
) | |
hf_repo = gr.Textbox( | |
label="Hugging Face Repo", | |
placeholder="username/repo-name", | |
info="Enter the Hugging Face repository name in the format 'username/repo-name'", | |
) | |
with gr.Row(): | |
create_zip = gr.Checkbox(label="Create ZIP file of images?", value=False) | |
private_repo = gr.Checkbox(label="Make repository private?", value=False) | |
with gr.Accordion("View converted images", open=False): | |
output_gallery = gr.Gallery(label="Converted Images") | |
status_text = gr.Markdown(label="Status") | |
download_button = gr.File(label="Download Converted Images") | |
submit_button = gr.Button("Convert PDFs to page images") | |
submit_button.click( | |
process_pdfs, | |
inputs=[pdf_files, sample_size, hf_repo, create_zip, private_repo], | |
outputs=[output_gallery, download_button, status_text], | |
) | |
# Launch the app | |
demo.launch(debug=True) | |