In [1]:
import torch
from torch import Tensor, nn
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torchtext.vocab import build_vocab_from_iterator
from torch.utils.data import dataset
from torch.utils.tensorboard import SummaryWriter

import regex as re
import os
import time
from tqdm import tqdm
import copy
import math

from model import TransformerModel
from utils import preProcessText, getTokenizer
from config import getConfig

In [2]:
def get_model(model_config, ntokens):
    emsize = model_config["emsize"]
    d_hid = model_config["d_hid"]
    nlayers = model_config["nlayers"]
    nhead = model_config["nhead"]
    dropout = model_config["dropout"]
    model = TransformerModel(ntokens, emsize,nhead, d_hid, nlayers, dropout)
    return model

def loadModel(best_model_path):
    if os.path.exists(best_model_path):
        print(f"Preloading model {best_model_path}")
        if torch.cuda.is_available():
            state = torch.load(best_model_path)
        else:
            state = torch.load(best_model_path, map_location=torch.device('cpu'))
        model.load_state_dict(state['model_state_dict'])
        return model
    else:
        raise Exception("Model Not Found")

In [3]:
model_config, app_config = getConfig()
print(model_config)
print(app_config)

bptt=model_config["bptt"]

device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

softmax = nn.Softmax(dim=2)

tokenizer, vocab = getTokenizer()
ntokens = len(vocab)
model = get_model(model_config, ntokens).to(device)

{'emsize': 300, 'd_hid': 1024, 'nlayers': 6, 'nhead': 6, 'dropout': 0.2, 'bptt': 64}
{'logs': 'tensorboard_logs', 'epochs': 25}
cpu




In [4]:
best_model_path = 'models/best_model.pt'
loaded_model = loadModel(best_model_path)

Preloading model models/best_model.pt


In [5]:
def data_process(raw_text_iter: dataset.IterableDataset) -> Tensor:
    """Converts raw text into a flat Tensor."""
    # obtain the data in tensor format for each line
    data = [torch.tensor(vocab(tokenizer(item)), dtype=torch.long)
            for item in raw_text_iter]
    # concatenate all the lines
    return torch.cat(tuple(filter(lambda t: t.numel() > 0, data)))

def batchify(data: Tensor, batch_size: int) -> Tensor:
    """Divides the data into batch_size separate sequences, removing extra elements
    that wouldn't cleanly fit.
    Args:
        data: Tensor, shape [N]
        batch_size: int, batch size
    Returns:
        Tensor of shape [N // bsz, bsz]
    """
    seq_len = data.size(0) // batch_size
    data = data[:seq_len * batch_size]
    data = data.view(batch_size, seq_len).t().contiguous()
    return data.to(device)

def generate_square_subsequent_mask(sz: int) -> Tensor:
    """Generates an upper-triangular matrix of -inf, with zeros on diag."""
    return torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)

In [6]:
def nonnaive_generator(model: nn.Module, gen_data: Tensor, no_words=5, k=50):
    model.eval()
    src_mask = generate_square_subsequent_mask(bptt).to(device)
    pred_text = []
    for i in range(no_words):
        batch_size = gen_data.size(0)
        if batch_size != bptt:
            src_mask_ = src_mask[:batch_size, :batch_size]
            
        # generate the probability of the next word
        output_softmax = model(gen_data, src_mask_)
        output_softmax_permuted = output_softmax.permute(1, 0, 2)
        
        # obtain the "k" top probable words index
        # both indices and values are of size (no. of words, k=50)
        indices = torch.topk(output_softmax_permuted, k, dim=2).indices.squeeze(0)
        # obtain the top "k" probability of the probable words
        values = torch.topk(softmax(output_softmax_permuted), k, dim=2).values
        values = values/torch.sum(values, dim=2, keepdims=True)
        values = values.squeeze(0)        
        
        # create categorical distribution and take sample from values
        # categorical distribution take 1 sample from k=50 samples of each dimension
        for _ in range(10): 
            ind_sampled = torch.distributions.Categorical(values).sample()
            next_index = indices[-1][ind_sampled[-1]]
            # if the obtained token is not <unk>, then no need to sample again
            if vocab.lookup_token(next_index) != '<unk>':
                break
            
        pred_text.append([vocab.lookup_token(next_index)][0])
        if(batch_size < 15):
            gen_data = torch.cat((gen_data[:, :], next_index.unsqueeze(0).unsqueeze(0)), 0)
            batch_size = gen_data.size(0)
        else:
            gen_data = torch.cat((gen_data[1:, :], next_index.unsqueeze(0).unsqueeze(0)), 0)
            batch_size = gen_data.size(0)

    return pred_text

## Gradio application

In [7]:
def predText(text : str, num_words : int):
    text = [text]
    num_words = int(num_words)
    sample_data = data_process(text)
    sample_data = batchify(sample_data, 1)
    pred_text = nonnaive_generator(loaded_model,  sample_data[:,-1].unsqueeze(1), no_words=num_words, k=50)
    whole_text = text[0] + ' ' + ' '.join(pred_text)
    return whole_text

In [8]:
text = "म घर नजिकैको"
num_words = 30
predText(text, num_words)

