Spaces:
Sleeping
Sleeping
import datetime | |
import numpy as np | |
import pandas as pd | |
import re | |
import json | |
import os | |
import glob | |
import torch | |
import torch.nn.functional as F | |
from torch.optim import Adam | |
from tqdm import tqdm | |
from torch import nn | |
from transformers import BertModel | |
from transformers import AutoTokenizer | |
import argparse | |
def split_essay_to_sentence(origin_essay): | |
origin_essay_sentence = sum([[a.strip() for a in i.split('.')] for i in origin_essay.split('\n')], []) | |
essay_sent = [a for a in origin_essay_sentence if len(a) > 0] | |
return essay_sent | |
def get_first_extraction(text_sentence): | |
row_dict = {} | |
for row in tqdm(text_sentence): | |
question = 'what is the feeling?' | |
answer = question_answerer(question=question, context=row) | |
row_dict[row] = answer | |
return row_dict | |
def get_sent_labeldata(): | |
label =pd.read_csv('./rawdata/sentimental_label.csv', encoding = 'cp949', header = None) | |
label[1] = label[1].apply(lambda x : re.findall(r'[๊ฐ-ํฃ]+', x)[0]) | |
label_dict =label[label.index % 10 == 0].set_index(0).to_dict()[1] | |
emo2idx = {v : k for k, v in enumerate(label_dict.items())} | |
idx2emo = {v : k[1] for k, v in emo2idx.items()} | |
return emo2idx, idx2emo | |
# def load_model(): | |
# class BertClassifier(nn.Module): | |
# def __init__(self, dropout = 0.3): | |
# super(BertClassifier, self).__init__() | |
# self.bert= BertModel.from_pretrained('bert-base-multilingual-cased') | |
# self.dropout = nn.Dropout(dropout) | |
# self.linear = nn.Linear(768, 6) | |
# self.relu = nn.ReLU() | |
# def forward(self, input_id, mask): | |
# _, pooled_output = self.bert(input_ids = input_id, attention_mask = mask, return_dict = False) | |
# dropout_output = self.dropout(pooled_output) | |
# linear_output = self.linear(dropout_output) | |
# final_layer= self.relu(linear_output) | |
# return final_layer | |
# tokenizer = AutoTokenizer.from_pretrained('bert-base-multilingual-cased') | |
# device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
# cls_model = BertClassifier() | |
# criterion = nn.CrossEntropyLoss() | |
# model_name = 'bert-base-multilingual-cased' | |
# PATH = './model' + '/' + model_name + '_' + '2023102410' | |
# print(PATH) | |
# cls_model = torch.load(PATH) | |
# #cls_model.load_state_dict(torch.load(PATH)) | |
# return tokenizer, cls_model | |
class myDataset_for_infer(torch.utils.data.Dataset): | |
def __init__(self, X): | |
self.X = X | |
def __len__(self): | |
return len(self.X) | |
def __getitem__(self,idx): | |
sentences = tokenizer(self.X[idx], return_tensors = 'pt', padding = 'max_length', max_length = 96, truncation = True) | |
return sentences | |
def infer_data(model, main_feeling_keyword): | |
#ds = myDataset_for_infer() | |
df_infer = myDataset_for_infer(main_feeling_keyword) | |
infer_dataloader = torch.utils.data.DataLoader(df_infer, batch_size= 16) | |
device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
if device == 'cuda': | |
model = model.cuda() | |
result_list = [] | |
with torch.no_grad(): | |
for idx, infer_input in tqdm(enumerate(infer_dataloader)): | |
mask = infer_input['attention_mask'].to(device) | |
input_id = infer_input['input_ids'].squeeze(1).to(device) | |
output = clsmodel(input_id, mask) | |
result = np.argmax(output.logits, axis=1).numpy() | |
result_list.extend(result) | |
return result_list | |
def get_word_emotion_pair(cls_model, origin_essay_sentence, idx2emo): | |
import re | |
def get_noun(sent): | |
return [re.sub(r'[์๋ฅผ]+', '', vocab) for (vocab, pos) in nlp(sent) if len(vocab) > 1 and pos == 'NOUN'] | |
def get_adj(sent): | |
return [re.sub(r'[์๋ฅผ]+', '', vocab) for (vocab, pos) in nlp(sent) if len(vocab) > 1 and pos == 'ADJ'] | |
def get_verb(sent): | |
return [re.sub(r'[์๋ฅผ]+', '', vocab) for (vocab, pos) in nlp(sent) if len(vocab) > 1 and pos == 'VERB'] | |
result_list = infer_data(cls_model, origin_essay_sentence) | |
final_result = pd.DataFrame(data = {'text': origin_essay_sentence , 'label' : result_list}) | |
final_result['emotion'] = final_result['label'].map(idx2emo) | |
nlp=lambda x:[(x[t["start"]:t["end"]],t["entity_group"]) for t in pipeline(x)] | |
#essay_sent_pos = [nlp(i) for i in tqdm(essay_sent)] | |
#final_result['text_pos'] = essay_sent_pos | |
final_result['noun_list'] = final_result['text'].map(get_noun) | |
final_result['adj_list'] = final_result['text'].map(get_adj) | |
final_result['verb_list'] = final_result['text'].map(get_verb) | |
final_result['title'] = 'none' | |
file_made_dt = datetime.datetime.now() | |
file_made_dt_str = datetime.datetime.strftime(file_made_dt, '%Y%m%d_%H%M%d') | |
os.makedirs(f'./result/{file_made_dt_str}/', exist_ok = True) | |
final_result.to_csv(f"./result/{file_made_dt_str}/essay_result.csv", index = False) | |
return final_result, file_made_dt_str | |
def get_essay_base_analysis(file_made_dt_str): | |
essay1 = pd.read_csv(f"./result/{file_made_dt_str}/essay_result.csv") | |
essay1['noun_list_len'] = essay1['noun_list'].apply(lambda x : len(x)) | |
essay1['noun_list_uniqlen'] = essay1['noun_list'].apply(lambda x : len(set(x))) | |
essay1['adj_list_len'] = essay1['adj_list'].apply(lambda x : len(x)) | |
essay1['adj_list_uniqlen'] = essay1['adj_list'].apply(lambda x : len(set(x))) | |
essay1['vocab_all'] = essay1[['noun_list','adj_list']].apply(lambda x : sum((eval(x[0]),eval(x[1])), []), axis=1) | |
essay1['vocab_cnt'] = essay1['vocab_all'].apply(lambda x : len(x)) | |
essay1['vocab_unique_cnt'] = essay1['vocab_all'].apply(lambda x : len(set(x))) | |
essay1['noun_list'] = essay1['noun_list'].apply(lambda x : eval(x)) | |
essay1['adj_list'] = essay1['adj_list'].apply(lambda x : eval(x)) | |
d = essay1.groupby('title')[['noun_list','adj_list']].sum([]).reset_index() | |
d['noun_cnt'] = d['noun_list'].apply(lambda x : len(set(x))) | |
d['adj_cnt'] = d['adj_list'].apply(lambda x : len(set(x))) | |
# ๋ฌธ์ฅ ๊ธฐ์ค ์ต๊ณ ๊ฐ์ | |
essay_summary =essay1.groupby(['title'])['emotion'].value_counts().unstack(level =1) | |
emo_vocab_dict = {} | |
for k, v in essay1[['emotion','noun_list']].values: | |
for vocab in v: | |
if (k, 'noun', vocab) not in emo_vocab_dict: | |
emo_vocab_dict[(k, 'noun', vocab)] = 0 | |
emo_vocab_dict[(k, 'noun', vocab)] += 1 | |
for k, v in essay1[['emotion','adj_list']].values: | |
for vocab in v: | |
if (k, 'adj', vocab) not in emo_vocab_dict: | |
emo_vocab_dict[(k, 'adj', vocab)] = 0 | |
emo_vocab_dict[(k, 'adj', vocab)] += 1 | |
vocab_emo_cnt_dict = {} | |
for k, v in essay1[['emotion','noun_list']].values: | |
for vocab in v: | |
if (vocab, 'noun') not in vocab_emo_cnt_dict: | |
vocab_emo_cnt_dict[('noun', vocab)] = {} | |
if k not in vocab_emo_cnt_dict[( 'noun', vocab)]: | |
vocab_emo_cnt_dict[( 'noun', vocab)][k] = 0 | |
vocab_emo_cnt_dict[('noun', vocab)][k] += 1 | |
for k, v in essay1[['emotion','adj_list']].values: | |
for vocab in v: | |
if ('adj', vocab) not in vocab_emo_cnt_dict: | |
vocab_emo_cnt_dict[( 'adj', vocab)] = {} | |
if k not in vocab_emo_cnt_dict[( 'adj', vocab)]: | |
vocab_emo_cnt_dict[( 'adj', vocab)][k] = 0 | |
vocab_emo_cnt_dict[('adj', vocab)][k] += 1 | |
vocab_emo_cnt_df = pd.DataFrame(vocab_emo_cnt_dict).T | |
vocab_emo_cnt_df['total'] = vocab_emo_cnt_df.sum(axis=1) | |
# ๋จ์ด๋ณ ์ต๊ณ ๊ฐ์ ๋ฐ ๊ฐ์ ๊ฐ์ | |
all_result=vocab_emo_cnt_df.sort_values(by = 'total', ascending = False) | |
# ๋จ์ด๋ณ ์ต๊ณ ๊ฐ์ ๋ฐ ๊ฐ์ ๊ฐ์ , ํ์ฉ์ฌ ํฌํจ ์ | |
adj_result=vocab_emo_cnt_df.sort_values(by = 'total', ascending = False) | |
# ๋ช ์ฌ๋ง ์ฌ์ฉ ์ | |
noun_result=vocab_emo_cnt_df[vocab_emo_cnt_df.index.get_level_values(0) == 'noun'].sort_values(by = 'total', ascending = False) | |
final_file_name = f"essay_all_vocab_result.csv" | |
adj_file_name = f"essay_adj_vocab_result.csv" | |
noun_file_name = f"essay_noun_vocab_result.csv" | |
os.makedirs(f'./result/{file_made_dt_str}/', exist_ok = True) | |
final_result.to_csv(f"./result/{file_made_dt_str}/essay_all_vocab_result.csv", index = False) | |
adj_result.to_csv(f"./result/{file_made_dt_str}/essay_adj_vocab_result.csv", index = False) | |
noun_result.to_csv(f"./result/{file_made_dt_str}/essay_noun_vocab_result.csv", index = False) | |
return final_result, adj_result, noun_result, essay_summary, file_made_dt_str | |
from transformers import pipeline | |
#model_name = 'AlexKay/xlm-roberta-large-qa-multilingual-finedtuned-ru' | |
model_name = 'monologg/koelectra-base-v2-finetuned-korquad' | |
question_answerer = pipeline("question-answering", model=model_name) | |
from transformers import AutoTokenizer,AutoModelForTokenClassification,TokenClassificationPipeline | |
tokenizer=AutoTokenizer.from_pretrained("KoichiYasuoka/roberta-large-korean-upos") | |
posmodel=AutoModelForTokenClassification.from_pretrained("KoichiYasuoka/roberta-large-korean-upos") | |
pipeline=TokenClassificationPipeline(tokenizer=tokenizer,model=posmodel,aggregation_strategy="simple") | |
nlp=lambda x:[(x[t["start"]:t["end"]],t["entity_group"]) for t in pipeline(x)] | |
from transformers import AutoModelForSequenceClassification | |
device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
tokenizer = AutoTokenizer.from_pretrained('bert-base-multilingual-cased') | |
def all_process(origin_essay): | |
essay_sent =split_essay_to_sentence(origin_essay) | |
row_dict = {} | |
for row in tqdm(essay_sent): | |
question = 'what is the feeling?' | |
answer = question_answerer(question=question, context=row) | |
row_dict[row] = answer | |
emo2idx, idx2emo = get_sent_labeldata() | |
tokenizer = AutoTokenizer.from_pretrained('bert-base-multilingual-cased') | |
cls_model = AutoModelForSequenceClassification.from_pretrain ed('seriouspark/bert-base-multilingual-cased-finetuning-sentimental-6label') | |
#cls_model = AutoModelForSequenceClassification.from_pretrained('bert-base-multilingual-cased', num_labels = 6) | |
final_result, file_name_dt = get_word_emotion_pair(cls_model, essay_sent) | |
all_result, adj_\result, noun_result, essay_summary, file_made_dt_str = get_essay_base_analysis(file_name_dt) | |
summary_result = pd.concat([adj_result, noun_result]).fillna(0).sort_values(by = 'total', ascending = False).fillna(0).reset_index()[:30] | |
with open(f'./result/{file_name_dt}/summary.json','w') as f: | |
json.dump( essay_summary.to_json(),f) | |
with open(f'./result/{file_made_dt_str}/all_result.json','w') as f: | |
json.dump( all_result.to_json(),f) | |
with open(f'./result/{file_made_dt_str}/adj_result.json','w') as f: | |
json.dump( adj_result.to_json(),f) | |
with open(f'./result/{file_made_dt_str}/noun_result.json','w') as f: | |
json.dump( noun_result.to_json(),f) | |
return essay_summary | |
import gradio as gr | |
outputs = [gr.Dataframe(row_count = (6, "dynamic"), | |
col_count=(2, "dynamic"), | |
label="Essay Summary based on Words") | |
#headers=['type','word','์ฌํ', '๋ถ๋ ธ', '๊ธฐ์จ', '๋ถ์', '์์ฒ', '๋นํฉ', 'total']) | |
] | |
#row_count = (10, "dynamic"), | |
#col_count=(9, "dynamic"), | |
#label="Results", | |
#headers=['type','word','์ฌํ', '๋ถ๋ ธ', '๊ธฐ์จ', '๋ถ์', '์์ฒ', '๋นํฉ', 'total']) | |
#] | |
iface = gr.Interface( | |
fn=all_process, | |
inputs = gr.Textbox(lines=2, placeholder= '๋น์ ์ ๊ธ์ ๋ฃ์ด๋ณด์ธ์'), | |
outputs = outputs, | |
) | |
iface.launch(share=True) |