Spaces:
Running
Running
# APP.PY | |
from msal import PublicClientApplication | |
import requests | |
import gradio as gr | |
import pandas as pd | |
import tiktoken | |
import tempfile | |
from PyPDF2 import PdfReader | |
from tqdm import tqdm | |
from pydantic import BaseModel, Field | |
from phi.agent import Agent, RunResponse | |
from phi.model.groq import Groq | |
from sentence_transformers import SentenceTransformer | |
from sentence_transformers import CrossEncoder | |
#from gradio_client import Client, handle_file | |
import os | |
from pptx import Presentation | |
from pptx2img import PPTXConverter # For splitting slides | |
import uuid | |
import shutil | |
from PIL import Image | |
import pandas as pd | |
import requests | |
import gradio as gr | |
from pydantic import BaseModel, Field | |
from typing import List | |
import tiktoken | |
from datetime import datetime | |
import zipfile | |
from PIL import Image | |
import gradio as gr | |
import threading | |
import time | |
# Importing functions from files | |
# from upload_function import process_presentation,get_folder_id | |
# from view_ppt import search_ppts | |
# from stats_dashboard import get_dashboard_stats ,update_dashboard | |
# from search_slides import search_slides,combine_slides_as_zip | |
# Configure Microsoft Authentication | |
# Access secrets securely | |
GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
CLIENT_ID = os.getenv("CLIENT_ID") | |
TENANT_ID = os.getenv("TENANT_ID") | |
ADMIN_USERNAME = os.getenv("ADMIN_USERNAME") | |
ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD") | |
AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}" | |
SCOPES = ["Files.ReadWrite.All", "User.Read"] | |
os.environ["GROQ_API_KEY"] = GROQ_API_KEY | |
embedding_model = SentenceTransformer('nomic-ai/nomic-embed-text-v1', trust_remote_code=True) | |
from sentence_transformers import CrossEncoder | |
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') # For reranking) # For reranking | |
access_token_state = {"token": None} | |
flow_state = {"flow": None} | |
global headers | |
global df | |
global search_results | |
from config import temp_file_path # Import the global variable | |
headers = { | |
"Authorization": None, | |
"Content-Type": "application/json" | |
} | |
# Local cache directory for downloaded files | |
LOCAL_CACHE_DIR = "local_cache" | |
os.makedirs(LOCAL_CACHE_DIR, exist_ok=True) | |
app = PublicClientApplication(client_id=CLIENT_ID, authority=AUTHORITY) | |
# Define Metadata Schema | |
class PPTMetadata(BaseModel): | |
PPT_Unique_ID: str = Field(description="A unique identifier for the presentation (e.g., filename or hash).") | |
Suitable_Title: str = Field(description="A concise and meaningful title for the presentation.") | |
Slide_Category: str = Field(description="The category or theme of the slides (e.g., Risk management, Data Analytics, Technology etc ).") | |
PPT_Owner:str = Field(description="The owner of the presentation ie who makes the presentation (eg: NCTC,DG Systems, Directorate of Logistics etc ,Not available if not found )") | |
Audience_Forum: str = Field(description="The intended audience or forum for the presentation/to whom the presentaiton is made (e.g., NACIN, WCO, Presentation before Member (CBIC),Not available if not found).") | |
Short_Summary: str = Field(description="A brief summary of the presentation's content with all keywords in 10 sentences covering all keywords.") | |
# Function to download metadata file from OneDrive | |
def download_metadata_file(metadata_folder_id, headers): | |
metadata_file_name = "Master_metadata.csv" | |
url = f"https://graph.microsoft.com/v1.0/me/drive/items/{metadata_folder_id}/children" | |
response = requests.get(url, headers=headers) | |
if response.status_code != 200: | |
raise ValueError(f"Failed to list folder contents. Error: {response.text}") | |
items = response.json().get("value", []) | |
file_item = next((item for item in items if item['name'] == metadata_file_name), None) | |
if not file_item: | |
raise FileNotFoundError(f"{metadata_file_name} not found in OneDrive folder.") | |
download_url = file_item["@microsoft.graph.downloadUrl"] | |
response = requests.get(download_url) | |
if response.status_code != 200: | |
raise ValueError(f"Failed to download {metadata_file_name}. Error: {response.text}") | |
# Use tempfile to create a temporary file | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as temp_file: | |
temp_file.write(response.content) | |
temp_file_path = temp_file.name # Save the path to the temporary file | |
print(f"β Downloaded: {metadata_file_name} to temporary file: {temp_file_path}") | |
# with open(metadata_file_name, 'wb') as f: | |
# f.write(response.content) | |
# print(f"β Downloaded: {metadata_file_name}") | |
return temp_file_path | |
##################################################### STATS DASHBOARD ################################################################## | |
def update_dashboard(): | |
total_ppts, total_slides, chart_data, latest_html = get_dashboard_stats() | |
return ( | |
gr.update(visible=True), | |
gr.update(value=f"<div><h3>Total PPTs: {total_ppts}</h3></div>"), | |
gr.update(value=f"<div><h3>Total Slides: {total_slides}</h3></div>"), | |
gr.update(value=chart_data), | |
gr.update(value=latest_html) | |
) | |
import pandas as pd | |
import gradio as gr | |
import os | |
def get_dashboard_stats(): | |
# Load metadata CSV | |
global temp_file_path | |
global df | |
print('Reading CSV...',temp_file_path) | |
#metadata_file_name= "Master_metadata.csv" | |
# df = pd.read_csv(metadata_file_name) | |
#temp_file_path = os.path.join("/tmp", metadata_file_name) | |
df = pd.read_csv(temp_file_path) | |
# Ensure upload_date column is in datetime format | |
df["Upload_date"] = pd.to_datetime(df["Upload_date"], errors="coerce") | |
print(df) | |
# Total unique PPTs and slides | |
total_ppts = df["PPT_Unique_ID"].nunique() | |
total_slides = len(df) | |
# Monthly PPT uploads | |
df["month_year"] = df["Upload_date"].dt.to_period("M").astype(str) | |
monthly_stats = df.groupby("month_year")["PPT_Unique_ID"].nunique().reset_index() | |
monthly_stats.columns = ["Month", "PPT Uploads"] | |
# Gradio BarPlot requires a DataFrame | |
chart_data = monthly_stats | |
# Latest 5 PPTs by upload date | |
latest_df = df.drop_duplicates(subset="PPT_Unique_ID").sort_values("Upload_date", ascending=False) | |
latest_5 = latest_df[["Suitable_Title", "Slide_Category","Upload_date"]].head(5) | |
# Create HTML for the latest PPTs list | |
# Create HTML for the latest PPTs list with heading | |
latest_html = "<h4 style='margin-bottom: 8px;'>π Top 5 Latest Uploaded PPTs</h4><ul style='line-height:1.6em;'>" | |
for _, row in latest_5.iterrows(): | |
title = row["Suitable_Title"] | |
category = row["Slide_Category"] | |
date_str = row["Upload_date"].strftime("%Y-%m-%d") if pd.notnull(row["Upload_date"]) else "Unknown Date" | |
latest_html += f"<li><b>{title}</b> <br><i>{category}</i> β <span style='color:gray;'>{date_str}</span></li>" | |
latest_html += "</ul>" | |
return total_ppts, total_slides, chart_data, latest_html | |
############################################################# UPLOAD PPT ####################################################################### | |
import requests | |
def get_access_token(): | |
flow = app.initiate_device_flow(scopes=SCOPES) | |
print("Go to", flow["verification_uri"]) | |
print("Enter the code:", flow["user_code"]) | |
result = app.acquire_token_by_device_flow(flow) | |
if "access_token" not in result: | |
print("β Could not acquire token:", result.get("error_description")) | |
exit() | |
return result["access_token"] | |
# Function to generate a unique PPT ID | |
def generate_unique_ppt_id(): | |
return str(uuid.uuid4())[:8] # Generate an 8-character unique ID | |
def truncate_text_to_tokens(text, max_tokens, model_name="cl100k_base"): | |
encoding = tiktoken.get_encoding(model_name) | |
tokens = encoding.encode(text) | |
truncated_tokens = tokens[:max_tokens] | |
return encoding.decode(truncated_tokens) | |
def split_and_convert_ppt(file_path, output_folder_slides, output_folder_images): | |
os.makedirs(output_folder_slides, exist_ok=True) | |
os.makedirs(output_folder_images, exist_ok=True) | |
presentation = Presentation(file_path) | |
slide_texts = [] | |
file_name = os.path.basename(file_path).split('.')[0] | |
print('File Name ',file_name) | |
print('File Path ',file_path) | |
for i in range(len(presentation.slides)): | |
unique_slide_id = f"{file_name}_{ppt_unique_id}_slide_{i + 1}" | |
slide_file_path = os.path.join(output_folder_slides, f"{unique_slide_id}.pptx") | |
print('Slide_file_path',slide_file_path) | |
image_path = os.path.join(output_folder_images, f"{unique_slide_id}_slide_1.png") # refer to pptx2img it stores iamge in this format new_name = f"{pptx_name}_slide_{idx + 1}.png" | |
print('Image file path',image_path) | |
# β Step 1: Create a single-slide PPTX | |
new_presentation = Presentation(file_path) | |
slide_indexes_to_remove = [j for j in range(len(new_presentation.slides)) if j != i] | |
for idx in sorted(slide_indexes_to_remove, reverse=True): | |
r_id = new_presentation.slides._sldIdLst[idx].rId | |
new_presentation.part.drop_rel(r_id) | |
del new_presentation.slides._sldIdLst[idx] | |
new_presentation.save(slide_file_path) | |
del new_presentation | |
# β Step 2: Convert the single-slide PPTX to image | |
converter = PPTXConverter() | |
converter.convert_pptx_to_images(slide_file_path, output_folder_images) | |
print(f"Slide {i+1} converted to image: {image_path}") | |
# β Step 3: Extract text from the slide image # Switching off OCR | |
#slide_text = extract_text_from_image(image_path) | |
#using PPTX for text extraction(actualy its quality is better then tesseratct) | |
# Extract text using python-pptx (editable text) | |
slide = presentation.slides[i] | |
pptx_text = "" | |
for shape in slide.shapes: | |
if hasattr(shape, "text"): | |
pptx_text += shape.text.strip() + "\n" | |
print(f"π‘ PPTX Text Extractedfrom slide {i + 1}:\n", pptx_text.strip()) | |
slide_texts.append(pptx_text.strip()) | |
return slide_texts | |
def generate_metadata_with_retry(full_text, retries=3, max_tokens=5000, decrement=100, model_name="cl100k_base"): | |
for attempt in range(1, retries + 2): | |
try: | |
truncated_text = truncate_text_to_tokens(full_text, max_tokens, model_name) | |
print(f"π Attempt {attempt}: Generating metadata with ~{count_tokens(truncated_text)} tokens...") | |
metadata = generate_metadata(truncated_text) | |
print("π Metadata generated successfully.") | |
return metadata # β Return on success | |
except Exception as e: | |
print(f"β Error on attempt {attempt}: {str(e)}") | |
if attempt == retries + 1: | |
print("π¨ Max retries reached. Metadata generation failed.") | |
return None | |
else: | |
max_tokens -= decrement | |
print(f"π Retrying with {max_tokens} tokens...") | |
# Function to generate metadata using phidata agent | |
def generate_metadata(ocr_text): | |
# Initialize the Agent with detailed instructions | |
metadata_agent = Agent( | |
name="Metadata Generator", | |
role="Generates structured metadata for presentations based on their content.", | |
instructions=[ | |
"Your task is to analyze the provided text and generate structured metadata for the presentation.", | |
"Carefully evaluate the content to determine the most appropriate values for each metadata field.", | |
# Rule 1: PPT Unique ID | |
"For the 'PPT_Unique_ID', use the first 8 characters of the MD5 hash of the input text. " | |
"This ensures uniqueness across presentations.", | |
# Rule 2: Suitable Title | |
"For the 'Suitable_Title', create a concise and meaningful title that captures the essence of the presentation. " | |
"Focus on first slide where title of presentation is given along with key themes, topics, or keywords mentioned in the text.", | |
# Rule 3: Slide Category | |
"For the 'Slide_Category', classify the presentation into one of the following categories: " | |
"The category or theme of the slides (e.g., Risk management , Data Analytics , Technology etc)" | |
"Base your decision on the overall theme or subject matter of the content.", | |
# Rule 4 :PPT owner | |
"Find The owner of the presentation ie who makes the presentation (eg: Done by name and designation ie Mr. baswaraj ,Princpial ADG , Additional Director ,or organisations like NCTC,DG Systems, Directorate of Logistics etc)" | |
"Dont Asssume if u could not found ,mention Not Available" | |
# Rule 5: Audience/Forum | |
"For the 'Audience_Forum', identify the target audience or forum for the presentation. " | |
"(e.g.,NACIN , WCO, Presentation before Member (CBIC)etc )." | |
"Dont Asssume if could not found ,mention Not Available" | |
"Consider the tone, language, and purpose of the content.", | |
# Rule 6: Short Summary | |
"For the 'Short_Summary', provide a brief summary of the presentation's content with all keywords in 10 sentences. " | |
"Highlight the keywords ,topics, main points or objectives of the presentation.", | |
"Mention the title also in the short summary ,owner and audience of the presentation" | |
# General Guidelines | |
"Ensure all fields are filled and meaningful. If unsure about a field, make an educated guess based on the context.", | |
], | |
model=Groq(id="deepseek-r1-distill-llama-70b"), # Replace with actual model ID | |
response_model=PPTMetadata, | |
markdown=True, | |
debug_mode=True, | |
show_tool_calls=True, | |
monitoring=True) | |
# Run the agent to generate metadata | |
response = metadata_agent.run( | |
f"Generate data fields for the following presentation content: {ocr_text}") | |
return response.content | |
# Function to get folder ID in OneDrive | |
def get_folder_id(folder_path, headers): | |
folders = folder_path.split("/") | |
parent_id = None | |
print("creating folder id for ",folder_path) | |
for folder_name in folders: | |
url = f"https://graph.microsoft.com/v1.0/me/drive/root/children" if not parent_id else f"https://graph.microsoft.com/v1.0/me/drive/items/{parent_id}/children" | |
response = requests.get(url, headers=headers) | |
if response.status_code != 200: | |
print(f"Failed to retrieve folder '{folder_name}'. Error: {response.text}") | |
return None | |
items = response.json().get("value", []) | |
folder_item = next((item for item in items if item["name"] == folder_name and "folder" in item), None) | |
if not folder_item: | |
# Create the folder if it doesn't exist | |
create_url = "https://graph.microsoft.com/v1.0/me/drive/root/children" if not parent_id else f"https://graph.microsoft.com/v1.0/me/drive/items/{parent_id}/children" | |
create_response = requests.post(create_url, headers=headers, json={ | |
"name": folder_name, | |
"folder": {}, | |
"@microsoft.graph.conflictBehavior": "rename" | |
}) | |
if create_response.status_code not in [200, 201]: | |
print(f"Failed to create folder '{folder_name}'. Error: {create_response.text}") | |
return None | |
folder_item = create_response.json() | |
parent_id = folder_item["id"] | |
return parent_id | |
# Function to upload file to OneDrive | |
def upload_to_onedrive(file_path, folder_id, headers): | |
file_name = os.path.basename(file_path) | |
upload_url = f"https://graph.microsoft.com/v1.0/me/drive/items/{folder_id}:/{file_name}:/content" | |
with open(file_path, "rb") as file: | |
file_content = file.read() | |
response = requests.put(upload_url, headers=headers, data=file_content) | |
if response.status_code in [200, 201]: | |
print(f"Uploaded {file_name} to OneDrive.") | |
return response.json()["id"] | |
else: | |
print(f"Failed to upload {file_name}. Error: {response.text}") | |
return None | |
# Function to count tokens using tiktoken | |
def count_tokens(text, model_name="cl100k_base"): | |
encoding = tiktoken.get_encoding(model_name) | |
tokens = encoding.encode(text) | |
return len(tokens) | |
def list_folder_files(folder_id, headers): | |
url = f"https://graph.microsoft.com/v1.0/me/drive/items/{folder_id}/children" | |
response = requests.get(url, headers=headers) | |
if response.status_code != 200: | |
raise ValueError(f"Failed to list folder contents. Error: {response.text}") | |
return response.json().get("value", []) | |
def download_onedrive_file(file_id, filename, headers): | |
url = f"https://graph.microsoft.com/v1.0/me/drive/items/{file_id}" | |
r = requests.get(url, headers=headers).json() | |
download_url = r.get("@microsoft.graph.downloadUrl") | |
response = requests.get(download_url) | |
with open(filename, 'wb') as f: | |
f.write(response.content) | |
def update_and_upload_metadata_simplified(metadata_list, metadata_folder_id, metadata_with_fulltext_folder_id, headers): | |
df_new = pd.DataFrame(metadata_list, columns=[ | |
"Unique_Slide_ID", "Slide_OCR_Text", "PPT_OCR_Text", "Slide_Embedding", "Short_Summary_Embedding", | |
"PPT_Unique_ID", "Suitable_Title", "Slide_Category", "PPT_Owner", "Audience_Forum", "Short_Summary", | |
"Slide_File_Path", "Slide_File_ID", "Full_PPT_File_Path", "Full_PPT_File_ID", | |
"Thumbnail_File_Path", "Thumbnail_File_ID","Upload_date"]) | |
for csv_file, folder_id, drop_column in [ | |
("Master_metadata.csv", metadata_folder_id, 'PPT_OCR_Text'), | |
("Master_fulltext_metadata.csv", metadata_with_fulltext_folder_id, None)]: | |
#folder_id = get_folder_id(folder_path, headers) | |
files = list_folder_files(folder_id, headers) | |
file_item = next((item for item in files if item['name'] == csv_file), None) | |
print('File items', file_item) | |
if file_item: | |
download_onedrive_file(file_item['id'], csv_file, headers) | |
df_existing = pd.read_csv(csv_file) | |
df_merged = pd.concat([df_existing, df_new], ignore_index=True) | |
else: | |
df_merged = df_new | |
if drop_column: | |
df_merged = df_merged.drop(columns=[drop_column]) | |
df_merged.to_csv(csv_file, index=False) | |
upload_to_onedrive(csv_file, folder_id, headers) | |
print(f"β Uploaded: {csv_file}") | |
return "β PPT Processing and Metadata update complete!" | |
# Main processing function | |
def process_presentation(file): | |
try: | |
# Step 0: Validate file format | |
file_path = file.name if hasattr(file, "name") else file | |
file_extension = os.path.splitext(file_path)[-1].lower() | |
gr.Info() | |
if file_extension not in ['.pptx']: | |
raise ValueError("Unsupported file format. Please upload .pptx") | |
# Extract the base file name (without extension) | |
file_name = os.path.basename(file_path).split('.')[0] | |
print('File Name ',file_name) | |
# Step 1: Generate unique PPT ID | |
global ppt_unique_id | |
ppt_unique_id = generate_unique_ppt_id() | |
upload_date = datetime.now().strftime('%Y-%m-%d') | |
# Step 2: Acquire access token via device flow | |
# access_token = get_access_token() | |
# print('access_token',access_token) | |
print('PPT_unique id',ppt_unique_id) | |
# Step 3: Get folder IDs for OneDrive | |
# headers = { | |
# "Authorization": f"Bearer {access_token}", | |
# "Content-Type": "application/json" | |
# } | |
gr.Info('Connecting to OneDrive..') | |
ppt_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/ppt_repo", headers) | |
slides_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/slides_repo", headers) | |
slide_image_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/slide_image_repo", headers) | |
metadata_folder_id=get_folder_id('Projects Apps/PPT Maker/Metadata_file',headers) | |
metadata_with_fulltext_folder_id=get_folder_id('Projects Apps/PPT Maker/Metadata_with_fulltext',headers) | |
print('ppt_repo_folder_id',ppt_repo_folder_id) | |
print('slides_repo_folder_id',slides_repo_folder_id) | |
print('slide_image_repo_folder_id',slide_image_repo_folder_id) | |
print('metadata_folder_id',metadata_folder_id) | |
if not (ppt_repo_folder_id and slides_repo_folder_id and slide_image_repo_folder_id and metadata_folder_id) : | |
gr.Error('Could not find or create required folders in OneDrive.') | |
raise ValueError("Could not find or create required folders in OneDrive.") | |
# Step 2: Upload the full PPT file to OneDrive | |
#ppt_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/ppt_repo") | |
# β Step: Check if file already exists in ppt_repo | |
existing_files = list_folder_files(ppt_repo_folder_id, headers) | |
ppt_file_name = os.path.basename(file_path) | |
if any(item['name'] == ppt_file_name for item in existing_files): | |
gr.Error('β οΈ A file named ' + ppt_file_name + ' already exists in the PPT repository. Please rename your file or delete the existing one before re-uploading.') | |
return f"β οΈ A file named '{ppt_file_name}' already exists in the PPT repository. Please rename your file or delete the existing one before re-uploading." | |
full_ppt_file_id = upload_to_onedrive(file_path, ppt_repo_folder_id,headers) | |
gr.Info('PPT uploaded to OneDrive..') | |
full_ppt_file_name = os.path.basename(file_path) | |
full_ppt_file_path = f"/Projects Apps/PPT Maker/ppt_repo/{full_ppt_file_name}" | |
# Step 3: Split PPT into individual slides and convert to images | |
gr.Info('Processing the PPT and indexing ..it may take a while ') | |
temp_output_folder_slides = "temp_slides" | |
temp_output_folder_images = "temp_images" | |
slide_texts = split_and_convert_ppt(file_path, temp_output_folder_slides, temp_output_folder_images) | |
print('PPT splitted and converted successfully') | |
# Compile full OCR text | |
full_text = "\n".join(slide_texts) | |
gr.Info('AI agent processing the data .') | |
metadata = generate_metadata_with_retry(full_text, retries=3, max_tokens=5000, decrement=100, model_name="cl100k_base") | |
# Step 5: Process each slide and prepare metadata for storage | |
#slides_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/slides_repo") | |
#slide_image_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/slide_image_repo") | |
metadata_list = [] | |
gr.Info('Uploading the individual slides and images into repo ') | |
for i, slide_text in enumerate(slide_texts): | |
unique_slide_id = f"{file_name}_{ppt_unique_id}_slide_{i + 1}" | |
slide_file_path = f"{temp_output_folder_slides}/{unique_slide_id}.pptx" | |
slide_image_path = f"{temp_output_folder_images}/{unique_slide_id}_slide_1.png" | |
# Upload individual slide (.pptx) to slides_repo | |
slide_file_id = upload_to_onedrive(slide_file_path, slides_repo_folder_id,headers) | |
slide_file_path_onedrive = f"/Projects Apps/PPT Maker/slides_repo/{unique_slide_id}.pptx" | |
print(f'Slide{i} uploaded into Onedrive') | |
# Upload slide image (.png) to slide_image_repo | |
thumbnail_file_id = upload_to_onedrive(slide_image_path, slide_image_repo_folder_id,headers) | |
thumbnail_file_path_onedrive = f"/Projects Apps/PPT Maker/slide_image_repo/{unique_slide_id}.png" | |
print(f'Image{i} uploaded into Onedrive') | |
# Generate embedding for the slide | |
slide_embedding = embedding_model.encode(slide_text).tolist() | |
short_summary_embedding = embedding_model.encode(metadata.Short_Summary).tolist() | |
# Prepare metadata for storage | |
metadata_list.append([ | |
unique_slide_id, # Unique Slide ID | |
slide_text, # Slide OCR Text | |
full_text, # PPT OCR Text | |
str(slide_embedding), # Embedding | |
str(short_summary_embedding), | |
ppt_unique_id, # PPT Unique ID | |
metadata.Suitable_Title, # Suitable Title | |
metadata.Slide_Category, # Slide Category | |
metadata.PPT_Owner, # PPT Owner | |
metadata.Audience_Forum, # Audience Forum | |
metadata.Short_Summary, # Short Summary | |
slide_file_path_onedrive, # Slide File Path (.pptx) | |
slide_file_id, # Slide File ID (.pptx) | |
full_ppt_file_path, # Full PPT File Path | |
full_ppt_file_id, # Full PPT File ID | |
thumbnail_file_path_onedrive, # Thumbnail File Path (.png) | |
thumbnail_file_id , # Thumbnail File ID (.png) | |
upload_date # upload date | |
]) | |
# Clean up temporary files for this slide | |
os.remove(slide_file_path) | |
os.remove(slide_image_path) | |
print('Slides cleared from temp') | |
# # Clean up temporary folders | |
# os.rmdir(temp_output_folder_slides) | |
# os.rmdir(temp_output_folder_images) | |
# Clean up temporary folders (forcefully deletes all contents inside) | |
shutil.rmtree(temp_output_folder_slides, ignore_errors=True) | |
shutil.rmtree(temp_output_folder_images, ignore_errors=True) | |
print('Temp folders cleared') | |
gr.Info('Vectorising the meta data and uploading in Onedrive..') | |
return update_and_upload_metadata_simplified( | |
metadata_list, | |
metadata_folder_id, | |
metadata_with_fulltext_folder_id, | |
headers | |
) | |
except Exception as e: | |
return f"An error occurred: {str(e)}" | |
############################################################################### SEARCH PPT ###################################### | |
import requests | |
from sentence_transformers import SentenceTransformer, CrossEncoder | |
from sklearn.metrics.pairwise import cosine_similarity | |
import os | |
import shutil | |
import gradio as gr | |
# Local cache directory for downloaded files | |
LOCAL_CACHE_DIR = "local_cache" | |
os.makedirs(LOCAL_CACHE_DIR, exist_ok=True) | |
# Function to download a file from OneDrive to the local cache | |
def download_file_from_onedrive(file_path, file_id, headers): | |
local_file_path = os.path.join(LOCAL_CACHE_DIR, os.path.basename(file_path)) | |
if not os.path.exists(local_file_path): # Avoid re-downloading | |
download_url = f"https://graph.microsoft.com/v1.0/me/drive/items/{file_id}/content" | |
response = requests.get(download_url, headers=headers) | |
if response.status_code != 200: | |
raise ValueError(f"Failed to download file {file_path}. Error: {response.text}") | |
with open(local_file_path, "wb") as f: | |
f.write(response.content) | |
print(f"β Downloaded: {file_path} -> {local_file_path}") | |
return local_file_path | |
# Function to search PPTs | |
def search_ppts(query, num_results): | |
global df | |
gr.Info("Searching the relevant PPTs .") | |
# Generate query embedding | |
query_embedding = embedding_model.encode(query).tolist() | |
# Filter the DataFrame to include only rows where Unique_Slide_ID ends with "slide_1" | |
df1 = df[df['Unique_Slide_ID'].str.endswith("slide_1", na=False)] | |
# Compute cosine similarity scores | |
df1['similarity'] = df1['Short_Summary_Embedding'].apply( | |
lambda x: cosine_similarity([query_embedding], [eval(x)])[0][0] | |
) | |
# Sort by cosine similarity score | |
df1 = df1.sort_values(by='similarity', ascending=False) | |
# Get top N results for reranking | |
top_n = min(50, len(df1)) # Take top 50 results for reranking | |
top_results = df1.head(top_n) | |
# Prepare input pairs for cross-encoder reranking | |
pairs = [(query, row['Short_Summary']) for _, row in top_results.iterrows()] | |
# Rerank using cross-encoder | |
gr.Info("Doing Semantic Reranking for most appropriate results ") | |
rerank_scores = cross_encoder.predict(pairs) | |
top_results = top_results.copy() # Avoid SettingWithCopyWarning | |
top_results['rerank_score'] = rerank_scores | |
# Sort by rerank score | |
top_results = top_results.sort_values(by='rerank_score', ascending=False) | |
print(top_results) | |
# Prepare results | |
results = [] | |
gr.Info('Downloading PPT images and ppt') | |
print('Downloading PPT images and ppt') | |
for _, row in top_results.head(num_results).iterrows(): | |
# Download slide image locally | |
slide_image_path = download_file_from_onedrive( | |
row['Thumbnail_File_Path'], row['Thumbnail_File_ID'], headers | |
) | |
# Download full PPT locally | |
ppt_download_link = download_file_from_onedrive( | |
row['Full_PPT_File_Path'], row['Full_PPT_File_ID'], headers | |
) | |
title = row['Suitable_Title'] | |
owner = row['PPT_Owner'] | |
category = row['Slide_Category'] | |
summary = row['Short_Summary'] | |
results.append({ | |
"image": slide_image_path, | |
"title": title, | |
"owner": owner, | |
"category": category, | |
"summary": summary, | |
"download_link": ppt_download_link | |
}) | |
print("downloading complete ") | |
# Update visibility of rows | |
visible_rows = min(len(results), num_results) | |
row_updates = [] | |
row_updates = [] | |
for i in range(20): | |
if i < len(results): | |
result = results[i] | |
row_updates.extend([ | |
gr.update(visible=True), # β Make the row visible | |
gr.update(value=result["image"], visible=True), | |
gr.update(value=f"<b>Title:</b> {result['title']}<br><b>Owner:</b> {result['owner']}<br><b>Category:</b> {result['category']}", visible=True), | |
gr.update(value=result["summary"], visible=True), | |
gr.update(value=result["download_link"], visible=True), | |
]) | |
else: | |
row_updates.extend([gr.update(visible=False)] * 5) # row + 4 components | |
return row_updates | |
################################################################ SEARCH SLIDES ######################## | |
import requests | |
import gradio as gr | |
import pandas as pd | |
import tiktoken | |
import tempfile | |
from PyPDF2 import PdfReader | |
from tqdm import tqdm | |
from pydantic import BaseModel, Field | |
from phi.agent import Agent, RunResponse | |
from phi.model.groq import Groq | |
from sentence_transformers import SentenceTransformer | |
from sentence_transformers import CrossEncoder | |
#from gradio_client import Client, handle_file | |
import os | |
from pptx import Presentation | |
from pptx2img import PPTXConverter # For splitting slides | |
import uuid | |
import shutil | |
from PIL import Image | |
import pandas as pd | |
import requests | |
import gradio as gr | |
from pydantic import BaseModel, Field | |
from typing import List | |
import tiktoken | |
from datetime import datetime | |
import zipfile | |
from PIL import Image | |
import gradio as gr | |
import threading | |
import time | |
# Global variable to store search results | |
search_results = [] | |
def search_slides(query, num_results): | |
global search_results # Use the global variable to store results | |
global df | |
# # Load metadata file | |
# gr.Info("Downloading the master file to search..") | |
# metadata_folder_id = get_folder_id("Projects Apps/PPT Maker/Metadata_file", headers) | |
# download_metadata_file(metadata_folder_id, headers) # Explicit call to download metadata | |
# metadata_file = "Master_metadata.csv" | |
# if not os.path.exists(metadata_file): | |
# return [gr.update(visible=False) for _ in range(20)], "Metadata file not found." | |
# df = pd.read_csv(metadata_file) | |
gr.Info("Searching the relevant slides.") | |
# Generate query embedding | |
query_embedding = embedding_model.encode(query).tolist() | |
# Compute cosine similarity scores | |
df['similarity'] = df['Slide_Embedding'].apply( | |
lambda x: cosine_similarity([query_embedding], [eval(x)])[0][0] | |
) | |
# Sort by cosine similarity score | |
df = df.sort_values(by='similarity', ascending=False) | |
# Get top N results for reranking | |
top_n = min(50, len(df)) # Take top 50 results for reranking | |
top_results = df.head(top_n) | |
# Prepare input pairs for cross-encoder reranking | |
pairs = [(query, row['Short_Summary']) for _, row in top_results.iterrows()] | |
# Rerank using cross-encoder | |
gr.Info("Doing Semantic Reranking for most appropriate results") | |
rerank_scores = cross_encoder.predict(pairs) | |
top_results = top_results.copy() # Avoid SettingWithCopyWarning | |
top_results['rerank_score'] = rerank_scores | |
# Sort by rerank score | |
top_results = top_results.sort_values(by='rerank_score', ascending=False) | |
# Prepare results | |
results = [] | |
gr.Info('Downloading slide images') | |
for _, row in top_results.head(num_results).iterrows(): | |
# Download slide image locally | |
slide_image_path = download_file_from_onedrive( | |
row['Thumbnail_File_Path'], row['Thumbnail_File_ID'], headers | |
) | |
# Download full PPT locally | |
slide_download_link = download_file_from_onedrive( | |
row['Slide_File_Path'], row['Slide_File_ID'], headers | |
) | |
title = row['Suitable_Title'] | |
owner = row['PPT_Owner'] | |
category = row['Slide_Category'] | |
summary = row['Short_Summary'] | |
results.append({ | |
"image": slide_image_path, | |
"title": title, | |
"owner": owner, | |
"category": category, | |
"summary": summary, | |
"slide_path": slide_download_link | |
}) | |
# Store results in the global variable | |
search_results = results | |
# Update visibility of rows | |
visible_rows = min(len(results), num_results) | |
row_updates = [] | |
for i in range(20): # Loop through all 20 rows | |
if i < visible_rows: # For rows with results | |
result = results[i] | |
row_updates.extend([ | |
gr.update(visible=True), # Row visibility | |
gr.update(value=result["image"], visible=True), | |
gr.update(value=f"<b>Title:</b> {result['title']}<br><b>Owner:</b> {result['owner']}<br><b>Category:</b> {result['category']}", visible=True), | |
gr.update(value=result["slide_path"], visible=True), # Slide path for identification | |
gr.update(visible=True) # Checkbox visibility | |
]) | |
else: # For rows without results | |
row_updates.extend([gr.update(visible=False)] * 6) # Row + 5 components | |
return row_updates | |
def combine_slides_as_zip(*checkbox_values): | |
""" | |
Collects selected individual slide files and zips them. | |
Returns the path to the ZIP file. | |
""" | |
selected_files = [ | |
result["slide_path"] for result, selected in zip(search_results, checkbox_values) if selected | |
] | |
if not selected_files: | |
return "No slides selected." | |
zip_filename = os.path.join(LOCAL_CACHE_DIR, "selected_slides.zip") | |
with zipfile.ZipFile(zip_filename, 'w') as zipf: | |
for file_path in selected_files: | |
arcname = os.path.basename(file_path) # Only filename in zip | |
zipf.write(file_path, arcname=arcname) | |
return zip_filename | |
# Background thread to wait for login | |
def background_login(flow): | |
global headers | |
result = app.acquire_token_by_device_flow(flow) | |
access_token = result["access_token"] | |
if "access_token" in result: | |
access_token_state["token"] = result["access_token"] | |
access_token = result["access_token"] | |
headers = { | |
"Authorization": f"Bearer {access_token}", | |
"Content-Type": "application/json" | |
} | |
else: | |
access_token_state["token"] = "ERROR" | |
def login_action(): | |
flow = app.initiate_device_flow(scopes=SCOPES) | |
flow_state["flow"] = flow | |
login_url = flow["verification_uri"] | |
login_code = flow["user_code"] | |
instructions = f""" | |
<p style='text-align:center; color:#1E3A8A;'>Please go to the following link to authenticate:</p> | |
<p style='text-align:center;'><a href='{login_url}' target='_blank'>{login_url}</a></p> | |
<p style='text-align:center;'>Enter the code: <strong>{login_code}</strong></p> | |
""" | |
# Start background login thread | |
threading.Thread(target=background_login, args=(flow,), daemon=True).start() | |
return gr.update(value=instructions, visible=True) | |
# Check token and control UI switch | |
def check_login_status(): | |
token = access_token_state["token"] | |
if token == "ERROR": | |
return gr.update(visible=True, value="β Login failed.Click Login button again to Try again"), gr.update(visible=True), gr.update(visible=False) | |
elif token: | |
return gr.update(value="", visible=False), gr.update(visible=False), gr.update(visible=True) | |
else: | |
return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) | |
def validate_admin_access(username, password): | |
if username == ADMIN_USERNAME and password == ADMIN_PASSWORD: | |
return ( | |
gr.update(visible=False), # Hide admin login form | |
gr.update(visible=True), # Show admin upload UI | |
gr.update(visible=False, value="") # Clear any error | |
) | |
else: | |
return ( | |
gr.update(visible=True), | |
gr.update(visible=False), | |
gr.update(visible=True, value="β Invalid credentials") | |
) | |
def load_and_store_metadata_df(): | |
global temp_file_path | |
# Load metadata file | |
gr.Info("Downloading the master file ..We will be ready shortly") | |
metadata_folder_id = get_folder_id("Projects Apps/PPT Maker/Metadata_file", headers) | |
temp_file_path =download_metadata_file(metadata_folder_id, headers) # Explicit call to download metadata | |
# metadata_file = "Master_metadata.csv" | |
# temp_file_path = os.path.join("/tmp", metadata_file_name) | |
if not os.path.exists(temp_file_path): | |
return [gr.update(visible=False) for _ in range(20)], "Metadata file not found." | |
# if not os.path.exists(metadata_file): | |
# return [gr.update(visible=False) for _ in range(20)], "Metadata file not found." | |
#CSS for checkboxes | |
css=""" | |
.gr-button { | |
background-color: #1E3A8A; | |
color: white; | |
} | |
/* Style for checkbox column */ | |
.checkbox-column { | |
background-color: #EFF6FF; | |
border-radius: 10px; | |
padding: 10px; | |
margin-top: 8px; | |
margin-bottom: 8px; | |
box-shadow: 0 1px 4px rgba(0,0,0,0.1); | |
transition: box-shadow 0.3s ease; | |
} | |
.checkbox-column:hover { | |
box-shadow: 0 2px 8px rgba(0,0,0,0.2); | |
} | |
/* Style the checkbox directly */ | |
.gr-checkbox { | |
font-weight: bold; | |
color: #1D4ED8; | |
} | |
""" | |
# # # MAIN APP # # # | |
with gr.Blocks(css=css) as demo: | |
with gr.Column(visible=True) as login_section: | |
gr.HTML("<h1 style='text-align:center; color:#1E3A8A;'>NCTC SlideFinder</h1>") | |
# π Subheading | |
gr.HTML("<h3 style='text-align:center; color:#0F766E;'>PPT Repo and Smart Search Powered by AI</h3>") | |
gr.HTML(""" | |
<div style='text-align:center;'> | |
<img src='/file=logo.jpg' width='200' height='200' style='margin-top:10px;' /> | |
</div> | |
""") | |
login_button = gr.Button("π Login") | |
auth_instructions = gr.HTML(visible=False) | |
login_error = gr.Textbox(visible=False, interactive=False, label="", show_label=False) | |
status_checker = gr.Button("β Check Login Status") | |
with gr.Column(visible=False) as main_app_section: | |
gr.Markdown("<h2 style='text-align:center; color:#0F766E;'>Welcome to NCTC PPT Repository</h2>") | |
with gr.Tab("π Stats Dashboard"): | |
with gr.Column() as dashboard_section: | |
gr.Markdown("### π Dashboard Overview") | |
with gr.Row(): | |
total_ppt_box = gr.HTML() | |
total_slides_box = gr.HTML() | |
with gr.Row(): | |
chart_output = gr.BarPlot(x="Month", y="PPT Uploads", label="Monthly PPT Uploads") | |
latest_ppts_output = gr.HTML() | |
with gr.Tab("Upload PPT"): | |
# file_input = gr.File(label="Upload PPT File") | |
# output_text = gr.Textbox(label="Processing Status") | |
# submit_button = gr.Button("Process") | |
# submit_button.click(process_presentation, inputs=file_input, outputs=output_text) | |
with gr.Column() as admin_access_section: | |
gr.Markdown("### π Admin Access Required") | |
username_input = gr.Textbox(label="Username", placeholder="Enter username") | |
password_input = gr.Textbox(label="Password", type="password", placeholder="Enter password") | |
admin_login_msg = gr.Textbox(visible=False, interactive=False, show_label=False) | |
admin_login_button = gr.Button("π Proceed") | |
with gr.Column(visible=False) as admin_upload_ui: | |
file_input = gr.File(label="Upload PPT File") | |
output_text = gr.Textbox(label="Processing Status") | |
submit_button = gr.Button("Process") | |
submit_button.click(process_presentation, inputs=file_input, outputs=output_text) | |
admin_login_button.click( | |
validate_admin_access, | |
inputs=[username_input, password_input], | |
outputs=[admin_access_section, admin_upload_ui, admin_login_msg] | |
) | |
with gr.Tab("Search PPT"): | |
query_input = gr.Textbox(label="Enter Search Query", placeholder="e.g., Risk Management") | |
num_results_input = gr.Number(label="Number of Results", value=5, minimum=1, maximum=20) | |
search_button = gr.Button("π Search") | |
result_rows = [] | |
result_components = [] | |
for i in range(20): | |
with gr.Row(visible=False) as row: | |
with gr.Column(scale=2): # image small | |
image_output = gr.Image(label="Slide Image") | |
with gr.Column(scale=1): # image small | |
info_output = gr.HTML(label="PPT Info") | |
with gr.Column(scale=2): # image small | |
summary_output = gr.Textbox(label="Short Summary", lines=3) | |
with gr.Column(scale=1): # image small | |
# download_button = gr.Button("Download PPT") | |
download_file = gr.File( label="π₯ Download PPT") | |
result_rows.append(row) # β Track rows | |
result_components.extend([row, image_output, info_output, summary_output, download_file]) | |
search_button.click( | |
search_ppts, | |
inputs=[query_input, num_results_input], | |
outputs=result_components | |
) | |
with gr.Tab("Search and Combine Slides"): | |
query_input = gr.Textbox(label="Enter Search Query to search slides", placeholder="e.g., Risk Management") | |
num_results_input = gr.Number(label="Number of Slides you need", value=5, minimum=1, maximum=20) | |
search_button = gr.Button("π Search") | |
result_rows = [] | |
result_components = [] | |
checkboxes = [] | |
for i in range(20): | |
with gr.Row(visible=False) as row: | |
with gr.Column(scale=4): # Image small | |
image_output = gr.Image(label="Slide Image") | |
with gr.Column(scale=2): # Info small | |
info_output = gr.HTML(label="Slide Info") | |
# with gr.Column(scale=2): # Summary small | |
# summary_output = gr.Textbox(label="Short Summary", lines=3) | |
with gr.Column(scale=1): # Slide ID small | |
download_file = gr.File( label="π₯ Download Slide") | |
#slide_id_output = gr.Textbox(label="Slide ID", interactive=False) | |
with gr.Column(scale=1, elem_classes=["checkbox-column"]): # Checkbox small | |
checkbox = gr.Checkbox(label="Select to Combine") | |
checkboxes.append(checkbox) | |
result_rows.append(row) # Track rows | |
result_components.extend([row, image_output, info_output, download_file, checkbox]) | |
combine_button = gr.Button("Combine Selected Slides") | |
combined_ppt_output = gr.File(label="Download Combined PPT") | |
search_button.click( | |
search_slides, | |
inputs=[query_input, num_results_input], | |
outputs=result_components | |
) | |
combine_button.click( | |
combine_slides_as_zip, | |
inputs=checkboxes, | |
outputs=gr.File(label="Download ZIP") | |
) | |
login_button.click(login_action, inputs=[], outputs=[auth_instructions]) | |
status_checker.click( | |
check_login_status, | |
inputs=[], | |
outputs=[login_error, login_section, main_app_section] | |
).then( | |
fn=load_and_store_metadata_df, | |
inputs=[], | |
outputs=[] | |
).then( | |
fn=update_dashboard, | |
inputs=[], | |
outputs=[dashboard_section, total_ppt_box, total_slides_box, chart_output, latest_ppts_output] | |
) | |
demo.launch(debug=True, allowed_paths=[LOCAL_CACHE_DIR]) |