{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "### Notebook for updating the PDF document" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from ipynb.fs.defs.preprocess_data import store_documents\n", "from ipynb.fs.defs.preprocess_data import load_documents\n", "from langchain.docstore.document import Document\n", "import pypdfium2 as pdfium\n", "import cv2\n", "import os\n", "import pytesseract\n", "from typing import List\n", "import shutil\n", "\n", "pytesseract_path = os.environ.get(\"TESSERACT_PATH\")\n", "pytesseract.pytesseract.tesseract_cmd = pytesseract_path\n", "\n", "\n", "def update_pdf_documents() -> List[Document]:\n", " \"\"\"\n", " Method for processing and updating documents based on the PDFs stored in input_data/PDF/documents. For that the PDFs, that were not processed yet, are converted to images and then transformed to texts. For each PDF one document is then created with all text from all pages. In the end the filename is changed, so that it is clear that it was already processed.\n", " \"\"\"\n", "\n", " # List for either all documents or only new ones\n", " documents_PDF = []\n", " # List for all documents\n", " already_processed_documents = load_documents(\"./../input_data/PDF/documents/all_documents\")\n", "\n", " PDF_images_path = \"./../input_data/PDF/PDF_Images\"\n", " directory_path = \"./../input_data/PDF/files\"\n", "\n", " # Go through each PDF file in the directory\n", " for file in os.listdir(directory_path):\n", " if \"Tesseract_processed\" not in file:\n", " file_path = os.path.join(directory_path, file)\n", " pdf = pdfium.PdfDocument(file_path)\n", " n_pages = len(pdf)\n", " # Create directory to store the image\n", " os.makedirs(PDF_images_path + f\"/{file}\")\n", " complete_text = \"\"\n", " # Go through each page of the PDF and save the according image\n", " for page_number in range(n_pages):\n", " page = pdf.get_page(page_number)\n", " pil_image = page.render(\n", " scale=300 / 72,\n", " rotation=0,\n", " crop=(0, 0, 0, 0),\n", " ).to_pil()\n", " pil_image_path = PDF_images_path + f\"/{file}/image_{page_number+1}.png\"\n", " pil_image.save(pil_image_path)\n", " img = cv2.imread(pil_image_path)\n", " # Convert image to grayscale\n", " gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)\n", " # Apply threshold to convert to binary image\n", " threshold_img = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]\n", " # Pass the image through pytesseract and add the text to the whole document text\n", " complete_text += pytesseract.image_to_string(threshold_img) + \"\\n\"\n", " # Remove the image as it is already processed\n", " os.remove(pil_image_path)\n", "\n", " file_name_without_pdf = file\n", " if file.endswith(\".pdf\"):\n", " file_name_without_pdf = file[:-4]\n", " # Create a document based on the whole text and metadata\n", " document_PDF = Document(page_content=complete_text, metadata={\"source\": file, \"title\": file_name_without_pdf})\n", " documents_PDF.append(document_PDF)\n", " already_processed_documents.append(document_PDF)\n", "\n", " # Change the filename, so that in future calls the PDF is not processed again\n", " new_filename = file.replace(\".pdf\", \"_Tesseract_processed.pdf\")\n", " new_pdf_path = os.path.join(directory_path, new_filename)\n", " print(new_pdf_path)\n", " pdf.close()\n", " os.rename(file_path, new_pdf_path)\n", "\n", " # Store docs if new documents were processed\n", " if len(documents_PDF) > 0:\n", " # Store all documents, including the new ones\n", " store_documents(already_processed_documents, \"./../input_data/PDF/documents/all_documents\")\n", " # Store the new documents\n", " store_documents(documents_PDF, \"./../input_data/PDF/documents/new_documents\")\n", "\n", " # Delete the empty folders inside the images folder\n", " target_dir = \"./../input_data/PDF/PDF_images\"\n", "\n", " # Check if the target directory exists to avoid errors\n", " if os.path.exists(target_dir):\n", " # List all the items in the directory\n", " for item in os.listdir(target_dir):\n", " item_path = os.path.join(target_dir, item)\n", " if os.path.isdir(item_path):\n", " # Use shutil.rmtree to delete the directory and all its contents\n", " shutil.rmtree(item_path)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Uncomment update needed because of new unprocessed files\n", "# update_pdf_documents()" ] } ], "metadata": { "kernelspec": { "display_name": "venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 2 }