Spaces:
Sleeping
Sleeping
from typing import List | |
import pandas as pd | |
from transformers import AutoTokenizer, AutoModel | |
import torch | |
from langchain_community.document_loaders import PyPDFLoader | |
from IPython.display import display | |
import os | |
os.system('apt-get install poppler-utils') | |
from sklearn.metrics.pairwise import cosine_similarity | |
import numpy as np | |
import streamlit as st | |
class PDFProcessor: | |
""" | |
Class for processing PDF files to extract text content. | |
""" | |
def extract_text_from_pdfs(self, file_paths: List[str]) -> List[str]: | |
""" | |
Extract text content from a list of PDF files. | |
Args: | |
file_paths (List[str]): A list of file paths to the PDF documents. | |
Returns: | |
List[str]: A list of text content extracted from the PDF documents. | |
""" | |
texts = [] | |
for file_path in file_paths: | |
try: | |
loader = PyPDFLoader(file_path) | |
pages = loader.load_and_split() | |
for page in pages: | |
if isinstance(page.page_content, bytes): | |
text = page.page_content.decode('utf-8', errors='ignore') | |
elif isinstance(page.page_content, str): | |
text = page.page_content | |
else: | |
print(f"Unexpected type: {type(page.page_content)}") | |
continue | |
texts.append(text) | |
except Exception as e: | |
print(f"Failed to process {file_path}: {e}") | |
return texts | |
class EmbeddingsProcessor: | |
""" | |
Class for processing text to obtain embeddings using a transformer model. | |
""" | |
def __init__(self, model_name: str): | |
""" | |
Initialize the EmbeddingsProcessor with a pre-trained model. | |
Args: | |
model_name (str): The name of the pre-trained model to use for generating embeddings. | |
""" | |
self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
self.model = AutoModel.from_pretrained(model_name).to('cuda') | |
def get_embeddings(self, texts: List[str]) -> np.ndarray: | |
""" | |
Generate embeddings for a list of texts. | |
Args: | |
texts (List[str]): A list of text strings for which to generate embeddings. | |
Returns: | |
np.ndarray: A NumPy array of embeddings for the provided texts. | |
""" | |
encoded_input = self.tokenizer(texts, padding=True, truncation=True, return_tensors="pt") | |
encoded_input = {k: v.to('cuda') for k, v in encoded_input.items()} | |
model_output = self.model(**encoded_input) | |
return model_output.last_hidden_state.mean(dim=1).detach().cpu().numpy() | |
def compute_similarity(template_embeddings: np.ndarray, contract_embeddings: np.ndarray) -> np.ndarray: | |
""" | |
Compute cosine similarity between template and contract embeddings. | |
Args: | |
template_embeddings (np.ndarray): A NumPy array of template embeddings. | |
contract_embeddings (np.ndarray): A NumPy array of contract embeddings. | |
Returns: | |
np.ndarray: A NumPy array of similarity scores between contracts and templates. | |
""" | |
return cosine_similarity(contract_embeddings, template_embeddings) | |
def clear_folder(path): | |
if not os.path.exists(path): | |
os.makedirs(path) # Create the directory if it doesn't exist | |
for file in os.listdir(path): | |
file_path = os.path.join(path, file) | |
try: | |
if os.path.isfile(file_path): | |
os.unlink(file_path) | |
except Exception as e: | |
print(f"Failed to delete {file_path}: {e}") | |
def save_uploaded_file(uploaded_file, path): | |
try: | |
with open(os.path.join(path, uploaded_file.name), "wb") as f: | |
f.write(uploaded_file.getbuffer()) | |
return True | |
except: | |
return False | |
# Streamlit UI | |
st.title('PDF Similarity Checker') | |
col1, col2 = st.columns(2) | |
# Clear the templates and contracts folders before uploading new files | |
templates_folder = './templates' | |
contracts_folder = './contracts' | |
clear_folder(templates_folder) | |
clear_folder(contracts_folder) | |
with col1: | |
st.header("Upload Templates") | |
uploaded_files_templates = st.file_uploader("Choose PDF files", accept_multiple_files=True, type=['pdf']) | |
os.makedirs(templates_folder, exist_ok=True) | |
for uploaded_file in uploaded_files_templates: | |
if save_uploaded_file(uploaded_file, templates_folder): | |
st.write(f"Saved: {uploaded_file.name}") | |
with col2: | |
st.header("Upload Contracts") | |
uploaded_files_contracts = st.file_uploader("Choose PDF files", key="contracts", accept_multiple_files=True, type=['pdf']) | |
os.makedirs(contracts_folder, exist_ok=True) | |
for uploaded_file in uploaded_files_contracts: | |
if save_uploaded_file(uploaded_file, contracts_folder): | |
st.write(f"Saved: {uploaded_file.name}") | |
model_name = st.selectbox("Select Model", ['sentence-transformers/multi-qa-mpnet-base-dot-v1'], index=0) | |
if st.button("Compute Similarities"): | |
pdf_processor = PDFProcessor() | |
embedding_processor = EmbeddingsProcessor(model_name) | |
# Process templates | |
template_files = [os.path.join(templates_folder, f) for f in os.listdir(templates_folder)] | |
template_texts = [pdf_processor.extract_text_from_pdfs([f])[0] for f in template_files if pdf_processor.extract_text_from_pdfs([f])] | |
template_embeddings = embedding_processor.get_embeddings(template_texts) | |
# Process contracts | |
contract_files = [os.path.join(contracts_folder, f) for f in os.listdir(contracts_folder)] | |
contract_texts = [pdf_processor.extract_text_from_pdfs([f])[0] for f in contract_files if pdf_processor.extract_text_from_pdfs([f])] | |
contract_embeddings = embedding_processor.get_embeddings(contract_texts) | |
# Compute similarities | |
similarities = compute_similarity(template_embeddings, contract_embeddings) | |
# Display results in a table format | |
similarity_data = [] | |
for i, contract_file in enumerate(contract_files): | |
row = [i + 1, os.path.basename(contract_file)] # SI No and contract file name | |
for j in range(len(template_files)): | |
if j < similarities.shape[1] and i < similarities.shape[0]: # Check if indices are within bounds | |
row.append(f"{similarities[i, j] * 100:.2f}%") # Format as percentage | |
else: | |
row.append("N/A") # Handle out-of-bounds indices gracefully | |
similarity_data.append(row) | |
# Create a DataFrame for the table | |
columns = ["SI No", "Contract"] + [os.path.basename(template_files[j]) for j in range(len(template_files))] | |
similarity_df = pd.DataFrame(similarity_data, columns=columns) | |
# Display maximize option | |
if st.checkbox("Maximize Table View"): | |
st.write("Similarity Scores Table (Maximized):") | |
st.dataframe(similarity_df) # Maximized view | |
else: | |
st.write("Similarity Scores Table:") | |
st.table(similarity_df.style.hide(axis="index")) # Normal view | |
# Download option | |
csv = similarity_df.to_csv(index=False).encode('utf-8') | |
st.download_button( | |
label="Download Similarity Table as CSV", | |
data=csv, | |
file_name='similarity_scores.csv', | |
mime='text/csv', | |
) |