Spaces:
Sleeping
Sleeping
| import concurrent.futures | |
| import requests | |
| from pdf2image import convert_from_path | |
| import base64 | |
| from pymongo import MongoClient | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
| from langchain_core.messages import HumanMessage | |
| import os | |
| import re | |
| import json | |
| import uuid | |
| from dotenv import load_dotenv | |
| import pinecone | |
| load_dotenv() | |
| MONGO_URI = os.getenv("MONGO_URI") | |
| DB_NAME = os.getenv("DB_NAME") | |
| COLLECTION_NAME = os.getenv("COLLECTION_NAME") | |
| FLASH_API = os.getenv("FLASH_API") | |
| mongo_client = MongoClient(MONGO_URI) | |
| db = mongo_client[DB_NAME] | |
| collection = db[COLLECTION_NAME] | |
| collection2=db['about_company'] | |
| model = ChatGoogleGenerativeAI(model="gemini-1.5-flash", temperature=0, max_tokens=None, google_api_key=FLASH_API) | |
| google_embeddings = GoogleGenerativeAIEmbeddings( | |
| model="models/embedding-001", # Correct model name | |
| google_api_key="AIzaSyANNRKfEb-YnVIBaSAq6hQ38XpxxGwvaws" # Your API key | |
| ) | |
| pc = pinecone.Pinecone( | |
| api_key="4a80f293-ae6d-489a-a7d8-33ea3fcdd26b" # Your Pinecone API key | |
| ) | |
| index_name = "mospi" | |
| index = pc.Index(index_name) | |
| about_company_doc=collection2.find_one({"type":"about_company"}) | |
| if about_company_doc: | |
| about_company=about_company_doc.get('company_description','') | |
| pdf_temp_dir = 'temp/pdf_files' | |
| image_temp_dir = 'temp/page_images' | |
| os.makedirs(pdf_temp_dir, exist_ok=True) | |
| os.makedirs(image_temp_dir, exist_ok=True) | |
| pdf_path = os.path.join(pdf_temp_dir, 'downloaded_file.pdf') | |
| def download_and_split_pdf_to_image(url): | |
| try: | |
| response = requests.get(url) | |
| with open(pdf_path, 'wb') as pdf_file: | |
| pdf_file.write(response.content) | |
| except Exception as e: | |
| print(f"error occured during downloading pdf from object url : {e}") | |
| return None | |
| try: | |
| images = convert_from_path(pdf_path) | |
| for i, image in enumerate(images): | |
| image_path = os.path.join(image_temp_dir, f'page_{i + 1}.png') | |
| image.save(image_path, 'PNG') | |
| print(f'Saved image: {image_path}') | |
| return True | |
| except Exception as e: | |
| print(f"error occured in converting pdf pages to image : {e}") | |
| return None | |
| system_prompt_text = f"""Given is an image of a PDF page.Your task is to extract all the information from this image and give a detailed summary of the page, do not miss out on any information, include keywords or any terms mentioned in the pdf.' | |
| Given below is a company information whose pdf page is givn to you, to understand the context. | |
| - About Company: {about_company} | |
| Follow this Expected output format given below: | |
| Expected Output format : {{"description":"String"}} | |
| """ | |
| def process_image_using_llm(image, page_number, url): | |
| try: | |
| message = HumanMessage( | |
| content=[ | |
| {"type": "text", "text": system_prompt_text}, | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image}"}}, | |
| ], | |
| ) | |
| response = model.invoke([message]) | |
| print(f"LLM response for page {page_number}: {response}") | |
| # Extract JSON from the response content using regex | |
| match = re.search(r"\{.*\}", response.content.strip()) | |
| if match: | |
| json_data = match.group(0) | |
| # Step 1: Locate the "description" field and escape all single quotes within it | |
| description_match = re.search(r"'description'\s*:\s*('.*?'|\".*?\")", json_data) | |
| if description_match: | |
| description_text = description_match.group(1) | |
| # Replace outer single quotes with double quotes if necessary | |
| if description_text.startswith("'") and description_text.endswith("'"): | |
| description_text = f'"{description_text[1:-1]}"' | |
| elif description_text.startswith('"') and description_text.endswith('"'): | |
| pass # No change needed if already double quotes | |
| # Escape all single quotes within the description text | |
| description_text = description_text.replace("'", "\\'") | |
| # Replace the original match with the updated description text | |
| json_data = ( | |
| json_data[:description_match.start(1)] + | |
| description_text + | |
| json_data[description_match.end(1):] | |
| ) | |
| # Step 2: Attempt to load the cleaned JSON string | |
| try: | |
| data = json.loads(json_data) # Load as JSON | |
| description = data.get("description", "None").strip() | |
| can_find_description = description != "None" | |
| return { | |
| "page_number": page_number, | |
| "description": description if can_find_description else None, | |
| "can_find_description": can_find_description | |
| } | |
| except json.JSONDecodeError as e: | |
| print(f"Error decoding JSON for page {page_number}: {e}") | |
| return { | |
| "page_number": page_number, | |
| "description": None, | |
| "can_find_description": False | |
| } | |
| else: | |
| print(f"No valid JSON found in the response for page {page_number}") | |
| return { | |
| "page_number": page_number, | |
| "description": None, | |
| "can_find_description": False | |
| } | |
| except Exception as e: | |
| print(f"Error processing page {page_number}: {e}") | |
| return { | |
| "page_number": page_number, | |
| "description": None, | |
| "can_find_description": False | |
| } | |
| def create_embedding_for_pdf_chunks(page,description,url,tags,categories): | |
| try: | |
| document = collection.find_one({'object_url': url}) | |
| file_type = document.get("type") | |
| mongo_id = str(document.get('_id')) | |
| embedding = google_embeddings.embed_query(description) | |
| pinecone_id = str(uuid.uuid4()) | |
| vectors = [{ | |
| 'id': pinecone_id, | |
| 'values': embedding, | |
| 'metadata': { | |
| 'description': description, | |
| "url": url, | |
| "page_number":page, | |
| "tag": file_type, | |
| "mongo_id": mongo_id, | |
| "tags": ','.join(tags), | |
| "categories": ','.join(categories) # Store MongoDB ID in metadata | |
| } | |
| }] | |
| index.upsert(vectors) | |
| print(f"Inserted: page {page} in Pinecone with MongoDB ID {mongo_id} in metadata") | |
| collection.update_one( | |
| { | |
| "_id": document["_id"], | |
| "chunks.page_number": page # Match document and specific chunk by page number | |
| }, | |
| { | |
| "$set": { | |
| "chunks.$.pinecone_id": pinecone_id, | |
| "chunks.$.successfully_embedding_created": True | |
| } | |
| } | |
| ) | |
| return True | |
| except Exception as e: | |
| print(f"error occured in creating embedding for pdf with mongo id {mongo_id} for page {page}") | |
| collection.update_one( | |
| { | |
| "_id": document["_id"], | |
| "chunks.page_number": page # Match document and specific chunk by page number | |
| }, | |
| { | |
| "$set": { | |
| "chunks.$.successfully_embedding_created": False | |
| } | |
| } | |
| ) | |
| return False | |
| def process_image_and_create_embedding(page_number, image_path, url, tags, categories): | |
| with open(image_path, "rb") as image_file: | |
| image_data = base64.b64encode(image_file.read()).decode("utf-8") | |
| # Process image using LLM to get description | |
| page_result = process_image_using_llm(image_data, page_number, url) | |
| # If description is available, create embedding | |
| if page_result.get("description"): | |
| create_embedding_for_pdf_chunks(page_number, page_result["description"], url, tags, categories) | |
| else: | |
| print(f"Skipping page {page_number} as description is None") | |
| return page_result | |
| def cleanup_directory(directory_path): | |
| try: | |
| for filename in os.listdir(directory_path): | |
| file_path = os.path.join(directory_path, filename) | |
| if os.path.isfile(file_path): | |
| os.remove(file_path) | |
| print(f"Cleaned up files in {directory_path}") | |
| except Exception as e: | |
| print(f"Error cleaning up directory {directory_path}: {e}") | |
| def process_pdf(url, tags, categories): | |
| print(f"Processing PDF with URL: {url}") | |
| if download_and_split_pdf_to_image(url): | |
| chunks = [] | |
| image_files = sorted( | |
| os.listdir(image_temp_dir), | |
| key=lambda x: int(re.search(r'page_(\d+)', x).group(1)) | |
| ) | |
| # Use ThreadPoolExecutor to process each page in parallel | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| futures = [ | |
| executor.submit( | |
| process_image_and_create_embedding, | |
| count, | |
| os.path.join(image_temp_dir, image_name), | |
| url, | |
| tags, | |
| categories | |
| ) | |
| for count, image_name in enumerate(image_files, start=1) | |
| ] | |
| # Collect results as each thread completes | |
| for future in concurrent.futures.as_completed(futures): | |
| try: | |
| page_result = future.result() | |
| chunks.append(page_result) | |
| except Exception as e: | |
| print(f"Error processing page: {e}") | |
| # Update MongoDB document with the collected chunks | |
| collection.update_one( | |
| {"object_url": url}, | |
| {"$set": {"chunks": chunks}}, | |
| upsert=True | |
| ) | |
| print("Saved chunks to MongoDB.") | |
| # Cleanup directories | |
| cleanup_directory(pdf_temp_dir) | |
| cleanup_directory(image_temp_dir) | |
| # Check how many pages failed to create embeddings | |
| total_pages = len(chunks) | |
| failed_pages = sum(1 for chunk in chunks if not chunk.get("can_find_description")) | |
| return failed_pages < total_pages | |