# import parsing # decomment to download data from the website and parse it # from string import punctuation from tqdm.auto import tqdm, trange import torch from transformers import AutoTokenizer, AutoModel import datasets import pandas as pd import nltk from nltk import word_tokenize from nltk.corpus import stopwords from nltk.stem import wordnet # for lemmtization from nltk import pos_tag # for parts of speech nltk.download('omw-1.4') #this is for the .apply() function to work nltk.download('punkt') nltk.download('averaged_perceptron_tagger') nltk.download('wordnet') nltk.download('stopwords') import numpy as np import os import re #regular expressions import time from sklearn.feature_extraction.text import CountVectorizer # for bag of words (bow) from sklearn.feature_extraction.text import TfidfVectorizer #for tfidf from sklearn.metrics import pairwise_distances # cosine similarity from sklearn.metrics.pairwise import cosine_similarity from gensim.models import Word2Vec, KeyedVectors import gensim.downloader as api import gradio as gr import time # Take Rachel as main character df = pd.read_csv("rachel_friends.csv") # read the database into a data frame #-------------------------------------TF-IDF------------------------------------------# # Define function for text normalization def text_normalization(text): text = str(text).lower() # convert to all lower letters spl_char_text = re.sub(r'[^a-z]', ' ', text) # remove any special characters including numbers tokens = nltk.word_tokenize(spl_char_text) # tokenize words lema = wordnet.WordNetLemmatizer() # lemmatizer initiation tags_list = pos_tag(tokens, tagset = None) # parts of speech lema_words = [] for token, pos_token in tags_list: if pos_token.startswith('V'): # if the tag from tag_list is a verb, assign 'v' to it's pos_val pos_val = 'v' elif pos_token.startswith('J'): # adjective pos_val = 'a' elif pos_token.startswith('R'): # adverb pos_val = 'r' else: # otherwise it must be a noun pos_val = 'n' lema_token = lema.lemmatize(token, pos_val) # performing lemmatization lema_words.append(lema_token) # addid the lemmatized words into our list return " ".join(lema_words) # return our list as a human sentence # Preprocess data and insert to dataframe question_normalized = df['question'].apply(text_normalization) df.insert(2, 'Normalized question', question_normalized, True) # Define function to delete stopwords from the sentences stop = stopwords.words('english') # Include stop words stop = [] # Exclude stopwords def removeStopWords(text): Q = [] s = text.split() # create an array of words from our text sentence, cut it into words q = '' for w in s: # for every word in the given sentence if the word is a stop word ignore it if w in stop: continue else: # otherwise add it to the end of our array Q.append(w) q = " ".join(Q) # create a sentence out of our array of non stop words return q # Preprocess data and insert to dataframe question_norm_and_stop = df['Normalized question'].apply(removeStopWords) df.insert(3, 'Normalized and StopWords question', question_norm_and_stop, True) tfidf = TfidfVectorizer(ngram_range=(1,3), max_features=5024) # initializing tf-idf x_tfidf = tfidf.fit_transform(df['Normalized and StopWords question']).toarray() # oversimplifying this converts words to vectors features_tfidf = tfidf.get_feature_names_out() # use function to get all the normalized words df_tfidf = pd.DataFrame(x_tfidf, columns = features_tfidf) # create dataframe to show the 0, 1 value for each word # bot tf idf algorithm without context def chat_tfidf(question): tidy_question = text_normalization(removeStopWords(question)) # clean & lemmatize the question tf = tfidf.transform([tidy_question]).toarray() # convert the question into a vector cos = 1- pairwise_distances(df_tfidf, tf, metric = 'cosine') # calculate the cosine value index_value = cos.argmax() # find the index of the maximum cosine value # answer = Answer("Ross", df['answer'].loc[index_value]) answer = df['answer'].loc[index_value] return answer # bot tf idf algorithm with context def chat_tfidf_context(question, history): len_history = len(history) if len_history > 1: memory_weights = np.array([0.1, 0.3, 1.0]) # .reshape((3,1)) # take last two sentences in accordance to bot's memory history = history[-2:] else: memory_weights = np.array([0.3, 1.0]) history_sentence = np.zeros(shape=(len_history+1, 5024)) for ind, h in enumerate(history): # normalize first question from context tidy_question = text_normalization(removeStopWords(h[0])) # pass via tfidf tf = tfidf.transform([tidy_question]).toarray() # assign tf idf vector to history sentence history_sentence[ind] = tf * memory_weights[ind] tidy_question = text_normalization(removeStopWords(question)) tf = tfidf.transform([tidy_question]).toarray() history_sentence[-1] = tf history_sentence = history_sentence.mean(axis=0).reshape(1,-1) cos = 1- pairwise_distances(df_tfidf, history_sentence, metric = 'cosine') index_value = cos.argmax() answer = df['answer'].loc[index_value] return answer #-------------------------------------W2V------------------------------------------# punkt = [p for p in punctuation] + ["`", "``" ,"''", "'"] def tokenize(sent: str) -> str: tokens = nltk.word_tokenize(sent.lower()) # tokenize words return ' '.join([word for word in tokens if word not in stop and word not in punkt]) questions_preprocessed = [] for question in df["question"].tolist() + df["answer"].tolist(): questions_preprocessed.append(tokenize(question)) questions_w2v = [sent.split(" ") for sent in questions_preprocessed] w2v = KeyedVectors.load('w2v.bin') unknown_vector = np.random.uniform(low=-0.2, high=0.2, size=(25,)) # define function to form sentences with w2v def w2v_get_vector_for_sentence(sentence): sent = nltk.word_tokenize(sentence.lower()) sent = [word for word in sent if word not in punkt] sentence_vector = [] if len(sent)==0: sentence_vector.append(unknown_vector) else: for word in sent: if word in w2v.key_to_index: sentence_vector.append(w2v[word]) else: sentence_vector.append(unknown_vector) return np.array(sentence_vector).mean(axis=0) # create base for w2v base = np.zeros(shape=(len(df.question), 25)) for ind, sentence in enumerate(df['question']): # df[df['question'].str.len() >= 1] base[ind] = w2v_get_vector_for_sentence(sentence) # bot w2v algorithm without context def chat_word2vec(question): question = [w2v_get_vector_for_sentence(question)] cos = 1-pairwise_distances(base, question, metric = 'cosine') # calculate the cosine value index_value = cos.argmax() # find the index of the maximum cosine value answer = df['answer'].loc[index_value] return answer # bot w2v algorithm with context def chat_word2vec_context(question, history): len_history = len(history) if len_history > 1: memory_weights = np.array([0.1, 0.3, 1.0]) # .reshape((3,1)) # take last two sentences in accordance to bot's memory history = history[-2:] else: memory_weights = np.array([0.3, 1.0]) history_sentence = np.zeros(shape=(len_history+1, 25)) for ind, h in enumerate(history): sentence = w2v_get_vector_for_sentence(h[0]) history_sentence[ind] = sentence * memory_weights[ind] question = w2v_get_vector_for_sentence(question) history_sentence[-1] = question history_sentence = history_sentence.mean(axis=0).reshape(1, -1) cos = 1-pairwise_distances(base, history_sentence, metric = 'cosine') index_value = cos.argmax() answer = df['answer'].loc[index_value] return answer #-------------------------------------BERT------------------------------------------# # Let's try bert model by elastic and with e5 model_name = "distilbert/distilbert-base-uncased" device = "cpu" tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModel.from_pretrained(model_name) class BERTSearchEngine: def __init__(self, model, tokenizer, text_database): self.raw_procesed_data = [self.preprocess(sample, tokenizer) for sample in text_database] self.base = [] self.retriever = None self.inverted_index = {} self._init_retriever(model, tokenizer, text_database) self._init_inverted_index(text_database) @staticmethod def preprocess(sentence: str, tokenizer): return tokenizer(sentence, padding=True, truncation=True, return_tensors='pt') def _embed_bert_cls(self, tokenized_text: dict[torch.Tensor]) -> np.array: with torch.no_grad(): model_output = self.retriever(**{k: v.to(self.retriever.device) for k, v in tokenized_text.items()}) embeddings = model_output.last_hidden_state[:, 0, :] embeddings = torch.nn.functional.normalize(embeddings) return embeddings[0].cpu().numpy() def _init_retriever(self, model, tokenizer, text_database): self.retriever = model self.tokenizer = tokenizer self.base = np.load("bert_base.npy") #np.array([self._embed_bert_cls(self.preprocess(text, tokenizer)) for text in tqdm(text_database)]) def retrieve(self, query: str) -> np.array: return self._embed_bert_cls(self.preprocess(query, self.tokenizer)) def retrieve_documents(self, query: str, top_k=3) -> list[int]: query_vector = self.retrieve(query) cosine_similarities = cosine_similarity([query_vector], self.base).flatten() relevant_indices = np.argsort(cosine_similarities, axis=0)[::-1][:top_k] return relevant_indices.tolist() def _init_inverted_index(self, text_database: list[str]): self.inverted_index = dict(enumerate(text_database)) def display_relevant_docs(self, query, full_database, top_k=3) -> list[int]: docs_indexes = self.retrieve_documents(query, top_k=top_k) return [self.inverted_index[ind] for ind in docs_indexes] def find_answer(self, query: str) -> int: query_vector = self.retrieve(query) cosine_similarities = cosine_similarity([query_vector], self.base).flatten() relevant_indice = np.argmax(cosine_similarities, axis=0) return relevant_indice simple_search_engine = BERTSearchEngine(model, tokenizer, df["question"]) # simple_search_engine.bert = np.load(bert_base.npy) # bot bert algorithm without context def chat_bert(question): ind = simple_search_engine.find_answer(question) answer = df['answer'].iloc[ind] return answer # bot bert algorithm with context def chat_bert_context(question, history): len_history = len(history) if len_history > 1: memory_weights = np.array([0.1, 0.3, 1.0]) # .reshape((3,1)) # take last two sentences in accordance to bot's memory history = history[-2:] else: memory_weights = np.array([0.3, 1.0]) history_sentence = np.zeros(shape=(len_history+1, 768)) for ind, h in enumerate(history): sentence = simple_search_engine.retrieve(h) history_sentence[ind] = sentence * memory_weights[ind] question = simple_search_engine.retrieve(question) history_sentence[-1] = question history_sentence = history_sentence.mean(axis=0).reshape(1, -1) cosine_similarities = cosine_similarity(history_sentence, simple_search_engine.base).flatten() relevant_indice = np.argmax(cosine_similarities, axis=0) answer = df['answer'].loc[relevant_indice] return answer #-------------------------------------Bi-BERT-Encoder------------------------------------------# MAX_LENGTH = 128 inverted_answer = dict(enumerate(df.answer.tolist())) # Define function for mean-pooling def mean_pool(token_embeds: torch.tensor, attention_mask: torch.tensor) -> torch.tensor: in_mask = attention_mask.unsqueeze(-1).expand(token_embeds.size()).float() pool = torch.sum(token_embeds * in_mask, 1) / torch.clamp(in_mask.sum(1), min=1e-9) return pool # Define function for tokenization of the sentence and encoding it def encode(input_texts: list[str], tokenizer: AutoTokenizer, model: AutoModel, device: str = "cpu" ) -> torch.tensor: model.eval() tokenized_texts = tokenizer(input_texts, max_length=128, padding='max_length', truncation=True, return_tensors="pt") token_embeds = model(tokenized_texts["input_ids"].to(device), tokenized_texts["attention_mask"].to(device)).last_hidden_state pooled_embeds = mean_pool(token_embeds, tokenized_texts["attention_mask"].to(device)) return pooled_embeds # Define architecture for bi-bert-encoder class Sbert(torch.nn.Module): def __init__(self, max_length: int = 128): super().__init__() self.max_length = max_length self.bert_model = AutoModel.from_pretrained('distilbert-base-uncased') self.bert_tokenizer = AutoTokenizer.from_pretrained('distilbert-base-uncased') self.linear = torch.nn.Linear(self.bert_model.config.hidden_size * 3, 1) # self.sigmoid = torch.nn.Sigmoid() def forward(self, data: datasets.arrow_dataset.Dataset) -> torch.tensor: question_input_ids = data["question_input_ids"].to(device) question_attention_mask = data["question_attention_mask"].to(device) answer_input_ids = data["answer_input_ids"].to(device) answer_attention_mask = data["answer_attention_mask"].to(device) out_question = self.bert_model(question_input_ids, question_attention_mask) out_answer = self.bert_model(answer_input_ids, answer_attention_mask) question_embeds = out_question.last_hidden_state answer_embeds = out_answer.last_hidden_state pooled_question_embeds = mean_pool(question_embeds, question_attention_mask) pooled_answer_embeds = mean_pool(answer_embeds, answer_attention_mask) embeds = torch.cat([pooled_question_embeds, pooled_answer_embeds, torch.abs(pooled_question_embeds - pooled_answer_embeds)], dim=-1) # return self.sigmoid(self.linear(embeds)) return self.linear(embeds) # Initialize the model model_bi_encoder = Sbert().to(device) # Load weights from training step model_bi_encoder.bert_model.from_pretrained("models/friends_bi_encoder") # Load question embeds question_embeds = np.load("bi_bert_question.npy") def chat_bi_bert(question, history): question = encode(question, model_bi_encoder.bert_tokenizer, model_bi_encoder.bert_model, device).squeeze().cpu().detach().numpy() cosine_similarities = cosine_similarity([question], question_embeds).flatten() top_indice = np.argmax(cosine_similarities, axis=0) answer = df['answer'].iloc[top_indice] answer = inverted_answer[top_indice] return answer #-------------------------------------Bi+Cross-BERT-Encoder------------------------------------------# #Define class for CrossEncoderBert class CrossEncoderBert(torch.nn.Module): def __init__(self, max_length: int = MAX_LENGTH): super().__init__() self.max_length = max_length self.bert_model = AutoModel.from_pretrained('distilbert-base-uncased') self.bert_tokenizer = AutoTokenizer.from_pretrained('distilbert-base-uncased') self.linear = torch.nn.Linear(self.bert_model.config.hidden_size, 1) def forward(self, input_ids, attention_mask): outputs = self.bert_model(input_ids=input_ids, attention_mask=attention_mask) pooled_output = outputs.last_hidden_state[:, 0] # Use the CLS token's output return self.linear(pooled_output) model_cross_encoder = CrossEncoderBert().to(device) model_cross_encoder.bert_model.from_pretrained("models/friends_cross_encoder") def chat_cross_bert(question, history): question_encoded = encode(question, model_bi_encoder.bert_tokenizer, model_bi_encoder.bert_model, device).squeeze().cpu().detach().numpy() cosine_similarities = cosine_similarity([question_encoded], question_embeds).flatten() topk_indices = np.argsort(cosine_similarities, axis=0)[::-1][:5] topk_indices=topk_indices.tolist() corpus = [inverted_answer[ind] for ind in topk_indices] queries = [question] * len(corpus) tokenized_texts = model_cross_encoder.bert_tokenizer( queries, corpus, max_length=MAX_LENGTH, padding=True, truncation=True, return_tensors="pt" ).to(device) # Finetuned CrossEncoder model scoring with torch.no_grad(): ce_scores = model_cross_encoder(tokenized_texts['input_ids'], tokenized_texts['attention_mask']).squeeze(-1) ce_scores = torch.sigmoid(ce_scores) # Apply sigmoid if needed # Process scores for finetuned model scores = ce_scores.cpu().numpy() ix = np.argmax(scores) # print(f"{corpus[scores_ix]}") return corpus[ix] # gradio part def echo(message, history, model): if model=="TF-IDF": # answer = chat_tfidf(message) answer = chat_tfidf_context(message, history) return answer elif model=="W2V": # answer = chat_word2vec(message) answer = chat_word2vec_context(message, history) return answer elif model=="BERT": answer = chat_bert_context(message, history) return answer elif model=="Bi-BERT-Encoder": answer = chat_bi_bert(message, history) return answer elif model=="Bi+Cross-BERT-Encoder": answer = chat_cross_bert(message, history) return answer title = "Chatbot who speaks like Rachel from Friends" description = "You have a good opportunity to have a dialog with actress from Friends - Rachel Green" # model = gr.CheckboxGroup(["TF-IDF", "W2V", "BERT", "BI-Encoder", "Cross-Encoder"], label="Model", info="What model do you want to use?", value="TF-IDF") model = gr.Dropdown(["TF-IDF", "W2V", "BERT", "Bi-BERT-Encoder", "Bi+Cross-BERT-Encoder"], label="Retrieval model", info="What model do you want to use?", value="TF-IDF") with gr.Blocks() as demo: gr.ChatInterface( fn=echo, title=title, description=description, additional_inputs=[model], retry_btn=None, undo_btn=None, clear_btn=None, ) demo.launch(debug=False, share=True)