Spaces:
Sleeping
Sleeping
# extract_text_from_pdf.py | |
import os | |
import torch | |
#import spaces | |
from PyPDF2 import PdfReader | |
from accelerate import Accelerator | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
from tqdm import tqdm | |
import warnings | |
#import spaces | |
from groq import Groq | |
warnings.filterwarnings('ignore') | |
#@spaces.GPU | |
class PDFTextExtractor: | |
""" | |
A class to handle PDF text extraction and preprocessing for podcast preparation. | |
""" | |
#@spaces.GPU | |
def __init__(self, pdf_path, output_path): | |
""" | |
Initialize the PDFTextExtractor with paths and model details. | |
Args: | |
pdf_path (str): Path to the PDF file. | |
output_path (str): Path to save the cleaned text file. | |
model_name (str): Name of the model to use for text processing. | |
""" | |
#model_name="meta-llama/Llama-3.2-1B-Instruct" | |
self.pdf_path = pdf_path | |
self.output_path = output_path | |
self.max_chars = 100000 | |
self.chunk_size = 1000 | |
#self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
# Initialize model and tokenizer | |
# self.accelerator = Accelerator() | |
# self.model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16,use_safetensors=True,device_map=self.device) | |
# self.tokenizer = AutoTokenizer.from_pretrained(model_name, use_safetensors=True) | |
# self.model, self.tokenizer = self.accelerator.prepare(self.model, self.tokenizer) | |
self.model_name="llama3-8b-8192" | |
# System prompt for text processing | |
self.system_prompt = """ | |
You are a world class text pre-processor, here is the raw data from a PDF, please parse and return it in a way that is crispy and usable to send to a podcast writer. | |
The raw data is messed up with new lines, Latex math and you will see fluff that we can remove completely. Basically take away any details that you think might be useless in a podcast author's transcript. | |
Remember, the podcast could be on any topic whatsoever so the issues listed above are not exhaustive | |
Please be smart with what you remove and be creative ok? | |
Remember DO NOT START SUMMARIZING THIS, YOU ARE ONLY CLEANING UP THE TEXT AND RE-WRITING WHEN NEEDED | |
Be very smart and aggressive with removing details, you will get a running portion of the text and keep returning the processed text. | |
PLEASE DO NOT ADD MARKDOWN FORMATTING, STOP ADDING SPECIAL CHARACTERS THAT MARKDOWN CAPATILISATION ETC LIKES | |
ALWAYS start your response directly with processed text and NO ACKNOWLEDGEMENTS about my questions ok? | |
Here is the text:""" | |
#@spaces.GPU | |
def validate_pdf(self): | |
"""Check if the file exists and is a valid PDF.""" | |
if not os.path.exists(self.pdf_path): | |
print(f"Error: File not found at path: {self.pdf_path}") | |
return False | |
if not self.pdf_path.lower().endswith('.pdf'): | |
print("Error: File is not a PDF") | |
return False | |
return True | |
#@spaces.GPU | |
def extract_text(self): | |
"""Extract text from the PDF, limited by max_chars.""" | |
if not self.validate_pdf(): | |
return None | |
with open(self.pdf_path, 'rb') as file: | |
pdf_reader = PdfReader(file) | |
num_pages = len(pdf_reader.pages) | |
print(f"Processing PDF with {num_pages} pages...") | |
extracted_text = [] | |
total_chars = 0 | |
for page_num in range(num_pages): | |
page = pdf_reader.pages[page_num] | |
text = page.extract_text() or "" | |
if total_chars + len(text) > self.max_chars: | |
remaining_chars = self.max_chars - total_chars | |
extracted_text.append(text[:remaining_chars]) | |
print(f"Reached {self.max_chars} character limit at page {page_num + 1}") | |
break | |
extracted_text.append(text) | |
total_chars += len(text) | |
print(f"Processed page {page_num + 1}/{num_pages}") | |
final_text = '\n'.join(extracted_text) | |
print(f"Extraction complete! Total characters: {len(final_text)}") | |
return final_text | |
#@spaces.GPU | |
def create_word_bounded_chunks(self, text): | |
"""Split text into chunks around the target size.""" | |
words = text.split() | |
chunks = [] | |
current_chunk = [] | |
current_length = 0 | |
for word in words: | |
word_length = len(word) + 1 # +1 for the space | |
if current_length + word_length > self.chunk_size and current_chunk: | |
chunks.append(' '.join(current_chunk)) | |
current_chunk = [word] | |
current_length = word_length | |
else: | |
current_chunk.append(word) | |
current_length += word_length | |
if current_chunk: | |
chunks.append(' '.join(current_chunk)) | |
return chunks | |
#@spaces.GPU(duration=120) | |
def process_chunk(self, text_chunk): | |
"""Process a text chunk with the model and return the cleaned text.""" | |
conversation = [ | |
{"role": "system", "content": self.system_prompt}, | |
{"role": "user", "content": text_chunk} | |
] | |
# prompt = self.tokenizer.apply_chat_template(conversation, tokenize=False) | |
# inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device) | |
# with torch.no_grad(): | |
# output = self.model.generate(**inputs, temperature=0.7, top_p=0.9, max_new_tokens=512) | |
# processed_text = self.tokenizer.decode(output[0], skip_special_tokens=True)[len(prompt):].strip() | |
client = Groq( | |
api_key=os.environ.get("GROQ_API_KEY"), | |
) | |
chat_completion = client.chat.completions.create( | |
messages=conversation, | |
model=self.model_name, | |
) | |
processed_text = chat_completion.choices[0].message.content | |
return processed_text | |
#@spaces.GPU | |
def clean_and_save_text(self): | |
"""Extract, clean, and save processed text to a file.""" | |
extracted_text = self.extract_text() | |
if not extracted_text: | |
return None | |
chunks = self.create_word_bounded_chunks(extracted_text) | |
processed_text = "" | |
with open(self.output_path, 'w', encoding='utf-8') as out_file: | |
for chunk_num, chunk in enumerate(tqdm(chunks, desc="Processing chunks")): | |
processed_chunk = self.process_chunk(chunk) | |
processed_text += processed_chunk + "\n" | |
out_file.write(processed_chunk + "\n") | |
out_file.flush() | |
print(f"\nExtracted and cleaned text has been saved to {self.output_path}") | |
return self.output_path | |