|
|
|
from typing import List, Optional, Tuple |
|
|
|
import torch |
|
from torch import Tensor |
|
from torch import nn |
|
from transformers import RobertaModel |
|
|
|
from faknow.model.layers.layer import TextCNNLayer |
|
from faknow.model.model import AbstractModel |
|
from faknow.data.process.text_process import TokenizerFromPreTrained |
|
import pandas as pd |
|
import gdown |
|
import os |
|
|
|
class _MLP(nn.Module): |
|
def __init__(self, |
|
input_dim: int, |
|
embed_dims: List[int], |
|
dropout_rate: float, |
|
output_layer=True): |
|
super().__init__() |
|
layers = list() |
|
for embed_dim in embed_dims: |
|
layers.append(nn.Linear(input_dim, embed_dim)) |
|
layers.append(nn.BatchNorm1d(embed_dim)) |
|
layers.append(nn.ReLU()) |
|
layers.append(nn.Dropout(p=dropout_rate)) |
|
input_dim = embed_dim |
|
if output_layer: |
|
layers.append(torch.nn.Linear(input_dim, 1)) |
|
self.mlp = torch.nn.Sequential(*layers) |
|
|
|
def forward(self, x: Tensor) -> Tensor: |
|
""" |
|
|
|
Args: |
|
x (Tensor): shared feature from domain and text, shape=(batch_size, embed_dim) |
|
|
|
""" |
|
return self.mlp(x) |
|
|
|
|
|
class _MaskAttentionLayer(torch.nn.Module): |
|
""" |
|
Compute attention layer |
|
""" |
|
def __init__(self, input_size: int): |
|
super(_MaskAttentionLayer, self).__init__() |
|
self.attention_layer = torch.nn.Linear(input_size, 1) |
|
|
|
def forward(self, |
|
inputs: Tensor, |
|
mask: Optional[Tensor] = None) -> Tuple[Tensor, Tensor]: |
|
weights = self.attention_layer(inputs).view(-1, inputs.size(1)) |
|
if mask is not None: |
|
weights = weights.masked_fill(mask == 0, float("-inf")) |
|
weights = torch.softmax(weights, dim=-1).unsqueeze(1) |
|
outputs = torch.matmul(weights, inputs).squeeze(1) |
|
return outputs, weights |
|
|
|
|
|
class MDFEND(AbstractModel): |
|
r""" |
|
MDFEND: Multi-domain Fake News Detection, CIKM 2021 |
|
paper: https://dl.acm.org/doi/10.1145/3459637.3482139 |
|
code: https://github.com/kennqiang/MDFEND-Weibo21 |
|
""" |
|
def __init__(self, |
|
pre_trained_bert_name: str, |
|
domain_num: int, |
|
mlp_dims: Optional[List[int]] = None, |
|
dropout_rate=0.2, |
|
expert_num=5): |
|
""" |
|
|
|
Args: |
|
pre_trained_bert_name (str): the name or local path of pre-trained bert model |
|
domain_num (int): total number of all domains |
|
mlp_dims (List[int]): a list of the dimensions in MLP layer, if None, [384] will be taken as default, default=384 |
|
dropout_rate (float): rate of Dropout layer, default=0.2 |
|
expert_num (int): number of experts also called TextCNNLayer, default=5 |
|
""" |
|
super(MDFEND, self).__init__() |
|
self.domain_num = domain_num |
|
self.expert_num = expert_num |
|
self.bert = RobertaModel.from_pretrained( |
|
pre_trained_bert_name).requires_grad_(False) |
|
self.embedding_size = self.bert.config.hidden_size |
|
self.loss_func = nn.BCELoss() |
|
if mlp_dims is None: |
|
mlp_dims = [384] |
|
|
|
filter_num = 64 |
|
filter_sizes = [1, 2, 3, 5, 10] |
|
experts = [ |
|
TextCNNLayer(self.embedding_size, filter_num, filter_sizes) |
|
for _ in range(self.expert_num) |
|
] |
|
self.experts = nn.ModuleList(experts) |
|
|
|
self.gate = nn.Sequential( |
|
nn.Linear(self.embedding_size * 2, mlp_dims[-1]), nn.ReLU(), |
|
nn.Linear(mlp_dims[-1], self.expert_num), nn.Softmax(dim=1)) |
|
|
|
self.attention = _MaskAttentionLayer(self.embedding_size) |
|
|
|
self.domain_embedder = nn.Embedding(num_embeddings=self.domain_num, |
|
embedding_dim=self.embedding_size) |
|
self.classifier = _MLP(320, mlp_dims, dropout_rate) |
|
|
|
def forward(self, token_id: Tensor, mask: Tensor, |
|
domain: Tensor) -> Tensor: |
|
""" |
|
|
|
Args: |
|
token_id (Tensor): token ids from bert tokenizer, shape=(batch_size, max_len) |
|
mask (Tensor): mask from bert tokenizer, shape=(batch_size, max_len) |
|
domain (Tensor): domain id, shape=(batch_size,) |
|
|
|
Returns: |
|
FloatTensor: the prediction of being fake, shape=(batch_size,) |
|
""" |
|
text_embedding = self.bert(token_id, |
|
attention_mask=mask).last_hidden_state |
|
attention_feature, _ = self.attention(text_embedding, mask) |
|
|
|
domain_embedding = self.domain_embedder(domain.view(-1, 1)).squeeze(1) |
|
|
|
gate_input = torch.cat([domain_embedding, attention_feature], dim=-1) |
|
gate_output = self.gate(gate_input) |
|
|
|
shared_feature = 0 |
|
for i in range(self.expert_num): |
|
expert_feature = self.experts[i](text_embedding) |
|
shared_feature += (expert_feature * gate_output[:, i].unsqueeze(1)) |
|
|
|
label_pred = self.classifier(shared_feature) |
|
|
|
return torch.sigmoid(label_pred.squeeze(1)) |
|
|
|
def calculate_loss(self, data) -> Tensor: |
|
""" |
|
calculate loss via BCELoss |
|
|
|
Args: |
|
data (dict): batch data dict |
|
|
|
Returns: |
|
loss (Tensor): loss value |
|
""" |
|
|
|
token_ids = data['text']['token_id'] |
|
masks = data['text']['mask'] |
|
domains = data['domain'] |
|
labels = data['label'] |
|
output = self.forward(token_ids, masks, domains) |
|
return self.loss_func(output, labels.float()) |
|
|
|
def predict(self, data_without_label) -> Tensor: |
|
""" |
|
predict the probability of being fake news |
|
|
|
Args: |
|
data_without_label (Dict[str, Any]): batch data dict |
|
|
|
Returns: |
|
Tensor: one-hot probability, shape=(batch_size, 2) |
|
""" |
|
|
|
token_ids = data_without_label['text']['token_id'] |
|
masks = data_without_label['text']['mask'] |
|
domains = data_without_label['domain'] |
|
|
|
|
|
round_pred = torch.round(self.forward(token_ids, masks, |
|
domains)).long() |
|
|
|
one_hot_pred = torch.nn.functional.one_hot(round_pred, num_classes=2) |
|
return one_hot_pred |
|
|
|
|
|
def download_from_gdrive(file_id, output_path): |
|
output = os.path.join(output_path) |
|
|
|
|
|
if not os.path.exists(output): |
|
gdown.download(id=file_id, output=output, quiet=False) |
|
|
|
|
|
return output |
|
|
|
|
|
|
|
def loading_model_and_tokenizer(): |
|
max_len, bert = 160, 'FacebookAI/xlm-roberta-base' |
|
|
|
|
|
file_id = "1--6GB3Ff81sILwtuvVTuAW3shGW_5VWC" |
|
|
|
model_path = '/content/drive/MyDrive/models/last-epoch-model-2024-03-17-01_00_32_1.pth' |
|
|
|
MODEL_SAVE_PATH = download_from_gdrive(file_id, model_path) |
|
domain_num = 4 |
|
|
|
|
|
|
|
tokenizer = TokenizerFromPreTrained(max_len, bert) |
|
|
|
model = MDFEND(bert, domain_num , expert_num=12 , mlp_dims = [3010, 2024 ,1012 ,606 , 400]) |
|
|
|
model.load_state_dict(torch.load(f=MODEL_SAVE_PATH , map_location=torch.device('cpu'))) |
|
|
|
model.requires_grad_(False) |
|
|
|
return tokenizer , model |