import os import re import requests import json import docx import pytesseract from nltk.tokenize import sent_tokenize, word_tokenize from PyPDF2 import PdfReader from pdf2image import convert_from_path class OCRService: def __init__(self, LLAMAPARSE_API_KEY): self.llama_parse_key = LLAMAPARSE_API_KEY return def extract_ocrless_pdf(self, filepath): reader = PdfReader(filepath) extracted_text = "" for page in reader.pages: text = page.extract_text() extracted_text += " " extracted_text += text return extracted_text def extract_text_from_pdf(self, filepath): images = convert_from_path(filepath, thread_count=4) full_text = [] #config = (r"--oem 2 --psm 7") for image_idx, image in enumerate(images): text = pytesseract.image_to_string(image) #text = pytesseract.image_to_string(image, config=config) full_text.append(text) return full_text def extract_text_from_document(self, filepath): file_ext = os.path.splitext(filepath)[-1] if file_ext in [".pdf"]: text_to_process = self.extract_text_from_pdf(filepath) text_joined = " ".join(text_to_process) #with open(f"{os.path.splitext(filepath)[0]}.txt", "w") as file: # file.writelines(text_to_process) elif file_ext in [".doc", ".DOC", ".docx", ".DOCX"]: doc_content = docx.Document(filepath) text_to_process = [i.text for i in doc_content.paragraphs] text_joined = " \n ".join(text_to_process) #with open(f"{os.path.splitext(filepath)[0]}.txt", "w") as file: # file.write(text_joined) elif file_ext in [".txt"]: file = open(f"{os.path.splitext(filepath)[0]}.txt", encoding="utf8") text_joined = file.read() return text_joined def preprocess_document(self, document): document = document.replace(r'\n+', "\n") #document = re.sub(r"\s+", " ", document) document = re.sub("“", r"\"", document) document = re.sub("”", r"\"", document) document = re.sub(r"\\\"", "\"", document) return document def chunk_document(self, text, k=1500): sentences = sent_tokenize(text) words = word_tokenize(text) chunks = [] current_chunk = [] current_word_count = 0 for sentence in sentences: sentence_words = word_tokenize(sentence) if current_word_count + len(sentence_words) <= k: current_chunk.append(sentence) current_word_count += len(sentence_words) else: chunks.append(" ".join(current_chunk)) current_chunk = [sentence] current_word_count = len(sentence_words) if current_chunk: chunks.append(" ".join(current_chunk)) for id, chunk in enumerate(chunks): if len(chunk.split()) < 2: del chunks[id] return chunks def llama_parse_ocr(self, file_path): llamaparse_url = 'https://api.cloud.llamaindex.ai/api/parsing/upload' headers = { 'accept': 'application/json', 'Authorization': f'Bearer {self.llama_parse_key}' } files = { 'file': (file_path, open(file_path, 'rb'), 'application/pdf') } response = requests.post(llamaparse_url, headers=headers, files=files) print(response.json()) # If you want to print the JSON response job_id = response.json()["id"] result_type = "markdown" llamaparse_result_url = f"https://api.cloud.llamaindex.ai/api/parsing/job/{job_id}/result/{result_type}" # check for the result until its ready while True: response = requests.get(llamaparse_result_url, headers=headers) if response.status_code == 200: break return response.json()['markdown']