| | import json
|
| | import torch
|
| | from transformers import BertConfig, BertTokenizer, BertForSequenceClassification, get_linear_schedule_with_warmup
|
| | from torch.utils.data import Dataset, DataLoader
|
| | from tqdm import tqdm
|
| | from sklearn.metrics import f1_score
|
| | import torch.amp as amp
|
| | import re
|
| | import warnings
|
| | from transformers import logging
|
| | import os
|
| |
|
| | warnings.filterwarnings("ignore")
|
| | logging.set_verbosity_error()
|
| |
|
| |
|
| | class IntentDataset(Dataset):
|
| | def __init__(self, questions, intents, tokenizer, max_len, intent_to_label, is_inference=False):
|
| | self.questions = questions
|
| | self.intents = intents
|
| | self.tokenizer = tokenizer
|
| | self.max_len = max_len
|
| | self.intent_to_label = intent_to_label
|
| | self.is_inference = is_inference
|
| |
|
| | def __len__(self):
|
| | return len(self.questions)
|
| |
|
| | def __getitem__(self, item):
|
| | question = str(self.questions[item])
|
| | intent = self.intents[item]
|
| |
|
| | encoding = self.tokenizer.encode_plus(
|
| | question,
|
| | add_special_tokens=True,
|
| | max_length=self.max_len,
|
| | return_token_type_ids=False,
|
| | padding='max_length',
|
| | return_attention_mask=True,
|
| | return_tensors='pt',
|
| | truncation=True
|
| | )
|
| |
|
| | if self.is_inference:
|
| | return {
|
| | 'input_ids': encoding['input_ids'].flatten(),
|
| | 'attention_mask': encoding['attention_mask'].flatten(),
|
| | }
|
| | else:
|
| | return {
|
| | 'input_ids': encoding['input_ids'].flatten(),
|
| | 'attention_mask': encoding['attention_mask'].flatten(),
|
| | 'labels': torch.tensor(self.intent_to_label[intent], dtype=torch.long)
|
| | }
|
| |
|
| |
|
| | def load_data(test_file, val_file):
|
| | import random
|
| |
|
| | print(f"Reading test data from {test_file}")
|
| | with open(test_file, 'r') as f:
|
| | test_data = json.load(f)
|
| |
|
| | print(f"Reading validation data from {val_file}")
|
| | with open(val_file, 'r') as f:
|
| | val_data = json.load(f)
|
| |
|
| | all_data = []
|
| | for item in test_data:
|
| | if isinstance(item, dict):
|
| | utterance = item.get('utterance') or item.get('text') or item.get('question')
|
| | intent = item.get('intent') or item.get('label') or item.get('class')
|
| | if utterance and intent:
|
| | all_data.append((utterance, intent))
|
| | elif isinstance(item, list) and len(item) == 2:
|
| | all_data.append(tuple(item))
|
| |
|
| | random.shuffle(all_data)
|
| | split_point = int(len(all_data) * 0.7)
|
| | train_processed = all_data[:split_point]
|
| | test_processed = all_data[split_point:]
|
| |
|
| | val_processed = []
|
| | for item in val_data:
|
| | if isinstance(item, dict):
|
| | utterance = item.get('utterance') or item.get('text') or item.get('question')
|
| | intent = item.get('intent') or item.get('label') or item.get('class')
|
| | if utterance and intent:
|
| | val_processed.append((utterance, intent))
|
| | elif isinstance(item, list) and len(item) == 2:
|
| | val_processed.append(tuple(item))
|
| |
|
| | intent_labels = list(set([intent for _, intent in train_processed + test_processed + val_processed]))
|
| | print(
|
| | f"Loaded {len(train_processed)} training examples, {len(test_processed)} test examples, and {len(val_processed)} validation examples")
|
| | return train_processed, test_processed, val_processed, intent_labels
|
| |
|
| |
|
| | class IntentClassifier:
|
| | def __init__(self, model_path, device):
|
| | self.device = device
|
| | self.model = BertForSequenceClassification.from_pretrained(model_path).to(device)
|
| | self.tokenizer = BertTokenizer.from_pretrained(model_path)
|
| | self.model.eval()
|
| |
|
| |
|
| | self.intent_priorities = {
|
| |
|
| | 'RecommendationRequest': 1.0,
|
| | 'Request': 1.0,
|
| | 'ComparisonRequest': 0.95,
|
| | 'ClarificationRequest': 0.95,
|
| |
|
| |
|
| | 'Fact': 0.9,
|
| | 'ActionReport': 0.85,
|
| | 'Preference': 0.85,
|
| |
|
| |
|
| | 'Opinion': 0.7,
|
| | 'SystemRecommendation': 0.7,
|
| | 'Answer': 0.7,
|
| |
|
| |
|
| | 'Sentiment': 0.5,
|
| | 'Feedback': 0.5,
|
| | 'ReferenceToPriorConversation': 0.5,
|
| |
|
| |
|
| | 'Greetings': 0.3,
|
| | 'Farewell': 0.3,
|
| | 'AgreementWithSystem': 0.3,
|
| | 'DisagreementWithSystem': 0.3,
|
| |
|
| |
|
| | 'IrrelevantUtterance': 0.1,
|
| | 'Unclear': 0.1
|
| | }
|
| |
|
| |
|
| | self.intent_relationships = {
|
| | 'RecommendationRequest': ['Fact', 'Preference', 'ActionReport'],
|
| | 'Request': ['Fact', 'Preference', 'ActionReport'],
|
| | 'ComparisonRequest': ['Fact', 'Preference'],
|
| | 'ClarificationRequest': ['Fact', 'ReferenceToPriorConversation']
|
| | }
|
| |
|
| | def segment_text(self, text):
|
| | """Enhanced text segmentation"""
|
| |
|
| | text = re.sub(r' and ', '. ', text)
|
| | text = re.sub(r', (?=[A-Z])', '. ', text)
|
| |
|
| |
|
| | segments = re.split('[.!?]', text)
|
| |
|
| |
|
| | segments = [s.strip() for s in segments if s.strip()]
|
| |
|
| |
|
| | refined_segments = []
|
| | for segment in segments:
|
| | if any(marker in segment.lower() for marker in ['because', 'since', 'as', 'would like', 'want to']):
|
| | parts = re.split(r'\b(because|since|as|would like|want to)\b', segment, flags=re.IGNORECASE)
|
| | refined_segments.extend([p.strip() for p in parts if p.strip()])
|
| | else:
|
| | refined_segments.append(segment)
|
| |
|
| | if not refined_segments:
|
| | refined_segments = [text.strip()]
|
| |
|
| | return refined_segments
|
| |
|
| | def classify_segment(self, text):
|
| | encoding = self.tokenizer.encode_plus(
|
| | text,
|
| | add_special_tokens=True,
|
| | max_length=128,
|
| | return_token_type_ids=False,
|
| | padding='max_length',
|
| | return_attention_mask=True,
|
| | return_tensors='pt',
|
| | truncation=True
|
| | )
|
| |
|
| | input_ids = encoding['input_ids'].to(self.device)
|
| | attention_mask = encoding['attention_mask'].to(self.device)
|
| |
|
| | with torch.no_grad():
|
| | outputs = self.model(input_ids, attention_mask=attention_mask)
|
| | probabilities = torch.softmax(outputs.logits, dim=1)[0]
|
| | confidence_values, pred_indices = torch.topk(probabilities, k=3)
|
| |
|
| |
|
| | return (pred_indices.cpu().tolist(),
|
| | confidence_values.cpu().tolist())
|
| |
|
| | def get_intent_label(self, index):
|
| | try:
|
| | with open('intent_mapping.json', 'r') as f:
|
| | intent_mapping = json.load(f)
|
| | return intent_mapping.get(f"LABEL_{index}", "Unknown")
|
| | except Exception as e:
|
| | print(f"Error loading intent mapping: {e}")
|
| | return "Unknown"
|
| |
|
| | def classify_text(self, text):
|
| | """Enhanced text classification with context awareness"""
|
| | segments = self.segment_text(text)
|
| | all_results = []
|
| |
|
| |
|
| | for segment in segments:
|
| | try:
|
| | pred_indices, confidence_values = self.classify_segment(segment)
|
| |
|
| | segment_results = []
|
| | for pred_idx, conf in zip(pred_indices, confidence_values):
|
| | intent = self.get_intent_label(pred_idx)
|
| | base_priority = self.intent_priorities.get(intent, 0.5)
|
| |
|
| |
|
| | position_boost = 1.0 if segment == segments[0] else 0.9
|
| |
|
| |
|
| | content_boost = 1.2 if any(phrase in segment.lower() for phrase in
|
| | ['need', 'want', 'help', 'recommend', 'advice', 'suggest']) else 1.0
|
| |
|
| | weighted_confidence = float(conf) * base_priority * position_boost * content_boost
|
| |
|
| | segment_results.append({
|
| | 'segment': segment,
|
| | 'intent': intent,
|
| | 'confidence': float(conf),
|
| | 'weighted_confidence': weighted_confidence,
|
| | 'base_priority': base_priority
|
| | })
|
| |
|
| | all_results.extend(segment_results)
|
| | except Exception as e:
|
| | print(f"Error processing segment '{segment}': {e}")
|
| | continue
|
| |
|
| | if not all_results:
|
| | return None
|
| |
|
| |
|
| | all_results.sort(key=lambda x: x['weighted_confidence'], reverse=True)
|
| |
|
| |
|
| | primary_intent = all_results[0]
|
| | secondary_intents = []
|
| |
|
| |
|
| | if primary_intent['intent'] in self.intent_relationships:
|
| | related_intents = self.intent_relationships[primary_intent['intent']]
|
| | for result in all_results[1:]:
|
| | if (result['intent'] in related_intents and
|
| | result['confidence'] > 0.4 and
|
| | len(secondary_intents) < 2):
|
| | secondary_intents.append(result)
|
| |
|
| |
|
| | if len(secondary_intents) < 2:
|
| | for result in all_results[1:]:
|
| | if (result['weighted_confidence'] > 0.4 and
|
| | result not in secondary_intents and
|
| | len(secondary_intents) < 2):
|
| | secondary_intents.append(result)
|
| |
|
| | return [primary_intent] + secondary_intents
|
| |
|
| |
|
| | def train_model(train_data, val_data, intent_labels, device):
|
| | train_questions, train_intents = zip(*train_data)
|
| | val_questions, val_intents = zip(*val_data)
|
| | intent_to_label = {intent: i for i, intent in enumerate(intent_labels)}
|
| |
|
| | config = BertConfig.from_pretrained('bert-base-uncased',
|
| | num_labels=len(intent_labels))
|
| | model = BertForSequenceClassification.from_pretrained('bert-base-uncased', config=config)
|
| | tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
|
| |
|
| | model = model.to(device)
|
| |
|
| | train_dataset = IntentDataset(train_questions, train_intents, tokenizer, max_len=128,
|
| | intent_to_label=intent_to_label)
|
| | val_dataset = IntentDataset(val_questions, val_intents, tokenizer, max_len=128,
|
| | intent_to_label=intent_to_label)
|
| |
|
| | train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
|
| | val_loader = DataLoader(val_dataset, batch_size=16)
|
| |
|
| | optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)
|
| | num_epochs = 25
|
| | scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0,
|
| | num_training_steps=len(train_loader) * num_epochs)
|
| | scaler = amp.GradScaler()
|
| | best_val_f1 = 0
|
| |
|
| | for epoch in range(num_epochs):
|
| | model.train()
|
| | total_loss = 0
|
| | for batch in tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs}"):
|
| | optimizer.zero_grad()
|
| | input_ids = batch['input_ids'].to(device)
|
| | attention_mask = batch['attention_mask'].to(device)
|
| | labels = batch['labels'].to(device)
|
| |
|
| | with amp.autocast(device_type='cuda' if torch.cuda.is_available() else 'cpu'):
|
| | outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
|
| | loss = outputs.loss
|
| |
|
| | scaler.scale(loss).backward()
|
| | scaler.step(optimizer)
|
| | scaler.update()
|
| | scheduler.step()
|
| |
|
| | total_loss += loss.item()
|
| |
|
| |
|
| | model.eval()
|
| | val_preds = []
|
| | val_true = []
|
| |
|
| | with torch.no_grad():
|
| | for batch in val_loader:
|
| | input_ids = batch['input_ids'].to(device)
|
| | attention_mask = batch['attention_mask'].to(device)
|
| | labels = batch['labels'].to(device)
|
| |
|
| | outputs = model(input_ids, attention_mask=attention_mask)
|
| | _, preds = torch.max(outputs.logits, dim=1)
|
| |
|
| | val_preds.extend(preds.cpu().tolist())
|
| | val_true.extend(labels.cpu().tolist())
|
| |
|
| | val_f1 = f1_score(val_true, val_preds, average='weighted')
|
| | print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss / len(train_loader):.4f}, Val F1: {val_f1:.4f}")
|
| |
|
| | if val_f1 > best_val_f1:
|
| | best_val_f1 = val_f1
|
| | model.save_pretrained('./fine_tuned_bert')
|
| | tokenizer.save_pretrained('./fine_tuned_bert')
|
| | print(f"New best model saved with validation F1: {best_val_f1:.4f}")
|
| |
|
| | intent_mapping = {f"LABEL_{i}": intent for i, intent in enumerate(intent_labels)}
|
| | with open('./fine_tuned_bert/intent_mapping.json', 'w') as f:
|
| | json.dump(intent_mapping, f, indent=2)
|
| |
|
| | return model, tokenizer
|
| |
|
| |
|
| | def interactive_classification():
|
| | device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
| | print(f"Using device: {device}")
|
| |
|
| | try:
|
| | classifier = IntentClassifier('./fine_tuned_bert', device)
|
| | print("\nModel loaded successfully!")
|
| | print("\nEnter your questions (type 'quit' to exit):")
|
| |
|
| | while True:
|
| | question = input("\nEnter your question: ").strip()
|
| |
|
| | if question.lower() in ['quit', 'exit', 'q']:
|
| | print("Exiting...")
|
| | break
|
| |
|
| | if not question:
|
| | print("Please enter a valid question.")
|
| | continue
|
| |
|
| | try:
|
| | results = classifier.classify_text(question)
|
| |
|
| | if results:
|
| | print("\nResults:")
|
| | for i, result in enumerate(results, 1):
|
| | print(f"\nIntent {i}:")
|
| | print(f"Detected Intent: {result['intent']}")
|
| | print(f"Confidence: {result['confidence']:.2%}")
|
| | print(f"Segment: {result['segment']}")
|
| |
|
| | conf = result['confidence']
|
| | if conf >= 0.9:
|
| | print("Confidence Level: Very High")
|
| | elif conf >= 0.7:
|
| | print("Confidence Level: High")
|
| | elif conf >= 0.5:
|
| | print("Confidence Level: Moderate")
|
| | else:
|
| | print("Confidence Level: Low")
|
| | else:
|
| | print("Could not determine intent with sufficient confidence.")
|
| |
|
| | except Exception as e:
|
| | print(f"Error processing question: {str(e)}")
|
| | print("Please try another question.")
|
| |
|
| | except Exception as e:
|
| | print(f"Error loading model: {str(e)}")
|
| | print("Please ensure the model has been trained and saved correctly.")
|
| |
|
| |
|
| | def main():
|
| | device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
| | print(f"Using device: {device}")
|
| |
|
| | model_dir = '/app/code/fine_tuned_bert'
|
| | required_files = {
|
| | 'config.json',
|
| | 'intent_mapping.json',
|
| | 'tokenizer_config.json',
|
| | 'vocab.txt'
|
| | }
|
| | model_files = {'pytorch_model.bin', 'model.safetensors'}
|
| |
|
| |
|
| | if os.path.exists(model_dir):
|
| | existing_files = set(os.listdir(model_dir))
|
| |
|
| |
|
| | has_model_file = any(f in existing_files for f in model_files)
|
| |
|
| |
|
| | has_required = required_files.issubset(existing_files)
|
| |
|
| | if has_model_file and has_required:
|
| | print("Found valid existing model. Loading...")
|
| | classifier = IntentClassifier(model_dir, device)
|
| | print("\nStarting interactive classification...")
|
| | interactive_classification()
|
| | return
|
| |
|
| |
|
| | print("Model not found or incomplete. Starting training...")
|
| | os.makedirs(model_dir, exist_ok=True)
|
| |
|
| |
|
| | train_data, test_data, val_data, intent_labels = load_data(
|
| | 'training-22-intent.json',
|
| | 'validation-22-intent.json'
|
| | )
|
| | intent_mapping = {f"LABEL_{i}": intent for i, intent in enumerate(intent_labels)}
|
| | with open(os.path.join(model_dir, 'intent_mapping.json'), 'w') as f:
|
| | json.dump(intent_mapping, f, indent=2)
|
| |
|
| |
|
| | model, tokenizer = train_model(train_data, val_data, intent_labels, device)
|
| | model.save_pretrained(model_dir, safe_serialization=False)
|
| | tokenizer.save_pretrained(model_dir)
|
| |
|
| | print("Training complete. Starting classification...")
|
| | interactive_classification()
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| | main() |