moogeulmoogeul / app.py
seriouspark's picture
Update app.py
9e83131
raw
history blame
11.7 kB
import datetime
import numpy as np
import pandas as pd
import re
import json
import os
import glob
import torch
import torch.nn.functional as F
from torch.optim import Adam
from tqdm import tqdm
from torch import nn
from transformers import BertModel
from transformers import AutoTokenizer
import argparse
def split_essay_to_sentence(origin_essay):
origin_essay_sentence = sum([[a.strip() for a in i.split('.')] for i in origin_essay.split('\n')], [])
essay_sent = [a for a in origin_essay_sentence if len(a) > 0]
return essay_sent
def get_first_extraction(text_sentence):
row_dict = {}
for row in tqdm(text_sentence):
question = 'what is the feeling?'
answer = question_answerer(question=question, context=row)
row_dict[row] = answer
return row_dict
def get_sent_labeldata():
label =pd.read_csv('./rawdata/sentimental_label.csv', encoding = 'cp949', header = None)
label[1] = label[1].apply(lambda x : re.findall(r'[๊ฐ€-ํžฃ]+', x)[0])
label_dict =label[label.index % 10 == 0].set_index(0).to_dict()[1]
emo2idx = {v : k for k, v in enumerate(label_dict.items())}
idx2emo = {v : k[1] for k, v in emo2idx.items()}
return emo2idx, idx2emo
# def load_model():
# class BertClassifier(nn.Module):
# def __init__(self, dropout = 0.3):
# super(BertClassifier, self).__init__()
# self.bert= BertModel.from_pretrained('bert-base-multilingual-cased')
# self.dropout = nn.Dropout(dropout)
# self.linear = nn.Linear(768, 6)
# self.relu = nn.ReLU()
# def forward(self, input_id, mask):
# _, pooled_output = self.bert(input_ids = input_id, attention_mask = mask, return_dict = False)
# dropout_output = self.dropout(pooled_output)
# linear_output = self.linear(dropout_output)
# final_layer= self.relu(linear_output)
# return final_layer
# tokenizer = AutoTokenizer.from_pretrained('bert-base-multilingual-cased')
# device = 'cuda' if torch.cuda.is_available() else 'cpu'
# cls_model = BertClassifier()
# criterion = nn.CrossEntropyLoss()
# model_name = 'bert-base-multilingual-cased'
# PATH = './model' + '/' + model_name + '_' + '2023102410'
# print(PATH)
# cls_model = torch.load(PATH)
# #cls_model.load_state_dict(torch.load(PATH))
# return tokenizer, cls_model
class myDataset_for_infer(torch.utils.data.Dataset):
def __init__(self, X):
self.X = X
def __len__(self):
return len(self.X)
def __getitem__(self,idx):
sentences = tokenizer(self.X[idx], return_tensors = 'pt', padding = 'max_length', max_length = 96, truncation = True)
return sentences
def infer_data(model, main_feeling_keyword):
#ds = myDataset_for_infer()
df_infer = myDataset_for_infer(main_feeling_keyword)
infer_dataloader = torch.utils.data.DataLoader(df_infer, batch_size= 16)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
if device == 'cuda':
model = model.cuda()
result_list = []
with torch.no_grad():
for idx, infer_input in tqdm(enumerate(infer_dataloader)):
mask = infer_input['attention_mask'].to(device)
input_id = infer_input['input_ids'].squeeze(1).to(device)
output = clsmodel(input_id, mask)
result = np.argmax(output.logits, axis=1).numpy()
result_list.extend(result)
return result_list
def get_word_emotion_pair(cls_model, origin_essay_sentence, idx2emo):
import re
def get_noun(sent):
return [re.sub(r'[์„๋ฅผ]+', '', vocab) for (vocab, pos) in nlp(sent) if len(vocab) > 1 and pos == 'NOUN']
def get_adj(sent):
return [re.sub(r'[์„๋ฅผ]+', '', vocab) for (vocab, pos) in nlp(sent) if len(vocab) > 1 and pos == 'ADJ']
def get_verb(sent):
return [re.sub(r'[์„๋ฅผ]+', '', vocab) for (vocab, pos) in nlp(sent) if len(vocab) > 1 and pos == 'VERB']
result_list = infer_data(cls_model, origin_essay_sentence)
final_result = pd.DataFrame(data = {'text': origin_essay_sentence , 'label' : result_list})
final_result['emotion'] = final_result['label'].map(idx2emo)
nlp=lambda x:[(x[t["start"]:t["end"]],t["entity_group"]) for t in pipeline(x)]
#essay_sent_pos = [nlp(i) for i in tqdm(essay_sent)]
#final_result['text_pos'] = essay_sent_pos
final_result['noun_list'] = final_result['text'].map(get_noun)
final_result['adj_list'] = final_result['text'].map(get_adj)
final_result['verb_list'] = final_result['text'].map(get_verb)
final_result['title'] = 'none'
file_made_dt = datetime.datetime.now()
file_made_dt_str = datetime.datetime.strftime(file_made_dt, '%Y%m%d_%H%M%d')
os.makedirs(f'./result/{file_made_dt_str}/', exist_ok = True)
final_result.to_csv(f"./result/{file_made_dt_str}/essay_result.csv", index = False)
return final_result, file_made_dt_str
def get_essay_base_analysis(file_made_dt_str):
essay1 = pd.read_csv(f"./result/{file_made_dt_str}/essay_result.csv")
essay1['noun_list_len'] = essay1['noun_list'].apply(lambda x : len(x))
essay1['noun_list_uniqlen'] = essay1['noun_list'].apply(lambda x : len(set(x)))
essay1['adj_list_len'] = essay1['adj_list'].apply(lambda x : len(x))
essay1['adj_list_uniqlen'] = essay1['adj_list'].apply(lambda x : len(set(x)))
essay1['vocab_all'] = essay1[['noun_list','adj_list']].apply(lambda x : sum((eval(x[0]),eval(x[1])), []), axis=1)
essay1['vocab_cnt'] = essay1['vocab_all'].apply(lambda x : len(x))
essay1['vocab_unique_cnt'] = essay1['vocab_all'].apply(lambda x : len(set(x)))
essay1['noun_list'] = essay1['noun_list'].apply(lambda x : eval(x))
essay1['adj_list'] = essay1['adj_list'].apply(lambda x : eval(x))
d = essay1.groupby('title')[['noun_list','adj_list']].sum([]).reset_index()
d['noun_cnt'] = d['noun_list'].apply(lambda x : len(set(x)))
d['adj_cnt'] = d['adj_list'].apply(lambda x : len(set(x)))
# ๋ฌธ์žฅ ๊ธฐ์ค€ ์ตœ๊ณ  ๊ฐ์ •
essay_summary =essay1.groupby(['title'])['emotion'].value_counts().unstack(level =1)
emo_vocab_dict = {}
for k, v in essay1[['emotion','noun_list']].values:
for vocab in v:
if (k, 'noun', vocab) not in emo_vocab_dict:
emo_vocab_dict[(k, 'noun', vocab)] = 0
emo_vocab_dict[(k, 'noun', vocab)] += 1
for k, v in essay1[['emotion','adj_list']].values:
for vocab in v:
if (k, 'adj', vocab) not in emo_vocab_dict:
emo_vocab_dict[(k, 'adj', vocab)] = 0
emo_vocab_dict[(k, 'adj', vocab)] += 1
vocab_emo_cnt_dict = {}
for k, v in essay1[['emotion','noun_list']].values:
for vocab in v:
if (vocab, 'noun') not in vocab_emo_cnt_dict:
vocab_emo_cnt_dict[('noun', vocab)] = {}
if k not in vocab_emo_cnt_dict[( 'noun', vocab)]:
vocab_emo_cnt_dict[( 'noun', vocab)][k] = 0
vocab_emo_cnt_dict[('noun', vocab)][k] += 1
for k, v in essay1[['emotion','adj_list']].values:
for vocab in v:
if ('adj', vocab) not in vocab_emo_cnt_dict:
vocab_emo_cnt_dict[( 'adj', vocab)] = {}
if k not in vocab_emo_cnt_dict[( 'adj', vocab)]:
vocab_emo_cnt_dict[( 'adj', vocab)][k] = 0
vocab_emo_cnt_dict[('adj', vocab)][k] += 1
vocab_emo_cnt_df = pd.DataFrame(vocab_emo_cnt_dict).T
vocab_emo_cnt_df['total'] = vocab_emo_cnt_df.sum(axis=1)
# ๋‹จ์–ด๋ณ„ ์ตœ๊ณ  ๊ฐ์ • ๋ฐ ๊ฐ์ • ๊ฐœ์ˆ˜
all_result=vocab_emo_cnt_df.sort_values(by = 'total', ascending = False)
# ๋‹จ์–ด๋ณ„ ์ตœ๊ณ  ๊ฐ์ • ๋ฐ ๊ฐ์ • ๊ฐœ์ˆ˜ , ํ˜•์šฉ์‚ฌ ํฌํ•จ ์‹œ
adj_result=vocab_emo_cnt_df.sort_values(by = 'total', ascending = False)
# ๋ช…์‚ฌ๋งŒ ์‚ฌ์šฉ ์‹œ
noun_result=vocab_emo_cnt_df[vocab_emo_cnt_df.index.get_level_values(0) == 'noun'].sort_values(by = 'total', ascending = False)
final_file_name = f"essay_all_vocab_result.csv"
adj_file_name = f"essay_adj_vocab_result.csv"
noun_file_name = f"essay_noun_vocab_result.csv"
os.makedirs(f'./result/{file_made_dt_str}/', exist_ok = True)
final_result.to_csv(f"./result/{file_made_dt_str}/essay_all_vocab_result.csv", index = False)
adj_result.to_csv(f"./result/{file_made_dt_str}/essay_adj_vocab_result.csv", index = False)
noun_result.to_csv(f"./result/{file_made_dt_str}/essay_noun_vocab_result.csv", index = False)
return final_result, adj_result, noun_result, essay_summary, file_made_dt_str
from transformers import pipeline
#model_name = 'AlexKay/xlm-roberta-large-qa-multilingual-finedtuned-ru'
model_name = 'monologg/koelectra-base-v2-finetuned-korquad'
question_answerer = pipeline("question-answering", model=model_name)
from transformers import AutoTokenizer,AutoModelForTokenClassification,TokenClassificationPipeline
tokenizer=AutoTokenizer.from_pretrained("KoichiYasuoka/roberta-large-korean-upos")
posmodel=AutoModelForTokenClassification.from_pretrained("KoichiYasuoka/roberta-large-korean-upos")
pipeline=TokenClassificationPipeline(tokenizer=tokenizer,model=posmodel,aggregation_strategy="simple")
nlp=lambda x:[(x[t["start"]:t["end"]],t["entity_group"]) for t in pipeline(x)]
from transformers import AutoModelForSequenceClassification
device = 'cuda' if torch.cuda.is_available() else 'cpu'
tokenizer = AutoTokenizer.from_pretrained('bert-base-multilingual-cased')
def all_process(origin_essay):
essay_sent =split_essay_to_sentence(origin_essay)
row_dict = {}
for row in tqdm(essay_sent):
question = 'what is the feeling?'
answer = question_answerer(question=question, context=row)
row_dict[row] = answer
emo2idx, idx2emo = get_sent_labeldata()
tokenizer = AutoTokenizer.from_pretrained('bert-base-multilingual-cased')
cls_model = AutoModelForSequenceClassification.from_pretrain ed('seriouspark/bert-base-multilingual-cased-finetuning-sentimental-6label')
#cls_model = AutoModelForSequenceClassification.from_pretrained('bert-base-multilingual-cased', num_labels = 6)
final_result, file_name_dt = get_word_emotion_pair(cls_model, essay_sent)
all_result, adj_\result, noun_result, essay_summary, file_made_dt_str = get_essay_base_analysis(file_name_dt)
summary_result = pd.concat([adj_result, noun_result]).fillna(0).sort_values(by = 'total', ascending = False).fillna(0).reset_index()[:30]
with open(f'./result/{file_name_dt}/summary.json','w') as f:
json.dump( essay_summary.to_json(),f)
with open(f'./result/{file_made_dt_str}/all_result.json','w') as f:
json.dump( all_result.to_json(),f)
with open(f'./result/{file_made_dt_str}/adj_result.json','w') as f:
json.dump( adj_result.to_json(),f)
with open(f'./result/{file_made_dt_str}/noun_result.json','w') as f:
json.dump( noun_result.to_json(),f)
return essay_summary
import gradio as gr
outputs = [gr.Dataframe(row_count = (6, "dynamic"),
col_count=(2, "dynamic"),
label="Essay Summary based on Words")
#headers=['type','word','์Šฌํ””', '๋ถ„๋…ธ', '๊ธฐ์จ', '๋ถˆ์•ˆ', '์ƒ์ฒ˜', '๋‹นํ™ฉ', 'total'])
]
#row_count = (10, "dynamic"),
#col_count=(9, "dynamic"),
#label="Results",
#headers=['type','word','์Šฌํ””', '๋ถ„๋…ธ', '๊ธฐ์จ', '๋ถˆ์•ˆ', '์ƒ์ฒ˜', '๋‹นํ™ฉ', 'total'])
#]
iface = gr.Interface(
fn=all_process,
inputs = gr.Textbox(lines=2, placeholder= '๋‹น์‹ ์˜ ๊ธ€์„ ๋„ฃ์–ด๋ณด์„ธ์š”'),
outputs = outputs,
)
iface.launch(share=True)