MoSPI / helper /process_pdf_parallel.py
akshansh36's picture
Upload 4 files
2588b54 verified
import concurrent.futures
import requests
from pdf2image import convert_from_path
import base64
from pymongo import MongoClient
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_core.messages import HumanMessage
import os
import re
import json
import uuid
from dotenv import load_dotenv
import pinecone
load_dotenv()
MONGO_URI = os.getenv("MONGO_URI")
DB_NAME = os.getenv("DB_NAME")
COLLECTION_NAME = os.getenv("COLLECTION_NAME")
FLASH_API = os.getenv("FLASH_API")
mongo_client = MongoClient(MONGO_URI)
db = mongo_client[DB_NAME]
collection = db[COLLECTION_NAME]
collection2=db['about_company']
model = ChatGoogleGenerativeAI(model="gemini-1.5-flash", temperature=0, max_tokens=None, google_api_key=FLASH_API)
google_embeddings = GoogleGenerativeAIEmbeddings(
model="models/embedding-001", # Correct model name
google_api_key="AIzaSyANNRKfEb-YnVIBaSAq6hQ38XpxxGwvaws" # Your API key
)
pc = pinecone.Pinecone(
api_key="4a80f293-ae6d-489a-a7d8-33ea3fcdd26b" # Your Pinecone API key
)
index_name = "mospi"
index = pc.Index(index_name)
about_company_doc=collection2.find_one({"type":"about_company"})
if about_company_doc:
about_company=about_company_doc.get('company_description','')
pdf_temp_dir = 'temp/pdf_files'
image_temp_dir = 'temp/page_images'
os.makedirs(pdf_temp_dir, exist_ok=True)
os.makedirs(image_temp_dir, exist_ok=True)
pdf_path = os.path.join(pdf_temp_dir, 'downloaded_file.pdf')
def download_and_split_pdf_to_image(url):
try:
response = requests.get(url)
with open(pdf_path, 'wb') as pdf_file:
pdf_file.write(response.content)
except Exception as e:
print(f"error occured during downloading pdf from object url : {e}")
return None
try:
images = convert_from_path(pdf_path)
for i, image in enumerate(images):
image_path = os.path.join(image_temp_dir, f'page_{i + 1}.png')
image.save(image_path, 'PNG')
print(f'Saved image: {image_path}')
return True
except Exception as e:
print(f"error occured in converting pdf pages to image : {e}")
return None
system_prompt_text = f"""Given is an image of a PDF page.Your task is to extract all the information from this image and give a detailed summary of the page, do not miss out on any information, include keywords or any terms mentioned in the pdf.'
Given below is a company information whose pdf page is givn to you, to understand the context.
- About Company: {about_company}
Follow this Expected output format given below:
Expected Output format : {{"description":"String"}}
"""
def process_image_using_llm(image, page_number, url):
try:
message = HumanMessage(
content=[
{"type": "text", "text": system_prompt_text},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image}"}},
],
)
response = model.invoke([message])
print(f"LLM response for page {page_number}: {response}")
# Extract JSON from the response content using regex
match = re.search(r"\{.*\}", response.content.strip())
if match:
json_data = match.group(0)
# Step 1: Locate the "description" field and escape all single quotes within it
description_match = re.search(r"'description'\s*:\s*('.*?'|\".*?\")", json_data)
if description_match:
description_text = description_match.group(1)
# Replace outer single quotes with double quotes if necessary
if description_text.startswith("'") and description_text.endswith("'"):
description_text = f'"{description_text[1:-1]}"'
elif description_text.startswith('"') and description_text.endswith('"'):
pass # No change needed if already double quotes
# Escape all single quotes within the description text
description_text = description_text.replace("'", "\\'")
# Replace the original match with the updated description text
json_data = (
json_data[:description_match.start(1)] +
description_text +
json_data[description_match.end(1):]
)
# Step 2: Attempt to load the cleaned JSON string
try:
data = json.loads(json_data) # Load as JSON
description = data.get("description", "None").strip()
can_find_description = description != "None"
return {
"page_number": page_number,
"description": description if can_find_description else None,
"can_find_description": can_find_description
}
except json.JSONDecodeError as e:
print(f"Error decoding JSON for page {page_number}: {e}")
return {
"page_number": page_number,
"description": None,
"can_find_description": False
}
else:
print(f"No valid JSON found in the response for page {page_number}")
return {
"page_number": page_number,
"description": None,
"can_find_description": False
}
except Exception as e:
print(f"Error processing page {page_number}: {e}")
return {
"page_number": page_number,
"description": None,
"can_find_description": False
}
def create_embedding_for_pdf_chunks(page,description,url,tags,categories):
try:
document = collection.find_one({'object_url': url})
file_type = document.get("type")
mongo_id = str(document.get('_id'))
embedding = google_embeddings.embed_query(description)
pinecone_id = str(uuid.uuid4())
vectors = [{
'id': pinecone_id,
'values': embedding,
'metadata': {
'description': description,
"url": url,
"page_number":page,
"tag": file_type,
"mongo_id": mongo_id,
"tags": ','.join(tags),
"categories": ','.join(categories) # Store MongoDB ID in metadata
}
}]
index.upsert(vectors)
print(f"Inserted: page {page} in Pinecone with MongoDB ID {mongo_id} in metadata")
collection.update_one(
{
"_id": document["_id"],
"chunks.page_number": page # Match document and specific chunk by page number
},
{
"$set": {
"chunks.$.pinecone_id": pinecone_id,
"chunks.$.successfully_embedding_created": True
}
}
)
return True
except Exception as e:
print(f"error occured in creating embedding for pdf with mongo id {mongo_id} for page {page}")
collection.update_one(
{
"_id": document["_id"],
"chunks.page_number": page # Match document and specific chunk by page number
},
{
"$set": {
"chunks.$.successfully_embedding_created": False
}
}
)
return False
def process_image_and_create_embedding(page_number, image_path, url, tags, categories):
with open(image_path, "rb") as image_file:
image_data = base64.b64encode(image_file.read()).decode("utf-8")
# Process image using LLM to get description
page_result = process_image_using_llm(image_data, page_number, url)
# If description is available, create embedding
if page_result.get("description"):
create_embedding_for_pdf_chunks(page_number, page_result["description"], url, tags, categories)
else:
print(f"Skipping page {page_number} as description is None")
return page_result
def cleanup_directory(directory_path):
try:
for filename in os.listdir(directory_path):
file_path = os.path.join(directory_path, filename)
if os.path.isfile(file_path):
os.remove(file_path)
print(f"Cleaned up files in {directory_path}")
except Exception as e:
print(f"Error cleaning up directory {directory_path}: {e}")
def process_pdf(url, tags, categories):
print(f"Processing PDF with URL: {url}")
if download_and_split_pdf_to_image(url):
chunks = []
image_files = sorted(
os.listdir(image_temp_dir),
key=lambda x: int(re.search(r'page_(\d+)', x).group(1))
)
# Use ThreadPoolExecutor to process each page in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(
process_image_and_create_embedding,
count,
os.path.join(image_temp_dir, image_name),
url,
tags,
categories
)
for count, image_name in enumerate(image_files, start=1)
]
# Collect results as each thread completes
for future in concurrent.futures.as_completed(futures):
try:
page_result = future.result()
chunks.append(page_result)
except Exception as e:
print(f"Error processing page: {e}")
# Update MongoDB document with the collected chunks
collection.update_one(
{"object_url": url},
{"$set": {"chunks": chunks}},
upsert=True
)
print("Saved chunks to MongoDB.")
# Cleanup directories
cleanup_directory(pdf_temp_dir)
cleanup_directory(image_temp_dir)
# Check how many pages failed to create embeddings
total_pages = len(chunks)
failed_pages = sum(1 for chunk in chunks if not chunk.get("can_find_description"))
return failed_pages < total_pages