'म घर नजिकैको घरमा एक्लै छु । घर नजिकै रहेको सानो जग्गामा भाडामा लिएर काम गरिरहेका स्थानीय चन्द्रबहादुर थापाले यो व्यवसाय शुरु गरेका हुन् । उनलाई नेपाल सरकारले <num> हजार रुपैयाँ जरिवाना लगाएको'

In [9]:
import gradio as gr

  from .autonotebook import tqdm as notebook_tqdm


In [10]:
# Set up the Gradio Interface with custom HTML and JavaScript
input_text_box = gr.Textbox(label="Text", value="म घर", lines=5)

def predText(text : str, num_words : int):
    global input_text_box
    text = [text]
    num_words = int(num_words)
    sample_data = data_process(text)
    sample_data = batchify(sample_data, 1)
    pred_text = nonnaive_generator(loaded_model,  sample_data[:,-1].unsqueeze(1), no_words=num_words, k=50)
    whole_text = text[0] + ' ' + ' '.join(pred_text)
    return whole_text

In [11]:
examples = [["म घर", 10], ["मलाई", 40], ["आज", 70]]
with gr.Blocks() as interface:
    interface.title = "Nepali Text Generation Model"
    gr.Markdown("# Nepali Text Generation")
    gr.Markdown("Start typing nepali below and then click **Submit** to generate text.")
    gr.Markdown("You can select examples from the table below and then click **Submit** to generate text.")
    
    input_text_box = gr.Textbox(label="Text", value="म घर", lines=5, placeholder="Enter Nepali Text")
    input_num_words = gr.Number(label="Number of word to generate", value=5)

    btn = gr.Button(value="Submit")
    
    btn.click(predText, inputs=[input_text_box, input_num_words], outputs=[input_text_box])
    
    gr.Examples(examples=examples, inputs=[input_text_box, input_num_words], outputs=[input_text_box])
    gr.flagging = True
    
interface.launch()

Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.




In [24]:
title = "Nepali Language Model For Text Generation"
description = """
Click **Submit** to generate text 

**Flag** as **(Bad, Fine or Good)** as per the generated text

You can also choose **examples** given below. Click them and then submit
"""
examples = [["म घर", 10], ["मलाई", 40], ["आज", 70]]


# inpt and output for the interface
input_text_box = gr.Textbox(label="Input Text", value="म घर", lines=5, placeholder="Enter Nepali Text")
input_num_words = gr.Number(label="Number of word to generate", value=5)
output_text_box = gr.Textbox(label="Generated Text", lines=5, placeholder="Generated Text Appears here")

flagging_options = ["Bad", "Fine", "Good"]


interface = gr.Interface(fn=predText, inputs=[input_text_box, input_num_words], outputs=output_text_box, flagging_options=flagging_options, title=title, description=description, examples=examples)
interface.launch()

Running on local URL:  http://127.0.0.1:7869

To create a public link, set `share=True` in `launch()`.




In [25]:
# read flagged log file
import pandas as pd
flagged_log = pd.read_csv("flagged/log.csv")
flagged_log

Unnamed: 0,Text,Number of word to generate,Text.1,flag,username,timestamp
0,म घर,5,म घर पुग्दा मलाई निकै गाह्रो पर्ने,,,2023-11-27 17:00:15.272263
1,म घर,5,म घर नै लिएर आएको हुँ ।,Bad,,2023-11-27 17:06:19.426503
2,म घर,5,"म घर बसेको थिएँ । तर ,",Good,,2023-11-27 17:06:27.778048
3,म घर,5,म घर बनाउँछु र घरमा काम गर्छु,Good,,2023-11-27 17:06:34.104749
4,म घर,5,म घर गएँ । त्यस क्षेत्रका मान्छेहरु,Fine,,2023-11-27 17:06:44.150964
5,आज,70,म घर छोडेर गएका थिए । तर,Good,,2023-11-27 17:13:07.793558
6,आज,70,आज पनि यस विषयमा छलफल भएको हो । नेपाल कम्युनिष...,Fine,,2023-11-27 17:13:32.043888
7,आज,70,आज नै निर्वाचन आयोगमा उजुरी दिन गएका छन् । आयो...,Good,,2023-11-27 17:13:53.533122


## No. of parameters

In [12]:
loaded_model

TransformerModel(
  (embedding): Embedding(60507, 300)
  (pos_encoder): PositionalEncoding(
    (dropout): Dropout(p=0.2, inplace=False)
  )
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-5): 6 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=300, out_features=300, bias=True)
        )
        (linear1): Linear(in_features=300, out_features=1024, bias=True)
        (dropout): Dropout(p=0.2, inplace=False)
        (linear2): Linear(in_features=1024, out_features=300, bias=True)
        (norm1): LayerNorm((300,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((300,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.2, inplace=False)
        (dropout2): Dropout(p=0.2, inplace=False)
      )
    )
  )
  (decoder): Linear(in_features=300, out_features=60507, bias=True)
)

In [13]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

num_parameters = count_parameters(loaded_model)

In [14]:
formatted_num = "{:,}".format(num_parameters)
formatted_num

'42,233,451'