# for deep learning models import torch from transformers import pipeline, AutoTokenizer, AutoFeatureExtractor, ViTModel # for utility import numpy as np import joblib # for app demo import gradio as gr # some global variables seed = 42 np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed(seed) # Get cpu, gpu or mps device for training. device = ( "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu" ) class mm_inference: def __init__(self, text, img1, img2, img3, max_images = 3, incl_text_flag = True, incl_image_flag = True, incl_text_in_img_flag = True): path_to_model = 'sum_model.sav' self.model = joblib.load(open(path_to_model, 'rb')) self.text = text self.imgs = [img1, img2, img3] self.max_images = max_images self.incl_text_flag = incl_text_flag self.incl_image_flag = incl_image_flag self.text_model_ckpt = 'dccuchile/bert-base-spanish-wwm-uncased' self.img_model_ckpt = 'microsoft/swin-base-patch4-window7-224' # text and image pipelines self.tokenizer = AutoTokenizer.from_pretrained(self.text_model_ckpt) self.img_feature_extractor = AutoFeatureExtractor.from_pretrained(self.img_model_ckpt) # text and image pipeles for feature extraction self.text_feature_extractor = pipeline(task = 'feature-extraction', model = 'lzun/mcovmex-text', tokenizer = self.tokenizer, return_tensors = True, device = device) self.img_model = ViTModel.from_pretrained('lzun/mcovmex-image') def get_text_embs(self, text): ''' Feature extraction pipeline using no model head. This pipeline extracts the hidden states from the base transformer, which can be used as features in downstream tasks. last_hidden_state (torch.FloatTensor of shape (batch_size, sequence_length, hidden_size)) — Sequence of hidden-states at the output of the last layer of the model. For the text model, it returns a tensor of shape torch.Size([batch_size, n_tokens, hidden_dim]), or e.g., [1, 33, 768] for an input of 31 tokens (plus the [CLF] and [SEP] special tokens). Returns the first element of the sequence, which is related to the CLS token. For the multilingual CLIP model, it resturns a single numpy array of size 512. Returns the whole array. Inputs ------ text: text string to determine embeddings. clf_token: defaults to True, returns the CLS token of the sequence. Farlse returns the whole tensor. Returns ------- Torch tensor of size 768. ''' # get embeddings from text using the pipeline text_embs = self.text_feature_extractor(text) # return CLS token (first one of the last layer) return(text_embs[0][0]) def get_img_embs(self, image_path): """ For the transformer image model, it returns a tensor of shape torch.Size([batch_size, n_tokens, hidden_dim]) e.g., [1, 197, 768] for an input of 195 tokens (plus the [CLF] and [SEP] special tokens). Returns the first element of the sequence, which is related to the CLS token. For the multilingual CLIP model, it resturns a single numpy array of size 512. Returns the whole array. Inputs ------ path: path to the image Returns ------- Torch tensor of size 768. """ img = io.imread(image_path) # feature extractor inputs = self.img_feature_extractor(images=img, return_tensors="pt") outputs = self.img_model(**inputs) last_hidden_states = outputs.last_hidden_state return(last_hidden_states[0][0]) def count_images(self): n_imgs = 3 for item in self.imgs: if item is None: n_imgs -= 1 return n_imgs def predict(self): """ Fills the JSON file with the available tweet attributes. Parameters ---------- line: Dict Dict with each tweet keys and fields. """ # -------- get data embeddings -------- # determine text embeddings if self.incl_text_flag: text_embs = self.get_text_embs(self.text) # determine image embeddings if self.incl_image_flag: num_images = self.count_images() # case where there are no images available if num_images == 0: pass else: # list to save the embeddings for each image img_embs = [] txt_img_embs = [] for j in self.imgs: # get image path img_path = j # get embeddings of current image try: img_embs.append(self.get_img_embs(img_path)) except: pass # print(f'Num of images: {num_images}') # print(f'Num of img embeddings: {len(img_embs)}') # print(f'Num of txt-im embeddings: {len(txt_img_embs)}') # -------- infer overall sentiment -------- # apply sum fusion emb_sum = np.zeros(768) # add the image embeddings if self.incl_image_flag: if num_images>0: for emb in img_embs: emb_sum += emb.detach().numpy() # add text embeddings if self.incl_text_flag: emb_sum += text_embs.detach().numpy() # predict sent = int(self.model.predict(emb_sum.reshape(1,-1))[0]) print(sent) return sent def main(): with gr.Blocks() as demo: gr.Markdown("# Multimodal Spanish COVID-19 Sentiment Polarity Predictor") gr.Markdown("## Input text from a social media post (like X or Instagram)") text = gr.Textbox(label="Text from publication") gr.Markdown("## Input images from a social media post (min 1, max 3)") with gr.Row(): img1 = gr.Image(label="Image #1 from the publication (mandatory)", type="filepath") img2 = gr.Image(label="Image #2 from the publication (if available)", type="filepath") with gr.Row(): img3 = gr.Image(label="Image #3 from the publication (if available)", type="filepath") # img4 = gr.Image(label="Image #4 from the publication (if available)", type="filepath") pred_btn = gr.Button("Predict Sentiment") gr.Markdown("## Predicted output") gr.Markdown("### Positive (1), Neutral (0), Negative (-1), Spam (2)") output = gr.Label(label="Sentiment value") def test1(text, img1, img2, img3): print(text) print(img1) print(img2) print(img3) # init inference class inferencer = mm_inference(text, img1, img2, img3) # predict and return label return inferencer.predict() pred_btn.click(test1, inputs=[text, img1, img2, img3], outputs = output) demo.launch() if __name__ == '__main__': main()