AdvaitBERT-AI_Explanability / fun_advaitbert.py
NCTCMumbai's picture
Upload fun_advaitbert.py
9c6df82 verified
import pandas as pd
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
import sys
import random
sys.path.append('models')
from official.nlp.data import classifier_data_lib
from official.nlp.bert import tokenization
from official.nlp import optimization
tf.get_logger().setLevel('ERROR')
from huggingface_hub import InferenceClient
import math
import gradio as gr
num_warmup_steps=1
num_train_steps=1
init_lr = 3e-5
optimizer = optimization.create_optimizer(init_lr=init_lr,num_train_steps=num_train_steps,num_warmup_steps=num_warmup_steps,optimizer_type='adamw')
### Load Model
checkpoint_filepath=r'./Checkpoint'
model = tf.keras.models.load_model(checkpoint_filepath, custom_objects={'KerasLayer':hub.KerasLayer , 'AdamWeightDecay': optimizer})
df_report = pd.read_csv('./CTH_Description.csv')
df_report['CTH Code'] = df_report['CTH Code'].astype(str).str.zfill(8)
df_report_DUTY = pd.read_csv('./CTH_WISE_DUTY_RATE.csv')
df_report_DUTY['CTH'] = df_report_DUTY['CTH'].astype(str).str.zfill(8)
df = pd.read_csv("./CTH_CODE_MAP.csv")
df['CTH'] = df['CTH'].astype(str).str.zfill(8)
df = df[['CTH', 'code']]
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1")
class_names=df[['CTH','code']].drop_duplicates(subset='CTH').sort_values(by='code',ignore_index=True)['CTH'].values.tolist()
label_list=list(range(0,len(class_names)))
max_seq_length = 200 # maximum length of (token) input sequences . it can be any number
train_batch_size = 32 # batch size ( 16 choosen to avoid Out-Of-Memory errors)
# Get BERT layer and tokenizer:
# More details here: https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/4
bert_layer = hub.KerasLayer("https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/4" , trainable = True)
vocab_file = bert_layer.resolved_object.vocab_file.asset_path.numpy()
do_lower_case = bert_layer.resolved_object.do_lower_case.numpy()
tokenizer = tokenization.FullTokenizer(vocab_file , do_lower_case)
# This provides a function to convert each row to input features and label ( as required by BERT)
max_seq_length = 200 # maximum length of (token) input sequences . it can be any number
def to_feature(text, label, label_list=label_list, max_seq_length=max_seq_length, tokenizer=tokenizer):
example = classifier_data_lib.InputExample(guid = None,
text_a = text.numpy(),
text_b = None,
label = label.numpy())
feature = classifier_data_lib.convert_single_example(0 , example , label_list , max_seq_length , tokenizer)
return (feature.input_ids , feature.input_mask , feature.segment_ids , feature.label_id)
def to_feature_map(text, label):
input_ids , input_mask , segment_ids , label_id = tf.py_function(to_feature , inp = [text , label],
Tout = [tf.int32 , tf.int32 , tf.int32 , tf.int32])
input_ids.set_shape([max_seq_length])
input_mask.set_shape([max_seq_length])
segment_ids.set_shape([max_seq_length])
label_id.set_shape([])
x = {
"input_word_ids": input_ids,
"input_mask": input_mask,
"input_type_ids": segment_ids
}
return(x,label_id)
def find_max_10_with_position(arr, arr_size):
max_values_with_position = [(-sys.maxsize, -1)] * 10
for i in range(arr_size):
for j in range(5):
value, position = max_values_with_position[j]
if arr[i] > value:
max_values_with_position[j+1:] = max_values_with_position[j:9]
max_values_with_position[j] = (arr[i], i)
break
return max_values_with_position
def count_special_character(string):
special_char= 0
for i in range(len(string)):
ch = string[i]
if (string[i].isalpha()):
continue
else:
special_char += 1
if len(string)==special_char:
return False
else:
return True
def format_prompt(message, history):
prompt = "<s>"
for user_prompt, bot_response in history:
prompt += f"[INST] {user_prompt} [/INST]"
prompt += f" {bot_response}</s> "
prompt += f"[INST] {message} [/INST]"
return prompt
additional_inputs=[
gr.Textbox(
label="System Prompt",
max_lines=1,
interactive=True,
),
gr.Slider(
label="Temperature",
value=0.5,
minimum=0.0,
maximum=1.0,
step=0.05,
interactive=True,
info="Higher values produce more diverse outputs",
),
gr.Slider(
label="Max new tokens",
value=1024,
minimum=0,
maximum=4096,
step=64,
interactive=True,
info="The maximum numbers of new tokens",
),
gr.Slider(
label="Top-p (nucleus sampling)",
value=0.90,
minimum=0.0,
maximum=1,
step=0.05,
interactive=True,
info="Higher values sample more low-probability tokens",
),
gr.Slider(
label="Repetition penalty",
value=1.2,
minimum=1.0,
maximum=2.0,
step=0.05,
interactive=True,
info="Penalize repeated tokens",
)
]
def predict_CTH(txt):
print('Desc: ',txt)
global output_str_msg
if (txt!='') and len(txt)>=3 and (count_special_character(txt)):
valid_data = tf.data.Dataset.from_tensor_slices(([txt] , [1])) # 1 refers to 'entertainment' and 2 refers to 'sport'
valid_data = (valid_data.map(to_feature_map).batch(1))
preds = model.predict(valid_data)
predicted_values = tf.nn.softmax(preds)
arr = predicted_values.numpy().tolist()[0]
n = len(arr)
pred_value_max=find_max_10_with_position(arr, n)
sum_all = 0
for i in range(10):
sum_all += pred_value_max[i][0]
val_1 = pred_value_max[0][0]/sum_all
val_2 = pred_value_max[1][0]/sum_all
val_3 = pred_value_max[2][0]/sum_all
val_4 = pred_value_max[3][0]/sum_all
val_5 = pred_value_max[4][0]/sum_all
val_6 = pred_value_max[5][0]/sum_all
val_7 = pred_value_max[6][0]/sum_all
val_8 = pred_value_max[7][0]/sum_all
val_9 = pred_value_max[8][0]/sum_all
val_10 = pred_value_max[9][0]/sum_all
if pred_value_max[0][0]<=0.000131:
Var_CTH=[]
Var_desc=[]
Var_duty=[]
pred_duty=''
pred_desc=''
pred_CTH=''
output_str_msg='Not a adequate description'
return{'Not a adequate description':float(1.0)}
else:
Var_CTH=[]
Var_desc=[]
Var_duty=[]
pred_duty=''
pred_desc=''
pred_CTH=''
for i in range(len(pred_value_max)):
#predicted_code=np.where(predicted_values.numpy()==i)[1][0]
predicted_code=pred_value_max[i][1]
pred_CTH=df[df['code'] == predicted_code]['CTH'].iloc[0]
try:
pred_duty=df_report_DUTY[df_report_DUTY['CTH']==str(pred_CTH)]['DUTY_RATE'].iloc[0]
except:
pred_duty=''
pass
try:
pred_desc=df_report[df_report['CTH Code']==str(pred_CTH)]['Concat Description'].iloc[0]
except:
pred_desc=''
pass
Var_CTH.append(pred_CTH)
Var_desc.append(pred_desc)
Var_duty.append(pred_duty)
P1 ='CTH: '+str(Var_CTH[0])+' Duty Rate(%): '+ str(Var_duty[0])
P2 ='CTH: '+str(Var_CTH[1])+' Duty Rate(%): '+ str(Var_duty[1])
P3 ='CTH: '+str(Var_CTH[2])+' Duty Rate(%): '+ str(Var_duty[2])
P4 ='CTH: '+str(Var_CTH[3])+' Duty Rate(%): '+ str(Var_duty[3])
P5 ='CTH: '+str(Var_CTH[4])+' Duty Rate(%): '+ str(Var_duty[4])
P6 ='CTH: '+str(Var_CTH[5])+' Duty Rate(%): '+ str(Var_duty[5])
P7 ='CTH: '+str(Var_CTH[6])+' Duty Rate(%): '+ str(Var_duty[6])
P8 ='CTH: '+str(Var_CTH[7])+' Duty Rate(%): '+ str(Var_duty[7])
P9 ='CTH: '+str(Var_CTH[8])+' Duty Rate(%): '+ str(Var_duty[8])
P10 ='CTH: '+str(Var_CTH[9])+' Duty Rate(%): '+ str(Var_duty[9])
Q1='Desc: '+str(Var_desc[0])
Q2='Desc: '+str(Var_desc[1])
Q3='Desc: '+str(Var_desc[2])
Q4='Desc: '+str(Var_desc[3])
Q5='Desc: '+str(Var_desc[4])
Q6='Desc: '+str(Var_desc[5])
Q7='Desc: '+str(Var_desc[6])
Q8='Desc: '+str(Var_desc[7])
Q9='Desc: '+str(Var_desc[8])
Q10='Desc: '+str(Var_desc[9])
output_str_msg = (
f'1. {P1} {Q1} '
f'2. {P2} {Q2} '
f'3. {P3} {Q3} '
f'4. {P4} {Q4} '
f'5. {P5} {Q5} '
f'6. {P6} {Q6} '
f'7. {P7} {Q7} '
f'8. {P8} {Q8} '
f'9. {P9} {Q9} '
f'10. {P10} {Q10}')
#print('output_str_msg',output_str_msg)
return {str(P1):float(val_1),str(Q1):float(val_1),
str(P2):float(val_2),str(Q2):float(val_2),
str(P3):float(val_3),str(Q3):float(val_3),
str(P4):float(val_4),str(Q4):float(val_4),
str(P5):float(val_5),str(Q5):float(val_5),
str(P6):float(val_6),str(Q6):float(val_6),
str(P7):float(val_7),str(Q7):float(val_7),
str(P8):float(val_8),str(Q8):float(val_8),
str(P9):float(val_9),str(Q9):float(val_9),
str(P10):float(val_10),str(Q10):float(val_10),}
else:
output_str_msg='Not a adequate description'
return{'Enter Correct Description':float(1.0)}
def llm_model_function(txt,history,chatbot=[], temperature=0.9, max_new_tokens=1024, top_p=0.95, repetition_penalty=1.0,):
system_prompt=[]
chatbot=[]
global output_str_msg
#print('output_str_msg',output_str_msg)
if output_str_msg!='Not a adequate description':
prompt=f'First Explain What is the product- {txt}. Which is the most appropriate 8 Digit classification code out of the three given below classes. Explain the reason step by step. if none of the three classification is applicable more precisely due to lack of any additional information, tell you need additional information and what is the that additional information. {output_str_msg} ?'
temperature = float(temperature)
if temperature < 1e-2:
temperature = 1e-2
top_p = float(top_p)
generate_kwargs = dict(
temperature=temperature,
max_new_tokens=max_new_tokens,
top_p=top_p,
repetition_penalty=repetition_penalty,
do_sample=True,
seed=42,
)
formatted_prompt = format_prompt(f", {prompt}", history)
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False)
output = ""
for response in stream:
output += response.token.text
chatbot.append((txt, output))
return "", chatbot
else:
# warning_msg = f"Unexpected response"
# raise gr.Error(warning_msg)
chatbot.append(('Not a adequate description', 'Not a adequate description'))
return "", chatbot
def product_explaination(txt,history,chatbot=[], temperature=0.9, max_new_tokens=1024, top_p=0.95, repetition_penalty=1.0,):
print('Input Descrption is:',txt)
chatbot=[]
prompt=f'What is the product- {txt}?'
#print('prompt',prompt)
temperature = float(temperature)
if temperature < 1e-2:
temperature = 1e-2
top_p = float(top_p)
generate_kwargs = dict(
temperature=temperature,
max_new_tokens=max_new_tokens,
top_p=top_p,
repetition_penalty=repetition_penalty,
do_sample=True,
seed=42,
)
formatted_prompt = format_prompt(f", {prompt}", history)
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False)
output = ""
for response in stream:
output += response.token.text
chatbot.append((txt, output))
return "", chatbot