import re import joblib import pickle import numpy as np import pandas as pd import tensorflow as tf from typing import Optional, Union, Tuple from gensim.models import Word2Vec from transformers import BertTokenizer from transformers import BertForSequenceClassification, Trainer, TrainingArguments, BertModel from transformers.modeling_outputs import SequenceClassifierOutput from torch.nn import MSELoss, CrossEntropyLoss, BCEWithLogitsLoss from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, classification_report import torch.nn.functional as F import torch import time from torch import nn from transformers import Trainer from transformers import AutoModel, AutoTokenizer from sklearn.metrics import classification_report, confusion_matrix, accuracy_score NUM_CLASSES = 3 # 분류 클래스 수 DROP_OUT = 0.3 # 원하는 dropout 확률 class SentimentDataset(torch.utils.data.Dataset): def __init__(self, encodings, labels=None): self.encodings = encodings self.labels = labels def __getitem__(self, idx): item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()} if self.labels: item['labels'] = torch.tensor(self.labels[idx]) return item def __len__(self): return len(self.encodings["input_ids"]) class CustomBertForSequenceClassification(BertForSequenceClassification): def __init__(self, config): super().__init__(config) self.num_labels = config.num_labels self.config = config self.bert = BertModel(config) classifier_dropout = ( config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob ) self.dropout = nn.Dropout(classifier_dropout) # 하기 방식으로 대체한다. #self.classifier = nn.Linear(config.hidden_size, config.num_labels) # https://github.com/KisuYang/EmotionX-KU/blob/master/models.py self.linear_h = nn.Linear(config.hidden_size, 384) self.linear_o = nn.Linear(384, config.num_labels) self.selu = nn.SELU() print("hidden_size:", config.hidden_size, "num_lables:", config.num_labels) # Initialize weights and apply final processing self.post_init() def forward( self, input_ids: Optional[torch.Tensor] = None, attention_mask: Optional[torch.Tensor] = None, token_type_ids: Optional[torch.Tensor] = None, position_ids: Optional[torch.Tensor] = None, head_mask: Optional[torch.Tensor] = None, inputs_embeds: Optional[torch.Tensor] = None, labels: Optional[torch.Tensor] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, ) -> Union[Tuple[torch.Tensor], SequenceClassifierOutput]: r""" labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*): Labels for computing the sequence classification/regression loss. Indices should be in `[0, ..., config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If `config.num_labels > 1` a classification loss is computed (Cross-Entropy). """ return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.bert( input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids, position_ids=position_ids, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) # outputs[0]: batch_size(16), feature_size(38), hidden_size(768) # outputs[1]: batch_size(16), hidden_size(768) # BertModel 의 출력중 Pooled Output 출력을 취한다. pooled_output = outputs[1] # Dropout 전에 https://github.com/KisuYang/EmotionX-KU/blob/master/models.py 방식으로 레이어를 추가한다. pooled_output = self.selu(self.linear_h(pooled_output)) # Dropout 적용 pooled_output = self.dropout(pooled_output) # Linear layer를 통과시켜 num_labels 에 해당하는 출력을 생성한다. #logits = self.classifier(pooled_output) logits = self.linear_o(pooled_output) loss = None if labels is not None: if self.config.problem_type is None: if self.num_labels == 1: self.config.problem_type = "regression" elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int): self.config.problem_type = "single_label_classification" else: self.config.problem_type = "multi_label_classification" if self.config.problem_type == "regression": loss_fct = MSELoss() if self.num_labels == 1: loss = loss_fct(logits.squeeze(), labels.squeeze()) else: loss = loss_fct(logits, labels) elif self.config.problem_type == "single_label_classification": loss_fct = CrossEntropyLoss() loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) elif self.config.problem_type == "multi_label_classification": loss_fct = BCEWithLogitsLoss() loss = loss_fct(logits, labels) if not return_dict: output = (logits,) + outputs[2:] return ((loss,) + output) if loss is not None else output return SequenceClassifierOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) def train_model(model_name, X_train, X_test, y_train, y_test, epochs=2, train_batch_size=8, eval_batch_size=16, use_emotion_x=False): tokenizer = BertTokenizer.from_pretrained(model_name) train_encodings = tokenizer(X_train, truncation=True, padding=True) train_dataset = SentimentDataset(train_encodings, y_train) test_encodings = tokenizer(X_test, truncation=True, padding=True) test_dataset = SentimentDataset(test_encodings, y_test) print(train_dataset[1]['input_ids'].shape) print(train_dataset[1]['attention_mask'].shape) training_args = TrainingArguments( output_dir='./results', # output 저장 directory num_train_epochs=epochs, # total number of training epochs per_device_train_batch_size=train_batch_size, # batch size per device during training per_device_eval_batch_size=eval_batch_size, # batch size per device during evaluation warmup_steps = 500, # number of warmup steps for learning rate scheduler weight_decay = 0.01, # weight decay 강도 logging_dir='./logs', # log 저장 directory logging_steps=10, do_eval=True ) if use_emotion_x == True: model = CustomBertForSequenceClassification.from_pretrained(model_name, num_labels=NUM_CLASSES).to('cuda') else: model = BertForSequenceClassification.from_pretrained(model_name, num_labels=NUM_CLASSES).to('cuda') trainer = Trainer( model = model, args = training_args, train_dataset = train_dataset, eval_dataset = test_dataset ) s = time.time() trainer.train() trainer.evaluate(test_dataset) prediction = trainer.predict(test_dataset) y_logit = torch.tensor(prediction[0]) y_pred = F.softmax(y_logit, dim=-1).argmax(axis=1).numpy() print(classification_report(y_test, y_pred)) print(confusion_matrix(y_test, y_pred)) print(accuracy_score(y_test, y_pred)) return trainer , tokenizer def test_trainer(trainer, tokenizer): POSITIVE = 0 NEGATIVE = 1 NEUTRAL = 2 idx_to_label = {POSITIVE:'positive', NEGATIVE:'negative', NEUTRAL:'neutral'} test_dict = { '오늘 짜증 지대로네': NEGATIVE, '통장이 텅텅 비었음': NEGATIVE, '경제 사정이 좀 나아져서 좋네요': POSITIVE, '국가간 관계가 악화되고 있어요': NEGATIVE, '한국과 일본은 사이가 안좋아요.': NEGATIVE, '실패는 성공의 어머니이다.': POSITIVE, '날씨가 따뜻해서 마음이 편안해요.': POSITIVE, '주머니 사정이 파산 직전임' : NEGATIVE, '너무 걱정말고 힘내!' : POSITIVE, '아 진짜! 짜증나게 굴지말고 저리가!' : NEGATIVE, '인생이 피곤하다.' : NEGATIVE, '따뜻한 말씀 감사합니다.' :POSITIVE, '바보같은 놈들 한심하네' :NEGATIVE, '그 말이 저를 너무 힘들게 하네요' : NEGATIVE, '울지말고 힘내':POSITIVE, '눈물이 멈추질 않아요':NEGATIVE, '새로운 사장님은 진취적인 분이라 기대가 된다':POSITIVE, '오늘 할일이 태산이네':NEUTRAL, '할일이 너무 많지만 꾸역꾸역 하고 있어':NEUTRAL, '배가 고프네요':NEUTRAL, '집에 가고 싶네요':NEUTRAL, '코코아 한잔 하실래요?':NEUTRAL, '컴퓨터 바꿔주세요.':NEUTRAL, '한대 맞을래?': NEGATIVE, '신나는 여행을 생각하니 기분이 좋습니다':POSITIVE, '배고픈데 밥이 없어요.':NEGATIVE, '국가 경제가 파탄 나는 중이다.':NEGATIVE, '너때문에 내가 너무 힘들어':NEGATIVE, '그래도 니가 있어서 다행이야':POSITIVE, '암울한 경제 사정에도 열심히 해줘서 고마워요':POSITIVE, '오늘 기분 짱이에요':POSITIVE, '너는 대체 할 줄 아는게 뭐니?':NEGATIVE, '숙제가 너무 어려워 미치겠다':NEGATIVE, '우리 팀원들 열심히 해줘서 자랑스럽습니다':POSITIVE, 'Wow! 영화 진짜 재미있네':POSITIVE, 'ㅠㅠ 힘들어 죽을거 같아요':NEGATIVE, '이번 여행코스는 정말 환상적이네요':POSITIVE, '답답한 상황이지만 넌 이겨낼 수 있을꺼야':POSITIVE, '답답한 상황이지만 넌 잘 해낼 수 있을꺼야':POSITIVE, '언제나 곁에 있어줘서 힘이 됩니다.':POSITIVE, '몸이 너무 아파서 일이 손에 안잡혀요':NEGATIVE, '너 정말 잘한다 리스펙!':POSITIVE, '슬프지만 괜챦아':POSITIVE, '개빡치네 진짜':NEGATIVE, '비가 너무 많이 와서 집이 떠내려갔어요':NEGATIVE, '햇빛이 쨍쨍해서 옷이 잘 마르네요':POSITIVE, 'AI공부는 어렵지만 재미있어요':POSITIVE, '널 어쩌면 좋냐? 한숨밖에 안나온다':NEGATIVE, '도대체 무슨 생각으로 이런 짓을 한거야?':NEGATIVE, '미워도 다시 한번':POSITIVE, '도움 말씀 감사합니다':POSITIVE, '말도 안되는 소리 그만하고 저리가':NEGATIVE, '오늘 커피챗 분위기 굿':POSITIVE, '기분 나빠서 너랑 얘기하기 싫어':NEGATIVE, '이 그림 너무 마음에 든다':POSITIVE, '어이가 없어서 할 말이 없어':NEGATIVE, '동료 직원이 퇴사 인사를 했는데 씁쓸한 마음이 드네':NEGATIVE, '팀원이 아이디어 검토를 요청했는데 너무 좋은 아이디어 같아. 의견을 물어봐줘서 고마워':POSITIVE, '성격이 좋은 팀원들과 함께 할 수 있어서 다행이야':POSITIVE, '금요일만 되면 기분이 좋아져':POSITIVE, '벌써 일요일이라니 출근할 생각하니 급 다운된다.':NEGATIVE, '짜증나니까 얘기하지마!':NEGATIVE, '너무 심심해.':NEUTRAL, '똑똑한 사람이랑 대화하는건 즐거워요':POSITIVE, '당신은 항상 웃는 얼굴이어서 만나면 기분이 좋아져요':POSITIVE, '연설이 너무 따분해서 하품이 나와요':NEGATIVE, '맛있는 식당에 갈 생각을 하니 신나요':POSITIVE, '이런 훌륭한 강의를 듣게 되서 영광입니다.':POSITIVE, '만나뵙게 되서 반갑습니다.':POSITIVE, '그 사람만 만나면 짜증이 나서 보기가 싫어':NEGATIVE, '아이들이 활기차게 뛰어노는 모습이 보기 좋아요':POSITIVE, '한심한 소리좀 그만할 수 없어요?':NEGATIVE, '웃기고 자빠졌네!':NEGATIVE, '휴! 십년 감수했네!':NEUTRAL, '말같지도 않은 소리하고 있어! ㅅㅂ':NEGATIVE, '입에서 욕이 자동으로 나온다...':NEGATIVE, '입만 열면 거짓말이 자동으로 나와!':NEGATIVE, '저거 바보 아냐?':NEGATIVE, '힘들때 곁에 있어줘서 고마워':POSITIVE, '아이큐가 소숫점 이하 같아':NEGATIVE, '저런 모지리 같으니라고':NEGATIVE, '지지리 못난 놈':NEGATIVE, '저 인간 때문에 내가 제 명에 못살것 같아':NEGATIVE, '저 새끼 죽여':NEGATIVE, '넌 정말 천사같아':POSITIVE, '당신이 좋아요 항상 곁에 있어주세요':POSITIVE, '꼴도 보기 싫으니 썩 꺼져':NEGATIVE, '아 진짜 돌아버리겠네':NEGATIVE, '역겨운 놈들':NEGATIVE, '저런 미인을 보니 안구가 정화되는 느낌이야':POSITIVE, '아오 ㅅㅂ 눈 썩는다':NEGATIVE, '깝치지마 뒤질래?':NEGATIVE, '언제든 환영이에요':POSITIVE, '줘 패버리고 싶네 진짜':NEGATIVE, '애기만 보면 웃음이 나와':POSITIVE, '하는 짓 보면 저능아 같아':NEGATIVE, '칭챙춍':NEGATIVE, '왓더뻑':NEGATIVE, '이 빡대가리야':NEGATIVE, '돌대가리 자식':NEGATIVE, '너도 자식이라고 낳은 니 엄마가 불쌍':NEGATIVE, '예쁜 공주님이에요 축하해요':POSITIVE, '씩씩한 왕자님이에요. 좋으시겠어요.':POSITIVE, '얼씨구 좋다':POSITIVE, '하늘이 무너지는 기분이야':NEGATIVE, '하늘을 나는 기분이야':POSITIVE, '아자아자 화이팅!':POSITIVE, '개새끼':NEGATIVE, '아주 나이스':POSITIVE, '답도 없는 인간들':NEGATIVE, '정말 여긴 저능아 집단 같아':NEGATIVE, '만나서 반가워요. 정말 미인이시네요':POSITIVE, '당신이 그리워요. 보고싶어요.':POSITIVE, '바라만 봐도 웃음이 나와요':POSITIVE, '개 열받네':NEGATIVE, '레알 대박 ㅋㅋㅋ':POSITIVE, '너무 보고싶었어요. 이렇게 만나게되서 반갑습니다.':POSITIVE, '친구야 사랑해':POSITIVE, '이 바보 자식아':NEGATIVE, '오늘은 날씨가 참 좋네요. 기분이 상쾌해요.':POSITIVE, '속상해서 밥이 안넘어간다.': NEGATIVE, '마음이 울적해서 길을 나섰네':NEGATIVE, '오늘은 인생 최고의 날': POSITIVE, '이 훌륭한 일에 동참하게 되서 영광입니다.':POSITIVE, '나 집에서 자는 중':NEUTRAL, } hit_cnt = 0 tot_cnt = len(test_dict) for x, y in test_dict.items(): tokenized = tokenizer([x], truncation=True, padding=True) pred = trainer.predict(SentimentDataset(tokenized)) logit = torch.tensor(pred[0]) result = F.softmax(logit, dim=-1).argmax(1).numpy() if result[0] != y: print(f"ERROR: {x} expected:{idx_to_label[y]} result:{idx_to_label[result[0]]}") else: hit_cnt += 1 print() print(f"hit/total: {hit_cnt}/{tot_cnt}, rate: {hit_cnt/tot_cnt}")