RESUME_RANKER / functions.py
manasvinid's picture
Update functions.py
876dea7 verified
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
import nltk
import zipfile
import os
from bs4 import BeautifulSoup
import re
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize
from transformers import BartForConditionalGeneration, BartTokenizer
import torch
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
from qdrant_client import QdrantClient
from qdrant_client.http.models import VectorParams, Distance, Record, Filter
from random import uniform
import PyPDF2
import streamlit as st
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('stopwords')
def setup_nltk_resources():
"""
Sets up the custom NLTK data path and downloads necessary resources.
Downloads 'wordnet' for lemmatization, 'stopwords' for stopwords removal,
and 'punkt' for sentence tokenization.
"""
nltk_data_path = "/kaggle/working/nltk_data"
nltk.data.path.append(nltk_data_path)
nltk.download('wordnet', download_dir=nltk_data_path)
nltk.download('stopwords', download_dir=nltk_data_path)
nltk.download('punkt', download_dir=nltk_data_path)
def unzip_nltk_resource(zip_path, extract_to):
"""
Unzips an NLTK resource file to a specified directory.
Args:
zip_path (str): The path to the zipped NLTK resource file.
extract_to (str): The directory where the contents of the zip file will be extracted.
"""
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_to)
def preprocess_text(text):
"""
Preprocesses a given text string for NLP tasks. This includes cleaning the text,
tokenizing, removing stopwords, and lemmatizing the words.
Args:
text (str): The text string to preprocess.
Returns:
str: The preprocessed text.
"""
if not text:
return ""
text = re.sub(r'[\r\n\t]+', ' ', text)
text = re.sub(r'[^a-zA-Z\s]', '', text)
text = text.lower()
tokens = word_tokenize(text)
stop_words = set(stopwords.words('english'))
filtered_tokens = [word for word in tokens if word not in stop_words]
lemmatizer = WordNetLemmatizer()
lemmatized_text = [lemmatizer.lemmatize(word) for word in filtered_tokens]
return ' '.join(lemmatized_text)
def drop_duplicates(df, column_name):
"""
Drops duplicates based on a specified column from the DataFrame.
Args:
df (pd.DataFrame): The DataFrame from which to remove duplicates.
column_name (str): The name of the column based on which duplicates will be identified.
Returns:
pd.DataFrame: DataFrame with duplicates removed based on the specified column.
"""
if column_name not in df.columns:
raise ValueError(f"Column '{column_name}' not found in DataFrame")
original_size = df.shape[0]
df_cleaned = df.drop_duplicates(subset=[column_name])
new_size = df_cleaned.shape[0]
print(f"Dropped {original_size - new_size} duplicates from '{column_name}'. New dataset size: {new_size}")
return df_cleaned
def add_token_count_column(df, column_name):
"""
Adds a new column to the DataFrame with the token count for each entry in the specified column.
This function creates a copy of the DataFrame to avoid 'SettingWithCopyWarning'.
Args:
df (pd.DataFrame): The DataFrame to process.
column_name (str): The name of the column for which to count tokens.
Returns:
pd.DataFrame: DataFrame with an additional column 'token_count'.
"""
if column_name not in df.columns:
raise ValueError(f"Column '{column_name}' not found in DataFrame")
# Creating a copy of the DataFrame to avoid modifying a slice
df_copy = df.copy()
# Tokenize each entry in the specified column and count the number of tokens
df_copy['token_count'] = df_copy[column_name].apply(lambda x: len(word_tokenize(x)) if pd.notnull(x) else 0)
return df_copy
class TextSummarizer:
"""
A text summarization class that uses a fine-tuned BART model to summarize text.
Attributes:
device (str): Device to run the model on, either 'cuda' or 'cpu'.
model (BartForConditionalGeneration): The loaded BART model.
tokenizer (BartTokenizer): The tokenizer for the BART model.
"""
def __init__(self, model_name):
"""
Initializes the TextSummarizer with a specified BART model.
Args:
model_name (str): The name or path of the fine-tuned BART model.
"""
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.model = BartForConditionalGeneration.from_pretrained(model_name).to(self.device)
self.tokenizer = BartTokenizer.from_pretrained(model_name)
def summarize(self, text, max_input_length=1024, max_output_length=150, min_output_length=40):
"""
Summarizes the given text using the fine-tuned BART model.
Args:
text (str): The text to be summarized.
max_input_length (int): The maximum length of the input text in tokens.
max_output_length (int): The maximum length of the summary text in tokens.
min_output_length (int): The minimum length of the summary text in tokens.
Returns:
str: The summarized text.
"""
inputs = self.tokenizer([text], max_length=max_input_length, return_tensors='pt', truncation=True)
summary_ids = self.model.generate(
inputs['input_ids'].to(self.device),
max_length=max_output_length,
min_length=min_output_length,
length_penalty=2.0,
num_beams=4,
early_stopping=True
)
return self.tokenizer.decode(summary_ids[0], skip_special_tokens=True)
def batch_summarize(df, text_col, summarizer, batch_size=10, output_col=None):
"""
Summarizes text in batches.
Args:
df (pd.DataFrame): The DataFrame containing text to summarize.
text_col (str): The column in the DataFrame with text to summarize.
summarizer: The summarizer object or function.
batch_size (int): The size of each batch for summarization.
output_col (str, optional): The name of the output column for summarized text.
If None, defaults to text_col.
Returns:
pd.DataFrame: DataFrame with summarized text in the specified output column.
"""
summarized_texts = []
# Use the text_col as output_col if not specified
if output_col is None:
output_col = text_col
# Iterate through the DataFrame in batches
for start_idx in tqdm(range(0, len(df), batch_size), desc="Summarizing"):
end_idx = start_idx + batch_size
batch = df[text_col][start_idx:end_idx]
# Summarize each batch
summarized_batch = [summarizer.summarize(text) for text in batch]
summarized_texts.extend(summarized_batch)
# Create a new DataFrame with the summarized text
return pd.DataFrame({output_col: summarized_texts})
class SentenceTransformerEncoder:
"""
A class to handle sentence encoding using Sentence Transformers, directly working with pandas DataFrames.
This class encodes text data in a specified DataFrame column into vector representations.
Attributes:
model (SentenceTransformer): The Sentence Transformer model used for encoding.
"""
def __init__(self, model_name='all-MiniLM-L6-v2'):
"""
Initializes the SentenceTransformerEncoder with a specified Sentence Transformer model.
Args:
model_name (str): The name of the Sentence Transformer model.
"""
self.model = SentenceTransformer(model_name)
def encode_column(self, df, column, batch_size=32, encoded_column_suffix='_encoded'):
"""
Encodes a specific column in a DataFrame and adds a new column with encoded vectors.
Args:
df (pd.DataFrame): The DataFrame containing the texts to encode.
column (str): The name of the column to encode.
batch_size (int): The size of each batch for processing.
encoded_column_suffix (str): Suffix for the new column containing encoded vectors.
Returns:
pd.DataFrame: The original DataFrame with an additional column containing encoded vectors.
Raises:
ValueError: If the specified column is not found in the DataFrame.
"""
if column not in df.columns:
raise ValueError(f"Column '{column}' not found in DataFrame")
# Encoding the text data in batches
encoded_vectors = []
for start_idx in range(0, len(df), batch_size):
end_idx = min(start_idx + batch_size, len(df))
batch_texts = df[column][start_idx:end_idx].tolist()
batch_encoded = self.model.encode(batch_texts, show_progress_bar=True)
encoded_vectors.extend(batch_encoded)
# Adding the encoded vectors as a new column in the DataFrame
df[column + encoded_column_suffix] = encoded_vectors
return df
class QdrantInterface:
"""
A class for interfacing with the Qdrant vector database.
Attributes:
client (QdrantClient): Client instance for interacting with Qdrant.
vector_dimension (int): Dimension of the vectors used in the collection.
"""
"""
A class for interfacing with the Qdrant vector database.
...
"""
def __init__(self, url, api_key, vector_dimension):
"""
Initializes the QdrantInterface with the specified Qdrant URL, API key, and vector dimension.
Args:
url (str): Full URL of the Qdrant server.
api_key (str): API key for Qdrant.
vector_dimension (int): Dimension of vectors to be stored in Qdrant.
"""
self.client = QdrantClient(url=url, api_key=api_key)
self.vector_dimension = vector_dimension
def create_collection(self, collection_name, distance_metric=Distance.COSINE):
"""
Creates or recreates a collection in Qdrant.
Args:
collection_name (str): Name of the collection.
distance_metric (Distance): Distance metric for vector comparisons.
"""
self.client.recreate_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=self.vector_dimension, distance=distance_metric)
)
def save_to_qdrant(self, df, collection_name, vector_col, payload_cols, batch_size=100):
"""
Saves a DataFrame to Qdrant in batches.
Args:
df (pd.DataFrame): DataFrame containing data to save.
collection_name (str): Name of the collection in Qdrant.
vector_col (str): Name of the column containing vectors.
payload_cols (list[str]): List of column names to include as payload.
batch_size (int): Number of records to process in each batch.
"""
for start_idx in range(0, len(df), batch_size):
end_idx = min(start_idx + batch_size, len(df))
batch = df.iloc[start_idx:end_idx]
records = []
for idx, row in batch.iterrows():
# Debug print
print(f"Index: {idx}, Vector Type: {type(row[vector_col])}, First 10 Elements: {row[vector_col][:10]}")
record = Record(
id=idx,
vector=row[vector_col],
payload={col: row[col] for col in payload_cols}
)
records.append(record)
self.client.upload_records(collection_name=collection_name, records=records)
def retrieve_specific_records(self, collection_name, ids):
"""
Retrieves specific records by their IDs from a Qdrant collection.
Args:
collection_name (str): The name of the collection.
ids (list): List of record IDs to retrieve.
Returns:
List of specific records from the collection.
"""
return self.client.retrieve(collection_name=collection_name, ids=ids)
def view_sample_records(self, collection_name, vector_dimension, limit=10):
"""
Retrieves a sample of records from a Qdrant collection using a dummy search.
Args:
collection_name (str): The name of the collection.
vector_dimension (int): Dimension of vectors in the collection.
limit (int): The number of records to retrieve.
Returns:
List of sample records from the collection.
"""
# Generate a random vector
random_vector = [uniform(-1, 1) for _ in range(vector_dimension)]
# Perform a dummy search
return self.client.search(
collection_name=collection_name,
query_vector=random_vector,
limit=limit
)
def match_resumes_to_jobs(self, resume_vector, top_k=10):
"""
Matches a given resume vector to job postings.
Args:
resume_vector (list): The vector representation of a resume.
top_k (int): Number of top similar matches to return.
Returns:
List of matched job postings with similarity scores.
"""
hits = self.client.search(
collection_name="jobs",
query_vector=resume_vector,
limit=top_k,
with_payload=True
)
return [(hit.payload, hit.score) for hit in hits]
def match_jobs_to_resumes(self, job_vector, top_k=10):
"""
Matches a given job vector to resumes.
Args:
job_vector (list): The vector representation of a job posting.
top_k (int): Number of top similar matches to return.
Returns:
List of tuples containing matched resumes and their similarity scores.
"""
hits = self.client.search(
collection_name="resumes",
query_vector=job_vector,
limit=top_k,
with_payload=True
)
return [(hit.payload, hit.score) for hit in hits]
def extract_text_from_pdf(file):
"""
Extract text from a PDF file using PyPDF2 library.
"""
text = ""
try:
pdf_reader = PyPDF2.PdfReader(file)
num_pages = len(pdf_reader.pages)
for page_num in range(num_pages):
page = pdf_reader.pages[page_num]
text += page.extract_text()
except Exception as e:
st.error(f"Error extracting text from PDF: {e}")
return text
def resume_pdf():
st.title("UPLOAD RESUMES")
# Allow user to upload multiple PDF files
uploaded_files = st.file_uploader("Upload PDF files", accept_multiple_files=True, type="pdf")
if uploaded_files:
st.write("## Extracted Text from PDFs")
df_rows = []
# Iterate over uploaded PDF files
for idx, uploaded_file in enumerate(uploaded_files):
text = extract_text_from_pdf(uploaded_file)
# Add text to DataFrame
df_rows.append({"File Name": f"File_{idx+1}", "Resume": text})
# # Display extracted text
# st.write(f"### File {idx+1}")
# st.write(text)
# # Iterate over uploaded PDF files
# for uploaded_file in uploaded_files:
# text = extract_text_from_pdf(uploaded_file)
# # Add text to DataFrame
# df_rows.append({"File Name": uploaded_file.name, "Text": text.decode("utf-8")})
# # Display extracted text
# st.write(f"### {uploaded_file.name}")
# st.write(text)
# Create DataFrame
df = pd.DataFrame(df_rows)
return df
# # Display DataFrame
# st.write("## Combined Data in DataFrame")
# st.write(df)
def job_desc_pdf():
st.title("UPLOAD JOB DESCRIPTION")
uploaded_file = st.file_uploader("Upload PDF file", type="pdf")
if uploaded_file:
st.write("## Extracted Text from PDFs")
text = extract_text_from_pdf(uploaded_file)
df_rows = []
df_rows.append({"File Name": "Job_Desc", "description": text})
# # Iterate over uploaded PDF files
# for uploaded_file in uploaded_files:
# text = extract_text_from_pdf(uploaded_file)
# # Add text to DataFrame
#
# # Display extracted text
# st.write(f"### {uploaded_file.name}")
# st.write(text)
# Create DataFrame
df = pd.DataFrame(df_rows)
# # Display DataFrame
# st.write("## Combined Data in DataFrame")
# st.write(df)
return df