Spaces:
Running
Running
# UPLOAD FUNCTIONS.PY | |
import requests | |
import gradio as gr | |
import pandas as pd | |
import tiktoken | |
import tempfile | |
from PyPDF2 import PdfReader | |
from tqdm import tqdm | |
from pydantic import BaseModel, Field | |
from phi.agent import Agent, RunResponse | |
from phi.model.groq import Groq | |
from sentence_transformers import SentenceTransformer | |
from sentence_transformers import CrossEncoder | |
#from gradio_client import Client, handle_file | |
import os | |
from pptx import Presentation | |
from pptx2img import PPTXConverter # For splitting slides | |
import uuid | |
import shutil | |
from PIL import Image | |
import pandas as pd | |
import requests | |
import gradio as gr | |
from pydantic import BaseModel, Field | |
from typing import List | |
import tiktoken | |
from datetime import datetime | |
import zipfile | |
from PIL import Image | |
import gradio as gr | |
import threading | |
import time | |
import requests | |
def get_access_token(): | |
flow = app.initiate_device_flow(scopes=SCOPES) | |
print("Go to", flow["verification_uri"]) | |
print("Enter the code:", flow["user_code"]) | |
result = app.acquire_token_by_device_flow(flow) | |
if "access_token" not in result: | |
print("β Could not acquire token:", result.get("error_description")) | |
exit() | |
return result["access_token"] | |
# Function to generate a unique PPT ID | |
def generate_unique_ppt_id(): | |
return str(uuid.uuid4())[:8] # Generate an 8-character unique ID | |
def truncate_text_to_tokens(text, max_tokens, model_name="cl100k_base"): | |
encoding = tiktoken.get_encoding(model_name) | |
tokens = encoding.encode(text) | |
truncated_tokens = tokens[:max_tokens] | |
return encoding.decode(truncated_tokens) | |
def split_and_convert_ppt(file_path, output_folder_slides, output_folder_images): | |
os.makedirs(output_folder_slides, exist_ok=True) | |
os.makedirs(output_folder_images, exist_ok=True) | |
presentation = Presentation(file_path) | |
slide_texts = [] | |
file_name = os.path.basename(file_path).split('.')[0] | |
print('File Name ',file_name) | |
print('File Path ',file_path) | |
for i in range(len(presentation.slides)): | |
unique_slide_id = f"{file_name}_{ppt_unique_id}_slide_{i + 1}" | |
slide_file_path = os.path.join(output_folder_slides, f"{unique_slide_id}.pptx") | |
print('Slide_file_path',slide_file_path) | |
image_path = os.path.join(output_folder_images, f"{unique_slide_id}_slide_1.png") # refer to pptx2img it stores iamge in this format new_name = f"{pptx_name}_slide_{idx + 1}.png" | |
print('Image file path',image_path) | |
# β Step 1: Create a single-slide PPTX | |
new_presentation = Presentation(file_path) | |
slide_indexes_to_remove = [j for j in range(len(new_presentation.slides)) if j != i] | |
for idx in sorted(slide_indexes_to_remove, reverse=True): | |
r_id = new_presentation.slides._sldIdLst[idx].rId | |
new_presentation.part.drop_rel(r_id) | |
del new_presentation.slides._sldIdLst[idx] | |
new_presentation.save(slide_file_path) | |
del new_presentation | |
# β Step 2: Convert the single-slide PPTX to image | |
converter = PPTXConverter() | |
converter.convert_pptx_to_images(slide_file_path, output_folder_images) | |
print(f"Slide {i+1} converted to image: {image_path}") | |
# β Step 3: Extract text from the slide image # Switching off OCR | |
#slide_text = extract_text_from_image(image_path) | |
#using PPTX for text extraction(actualy its quality is better then tesseratct) | |
# Extract text using python-pptx (editable text) | |
slide = presentation.slides[i] | |
pptx_text = "" | |
for shape in slide.shapes: | |
if hasattr(shape, "text"): | |
pptx_text += shape.text.strip() + "\n" | |
print(f"π‘ PPTX Text Extractedfrom slide {i + 1}:\n", pptx_text.strip()) | |
slide_texts.append(pptx_text.strip()) | |
return slide_texts | |
def generate_metadata_with_retry(full_text, retries=3, max_tokens=5000, decrement=100, model_name="cl100k_base"): | |
for attempt in range(1, retries + 2): | |
try: | |
truncated_text = truncate_text_to_tokens(full_text, max_tokens, model_name) | |
print(f"π Attempt {attempt}: Generating metadata with ~{count_tokens(truncated_text)} tokens...") | |
metadata = generate_metadata(truncated_text) | |
print("π Metadata generated successfully.") | |
return metadata # β Return on success | |
except Exception as e: | |
print(f"β Error on attempt {attempt}: {str(e)}") | |
if attempt == retries + 1: | |
print("π¨ Max retries reached. Metadata generation failed.") | |
return None | |
else: | |
max_tokens -= decrement | |
print(f"π Retrying with {max_tokens} tokens...") | |
# Function to generate metadata using phidata agent | |
def generate_metadata(ocr_text): | |
# Initialize the Agent with detailed instructions | |
metadata_agent = Agent( | |
name="Metadata Generator", | |
role="Generates structured metadata for presentations based on their content.", | |
instructions=[ | |
"Your task is to analyze the provided text and generate structured metadata for the presentation.", | |
"Carefully evaluate the content to determine the most appropriate values for each metadata field.", | |
# Rule 1: PPT Unique ID | |
"For the 'PPT_Unique_ID', use the first 8 characters of the MD5 hash of the input text. " | |
"This ensures uniqueness across presentations.", | |
# Rule 2: Suitable Title | |
"For the 'Suitable_Title', create a concise and meaningful title that captures the essence of the presentation. " | |
"Focus on first slide where title of presentation is given along with key themes, topics, or keywords mentioned in the text.", | |
# Rule 3: Slide Category | |
"For the 'Slide_Category', classify the presentation into one of the following categories: " | |
"The category or theme of the slides (e.g., Risk management , Data Analytics , Technology etc)" | |
"Base your decision on the overall theme or subject matter of the content.", | |
# Rule 4 :PPT owner | |
"Find The owner of the presentation ie who makes the presentation (eg: Done by name and designation ie Mr. baswaraj ,Princpial ADG , Additional Director ,or organisations like NCTC,DG Systems, Directorate of Logistics etc)" | |
"Dont Asssume if u could not found ,mention Not Available" | |
# Rule 5: Audience/Forum | |
"For the 'Audience_Forum', identify the target audience or forum for the presentation. " | |
"(e.g.,NACIN , WCO, Presentation before Member (CBIC)etc )." | |
"Dont Asssume if could not found ,mention Not Available" | |
"Consider the tone, language, and purpose of the content.", | |
# Rule 6: Short Summary | |
"For the 'Short_Summary', provide a brief summary of the presentation's content with all keywords in 10 sentences. " | |
"Highlight the keywords ,topics, main points or objectives of the presentation.", | |
"Mention the title also in the short summary ,owner and audience of the presentation" | |
# General Guidelines | |
"Ensure all fields are filled and meaningful. If unsure about a field, make an educated guess based on the context.", | |
], | |
model=Groq(id="deepseek-r1-distill-llama-70b"), # Replace with actual model ID | |
response_model=PPTMetadata, | |
markdown=True, | |
debug_mode=True, | |
show_tool_calls=True, | |
monitoring=True) | |
# Run the agent to generate metadata | |
response = metadata_agent.run( | |
f"Generate data fields for the following presentation content: {ocr_text}") | |
return response.content | |
# Function to get folder ID in OneDrive | |
def get_folder_id(folder_path, headers): | |
folders = folder_path.split("/") | |
parent_id = None | |
print("creating folder id for ",folder_path) | |
for folder_name in folders: | |
url = f"https://graph.microsoft.com/v1.0/me/drive/root/children" if not parent_id else f"https://graph.microsoft.com/v1.0/me/drive/items/{parent_id}/children" | |
response = requests.get(url, headers=headers) | |
if response.status_code != 200: | |
print(f"Failed to retrieve folder '{folder_name}'. Error: {response.text}") | |
return None | |
items = response.json().get("value", []) | |
folder_item = next((item for item in items if item["name"] == folder_name and "folder" in item), None) | |
if not folder_item: | |
# Create the folder if it doesn't exist | |
create_url = "https://graph.microsoft.com/v1.0/me/drive/root/children" if not parent_id else f"https://graph.microsoft.com/v1.0/me/drive/items/{parent_id}/children" | |
create_response = requests.post(create_url, headers=headers, json={ | |
"name": folder_name, | |
"folder": {}, | |
"@microsoft.graph.conflictBehavior": "rename" | |
}) | |
if create_response.status_code not in [200, 201]: | |
print(f"Failed to create folder '{folder_name}'. Error: {create_response.text}") | |
return None | |
folder_item = create_response.json() | |
parent_id = folder_item["id"] | |
return parent_id | |
# Function to upload file to OneDrive | |
def upload_to_onedrive(file_path, folder_id, headers): | |
file_name = os.path.basename(file_path) | |
upload_url = f"https://graph.microsoft.com/v1.0/me/drive/items/{folder_id}:/{file_name}:/content" | |
with open(file_path, "rb") as file: | |
file_content = file.read() | |
response = requests.put(upload_url, headers=headers, data=file_content) | |
if response.status_code in [200, 201]: | |
print(f"Uploaded {file_name} to OneDrive.") | |
return response.json()["id"] | |
else: | |
print(f"Failed to upload {file_name}. Error: {response.text}") | |
return None | |
# Function to count tokens using tiktoken | |
def count_tokens(text, model_name="cl100k_base"): | |
encoding = tiktoken.get_encoding(model_name) | |
tokens = encoding.encode(text) | |
return len(tokens) | |
def list_folder_files(folder_id, headers): | |
url = f"https://graph.microsoft.com/v1.0/me/drive/items/{folder_id}/children" | |
response = requests.get(url, headers=headers) | |
if response.status_code != 200: | |
raise ValueError(f"Failed to list folder contents. Error: {response.text}") | |
return response.json().get("value", []) | |
def download_onedrive_file(file_id, filename, headers): | |
url = f"https://graph.microsoft.com/v1.0/me/drive/items/{file_id}" | |
r = requests.get(url, headers=headers).json() | |
download_url = r.get("@microsoft.graph.downloadUrl") | |
response = requests.get(download_url) | |
with open(filename, 'wb') as f: | |
f.write(response.content) | |
def update_and_upload_metadata_simplified(metadata_list, metadata_folder_id, metadata_with_fulltext_folder_id, headers): | |
df_new = pd.DataFrame(metadata_list, columns=[ | |
"Unique_Slide_ID", "Slide_OCR_Text", "PPT_OCR_Text", "Slide_Embedding", "Short_Summary_Embedding", | |
"PPT_Unique_ID", "Suitable_Title", "Slide_Category", "PPT_Owner", "Audience_Forum", "Short_Summary", | |
"Slide_File_Path", "Slide_File_ID", "Full_PPT_File_Path", "Full_PPT_File_ID", | |
"Thumbnail_File_Path", "Thumbnail_File_ID","Upload_date"]) | |
for csv_file, folder_id, drop_column in [ | |
("Master_metadata.csv", metadata_folder_id, 'PPT_OCR_Text'), | |
("Master_fulltext_metadata.csv", metadata_with_fulltext_folder_id, None)]: | |
#folder_id = get_folder_id(folder_path, headers) | |
files = list_folder_files(folder_id, headers) | |
file_item = next((item for item in files if item['name'] == csv_file), None) | |
print('File items', file_item) | |
if file_item: | |
download_onedrive_file(file_item['id'], csv_file, headers) | |
df_existing = pd.read_csv(csv_file) | |
df_merged = pd.concat([df_existing, df_new], ignore_index=True) | |
else: | |
df_merged = df_new | |
if drop_column: | |
df_merged = df_merged.drop(columns=[drop_column]) | |
df_merged.to_csv(csv_file, index=False) | |
upload_to_onedrive(csv_file, folder_id, headers) | |
print(f"β Uploaded: {csv_file}") | |
return "β PPT Processing and Metadata update complete!" | |
# Main processing function | |
def process_presentation(file): | |
try: | |
# Step 0: Validate file format | |
file_path = file.name if hasattr(file, "name") else file | |
file_extension = os.path.splitext(file_path)[-1].lower() | |
gr.Info() | |
if file_extension not in ['.pptx']: | |
raise ValueError("Unsupported file format. Please upload .pptx") | |
# Extract the base file name (without extension) | |
file_name = os.path.basename(file_path).split('.')[0] | |
print('File Name ',file_name) | |
# Step 1: Generate unique PPT ID | |
global ppt_unique_id | |
ppt_unique_id = generate_unique_ppt_id() | |
upload_date = datetime.now().strftime('%Y-%m-%d') | |
# Step 2: Acquire access token via device flow | |
# access_token = get_access_token() | |
# print('access_token',access_token) | |
print('PPT_unique id',ppt_unique_id) | |
# Step 3: Get folder IDs for OneDrive | |
# headers = { | |
# "Authorization": f"Bearer {access_token}", | |
# "Content-Type": "application/json" | |
# } | |
gr.Info('Connecting to OneDrive..') | |
ppt_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/ppt_repo", headers) | |
slides_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/slides_repo", headers) | |
slide_image_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/slide_image_repo", headers) | |
metadata_folder_id=get_folder_id('Projects Apps/PPT Maker/Metadata_file',headers) | |
metadata_with_fulltext_folder_id=get_folder_id('Projects Apps/PPT Maker/Metadata_with_fulltext',headers) | |
print('ppt_repo_folder_id',ppt_repo_folder_id) | |
print('slides_repo_folder_id',slides_repo_folder_id) | |
print('slide_image_repo_folder_id',slide_image_repo_folder_id) | |
print('metadata_folder_id',metadata_folder_id) | |
if not (ppt_repo_folder_id and slides_repo_folder_id and slide_image_repo_folder_id and metadata_folder_id) : | |
gr.Error('Could not find or create required folders in OneDrive.') | |
raise ValueError("Could not find or create required folders in OneDrive.") | |
# Step 2: Upload the full PPT file to OneDrive | |
#ppt_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/ppt_repo") | |
# β Step: Check if file already exists in ppt_repo | |
existing_files = list_folder_files(ppt_repo_folder_id, headers) | |
ppt_file_name = os.path.basename(file_path) | |
if any(item['name'] == ppt_file_name for item in existing_files): | |
gr.Error('β οΈ A file named ' + ppt_file_name + ' already exists in the PPT repository. Please rename your file or delete the existing one before re-uploading.') | |
return f"β οΈ A file named '{ppt_file_name}' already exists in the PPT repository. Please rename your file or delete the existing one before re-uploading." | |
full_ppt_file_id = upload_to_onedrive(file_path, ppt_repo_folder_id,headers) | |
gr.Info('PPT uploaded to OneDrive..') | |
full_ppt_file_name = os.path.basename(file_path) | |
full_ppt_file_path = f"/Projects Apps/PPT Maker/ppt_repo/{full_ppt_file_name}" | |
# Step 3: Split PPT into individual slides and convert to images | |
gr.Info('Processing the PPT and indexing ..it may take a while ') | |
temp_output_folder_slides = "/temp/temp_slides" | |
temp_output_folder_images = "/temp/temp_images" | |
slide_texts = split_and_convert_ppt(file_path, temp_output_folder_slides, temp_output_folder_images) | |
print('PPT splitted and converted successfully') | |
# Compile full OCR text | |
full_text = "\n".join(slide_texts) | |
gr.Info('AI agent processing the data .') | |
metadata = generate_metadata_with_retry(full_text, retries=3, max_tokens=5000, decrement=100, model_name="cl100k_base") | |
# Step 5: Process each slide and prepare metadata for storage | |
#slides_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/slides_repo") | |
#slide_image_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/slide_image_repo") | |
metadata_list = [] | |
gr.Info('Uploading the individual slides and images into repo ') | |
for i, slide_text in enumerate(slide_texts): | |
unique_slide_id = f"{file_name}_{ppt_unique_id}_slide_{i + 1}" | |
slide_file_path = f"{temp_output_folder_slides}/{unique_slide_id}.pptx" | |
slide_image_path = f"{temp_output_folder_images}/{unique_slide_id}_slide_1.png" | |
# Upload individual slide (.pptx) to slides_repo | |
slide_file_id = upload_to_onedrive(slide_file_path, slides_repo_folder_id,headers) | |
slide_file_path_onedrive = f"/Projects Apps/PPT Maker/slides_repo/{unique_slide_id}.pptx" | |
print(f'Slide{i} uploaded into Onedrive') | |
# Upload slide image (.png) to slide_image_repo | |
thumbnail_file_id = upload_to_onedrive(slide_image_path, slide_image_repo_folder_id,headers) | |
thumbnail_file_path_onedrive = f"/Projects Apps/PPT Maker/slide_image_repo/{unique_slide_id}.png" | |
print(f'Image{i} uploaded into Onedrive') | |
# Generate embedding for the slide | |
slide_embedding = embedding_model.encode(slide_text).tolist() | |
short_summary_embedding = embedding_model.encode(metadata.Short_Summary).tolist() | |
# Prepare metadata for storage | |
metadata_list.append([ | |
unique_slide_id, # Unique Slide ID | |
slide_text, # Slide OCR Text | |
full_text, # PPT OCR Text | |
str(slide_embedding), # Embedding | |
str(short_summary_embedding), | |
ppt_unique_id, # PPT Unique ID | |
metadata.Suitable_Title, # Suitable Title | |
metadata.Slide_Category, # Slide Category | |
metadata.PPT_Owner, # PPT Owner | |
metadata.Audience_Forum, # Audience Forum | |
metadata.Short_Summary, # Short Summary | |
slide_file_path_onedrive, # Slide File Path (.pptx) | |
slide_file_id, # Slide File ID (.pptx) | |
full_ppt_file_path, # Full PPT File Path | |
full_ppt_file_id, # Full PPT File ID | |
thumbnail_file_path_onedrive, # Thumbnail File Path (.png) | |
thumbnail_file_id , # Thumbnail File ID (.png) | |
upload_date # upload date | |
]) | |
# Clean up temporary files for this slide | |
os.remove(slide_file_path) | |
os.remove(slide_image_path) | |
print('Slides cleared from temp') | |
# # Clean up temporary folders | |
# os.rmdir(temp_output_folder_slides) | |
# os.rmdir(temp_output_folder_images) | |
# Clean up temporary folders (forcefully deletes all contents inside) | |
shutil.rmtree(temp_output_folder_slides, ignore_errors=True) | |
shutil.rmtree(temp_output_folder_images, ignore_errors=True) | |
print('Temp folders cleared') | |
gr.Info('Vectorising the meta data and uploading in Onedrive..') | |
return update_and_upload_metadata_simplified( | |
metadata_list, | |
metadata_folder_id, | |
metadata_with_fulltext_folder_id, | |
headers | |
) | |
except Exception as e: | |
return f"An error occurred: {str(e)}" | |