Spaces:
Sleeping
Sleeping
# Importing required packages | |
import pickle | |
import pandas as pd | |
import re | |
import numpy as np | |
import torch.nn.functional as F | |
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score | |
from transformers import AutoModelForSequenceClassification, AutoTokenizer | |
# Loading data | |
parquet_file = 'data/data_dump_ai_assingment.parquet' | |
df = pd.read_parquet(parquet_file, engine='pyarrow') | |
# Setting 3 random campaigns aside as testing examples for final models | |
campaign_ids = [8, 123, 256] | |
df_final_testing = df[df['campaign_id'].isin(campaign_ids)==True] | |
# Clean text | |
def clean_text(text): | |
# Use a regular expression to remove non-alphabetic characters | |
cleaned_text = re.sub(r'[^a-zA-Z0-9]+', ' ', text) | |
# Remove multiple consecutive spaces and leading/trailing spaces | |
cleaned_text = ' '.join(cleaned_text.split()) | |
# Lower texts | |
cleaned_text = cleaned_text.lower() | |
return cleaned_text | |
def combine_text(df_single_lead): | |
# Changing column types | |
df_single_lead['current_position'] = df_single_lead['current_position'].astype('str') | |
df_single_lead['industry_sector'] = df_single_lead['industry_sector'].astype('str') | |
df_single_lead['n_employees'] = df_single_lead['n_employees'].astype('str') | |
# Combine text columns | |
df_single_lead['combined_text'] = df_single_lead['current_position'] + ' ' + df_single_lead['industry_sector'] + ' ' + df_single_lead['n_employees'] + ' employees' | |
# Clean text | |
df_single_lead['combined_text'] = df_single_lead['combined_text'].apply(lambda row: clean_text(row)) | |
return df_single_lead | |
# Function to test model performance | |
def model_predict(model, tokenizer, X_test, y_test, batch_size=32): | |
text_test = X_test.to_list() | |
labels_test = y_test.to_list() | |
# Split the test data into batches to prevent large memory allocation | |
batch_size = batch_size | |
num_samples = len(text_test) | |
num_batches = (num_samples + batch_size - 1) // batch_size # Calculate the number of batches | |
# Initialize an empty list to store predicted labels | |
predicted_labels_test = [] | |
# Initialize an empty list to store predicted probabilities | |
predicted_proba_test = [] | |
# Iterate over batches | |
for i in range(num_batches): | |
start_idx = i * batch_size | |
end_idx = min((i + 1) * batch_size, num_samples) | |
# Get a batch of text and labels | |
batch_text = text_test[start_idx:end_idx] | |
batch_labels = labels_test[start_idx:end_idx] | |
# Encode the batch | |
encoded_data = tokenizer(batch_text, padding=True, truncation=True, return_tensors='pt') | |
# Forward pass through the model | |
logits = model(encoded_data['input_ids'], attention_mask=encoded_data['attention_mask']).logits | |
# Get predicted labels for the batch | |
batch_predicted_labels = logits.argmax(dim=1).tolist() | |
# Append the batch predictions to the overall list | |
predicted_labels_test.extend(batch_predicted_labels) | |
# Apply softmax to logits to retrieve probabilities and put them in a cleaned list | |
softmax_proba = F.softmax(logits, dim=-1) | |
batch_predicted_proba= [tensor.tolist() for tensor in softmax_proba] | |
# Append the batch probabilities to the overall list | |
predicted_proba_test.extend(batch_predicted_proba) | |
return predicted_labels_test, predicted_proba_test | |
# Calculate performance metrics | |
def compute_metrics(predictions, true_labels): | |
f1_weighted = round(f1_score(true_labels, predictions, average='weighted'),3) | |
f1 = round(f1_score(true_labels, predictions),3) | |
accuracy = round(accuracy_score(true_labels, predictions),3) | |
recall = round(recall_score(true_labels, predictions, zero_division=np.nan),3) | |
precision = round(precision_score(true_labels, predictions, zero_division=np.nan),3) | |
performance_metrics = { | |
'F1 weighted': f1_weighted, | |
'F1': f1, | |
'Accuracy': accuracy, | |
'Recall': recall, | |
'Precision': precision | |
} | |
return performance_metrics | |
# Loading XGB model | |
with open('models/xgb_tuned_2/xgb_model_tuned_2.pkl', 'rb') as model_file: | |
xgb_model_tuned_2 = pickle.load(model_file) | |
# Loading XGB vectorizer | |
with open('models/xgb_tuned_2/vectorizer.pkl', 'rb') as model_file: | |
vectorizer = pickle.load(model_file) | |
# Loading BERT model | |
distil_bert_tokenizer_tuned_2 = AutoTokenizer.from_pretrained('models/distil_bert_tuned_2') | |
# Loading BERT tokenizer | |
distil_bert_model_tuned_2 = AutoModelForSequenceClassification.from_pretrained( | |
'models/distil_bert_tuned_2', num_labels=2) | |
# Classify single lead data | |
def classify(CAMPAIGN_ID, LEAD_ID, proba_cutoff=50, model_type='XGB', full_campaign=False): | |
if full_campaign == True: | |
# Select full campaign data | |
df = df_final_testing[(df_final_testing['campaign_id']==CAMPAIGN_ID)] | |
else: | |
# Selecting single lead data | |
df = df_final_testing[(df_final_testing['campaign_id']==CAMPAIGN_ID) & (df_final_testing['lead_id']==LEAD_ID)] | |
# True labels | |
true_labels = df['employee_is_selected'].tolist() | |
# Combining text columns | |
df = combine_text(df) | |
# Vectorize text with tfidf vectorizer | |
tfidf_matrix = vectorizer.transform(df['combined_text']) | |
# Selecing model | |
if model_type=='XGB': | |
model = xgb_model_tuned_2 | |
# Predictions | |
predictions = model.predict(tfidf_matrix) | |
# Prediction porabilities of being 1 (selected) | |
predictions_proba_1 = model.predict_proba(tfidf_matrix)[:, 1].tolist() | |
elif model_type=='BERT': | |
predictions, predicted_test_proba = model_predict(model = distil_bert_model_tuned_2, | |
tokenizer = distil_bert_tokenizer_tuned_2, | |
X_test = df['combined_text'], | |
y_test = df['employee_is_selected']) | |
# Prediction porabilities of being 1 (selected) | |
predictions_proba_1 = [lists[1] for lists in predicted_test_proba] | |
# Alter predictions based on rank_cutoff value | |
cutoff_predictions = [1 if probability >= (proba_cutoff/100) else 0 for probability in predictions_proba_1] | |
# Use argsort to get the indices that would sort the list in descending order | |
sorted_indices = np.argsort(predictions_proba_1)[::-1] | |
# Create dataframe columns and ranking | |
df['cutoff_prediction'] = cutoff_predictions | |
df['prediction_proba_1'] = predictions_proba_1 | |
df = df.sort_values(by='prediction_proba_1', ascending=False) | |
df['ranking'] = [i+1 for i in range(len(df['prediction_proba_1']))] | |
df['prediction_proba_1'] = df['prediction_proba_1'].round(3) | |
df = df[['ranking', 'prediction_proba_1', 'current_position', 'industry_sector', 'employee_is_selected', 'cutoff_prediction']].sort_values(by='prediction_proba_1', ascending=False) | |
df_123 = df[(df['ranking'].isin([1, 2, 3])) & (df['cutoff_prediction'] == 1)].sort_values(by='ranking') | |
performance_metrics = compute_metrics(cutoff_predictions, true_labels) | |
df_performance_metrics = pd.DataFrame.from_dict(performance_metrics, orient='index', columns=['Score']) | |
df_performance_metrics.reset_index(inplace=True, names=['Metric']) | |
return df, df_123, df_performance_metrics | |