import streamlit as st import plotly.graph_objects as go from transformers import pipeline import re import time import requests from PIL import Image import itertools import numpy as np import matplotlib.pyplot as plt from matplotlib.colors import rgb2hex import matplotlib from matplotlib.colors import ListedColormap, rgb2hex import ipywidgets as widgets from IPython.display import display, HTML import re import pandas as pd from pprint import pprint from tenacity import retry from tqdm import tqdm import tiktoken import scipy.stats import inseq import torch from transformers import AutoModelForCausalLM from transformers import GPT2LMHeadModel import tiktoken import seaborn as sns from transformers import AutoTokenizer, AutoModelForSeq2SeqLM # from colorama import Fore, Style import openai # for OpenAI API calls ###################################### def find_indices(arr, target): indices = [] start_index = None for i, element in enumerate(arr): if target in element: if start_index is None: start_index = i else: indices.append((start_index, i - 1)) start_index = i if start_index is not None: indices.append((start_index, len(arr) - 1)) return indices ###################################### import streamlit as st def colorize_tokens(token_data, sentence): colored_sentence = "" start = 0 for token in token_data: entity_group = token["entity_group"] word = token["word"] tag = f"[{entity_group}]" tag_color = tag_colors.get(entity_group, "white") # Default to white if color not found colored_chunk = f'{word} {tag}' colored_sentence += sentence[start:token["start"]] + colored_chunk start = token["end"] # Add the remaining part of the sentence colored_sentence += sentence[start:] return colored_sentence # Define colors for the tags tag_colors = { "ADJP": "#8F6B9F", # Blue "ADVP": "#7275A7", # Green "CONJP": "#5BA4BB", # Red "INTJ": "#95CA73", # Cyan "LST": "#DFDA70", # Magenta "NP": "#EFBC65", # Yellow "PP": "#FC979B", # Purple "PRT": "#F1C5C1", # Dark Blue "SBAR": "#FAEBE8", # Dark Green "VP": "#90DFD2", # Dark Cyan } ################## ################### def generate_tagged_sentence(sentence, entity_tags): # Create a list to hold the tagged tokens tagged_tokens = [] # Process the entity tags to annotate the sentence for tag in entity_tags: start = tag['start'] end = tag['end'] if end{token}' # original_pointer += len(token) # token_pointer += 1 # else: # html += original_text[original_pointer] # original_pointer += 1 # #display(HTML(html)) # st.markdown(html, unsafe_allow_html=True) def render_heatmap(original_text, importance_scores_df): # Extract the importance scores importance_values = importance_scores_df['importance_value'].values # Check for division by zero during normalization min_val = np.min(importance_values) max_val = np.max(importance_values) if max_val - min_val != 0: normalized_importance_values = (importance_values - min_val) / (max_val - min_val) else: normalized_importance_values = np.zeros_like(importance_values) # Fallback: all-zero array # Generate a colormap for the heatmap (use "Blues") cmap = matplotlib.cm.get_cmap('Blues') # Function to determine text color based on background color def get_text_color(bg_color): brightness = 0.299 * bg_color[0] + 0.587 * bg_color[1] + 0.114 * bg_color[2] if brightness < 0.5: return 'white' else: return 'black' # Initialize pointers for the original text and token importance original_pointer = 0 token_pointer = 0 # Create an HTML representation html = "" while original_pointer < len(original_text): token = importance_scores_df.loc[token_pointer, 'token'] if original_pointer == original_text.find(token, original_pointer): importance = normalized_importance_values[token_pointer] rgba = cmap(importance) bg_color = rgba[:3] text_color = get_text_color(bg_color) html += f'{token}' original_pointer += len(token) token_pointer += 1 else: html += original_text[original_pointer] original_pointer += 1 st.markdown(html, unsafe_allow_html=True) ########################## # Create selectbox prompt_list=["Which individuals possessed the ships that were part of the Boston Tea Party?", "Freddie Frith", "Robert used PDF for his math homework." ] options = [f"Prompt #{i+1}: {prompt_list[i]}" for i in range(3)] + ["Another Prompt..."] selection = st.selectbox("Choose a prompt from the dropdown below . Click on :blue['Another Prompt...'] , if you want to enter your own custom prompt.", options=options) check=[] # if selection == "Another Prompt...": # otherOption = st.text_input("Enter your custom prompt...") # if otherOption: # st.caption(f""":white_check_mark: Your input prompt is : {otherOption}""") # st.caption(':green[Kindly hold on for a few minutes while the AI text is being generated]') # check=otherOption # st.caption(f"""{check}""") # else: # result = re.split(r'#\d+:', selection, 1) # if result: # st.caption(f""":white_check_mark: Your input prompt is : {result[1]}""") # st.caption(':green[Kindly hold on for a few minutes while the AI text is being generated]') # check=result[1] if selection == "Another Prompt...": check = st.text_input("Enter your custom prompt...") check = " " + check if check: st.caption(f""":white_check_mark: Your input prompt is : {check}""") st.caption(':green[Kindly hold on for a few minutes while the AI text is being generated]') # check=otherOption # st.caption(f"""{check}""") else: check = re.split(r'#\d+:', selection, 1)[1] if check: st.caption(f""":white_check_mark: Your input prompt is : {check}""") st.caption(':green[Kindly hold on for a few minutes while the AI text is being generated]') # check=result[1] # @st.cache_data def load_chunk_model(check): iden=['error'] while 'error' in iden: time.sleep(1) try: output = query_chunk({"inputs": f"""{check}""",}) iden = output # Update 'check' with the new result except Exception as e: print(f"An exception occurred: {e}") return output ################################## # st.write(entity_tags) ################################## # colored_output, _ = colorize_tokens(load_chunk_model(check),check) # st.caption('The below :blue[NER] tags are found for orginal prompt:') # st.markdown(colored_output, unsafe_allow_html=True) # @st.cache_resource def load_text_gen_model(check): iden=['error'] while 'error' in iden: time.sleep(1) try: output = query({ "inputs": f"""{check}""", "parameters": { "min_new_tokens": 30, "max_new_tokens": 100, "do_sample":True, #"remove_invalid_values" : True #"temperature" :0.6 # "top_k":1 # "num_beams":2, # "no_repeat_ngram_size":2, # "early_stopping":True } }) iden = output # Update 'check' with the new result except Exception as e: print(f"An exception occurred: {e}") return output[0]['generated_text'] # @st.cache_data # def load_text_gen_model(check): # return get_answers(prompt + check) def decoded_tokens(string, tokenizer): return [tokenizer.decode([x]) for x in tokenizer.encode(string)] # def analyze_heatmap(df): # sns.set_palette(sns.color_palette("viridis")) # # Create a copy of the DataFrame to prevent modification of the original # df_copy = df.copy() # # Ensure DataFrame has the required columns # if 'token' not in df_copy.columns or 'importance_value' not in df_copy.columns: # raise ValueError("The DataFrame must contain 'token' and 'importance_value' columns.") # # Add 'Position' column to the DataFrame copy # df_copy['Position'] = range(len(df_copy)) # # Plot a bar chart for importance score per token # plt.figure(figsize=(len(df_copy) * 0.3, 4)) # sns.barplot(x='token', y='importance_value', data=df_copy) # plt.xticks(rotation=45, ha='right') # plt.title('Importance Score per Token') # return plt # #plt.show() # ########################### # def analyze_heatmap(df_input): # df = df_input.copy() # df["Position"] = range(len(df)) # # Get the viridis colormap # viridis = matplotlib.cm.get_cmap("viridis") # # Create a Matplotlib figure and axis # fig, ax = plt.subplots(figsize=(10, 6)) # # Normalize the importance values # min_val = df["importance_value"].min() # max_val = df["importance_value"].max() # normalized_values = (df["importance_value"] - min_val) / (max_val - min_val) # # Create the bars, colored based on normalized importance_value # for i, (token, norm_value) in enumerate(zip(df["token"], normalized_values)): # color = viridis(norm_value) # ax.bar( # x=[i], # Use index for x-axis # height=[df["importance_value"].iloc[i]], # width=1.0, # Set the width to make bars touch each other # color=[color], # ) # # Additional styling # ax.set_title("Importance Score per Token", size=25) # ax.set_xlabel("Token") # ax.set_ylabel("Importance Value") # ax.set_xticks(range(len(df["token"]))) # ax.set_xticklabels(df["token"], rotation=45) # return fig @st.cache_data def analyze_heatmap(df_input): df = df_input.copy() df["Position"] = range(len(df)) # Get the Blues colormap blues = matplotlib.cm.get_cmap("Blues") # Create a Matplotlib figure and axis fig, ax = plt.subplots(figsize=(10, 6)) # Normalize the importance values min_val = df["importance_value"].min() max_val = df["importance_value"].max() normalized_values = (df["importance_value"] - min_val) / (max_val - min_val) # Create the bars, colored based on normalized importance_value for i, (token, norm_value) in enumerate(zip(df["token"], normalized_values)): color = blues(norm_value) ax.bar( x=[i], # Use index for x-axis height=[df["importance_value"].iloc[i]], width=1.0, # Set the width to make bars touch each other color=[color], ) # Additional styling # ax.set_title("Importance Score per Token", size=25) # ax.set_xlabel("Token") # ax.set_ylabel("Importance Value") ax.set_xticks(range(len(df["token"]))) ax.set_xticklabels(df["token"], rotation=45) return fig # def analyze_heatmap(df_input): # df = df_input.copy() # df["Position"] = range(len(df)) # # Get the viridis colormap # viridis = matplotlib.colormaps["viridis"] # # Initialize the figure # fig = go.Figure() # # Create the histogram bars with viridis coloring # # Normalize the importance values # min_val = df["importance_value"].min() # max_val = df["importance_value"].max() # normalized_values = (df["importance_value"] - min_val) / (max_val - min_val) # # Initialize the figure # fig = go.Figure() # # Create the bars, colored based on normalized importance_value # for i, (token, norm_value) in enumerate(zip(df["token"], normalized_values)): # color = f"rgb({int(viridis(norm_value)[0] * 255)}, {int(viridis(norm_value)[1] * 255)}, {int(viridis(norm_value)[2] * 255)})" # fig.add_trace( # go.Bar( # x=[i], # Use index for x-axis # y=[df["importance_value"].iloc[i]], # width=1.0, # Set the width to make bars touch each other # marker=dict(color=color), # ) # ) # # Additional styling # fig.update_layout( # title=f"Importance Score per Token", # title_font={'size': 25}, # xaxis_title="Token", # yaxis_title="Importance Value", # showlegend=False, # bargap=0, # Remove gap between bars # xaxis=dict( # Set tick labels to tokens # tickmode="array", # tickvals=list(range(len(df["token"]))), # ticktext=list(df["token"]), # ), # ) # # Rotate x-axis labels by 45 degrees # fig.update_xaxes(tickangle=45) # return fig ############################ # @st.cache_data def integrated_gradients(input_ids, baseline, model, n_steps= 10): #100 # Convert input_ids and baseline to LongTensors input_ids = input_ids.long() baseline = baseline.long() # Initialize tensor to store accumulated gradients accumulated_grads = None # Create interpolated inputs alphas = torch.linspace(0, 1, n_steps) delta = input_ids - baseline interpolates = [(baseline + (alpha * delta).long()).long() for alpha in alphas] # Explicitly cast to LongTensor # Initialize tqdm progress bar pbar = tqdm(total=n_steps, desc="Calculating Integrated Gradients") for interpolate in interpolates: # Update tqdm progress bar pbar.update(1) # Convert interpolated samples to embeddings interpolate_embedding = model.transformer.wte(interpolate).clone().detach().requires_grad_(True) # Forward pass output = model(inputs_embeds=interpolate_embedding, output_attentions=False)[0] # Aggregate the logits across all positions (using sum in this example) aggregated_logit = output.sum() # Backward pass to calculate gradients aggregated_logit.backward() # Accumulate gradients if accumulated_grads is None: accumulated_grads = interpolate_embedding.grad.clone() else: accumulated_grads += interpolate_embedding.grad # Clear gradients model.zero_grad() interpolate_embedding.grad.zero_() # Close tqdm progress bar pbar.close() # Compute average gradients avg_grads = accumulated_grads / n_steps # Compute attributions with torch.no_grad(): input_embedding = model.transformer.wte(input_ids) baseline_embedding = model.transformer.wte(baseline) attributions = (input_embedding - baseline_embedding) * avg_grads return attributions # @st.cache_data def process_integrated_gradients(input_text, _gpt2tokenizer, model): inputs = torch.tensor([_gpt2tokenizer.encode(input_text)]) gpt2tokens = decoded_tokens(input_text, _gpt2tokenizer) with torch.no_grad(): outputs = model(inputs, output_attentions=True, output_hidden_states=True) attentions = outputs[-1] # Initialize a baseline as zero tensor baseline = torch.zeros_like(inputs).long() # Compute Integrated Gradients targeting the aggregated sequence output attributions = integrated_gradients(inputs, baseline, model) # Convert tensors to numpy array for easier manipulation attributions_np = attributions.detach().numpy().sum(axis=2) # Sum across the embedding dimensions to get a single attribution score per token attributions_sum = attributions.sum(axis=2).squeeze(0).detach().numpy() l2_norm_attributions = np.linalg.norm(attributions_sum, 2) normalized_attributions_sum = attributions_sum / l2_norm_attributions clamped_attributions_sum = np.where(normalized_attributions_sum < 0, 0, normalized_attributions_sum) attribution_df = pd.DataFrame({ 'token': gpt2tokens, 'importance_value': clamped_attributions_sum }) return attribution_df ######################## model_type = 'gpt2' model_version = 'gpt2' model = GPT2LMHeadModel.from_pretrained(model_version, output_attentions=True) _gpt2tokenizer = tiktoken.get_encoding("gpt2") ####################### para_tokenizer = AutoTokenizer.from_pretrained("humarin/chatgpt_paraphraser_on_T5_base") para_model = AutoModelForSeq2SeqLM.from_pretrained("humarin/chatgpt_paraphraser_on_T5_base") ###################### @st.cache_resource def paraphrase( question, num_beams=5, num_beam_groups=5, num_return_sequences=5, repetition_penalty=10.0, diversity_penalty=3.0, no_repeat_ngram_size=2, temperature=0.7, max_length=64 #128 ): input_ids = para_tokenizer( f'paraphrase: {question}', return_tensors="pt", padding="longest", max_length=max_length, truncation=True, ).input_ids outputs = para_model.generate( input_ids, temperature=temperature, repetition_penalty=repetition_penalty, num_return_sequences=num_return_sequences, no_repeat_ngram_size=no_repeat_ngram_size, num_beams=num_beams, num_beam_groups=num_beam_groups, max_length=max_length, diversity_penalty=diversity_penalty ) res = para_tokenizer.batch_decode(outputs, skip_special_tokens=True) return res ########################### class SentenceAnalyzer: def __init__(self, check, original, _gpt2tokenizer, model): self.check = check self.original = original self._gpt2tokenizer = _gpt2tokenizer self.model = model self.entity_tags = load_chunk_model(check) self.tagged_sentence = generate_tagged_sentence(check, self.entity_tags) self.sentence_with_pause = replace_pp_with_pause(check, self.entity_tags) self.split_sentences = get_split_sentences(check, self.entity_tags) self.colored_output = colorize_tokens(self.entity_tags, check) def analyze(self): # st.caption(f"The below :blue[shallow parsing] tags are found for {self.original} prompt:") # st.markdown(self.colored_output, unsafe_allow_html=True) attribution_df1 = process_integrated_gradients(self.check, self._gpt2tokenizer, self.model) st.caption(f":blue[{self.original}]:") render_heatmap(self.check, attribution_df1) # st.write("Original") st.pyplot(analyze_heatmap(attribution_df1)) # st.write("After [PAUSE]") # st.write("Sentence with [PAUSE] Replacement:", self.sentence_with_pause) dataframes_list = [] for i, split_sentence in enumerate(self.split_sentences): # st.write(f"Sentence {i + 1} : {split_sentence}") attribution_df1 = process_integrated_gradients(split_sentence, self._gpt2tokenizer, self.model) if i < len(self.split_sentences) - 1: # Add a row with [PAUSE] and value 0 at the end pause_row = pd.DataFrame({'token': '[PAUSE]', 'importance_value': 0},index=[len(attribution_df1)]) # pause_row = pd.DataFrame({'', '': 0},index=[len(attribution_df1)]) attribution_df1 = pd.concat([attribution_df1,pause_row], ignore_index=True) dataframes_list.append(attribution_df1) # After the loop, you can concatenate the dataframes in the list if needed if dataframes_list: combined_dataframe = pd.concat(dataframes_list, axis=0) combined_dataframe = combined_dataframe[combined_dataframe['token'] != " "].reset_index(drop=True) combined_dataframe1 = combined_dataframe[combined_dataframe['token'] != "[PAUSE]"] combined_dataframe1.reset_index(drop=True, inplace=True) st.write(f"Sentence with [PAUSE] Replacement:") # st.dataframe(combined_dataframe1) render_heatmap(self.sentence_with_pause,combined_dataframe1) # render_heatmap(self.sentence_with_pause,combined_dataframe) st.pyplot(analyze_heatmap(combined_dataframe)) paraphrase_list=paraphrase(check) # st.write(paraphrase_list) ###################### col1, col2 = st.columns(2) with col1: analyzer = SentenceAnalyzer(check, "Original Prompt", _gpt2tokenizer, model) analyzer.analyze() with col2: ai_gen_text=load_text_gen_model(check) st.caption(':blue[AI generated text by GPT4]') st.write(ai_gen_text) #st.markdown("""
""", unsafe_allow_html=True) st.markdown("""
""", unsafe_allow_html=True) col3, col4 = st.columns(2) with col3: analyzer = SentenceAnalyzer(" "+paraphrase_list[0], "Paraphrase 1", _gpt2tokenizer, model) analyzer.analyze() with col4: ai_gen_text=load_text_gen_model(paraphrase_list[0]) st.caption(':blue[AI generated text by GPT4]') st.write(ai_gen_text) st.markdown("""
""", unsafe_allow_html=True) col5, col6 = st.columns(2) with col5: analyzer = SentenceAnalyzer(" "+paraphrase_list[1], "Paraphrase 2", _gpt2tokenizer, model) analyzer.analyze() with col6: ai_gen_text=load_text_gen_model(paraphrase_list[1]) st.caption(':blue[AI generated text by GPT4]') st.write(ai_gen_text) st.markdown("""
""", unsafe_allow_html=True) col7, col8 = st.columns(2) with col7: analyzer = SentenceAnalyzer(" "+paraphrase_list[2], "Paraphrase 3", _gpt2tokenizer, model) analyzer.analyze() with col8: ai_gen_text=load_text_gen_model(paraphrase_list[2]) st.caption(':blue[AI generated text by GPT4]') st.write(ai_gen_text) st.markdown("""
""", unsafe_allow_html=True) col9, col10 = st.columns(2) with col9: analyzer = SentenceAnalyzer(" "+paraphrase_list[3], "Paraphrase 4", _gpt2tokenizer, model) analyzer.analyze() with col10: ai_gen_text=load_text_gen_model(paraphrase_list[3]) st.caption(':blue[AI generated text by GPT4]') st.write(ai_gen_text) st.markdown("""
""", unsafe_allow_html=True) col11, col12 = st.columns(2) with col11: analyzer = SentenceAnalyzer(" "+paraphrase_list[4], "Paraphrase 5", _gpt2tokenizer, model) analyzer.analyze() with col12: ai_gen_text=load_text_gen_model(paraphrase_list[4]) st.caption(':blue[AI generated text by GPT4]') st.write(ai_gen_text)