Spaces:
Runtime error
Runtime error
import joblib | |
import pandas as pd | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.naive_bayes import MultinomialNB | |
from sklearn.pipeline import Pipeline | |
from sklearn.model_selection import train_test_split | |
from typing import Tuple, Any, Optional, List, Dict | |
from pathlib import Path | |
import re | |
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel | |
import spacy | |
import pickle | |
# --- Constants --- | |
MODEL_DIR = Path("saved_models") | |
MODEL_PATH = MODEL_DIR / "email_classifier_pipeline.pkl" | |
MODEL_DIR.mkdir(parents=True, exist_ok=True) | |
# --- FastAPI App --- | |
app = FastAPI() | |
# --- Pydantic Models for Request/Response --- | |
class EmailInput(BaseModel): | |
email_body: str | |
class MaskedEntity(BaseModel): | |
position: List[int] | |
classification: str | |
entity: str | |
class ClassificationOutput(BaseModel): | |
input_email_body: str | |
list_of_masked_entities: List[MaskedEntity] | |
masked_email: str | |
category_of_the_email: str | |
# --- Load Model at Startup --- | |
# Load the model pipeline once when the application starts | |
model_pipeline: Optional[Pipeline] = None | |
# --- Model Loading --- | |
def load_model_pipeline() -> Optional[Pipeline]: | |
"""Loads the trained model pipeline.""" | |
model_pipeline = None | |
if MODEL_PATH.exists(): | |
try: | |
model_pipeline = joblib.load(MODEL_PATH) | |
print(f"Model pipeline loaded successfully from {MODEL_PATH}") | |
except Exception as e: | |
print(f"Error loading model pipeline from {MODEL_PATH}: {e}") | |
else: | |
print(f"Model pipeline not found at {MODEL_PATH}.") | |
print("Please train and save the model pipeline first.") | |
return model_pipeline | |
# --- Text Cleaning Function --- | |
def clean_text_for_classification(text: str) -> str: | |
"""Basic text cleaning.""" | |
text = text.lower() | |
text = re.sub(r'<.*?>', '', text) # Remove HTML tags | |
text = re.sub(r'[^a-z\s]', '', text) # Remove non-alpha and non-whitespace | |
text = re.sub(r'\s+', ' ', text).strip() # Normalize whitespace | |
return text | |
# --- Mask PII Function --- | |
def mask_pii(text: str, nlp_model: spacy.language.Language) -> Tuple[str, List[Dict[str, Any]]]: | |
""" | |
Finds and masks PII entities in text using the provided spaCy model. | |
Args: | |
text: The input email body. | |
nlp_model: The loaded spaCy language model. | |
Returns: | |
A tuple containing: | |
- The email body with PII entities replaced by placeholders (e.g., "[full_name]"). | |
- A list of dictionaries, where each dictionary describes a masked entity | |
(e.g., {"position": [start, end], "classification": "entity_type", "entity": "original_text"}). | |
""" | |
print(f"Executing mask_pii for text: '{text[:50]}...'") # Add log | |
masked_text = text | |
entities = [] | |
doc = nlp_model(text) | |
for ent in doc.ents: | |
if ent.label_ in ["PERSON", "EMAIL"]: | |
entity_info = { | |
"position": [ent.start_char, ent.end_char], | |
"classification": ent.label_.lower(), | |
"entity": ent.text | |
} | |
entities.append(entity_info) | |
masked_text = masked_text.replace(ent.text, f"[{ent.label_.lower()}]") | |
print(f"mask_pii result - entities: {entities}") # Add log | |
return masked_text, entities | |
# --- Prediction Function --- | |
def predict_category(text: str, pipeline: Pipeline) -> str: | |
""" | |
Predicts the category of the text using the loaded classification pipeline. | |
Applies cleaning before prediction. | |
""" | |
print(f"Executing predict_category for text: '{text[:50]}...'") | |
try: | |
# Clean the text first using the function now in this file | |
cleaned_text = clean_text_for_classification(text) | |
print(f"Cleaned text for prediction: '{cleaned_text[:50]}...'") | |
# Assuming the pipeline has a .predict() method | |
prediction = pipeline.predict([cleaned_text]) # Predict on cleaned text | |
category = str(prediction[0]) if prediction else "Prediction failed" | |
except Exception as e: | |
print(f"Error during prediction: {e}") | |
category = "Prediction Error" | |
print(f"predict_category result: {category}") | |
return category | |
# --- Training Function --- | |
def train_model(data_path: Path, model_save_path: Path): | |
"""Loads data, trains the model pipeline, and saves it.""" | |
if not data_path.exists(): | |
print(f"Error: Dataset not found at {data_path}") | |
print("Please make sure the CSV file is uploaded to your Codespace.") | |
return | |
print(f"Loading dataset from {data_path}...") | |
try: | |
df = pd.read_csv(data_path) | |
except Exception as e: | |
print(f"Error loading CSV: {e}") | |
return | |
# --- Data Validation --- | |
email_body_column = 'body' # Column name for email text in your CSV | |
category_column = 'category' # Column name for the category label in your CSV | |
if email_body_column not in df.columns: | |
print(f"Error: Email body column '{email_body_column}' not found in the dataset.") | |
print(f"Available columns: {df.columns.tolist()}") | |
return | |
if category_column not in df.columns: | |
print(f"Error: Category column '{category_column}' not found in the dataset.") | |
print(f"Available columns: {df.columns.tolist()}") | |
return | |
# Handle potential missing values | |
df.dropna(subset=[email_body_column, category_column], inplace=True) | |
if df.empty: | |
print("Error: No valid data remaining after handling missing values.") | |
return | |
print("Applying text cleaning...") | |
# Ensure the cleaning function exists and works | |
try: | |
df['cleaned_text'] = df[email_body_column].astype(str).apply(clean_text_for_classification) | |
except Exception as e: | |
print(f"Error during text cleaning: {e}") | |
return | |
print("Splitting data...") | |
X = df['cleaned_text'] | |
y = df[category_column] | |
X_train, X_test, y_train, y_test = train_test_split( | |
X, y, test_size=0.2, random_state=42, stratify=y # Use stratify for balanced splits | |
) | |
# --- Model Pipeline --- | |
pipeline = Pipeline([ | |
('tfidf', TfidfVectorizer(stop_words='english', max_df=0.95, min_df=2)), | |
('clf', MultinomialNB()) # Using Naive Bayes as a starting point | |
]) | |
print("Training model...") | |
try: | |
pipeline.fit(X_train, y_train) | |
print("Training complete.") | |
except Exception as e: | |
print(f"Error during model training: {e}") | |
return | |
# --- Evaluation --- | |
try: | |
accuracy = pipeline.score(X_test, y_test) | |
print(f"Model Accuracy on Test Set: {accuracy:.4f}") | |
except Exception as e: | |
print(f"Error during model evaluation: {e}") | |
# --- Save Model --- | |
print(f"Saving model pipeline to {model_save_path}...") | |
model_save_path.parent.mkdir(parents=True, exist_ok=True) # Ensure directory exists | |
try: | |
joblib.dump(pipeline, model_save_path) | |
print("Model pipeline saved successfully.") | |
except Exception as e: | |
print(f"Error saving model pipeline: {e}") | |
# --- API Endpoints --- | |
def read_root(): | |
return {"message": "Email Classification API is running. Use the /classify/ endpoint."} | |
async def classify_email(email_input: EmailInput): | |
if model_pipeline is None: | |
raise HTTPException(status_code=503, detail="Model not loaded. API is not ready.") | |
input_email = email_input.email_body | |
# 1. Mask PII | |
masked_text, masked_entities_list = mask_pii(input_email) | |
# Convert masked_entities_list to list of MaskedEntity objects if needed | |
# (Depends on how mask_pii returns it, ensure structure matches Pydantic model) | |
formatted_entities = [MaskedEntity(**entity) for entity in masked_entities_list] | |
# 2. Predict Category using the masked text | |
predicted_category = predict_category(masked_text, model_pipeline) | |
# 3. Construct and return the response | |
response = ClassificationOutput( | |
input_email_body=input_email, | |
list_of_masked_entities=formatted_entities, | |
masked_email=masked_text, | |
category_of_the_email=predicted_category | |
) | |
return response | |
# Example Usage (if you run this file directly for testing/training) | |
if __name__ == "__main__": | |
print("Running models.py directly...") | |
dummy_emails = [ | |
"Subject: Billing Issue My account [full_name] was charged twice for order [order_id]. Please refund.", | |
"Subject: Help needed Cannot login. My email is [email]. Reset password link broken.", | |
"Subject: Account Management Request to close my account [account_num]. User [full_name]." | |
] | |
dummy_labels = ["Billing Issues", "Technical Support", "Account Management"] | |
print("Attempting to load model and predict...") | |
model_pipeline = load_model_pipeline() | |
if model_pipeline: | |
test_email = "my login is not working help required email [email]" | |
category = predict_category(test_email, model_pipeline) | |
print(f"Test Email: '{test_email}'") | |
print(f"Predicted Category: {category}") | |
else: | |
print("Cannot perform prediction as model pipeline failed to load.") | |
#hi i am siddharth | |
#hi i am siddharth |