|
import pandas as pd |
|
import numpy as np |
|
import tensorflow as tf |
|
import tensorflow_hub as hub |
|
import sys |
|
import random |
|
sys.path.append('models') |
|
from official.nlp.data import classifier_data_lib |
|
from official.nlp.bert import tokenization |
|
from official.nlp import optimization |
|
tf.get_logger().setLevel('ERROR') |
|
from huggingface_hub import InferenceClient |
|
import math |
|
import gradio as gr |
|
|
|
num_warmup_steps=1 |
|
num_train_steps=1 |
|
init_lr = 3e-5 |
|
optimizer = optimization.create_optimizer(init_lr=init_lr,num_train_steps=num_train_steps,num_warmup_steps=num_warmup_steps,optimizer_type='adamw') |
|
|
|
|
|
checkpoint_filepath=r'./Checkpoint' |
|
model = tf.keras.models.load_model(checkpoint_filepath, custom_objects={'KerasLayer':hub.KerasLayer , 'AdamWeightDecay': optimizer}) |
|
|
|
df_report = pd.read_csv('./CTH_Description.csv') |
|
df_report['CTH Code'] = df_report['CTH Code'].astype(str).str.zfill(8) |
|
|
|
df_report_DUTY = pd.read_csv('./CTH_WISE_DUTY_RATE.csv') |
|
df_report_DUTY['CTH'] = df_report_DUTY['CTH'].astype(str).str.zfill(8) |
|
|
|
df = pd.read_csv("./CTH_CODE_MAP.csv") |
|
df['CTH'] = df['CTH'].astype(str).str.zfill(8) |
|
df = df[['CTH', 'code']] |
|
|
|
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") |
|
|
|
|
|
class_names=df[['CTH','code']].drop_duplicates(subset='CTH').sort_values(by='code',ignore_index=True)['CTH'].values.tolist() |
|
label_list=list(range(0,len(class_names))) |
|
max_seq_length = 200 |
|
train_batch_size = 32 |
|
|
|
|
|
|
|
bert_layer = hub.KerasLayer("https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/4" , trainable = True) |
|
vocab_file = bert_layer.resolved_object.vocab_file.asset_path.numpy() |
|
do_lower_case = bert_layer.resolved_object.do_lower_case.numpy() |
|
tokenizer = tokenization.FullTokenizer(vocab_file , do_lower_case) |
|
|
|
|
|
|
|
max_seq_length = 200 |
|
def to_feature(text, label, label_list=label_list, max_seq_length=max_seq_length, tokenizer=tokenizer): |
|
example = classifier_data_lib.InputExample(guid = None, |
|
text_a = text.numpy(), |
|
text_b = None, |
|
label = label.numpy()) |
|
feature = classifier_data_lib.convert_single_example(0 , example , label_list , max_seq_length , tokenizer) |
|
|
|
return (feature.input_ids , feature.input_mask , feature.segment_ids , feature.label_id) |
|
|
|
|
|
def to_feature_map(text, label): |
|
input_ids , input_mask , segment_ids , label_id = tf.py_function(to_feature , inp = [text , label], |
|
Tout = [tf.int32 , tf.int32 , tf.int32 , tf.int32]) |
|
|
|
input_ids.set_shape([max_seq_length]) |
|
input_mask.set_shape([max_seq_length]) |
|
segment_ids.set_shape([max_seq_length]) |
|
label_id.set_shape([]) |
|
|
|
x = { |
|
"input_word_ids": input_ids, |
|
"input_mask": input_mask, |
|
"input_type_ids": segment_ids |
|
} |
|
|
|
return(x,label_id) |
|
|
|
|
|
def find_max_10_with_position(arr, arr_size): |
|
max_values_with_position = [(-sys.maxsize, -1)] * 10 |
|
|
|
for i in range(arr_size): |
|
for j in range(5): |
|
value, position = max_values_with_position[j] |
|
if arr[i] > value: |
|
max_values_with_position[j+1:] = max_values_with_position[j:9] |
|
max_values_with_position[j] = (arr[i], i) |
|
break |
|
|
|
return max_values_with_position |
|
|
|
def count_special_character(string): |
|
special_char= 0 |
|
for i in range(len(string)): |
|
ch = string[i] |
|
if (string[i].isalpha()): |
|
continue |
|
else: |
|
special_char += 1 |
|
|
|
if len(string)==special_char: |
|
return False |
|
else: |
|
return True |
|
|
|
def format_prompt(message, history): |
|
prompt = "<s>" |
|
for user_prompt, bot_response in history: |
|
prompt += f"[INST] {user_prompt} [/INST]" |
|
prompt += f" {bot_response}</s> " |
|
prompt += f"[INST] {message} [/INST]" |
|
return prompt |
|
|
|
|
|
additional_inputs=[ |
|
gr.Textbox( |
|
label="System Prompt", |
|
max_lines=1, |
|
interactive=True, |
|
), |
|
gr.Slider( |
|
label="Temperature", |
|
value=0.5, |
|
minimum=0.0, |
|
maximum=1.0, |
|
step=0.05, |
|
interactive=True, |
|
info="Higher values produce more diverse outputs", |
|
), |
|
gr.Slider( |
|
label="Max new tokens", |
|
value=1024, |
|
minimum=0, |
|
maximum=4096, |
|
step=64, |
|
interactive=True, |
|
info="The maximum numbers of new tokens", |
|
), |
|
gr.Slider( |
|
label="Top-p (nucleus sampling)", |
|
value=0.90, |
|
minimum=0.0, |
|
maximum=1, |
|
step=0.05, |
|
interactive=True, |
|
info="Higher values sample more low-probability tokens", |
|
), |
|
gr.Slider( |
|
label="Repetition penalty", |
|
value=1.2, |
|
minimum=1.0, |
|
maximum=2.0, |
|
step=0.05, |
|
interactive=True, |
|
info="Penalize repeated tokens", |
|
) |
|
] |
|
|
|
def predict_CTH(txt): |
|
print('Desc: ',txt) |
|
global output_str_msg |
|
if (txt!='') and len(txt)>=3 and (count_special_character(txt)): |
|
valid_data = tf.data.Dataset.from_tensor_slices(([txt] , [1])) |
|
valid_data = (valid_data.map(to_feature_map).batch(1)) |
|
preds = model.predict(valid_data) |
|
predicted_values = tf.nn.softmax(preds) |
|
arr = predicted_values.numpy().tolist()[0] |
|
n = len(arr) |
|
|
|
pred_value_max=find_max_10_with_position(arr, n) |
|
|
|
sum_all = 0 |
|
for i in range(10): |
|
sum_all += pred_value_max[i][0] |
|
|
|
|
|
val_1 = pred_value_max[0][0]/sum_all |
|
val_2 = pred_value_max[1][0]/sum_all |
|
val_3 = pred_value_max[2][0]/sum_all |
|
val_4 = pred_value_max[3][0]/sum_all |
|
val_5 = pred_value_max[4][0]/sum_all |
|
val_6 = pred_value_max[5][0]/sum_all |
|
val_7 = pred_value_max[6][0]/sum_all |
|
val_8 = pred_value_max[7][0]/sum_all |
|
val_9 = pred_value_max[8][0]/sum_all |
|
val_10 = pred_value_max[9][0]/sum_all |
|
|
|
if pred_value_max[0][0]<=0.000131: |
|
Var_CTH=[] |
|
Var_desc=[] |
|
Var_duty=[] |
|
pred_duty='' |
|
pred_desc='' |
|
pred_CTH='' |
|
|
|
output_str_msg='Not a adequate description' |
|
|
|
return{'Not a adequate description':float(1.0)} |
|
else: |
|
Var_CTH=[] |
|
Var_desc=[] |
|
Var_duty=[] |
|
pred_duty='' |
|
pred_desc='' |
|
pred_CTH='' |
|
|
|
for i in range(len(pred_value_max)): |
|
|
|
predicted_code=pred_value_max[i][1] |
|
pred_CTH=df[df['code'] == predicted_code]['CTH'].iloc[0] |
|
|
|
try: |
|
pred_duty=df_report_DUTY[df_report_DUTY['CTH']==str(pred_CTH)]['DUTY_RATE'].iloc[0] |
|
except: |
|
pred_duty='' |
|
pass |
|
|
|
|
|
try: |
|
pred_desc=df_report[df_report['CTH Code']==str(pred_CTH)]['Concat Description'].iloc[0] |
|
except: |
|
pred_desc='' |
|
pass |
|
|
|
Var_CTH.append(pred_CTH) |
|
Var_desc.append(pred_desc) |
|
Var_duty.append(pred_duty) |
|
|
|
P1 ='CTH: '+str(Var_CTH[0])+' Duty Rate(%): '+ str(Var_duty[0]) |
|
P2 ='CTH: '+str(Var_CTH[1])+' Duty Rate(%): '+ str(Var_duty[1]) |
|
P3 ='CTH: '+str(Var_CTH[2])+' Duty Rate(%): '+ str(Var_duty[2]) |
|
P4 ='CTH: '+str(Var_CTH[3])+' Duty Rate(%): '+ str(Var_duty[3]) |
|
P5 ='CTH: '+str(Var_CTH[4])+' Duty Rate(%): '+ str(Var_duty[4]) |
|
P6 ='CTH: '+str(Var_CTH[5])+' Duty Rate(%): '+ str(Var_duty[5]) |
|
P7 ='CTH: '+str(Var_CTH[6])+' Duty Rate(%): '+ str(Var_duty[6]) |
|
P8 ='CTH: '+str(Var_CTH[7])+' Duty Rate(%): '+ str(Var_duty[7]) |
|
P9 ='CTH: '+str(Var_CTH[8])+' Duty Rate(%): '+ str(Var_duty[8]) |
|
P10 ='CTH: '+str(Var_CTH[9])+' Duty Rate(%): '+ str(Var_duty[9]) |
|
|
|
Q1='Desc: '+str(Var_desc[0]) |
|
Q2='Desc: '+str(Var_desc[1]) |
|
Q3='Desc: '+str(Var_desc[2]) |
|
Q4='Desc: '+str(Var_desc[3]) |
|
Q5='Desc: '+str(Var_desc[4]) |
|
Q6='Desc: '+str(Var_desc[5]) |
|
Q7='Desc: '+str(Var_desc[6]) |
|
Q8='Desc: '+str(Var_desc[7]) |
|
Q9='Desc: '+str(Var_desc[8]) |
|
Q10='Desc: '+str(Var_desc[9]) |
|
|
|
output_str_msg = ( |
|
f'1. {P1} {Q1} ' |
|
f'2. {P2} {Q2} ' |
|
f'3. {P3} {Q3} ' |
|
f'4. {P4} {Q4} ' |
|
f'5. {P5} {Q5} ' |
|
f'6. {P6} {Q6} ' |
|
f'7. {P7} {Q7} ' |
|
f'8. {P8} {Q8} ' |
|
f'9. {P9} {Q9} ' |
|
f'10. {P10} {Q10}') |
|
|
|
|
|
|
|
return {str(P1):float(val_1),str(Q1):float(val_1), |
|
str(P2):float(val_2),str(Q2):float(val_2), |
|
str(P3):float(val_3),str(Q3):float(val_3), |
|
str(P4):float(val_4),str(Q4):float(val_4), |
|
str(P5):float(val_5),str(Q5):float(val_5), |
|
str(P6):float(val_6),str(Q6):float(val_6), |
|
str(P7):float(val_7),str(Q7):float(val_7), |
|
str(P8):float(val_8),str(Q8):float(val_8), |
|
str(P9):float(val_9),str(Q9):float(val_9), |
|
str(P10):float(val_10),str(Q10):float(val_10),} |
|
else: |
|
output_str_msg='Not a adequate description' |
|
return{'Enter Correct Description':float(1.0)} |
|
|
|
def llm_model_function(txt,history,chatbot=[], temperature=0.9, max_new_tokens=1024, top_p=0.95, repetition_penalty=1.0,): |
|
system_prompt=[] |
|
chatbot=[] |
|
|
|
global output_str_msg |
|
|
|
|
|
|
|
if output_str_msg!='Not a adequate description': |
|
|
|
prompt=f'First Explain What is the product- {txt}. Which is the most appropriate 8 Digit classification code out of the three given below classes. Explain the reason step by step. if none of the three classification is applicable more precisely due to lack of any additional information, tell you need additional information and what is the that additional information. {output_str_msg} ?' |
|
|
|
temperature = float(temperature) |
|
if temperature < 1e-2: |
|
temperature = 1e-2 |
|
top_p = float(top_p) |
|
|
|
generate_kwargs = dict( |
|
temperature=temperature, |
|
max_new_tokens=max_new_tokens, |
|
top_p=top_p, |
|
repetition_penalty=repetition_penalty, |
|
do_sample=True, |
|
seed=42, |
|
) |
|
|
|
formatted_prompt = format_prompt(f", {prompt}", history) |
|
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) |
|
output = "" |
|
for response in stream: |
|
output += response.token.text |
|
|
|
chatbot.append((txt, output)) |
|
return "", chatbot |
|
else: |
|
|
|
|
|
chatbot.append(('Not a adequate description', 'Not a adequate description')) |
|
return "", chatbot |
|
|
|
def product_explaination(txt,history,chatbot=[], temperature=0.9, max_new_tokens=1024, top_p=0.95, repetition_penalty=1.0,): |
|
print('Input Descrption is:',txt) |
|
chatbot=[] |
|
prompt=f'What is the product- {txt}?' |
|
|
|
temperature = float(temperature) |
|
if temperature < 1e-2: |
|
temperature = 1e-2 |
|
top_p = float(top_p) |
|
|
|
generate_kwargs = dict( |
|
temperature=temperature, |
|
max_new_tokens=max_new_tokens, |
|
top_p=top_p, |
|
repetition_penalty=repetition_penalty, |
|
do_sample=True, |
|
seed=42, |
|
) |
|
|
|
formatted_prompt = format_prompt(f", {prompt}", history) |
|
|
|
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) |
|
output = "" |
|
|
|
for response in stream: |
|
output += response.token.text |
|
|
|
chatbot.append((txt, output)) |
|
return "", chatbot |