Spaces:
Sleeping
Sleeping
import concurrent.futures | |
import requests | |
from pdf2image import convert_from_path | |
import base64 | |
from pymongo import MongoClient | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
from langchain_core.messages import HumanMessage | |
import os | |
import re | |
import json | |
import uuid | |
from dotenv import load_dotenv | |
import pinecone | |
load_dotenv() | |
MONGO_URI = os.getenv("MONGO_URI") | |
DB_NAME = os.getenv("DB_NAME") | |
COLLECTION_NAME = os.getenv("COLLECTION_NAME") | |
FLASH_API = os.getenv("FLASH_API") | |
mongo_client = MongoClient(MONGO_URI) | |
db = mongo_client[DB_NAME] | |
collection = db[COLLECTION_NAME] | |
collection2=db['about_company'] | |
model = ChatGoogleGenerativeAI(model="gemini-1.5-flash", temperature=0, max_tokens=None, google_api_key=FLASH_API) | |
google_embeddings = GoogleGenerativeAIEmbeddings( | |
model="models/embedding-001", # Correct model name | |
google_api_key="AIzaSyANNRKfEb-YnVIBaSAq6hQ38XpxxGwvaws" # Your API key | |
) | |
pc = pinecone.Pinecone( | |
api_key="4a80f293-ae6d-489a-a7d8-33ea3fcdd26b" # Your Pinecone API key | |
) | |
index_name = "mospi" | |
index = pc.Index(index_name) | |
about_company_doc=collection2.find_one({"type":"about_company"}) | |
if about_company_doc: | |
about_company=about_company_doc.get('company_description','') | |
pdf_temp_dir = 'temp/pdf_files' | |
image_temp_dir = 'temp/page_images' | |
os.makedirs(pdf_temp_dir, exist_ok=True) | |
os.makedirs(image_temp_dir, exist_ok=True) | |
pdf_path = os.path.join(pdf_temp_dir, 'downloaded_file.pdf') | |
def download_and_split_pdf_to_image(url): | |
try: | |
response = requests.get(url) | |
with open(pdf_path, 'wb') as pdf_file: | |
pdf_file.write(response.content) | |
except Exception as e: | |
print(f"error occured during downloading pdf from object url : {e}") | |
return None | |
try: | |
images = convert_from_path(pdf_path) | |
for i, image in enumerate(images): | |
image_path = os.path.join(image_temp_dir, f'page_{i + 1}.png') | |
image.save(image_path, 'PNG') | |
print(f'Saved image: {image_path}') | |
return True | |
except Exception as e: | |
print(f"error occured in converting pdf pages to image : {e}") | |
return None | |
system_prompt_text = f"""Given is an image of a PDF page.Your task is to extract all the information from this image and give a detailed summary of the page, do not miss out on any information, include keywords or any terms mentioned in the pdf.' | |
Given below is a company information whose pdf page is givn to you, to understand the context. | |
- About Company: {about_company} | |
Follow this Expected output format given below: | |
Expected Output format : {{"description":"String"}} | |
""" | |
def process_image_using_llm(image, page_number, url): | |
try: | |
message = HumanMessage( | |
content=[ | |
{"type": "text", "text": system_prompt_text}, | |
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image}"}}, | |
], | |
) | |
response = model.invoke([message]) | |
print(f"LLM response for page {page_number}: {response}") | |
# Extract JSON from the response content using regex | |
match = re.search(r"\{.*\}", response.content.strip()) | |
if match: | |
json_data = match.group(0) | |
# Step 1: Locate the "description" field and escape all single quotes within it | |
description_match = re.search(r"'description'\s*:\s*('.*?'|\".*?\")", json_data) | |
if description_match: | |
description_text = description_match.group(1) | |
# Replace outer single quotes with double quotes if necessary | |
if description_text.startswith("'") and description_text.endswith("'"): | |
description_text = f'"{description_text[1:-1]}"' | |
elif description_text.startswith('"') and description_text.endswith('"'): | |
pass # No change needed if already double quotes | |
# Escape all single quotes within the description text | |
description_text = description_text.replace("'", "\\'") | |
# Replace the original match with the updated description text | |
json_data = ( | |
json_data[:description_match.start(1)] + | |
description_text + | |
json_data[description_match.end(1):] | |
) | |
# Step 2: Attempt to load the cleaned JSON string | |
try: | |
data = json.loads(json_data) # Load as JSON | |
description = data.get("description", "None").strip() | |
can_find_description = description != "None" | |
return { | |
"page_number": page_number, | |
"description": description if can_find_description else None, | |
"can_find_description": can_find_description | |
} | |
except json.JSONDecodeError as e: | |
print(f"Error decoding JSON for page {page_number}: {e}") | |
return { | |
"page_number": page_number, | |
"description": None, | |
"can_find_description": False | |
} | |
else: | |
print(f"No valid JSON found in the response for page {page_number}") | |
return { | |
"page_number": page_number, | |
"description": None, | |
"can_find_description": False | |
} | |
except Exception as e: | |
print(f"Error processing page {page_number}: {e}") | |
return { | |
"page_number": page_number, | |
"description": None, | |
"can_find_description": False | |
} | |
def create_embedding_for_pdf_chunks(page,description,url,tags,categories): | |
try: | |
document = collection.find_one({'object_url': url}) | |
file_type = document.get("type") | |
mongo_id = str(document.get('_id')) | |
embedding = google_embeddings.embed_query(description) | |
pinecone_id = str(uuid.uuid4()) | |
vectors = [{ | |
'id': pinecone_id, | |
'values': embedding, | |
'metadata': { | |
'description': description, | |
"url": url, | |
"page_number":page, | |
"tag": file_type, | |
"mongo_id": mongo_id, | |
"tags": ','.join(tags), | |
"categories": ','.join(categories) # Store MongoDB ID in metadata | |
} | |
}] | |
index.upsert(vectors) | |
print(f"Inserted: page {page} in Pinecone with MongoDB ID {mongo_id} in metadata") | |
collection.update_one( | |
{ | |
"_id": document["_id"], | |
"chunks.page_number": page # Match document and specific chunk by page number | |
}, | |
{ | |
"$set": { | |
"chunks.$.pinecone_id": pinecone_id, | |
"chunks.$.successfully_embedding_created": True | |
} | |
} | |
) | |
return True | |
except Exception as e: | |
print(f"error occured in creating embedding for pdf with mongo id {mongo_id} for page {page}") | |
collection.update_one( | |
{ | |
"_id": document["_id"], | |
"chunks.page_number": page # Match document and specific chunk by page number | |
}, | |
{ | |
"$set": { | |
"chunks.$.successfully_embedding_created": False | |
} | |
} | |
) | |
return False | |
def process_image_and_create_embedding(page_number, image_path, url, tags, categories): | |
with open(image_path, "rb") as image_file: | |
image_data = base64.b64encode(image_file.read()).decode("utf-8") | |
# Process image using LLM to get description | |
page_result = process_image_using_llm(image_data, page_number, url) | |
# If description is available, create embedding | |
if page_result.get("description"): | |
create_embedding_for_pdf_chunks(page_number, page_result["description"], url, tags, categories) | |
else: | |
print(f"Skipping page {page_number} as description is None") | |
return page_result | |
def cleanup_directory(directory_path): | |
try: | |
for filename in os.listdir(directory_path): | |
file_path = os.path.join(directory_path, filename) | |
if os.path.isfile(file_path): | |
os.remove(file_path) | |
print(f"Cleaned up files in {directory_path}") | |
except Exception as e: | |
print(f"Error cleaning up directory {directory_path}: {e}") | |
def process_pdf(url, tags, categories): | |
print(f"Processing PDF with URL: {url}") | |
if download_and_split_pdf_to_image(url): | |
chunks = [] | |
image_files = sorted( | |
os.listdir(image_temp_dir), | |
key=lambda x: int(re.search(r'page_(\d+)', x).group(1)) | |
) | |
# Use ThreadPoolExecutor to process each page in parallel | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
futures = [ | |
executor.submit( | |
process_image_and_create_embedding, | |
count, | |
os.path.join(image_temp_dir, image_name), | |
url, | |
tags, | |
categories | |
) | |
for count, image_name in enumerate(image_files, start=1) | |
] | |
# Collect results as each thread completes | |
for future in concurrent.futures.as_completed(futures): | |
try: | |
page_result = future.result() | |
chunks.append(page_result) | |
except Exception as e: | |
print(f"Error processing page: {e}") | |
# Update MongoDB document with the collected chunks | |
collection.update_one( | |
{"object_url": url}, | |
{"$set": {"chunks": chunks}}, | |
upsert=True | |
) | |
print("Saved chunks to MongoDB.") | |
# Cleanup directories | |
cleanup_directory(pdf_temp_dir) | |
cleanup_directory(image_temp_dir) | |
# Check how many pages failed to create embeddings | |
total_pages = len(chunks) | |
failed_pages = sum(1 for chunk in chunks if not chunk.get("can_find_description")) | |
return failed_pages < total_pages | |