Spaces:
Sleeping
Sleeping
import os | |
import re | |
import requests | |
import json | |
import docx | |
import pytesseract | |
from nltk.tokenize import sent_tokenize, word_tokenize | |
from PyPDF2 import PdfReader | |
from pdf2image import convert_from_path | |
class OCRService: | |
def __init__(self, LLAMAPARSE_API_KEY): | |
self.llama_parse_key = LLAMAPARSE_API_KEY | |
return | |
def extract_ocrless_pdf(self, filepath): | |
reader = PdfReader(filepath) | |
extracted_text = "" | |
for page in reader.pages: | |
text = page.extract_text() | |
extracted_text += " " | |
extracted_text += text | |
return extracted_text | |
def extract_text_from_pdf(self, filepath): | |
images = convert_from_path(filepath, thread_count=4) | |
full_text = [] | |
#config = (r"--oem 2 --psm 7") | |
for image_idx, image in enumerate(images): | |
text = pytesseract.image_to_string(image) | |
#text = pytesseract.image_to_string(image, config=config) | |
full_text.append(text) | |
return full_text | |
def extract_text_from_document(self, filepath): | |
file_ext = os.path.splitext(filepath)[-1] | |
if file_ext in [".pdf"]: | |
text_to_process = self.extract_text_from_pdf(filepath) | |
text_joined = " ".join(text_to_process) | |
#with open(f"{os.path.splitext(filepath)[0]}.txt", "w") as file: | |
# file.writelines(text_to_process) | |
elif file_ext in [".doc", ".DOC", ".docx", ".DOCX"]: | |
doc_content = docx.Document(filepath) | |
text_to_process = [i.text for i in doc_content.paragraphs] | |
text_joined = " \n ".join(text_to_process) | |
#with open(f"{os.path.splitext(filepath)[0]}.txt", "w") as file: | |
# file.write(text_joined) | |
elif file_ext in [".txt"]: | |
file = open(f"{os.path.splitext(filepath)[0]}.txt", encoding="utf8") | |
text_joined = file.read() | |
return text_joined | |
def preprocess_document(self, document): | |
document = document.replace(r'\n+', "\n") | |
#document = re.sub(r"\s+", " ", document) | |
document = re.sub("β", r"\"", document) | |
document = re.sub("β", r"\"", document) | |
document = re.sub(r"\\\"", "\"", document) | |
return document | |
def chunk_document(self, text, k=1500): | |
sentences = sent_tokenize(text) | |
words = word_tokenize(text) | |
chunks = [] | |
current_chunk = [] | |
current_word_count = 0 | |
for sentence in sentences: | |
sentence_words = word_tokenize(sentence) | |
if current_word_count + len(sentence_words) <= k: | |
current_chunk.append(sentence) | |
current_word_count += len(sentence_words) | |
else: | |
chunks.append(" ".join(current_chunk)) | |
current_chunk = [sentence] | |
current_word_count = len(sentence_words) | |
if current_chunk: | |
chunks.append(" ".join(current_chunk)) | |
for id, chunk in enumerate(chunks): | |
if len(chunk.split()) < 2: | |
del chunks[id] | |
return chunks | |
def llama_parse_ocr(self, file_path): | |
llamaparse_url = 'https://api.cloud.llamaindex.ai/api/parsing/upload' | |
headers = { | |
'accept': 'application/json', | |
'Authorization': f'Bearer {self.llama_parse_key}' | |
} | |
files = { | |
'file': (file_path, open(file_path, 'rb'), 'application/pdf') | |
} | |
response = requests.post(llamaparse_url, headers=headers, files=files) | |
print(response.json()) # If you want to print the JSON response | |
job_id = response.json()["id"] | |
result_type = "markdown" | |
llamaparse_result_url = f"https://api.cloud.llamaindex.ai/api/parsing/job/{job_id}/result/{result_type}" | |
# check for the result until its ready | |
while True: | |
response = requests.get(llamaparse_result_url, headers=headers) | |
if response.status_code == 200: | |
break | |
return response.json()['markdown'] | |