import pandas as pd import numpy as np # from konlpy.tag import Okt from string import whitespace, punctuation import re import unicodedata from sentence_transformers import SentenceTransformer, util import gradio as gr import pytorch_lightning as pl import torch from transformers import PreTrainedTokenizerFast, BartForConditionalGeneration from transformers import BartForConditionalGeneration, PreTrainedTokenizerFast from transformers.optimization import get_cosine_schedule_with_warmup from torch.utils.data import DataLoader, Dataset # classification def CleanEnd(text): email = re.compile( r'[-_0-9a-z]+@[-_0-9a-z]+(?:\.[0-9a-z]+)+', flags=re.IGNORECASE) url = re.compile( r'(?:https?:\/\/)?[-_0-9a-z]+(?:\.[-_0-9a-z]+)+', flags=re.IGNORECASE) etc = re.compile( r'\.([^\.]*(?:기자|특파원|교수|작가|대표|논설|고문|주필|부문장|팀장|장관|원장|연구원|이사장|위원|실장|차장|부장|에세이|화백|사설|소장|단장|과장|기획자|큐레이터|저작권|평론가|©|©|ⓒ|\@|\/|=|▶|무단|전재|재배포|금지|\[|\]|\(\))[^\.]*)$') bracket = re.compile(r'^((?:\[.+\])|(?:【.+】)|(?:<.+>)|(?:◆.+◆)\s)') result = email.sub('', text) result = url.sub('', result) result = etc.sub('.', result) result = bracket.sub('', result).strip() return result def TextFilter(text): punct = ''.join([chr for chr in punctuation if chr != '%']) filtering = re.compile(f'[{whitespace}{punct}]+') onlyText = re.compile(r'[^\% ㄱ-ㅣ가-힣]+') result = filtering.sub(' ', text) result = onlyText.sub(' ', result).strip() result = filtering.sub(' ', result) return result def is_clickbait(title, content, threshold=0.815): model = SentenceTransformer( './model/onlineContrastive') pattern_whitespace = re.compile(f'[{whitespace}]+') title = unicodedata.normalize('NFC', re.sub( pattern_whitespace, ' ', title)).strip() title = CleanEnd(title) title = TextFilter(title) content = unicodedata.normalize('NFC', re.sub( pattern_whitespace, ' ', content)).strip() content = CleanEnd(content) content = TextFilter(content) # Noun Extraction # okt = Okt() # title = ' '.join(okt.nouns(title)) # content = ' '.join(okt.nouns(content)) # Compute embedding embeddings1 = model.encode(title, convert_to_tensor=True) embeddings2 = model.encode(content, convert_to_tensor=True) # Compute cosine-similarities cosine_score = util.cos_sim(embeddings1, embeddings2) similarity = cosine_score.numpy()[0][0] if similarity < threshold: return 0, similarity # clickbait else: return 1, similarity # non-clickbait # Generation df_train = pd.DataFrame() df_train['input_text'] = ['1', '2'] df_train['target_text'] = ['1', '2'] def CleanEnd_g(text): email = re.compile( r'[-_0-9a-z]+@[-_0-9a-z]+(?:\.[0-9a-z]+)+', flags=re.IGNORECASE) # url = re.compile(r'(?:https?:\/\/)?[-_0-9a-z]+(?:\.[-_0-9a-z]+)+', flags=re.IGNORECASE) # etc = re.compile(r'\.([^\.]*(?:기자|특파원|교수|작가|대표|논설|고문|주필|부문장|팀장|장관|원장|연구원|이사장|위원|실장|차장|부장|에세이|화백|사설|소장|단장|과장|기획자|큐레이터|저작권|평론가|©|©|ⓒ|\@|\/|=|▶|무단|전재|재배포|금지|\[|\]|\(\))[^\.]*)$') # bracket = re.compile(r'^((?:\[.+\])|(?:【.+】)|(?:<.+>)|(?:◆.+◆)\s)') result = email.sub('', text) # result = url.sub('', result) # result = etc.sub('.', result) # result = bracket.sub('', result).strip() return result class DatasetFromDataframe(Dataset): def __init__(self, df, dataset_args): self.data = df self.max_length = dataset_args['max_length'] self.tokenizer = dataset_args['tokenizer'] self.start_token = '' self.end_token = '' def __len__(self): return len(self.data) def create_tokens(self, text): tokens = self.tokenizer.encode( self.start_token + text + self.end_token) tokenLength = len(tokens) remain = self.max_length - tokenLength if remain >= 0: tokens = tokens + [self.tokenizer.pad_token_id] * remain attention_mask = [1] * tokenLength + [0] * remain else: tokens = tokens[: self.max_length - 1] + \ self.tokenizer.encode(self.end_token) attention_mask = [1] * self.max_length return tokens, attention_mask def __getitem__(self, index): record = self.data.iloc[index] question, answer = record['input_text'], record['target_text'] input_id, input_mask = self.create_tokens(question) output_id, output_mask = self.create_tokens(answer) label = output_id[1:(self.max_length + 1)] label = label + (self.max_length - len(label)) * [-100] return { 'input_ids': torch.LongTensor(input_id), 'attention_mask': torch.LongTensor(input_mask), 'decoder_input_ids': torch.LongTensor(output_id), 'decoder_attention_mask': torch.LongTensor(output_mask), "labels": torch.LongTensor(label) } class OneSourceDataModule(pl.LightningDataModule): def __init__( self, **kwargs ): super().__init__() self.data = kwargs.get('data') self.dataset_args = kwargs.get("dataset_args") self.batch_size = kwargs.get("batch_size") or 32 self.train_size = kwargs.get("train_size") or 0.9 def setup(self, stage=""): # trainset, testset = train_test_split(df_train, train_size=self.train_size, shuffle=True) self.trainset = DatasetFromDataframe(df_train, self.dataset_args) self.testset = DatasetFromDataframe(df_train, self.dataset_args) def train_dataloader(self): train = DataLoader( self.trainset, batch_size=self.batch_size ) return train def val_dataloader(self): val = DataLoader( self.testset, batch_size=self.batch_size ) return val def test_dataloader(self): test = DataLoader( self.testset, batch_size=self.batch_size ) return test class KoBARTConditionalGeneration(pl.LightningModule): def __init__(self, hparams, **kwargs): super(KoBARTConditionalGeneration, self).__init__() self.hparams.update(hparams) self.model = kwargs['model'] self.tokenizer = kwargs['tokenizer'] self.model.train() def configure_optimizers(self): param_optimizer = list(self.model.named_parameters()) no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight'] optimizer_grouped_parameters = [{ 'params': [ p for n, p in param_optimizer if not any(nd in n for nd in no_decay) ], 'weight_decay': 0.01 }, { 'params': [ p for n, p in param_optimizer if any(nd in n for nd in no_decay) ], 'weight_decay': 0.0 }] optimizer = torch.optim.AdamW( optimizer_grouped_parameters, lr=self.hparams.lr ) # num_workers = gpus * num_nodes data_len = len(self.train_dataloader().dataset) print(f'학습 데이터 양: {data_len}') num_train_steps = int( data_len / self.hparams.batch_size * self.hparams.max_epochs) print(f'Step 수: {num_train_steps}') num_warmup_steps = int(num_train_steps * self.hparams.warmup_ratio) print(f'Warmup Step 수: {num_warmup_steps}') scheduler = get_cosine_schedule_with_warmup( optimizer, num_warmup_steps=num_warmup_steps, num_training_steps=num_train_steps ) lr_scheduler = { 'scheduler': scheduler, 'monitor': 'loss', 'interval': 'step', 'frequency': 1 } return [optimizer], [lr_scheduler] def forward(self, inputs): return self.model( input_ids=inputs['input_ids'], attention_mask=inputs['attention_mask'], decoder_input_ids=inputs['decoder_input_ids'], decoder_attention_mask=inputs['decoder_attention_mask'], labels=inputs['labels'], return_dict=True ) def training_step(self, batch, batch_idx): loss = self(batch).loss return loss def validation_step(self, batch, batch_idx): loss = self(batch).loss def test(self, text): tokens = self.tokenizer.encode("" + text + "") tokenLength = len(tokens) remain = self.hparams.max_length - tokenLength if remain >= 0: tokens = tokens + [self.tokenizer.pad_token_id] * remain attention_mask = [1] * tokenLength + [0] * remain else: tokens = tokens[: self.hparams.max_length - 1] + \ self.tokenizer.encode("") attention_mask = [1] * self.hparams.max_length tokens = torch.LongTensor([tokens]) attention_mask = torch.LongTensor([attention_mask]) self.model = self.model result = self.model.generate( tokens, max_length=self.hparams.max_length, attention_mask=attention_mask, num_beams=10 )[0] a = self.tokenizer.decode(result) return a def generation(szContent): tokenizer = PreTrainedTokenizerFast.from_pretrained( "gogamza/kobart-summarization") model1 = BartForConditionalGeneration.from_pretrained( "gogamza/kobart-summarization") if len(szContent) > 500: input_ids = tokenizer.encode(szContent[:500], return_tensors="pt") else: input_ids = tokenizer.encode(szContent, return_tensors="pt") summary = model1.generate( input_ids=input_ids, bos_token_id=model1.config.bos_token_id, eos_token_id=model1.config.eos_token_id, length_penalty=.3, # bigger than 1= longer, smaller than 1=shorter summary max_length=35, min_length=25, num_beams=5) szSummary = tokenizer.decode(summary[0], skip_special_tokens=True) print(szSummary) KoBARTModel = BartForConditionalGeneration.from_pretrained( './model/final2.h5') BATCH_SIZE = 32 MAX_LENGTH = 128 EPOCHS = 0 model2 = KoBARTConditionalGeneration({ "lr": 5e-6, "warmup_ratio": 0.1, "batch_size": BATCH_SIZE, "max_length": MAX_LENGTH, "max_epochs": EPOCHS }, tokenizer=tokenizer, model=KoBARTModel ) dm = OneSourceDataModule( data=df_train, batch_size=BATCH_SIZE, train_size=0.9, dataset_args={ "tokenizer": tokenizer, "max_length": MAX_LENGTH, } ) trainer = pl.Trainer( max_epochs=EPOCHS, gpus=0 ) trainer.fit(model2, dm) szTitle = model2.test(szSummary) df = pd.DataFrame() df['newTitle'] = [szTitle] df['content'] = [szContent] # White space, punctuation removal pattern_whitespace = re.compile(f'[{whitespace}]+') df['newTitle'] = df.newTitle.fillna('').replace(pattern_whitespace, ' ').map( lambda x: unicodedata.normalize('NFC', x)).str.strip() df['newTitle'] = df.newTitle.map(CleanEnd_g) df['newTitle'] = df.newTitle.map(TextFilter) return df.newTitle[0] def new_headline(title, content): label = is_clickbait(title, content) if label[0] == 0: return generation(content) elif label[0] == 1: return '낚시성 기사가 아닙니다.' # gradio with gr.Blocks() as demo1: gr.Markdown( """

clickbait news classifier and new headline generator

""") gr.Markdown( """ 뉴스 기사 제목과 본문을 입력하면 낚시성 기사인지 분류하고, 낚시성 기사이면 새로운 제목을 생성해주는 프로그램입니다. """) with gr.Row(): with gr.Column(): inputs = [gr.Textbox(placeholder="뉴스기사 제목을 입력해주세요", label='headline'), gr.Textbox( lines=10, placeholder="뉴스기사 본문을 입력해주세요", label='content')] with gr.Row(): btn = gr.Button("결과 출력") with gr.Column(): output = gr.Text(label='Result') btn.click(fn=new_headline, inputs=inputs, outputs=output) if __name__ == "__main__": demo1.launch()