!pip install -q -U watermark !pip install -qq transformers import transformers from transformers import BertModel, BertTokenizer, AdamW, get_linear_schedule_with_warmup import torch import numpy as np import pandas as pd import seaborn as sns from pylab import rcParams import matplotlib.pyplot as plt from matplotlib import rc from sklearn.model_selection import train_test_split from sklearn.metrics import confusion_matrix, classification_report from collections import defaultdict from textwrap import wrap from torch import nn, optim from torch.utils.data import Dataset, DataLoader import torch.nn.functional as F from transformers import BertModel bert_model = BertModel.from_pretrained('bert-base-uncased') bert_model.save_pretrained('C:/Users/Marie-Ange/Downloads') from transformers import Pipeline pipe = pipeline(model="Group209/Sentiment_Analysis") pipe("Sentiment_analysis_with_bert.py") sns.set(style='whitegrid', palette='muted', font_scale=1.2) HAPPY_COLORS_PALETTE = ["#01BEFE", "#FFDD00", "#FF7D00", "#FF006D", "#ADFF02", "#8F00FF"] sns.set_palette(sns.color_palette(HAPPY_COLORS_PALETTE)) rcParams['figure.figsize'] = 12, 8 RANDOM_SEED = 42 np.random.seed(RANDOM_SEED) torch.manual_seed(RANDOM_SEED) device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") !gdown --id 1S6qMioqPJjyBLpLVz4gmRTnJHnjitnuV !gdown --id 1zdmewp7ayS4js4VtrJEHzAheSW-5NBZv df = pd.read_csv("reviews.csv") sns.countplot(x='score', data = df) plt.xlabel('review score'); def to_sentiment(rating): rating = int(rating) if rating <= 2: return 0 elif rating == 3: return 1 else: return 2 df['sentiment'] = df.score.apply(to_sentiment) class_names = ['negative', 'neutral', 'positive'] print(df.sentiment) ax = sns.countplot(x='sentiment', data = df) plt.xlabel('review sentiment') ax.set_xticklabels(class_names); PRE_TRAINED_MODEL_NAME = 'bert-base-uncased' tokenizer = BertTokenizer.from_pretrained(PRE_TRAINED_MODEL_NAME) sample_txt = 'When was I last outside? I am stuck at home for 2 weeks.' tokens = tokenizer.tokenize(sample_txt) token_ids = tokenizer.convert_tokens_to_ids(tokens) print(f' Sentence: {sample_txt}') print(f' Tokens: {tokens}') print(f'Token IDs: {token_ids}') tokenizer.sep_token, tokenizer.sep_token_id tokenizer.cls_token, tokenizer.cls_token_id tokenizer.pad_token, tokenizer.pad_token_id tokenizer.unk_token, tokenizer.unk_token_id encoding = tokenizer.encode_plus( sample_txt, max_length=32, add_special_tokens=True, # Add '[CLS]' and '[SEP]' return_token_type_ids=False, pad_to_max_length=True, return_attention_mask=True, return_tensors='pt', # Return PyTorch tensors ) encoding.keys() print(len(encoding['input_ids'][0])) encoding['input_ids'][0] print(len(encoding['attention_mask'][0])) encoding['attention_mask'] tokenizer.convert_ids_to_tokens(encoding['input_ids'][0]) token_lens = [] for txt in df.content: tokens = tokenizer.encode(txt, max_length=512) token_lens.append(len(tokens)) sns.distplot(token_lens) plt.xlim([0, 256]); plt.xlabel('Token count'); MAX_LEN = 160 class GPReviewDataset(Dataset): def __init__(self, reviews, targets, tokenizer, max_len): self.reviews = reviews self.targets = targets self.tokenizer = tokenizer self.max_len = max_len def __len__(self): return len(self.reviews) def __getitem__(self, item): review = str(self.reviews[item]) target = self.targets[item] encoding = self.tokenizer.encode_plus( review, add_special_tokens=True, max_length=self.max_len, return_token_type_ids=False, pad_to_max_length=True, return_attention_mask=True, return_tensors='pt', ) return { 'review_text': review, 'input_ids': encoding['input_ids'].flatten(), 'attention_mask': encoding['attention_mask'].flatten(), 'targets': torch.tensor(target, dtype=torch.long) } df_train, df_test = train_test_split(df, test_size=0.1, random_state=RANDOM_SEED) df_val, df_test = train_test_split(df_test, test_size=0.5, random_state=RANDOM_SEED) df_train.shape, df_val.shape, df_test.shape def create_data_loader(df, tokenizer, max_len, batch_size): ds = GPReviewDataset( reviews=df.content.to_numpy(), targets=df.sentiment.to_numpy(), tokenizer=tokenizer, max_len=max_len ) return DataLoader( ds, batch_size=batch_size, num_workers=4 ) BATCH_SIZE = 16 train_data_loader = create_data_loader(df_train, tokenizer, MAX_LEN, BATCH_SIZE) val_data_loader = create_data_loader(df_val, tokenizer, MAX_LEN, BATCH_SIZE) test_data_loader = create_data_loader(df_test, tokenizer, MAX_LEN, BATCH_SIZE) data = next(iter(train_data_loader)) data.keys() print(data['input_ids'].shape) print(data['attention_mask'].shape) print(data['targets'].shape) bert_model = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME) last_hidden_state, pooled_output = bert_model( input_ids=encoding['input_ids'], attention_mask=encoding['attention_mask'], return_dict = False ) last_hidden_state.shape bert_model.config.hidden_size pooled_output.shape class SentimentClassifier(nn.Module): def __init__(self, n_classes): super(SentimentClassifier, self).__init__() self.bert = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME) self.drop = nn.Dropout(p=0.3) self.out = nn.Linear(self.bert.config.hidden_size, n_classes) def forward(self, input_ids, attention_mask): returned = self.bert( input_ids=input_ids, attention_mask=attention_mask ) pooled_output = returned["pooler_output"] output = self.drop(pooled_output) return self.out(output) model = SentimentClassifier(len(class_names)) model = model.to(device) input_ids = data['input_ids'].to(device) attention_mask = data['attention_mask'].to(device) print(input_ids.shape) # batch size x seq length print(attention_mask.shape) # batch size x seq length F.softmax(model(input_ids, attention_mask), dim=1) EPOCHS = 6 optimizer = AdamW(model.parameters(), lr=2e-5, correct_bias=False) total_steps = len(train_data_loader) * EPOCHS scheduler = get_linear_schedule_with_warmup( optimizer, num_warmup_steps=0, num_training_steps=total_steps ) loss_fn = nn.CrossEntropyLoss().to(device) def train_epoch( model, data_loader, loss_fn, optimizer, device, scheduler, n_examples ): model = model.train() losses = [] correct_predictions = 0 for d in data_loader: input_ids = d["input_ids"].to(device) attention_mask = d["attention_mask"].to(device) targets = d["targets"].to(device) outputs = model( input_ids=input_ids, attention_mask=attention_mask ) _, preds = torch.max(outputs, dim=1) loss = loss_fn(outputs, targets) correct_predictions += torch.sum(preds == targets) losses.append(loss.item()) loss.backward() nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) optimizer.step() scheduler.step() optimizer.zero_grad() return correct_predictions.double() / n_examples, np.mean(losses) def eval_model(model, data_loader, loss_fn, device, n_examples): model = model.eval() losses = [] correct_predictions = 0 with torch.no_grad(): for d in data_loader: input_ids = d["input_ids"].to(device) attention_mask = d["attention_mask"].to(device) targets = d["targets"].to(device) outputs = model( input_ids=input_ids, attention_mask=attention_mask ) _, preds = torch.max(outputs, dim=1) loss = loss_fn(outputs, targets) correct_predictions += torch.sum(preds == targets) losses.append(loss.item()) return correct_predictions.double() / n_examples, np.mean(losses) %%time history = defaultdict(list) best_accuracy = 0 for epoch in range(EPOCHS): print(f'Epoch {epoch + 1}/{EPOCHS}') print('-' * 10) train_acc, train_loss = train_epoch( model, train_data_loader, loss_fn, optimizer, device, scheduler, len(df_train) ) print(f'Train loss {train_loss} accuracy {train_acc}') val_acc, val_loss = eval_model( model, val_data_loader, loss_fn, device, len(df_val) ) print(f'Val loss {val_loss} accuracy {val_acc}') print() history['train_acc'].append(train_acc) history['train_loss'].append(train_loss) history['val_acc'].append(val_acc) history['val_loss'].append(val_loss) if val_acc > best_accuracy: torch.save(model.state_dict(), 'best_model_state.bin') best_accuracy = val_acc print(history['train_acc']) list_of_train_accuracy= [t.cpu().numpy() for t in history['train_acc']] list_of_train_accuracy print(history['val_acc']) list_of_val_accuracy= [t.cpu().numpy() for t in history['val_acc']] list_of_val_accuracy plt.plot(list_of_train_accuracy, label='train accuracy') plt.plot(list_of_val_accuracy, label='validation accuracy') plt.title('Training history') plt.ylabel('Accuracy') plt.xlabel('Epoch') plt.legend() plt.ylim([0, 1]); test_acc, _ = eval_model( model, test_data_loader, loss_fn, device, len(df_test) ) print(('\n')) print('Test Accuracy : ', test_acc.item()) def get_predictions(model, data_loader): model = model.eval() review_texts = [] predictions = [] prediction_probs = [] real_values = [] with torch.no_grad(): for d in data_loader: texts = d["review_text"] input_ids = d["input_ids"].to(device) attention_mask = d["attention_mask"].to(device) targets = d["targets"].to(device) outputs = model( input_ids=input_ids, attention_mask=attention_mask ) _, preds = torch.max(outputs, dim=1) probs = F.softmax(outputs, dim=1) review_texts.extend(texts) predictions.extend(preds) prediction_probs.extend(probs) real_values.extend(targets) predictions = torch.stack(predictions).cpu() prediction_probs = torch.stack(prediction_probs).cpu() real_values = torch.stack(real_values).cpu() return review_texts, predictions, prediction_probs, real_values y_review_texts, y_pred, y_pred_probs, y_test = get_predictions( model, test_data_loader ) print(classification_report(y_test, y_pred, target_names=class_names)) def show_confusion_matrix(confusion_matrix): hmap = sns.heatmap(confusion_matrix, annot=True, fmt="d", cmap="Blues") hmap.yaxis.set_ticklabels(hmap.yaxis.get_ticklabels(), rotation=0, ha='right') hmap.xaxis.set_ticklabels(hmap.xaxis.get_ticklabels(), rotation=30, ha='right') plt.ylabel('True sentiment') plt.xlabel('Predicted sentiment'); cm = confusion_matrix(y_test, y_pred) df_cm = pd.DataFrame(cm, index=class_names, columns=class_names) show_confusion_matrix(df_cm) idx = 2 review_text = y_review_texts[idx] true_sentiment = y_test[idx] pred_df = pd.DataFrame({ 'class_names': class_names, 'values': y_pred_probs[idx] }) print("\n".join(wrap(review_text))) print() print(f'True sentiment: {class_names[true_sentiment]}') sns.barplot(x='values', y='class_names', data=pred_df, orient='h') plt.ylabel('sentiment') plt.xlabel('probability') plt.xlim([0, 1]); review_text = input("Enter a comment for sentiment analysis: ") encoded_review = tokenizer.encode_plus( review_text, max_length=MAX_LEN, add_special_tokens=True, return_token_type_ids=False, pad_to_max_length=True, return_attention_mask=True, return_tensors='pt', ) input_ids = encoded_review['input_ids'].to(device) attention_mask = encoded_review['attention_mask'].to(device) output = model(input_ids, attention_mask) _, prediction = torch.max(output, dim=1) print(f'Review text: {review_text}') print(f'Sentiment : {class_names[prediction]}') def suggest_improved_text(review_text, model, tokenizer): # Analyse du sentiment du texte d'origine sentiment = analyze_sentiment(review_text, model, tokenizer) # Si le sentiment est négatif ou neutre, générer une version améliorée plus positive if sentiment in ['negative', 'neutral']: # Prétraitement du texte encoded_input = tokenizer.encode_plus( review_text, max_length=MAX_LEN, add_special_tokens=True, return_token_type_ids=False, pad_to_max_length=True, return_attention_mask=True, return_tensors='pt' ) input_ids = encoded_input['input_ids'].to(device) attention_mask = encoded_input['attention_mask'].to(device) outputs = model(input_ids, attention_mask) _, predicted_sentiment = torch.max(outputs, dim=1) improved_text = generate_improved_text(text, predicted_sentiment) return improved_text return review_text def analyze_sentiment(review_text, model, tokenizer): encoded_input = tokenizer.encode_plus( review_text, max_length=MAX_LEN, add_special_tokens=True, return_token_type_ids=False, pad_to_max_length=True, return_attention_mask=True, return_tensors='pt' ) input_ids = encoded_input['input_ids'].to(device) attention_mask = encoded_input['attention_mask'].to(device) outputs = model(input_ids, attention_mask) _, predicted_sentiment = torch.max(outputs, dim=1) return class_names[predicted_sentiment] def generate_improved_text(review_text, predicted_sentiment): positive_words = ["marvellous", "fantastic", "excellent", "admirable", "formidable"] if predicted_sentiment == 0: improved_text = review_text + " " + " ".join(positive_words) else: improved_text = review_text return improved_text