Spaces:
Sleeping
Sleeping
import requests | |
import gradio as gr | |
from ragatouille import RAGPretrainedModel | |
import logging | |
from pathlib import Path | |
from time import perf_counter | |
from sentence_transformers import CrossEncoder | |
from huggingface_hub import InferenceClient | |
from jinja2 import Environment, FileSystemLoader | |
import numpy as np | |
from os import getenv | |
from backend.query_llm import generate_hf, generate_qwen | |
from backend.semantic_search import table, retriever | |
from huggingface_hub import InferenceClient | |
# Bhashini API translation function | |
api_key = getenv('API_KEY') | |
user_id = getenv('USER_ID') | |
def bhashini_translate(text: str, from_code: str = "en", to_code: str = "hi") -> dict: | |
"""Translates text from source language to target language using the Bhashini API.""" | |
if not text.strip(): | |
print('Input text is empty. Please provide valid text for translation.') | |
return {"status_code": 400, "message": "Input text is empty", "translated_content": None, "speech_content": None} | |
else: | |
print('Input text - ',text) | |
print(f'Starting translation process from {from_code} to {to_code}...') | |
print(f'Starting translation process from {from_code} to {to_code}...') | |
gr.Warning(f'Translating to {to_code}...') | |
url = 'https://meity-auth.ulcacontrib.org/ulca/apis/v0/model/getModelsPipeline' | |
headers = { | |
"Content-Type": "application/json", | |
"userID": user_id, | |
"ulcaApiKey": api_key | |
} | |
payload = { | |
"pipelineTasks": [{"taskType": "translation", "config": {"language": {"sourceLanguage": from_code, "targetLanguage": to_code}}}], | |
"pipelineRequestConfig": {"pipelineId": "64392f96daac500b55c543cd"} | |
} | |
print('Sending initial request to get the pipeline...') | |
response = requests.post(url, json=payload, headers=headers) | |
if response.status_code != 200: | |
print(f'Error in initial request: {response.status_code}') | |
return {"status_code": response.status_code, "message": "Error in translation request", "translated_content": None} | |
print('Initial request successful, processing response...') | |
response_data = response.json() | |
service_id = response_data["pipelineResponseConfig"][0]["config"][0]["serviceId"] | |
callback_url = response_data["pipelineInferenceAPIEndPoint"]["callbackUrl"] | |
print(f'Service ID: {service_id}, Callback URL: {callback_url}') | |
headers2 = { | |
"Content-Type": "application/json", | |
response_data["pipelineInferenceAPIEndPoint"]["inferenceApiKey"]["name"]: response_data["pipelineInferenceAPIEndPoint"]["inferenceApiKey"]["value"] | |
} | |
compute_payload = { | |
"pipelineTasks": [{"taskType": "translation", "config": {"language": {"sourceLanguage": from_code, "targetLanguage": to_code}, "serviceId": service_id}}], | |
"inputData": {"input": [{"source": text}], "audio": [{"audioContent": None}]} | |
} | |
print(f'Sending translation request with text: "{text}"') | |
compute_response = requests.post(callback_url, json=compute_payload, headers=headers2) | |
if compute_response.status_code != 200: | |
print(f'Error in translation request: {compute_response.status_code}') | |
return {"status_code": compute_response.status_code, "message": "Error in translation", "translated_content": None} | |
print('Translation request successful, processing translation...') | |
compute_response_data = compute_response.json() | |
translated_content = compute_response_data["pipelineResponse"][0]["output"][0]["target"] | |
print(f'Translation successful. Translated content: "{translated_content}"') | |
return {"status_code": 200, "message": "Translation successful", "translated_content": translated_content} | |
# Existing chatbot functions | |
VECTOR_COLUMN_NAME = "vector" | |
TEXT_COLUMN_NAME = "text" | |
HF_TOKEN = getenv("HUGGING_FACE_HUB_TOKEN") | |
proj_dir = Path(__file__).parent | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1", token=HF_TOKEN) | |
env = Environment(loader=FileSystemLoader(proj_dir / 'templates')) | |
template = env.get_template('template.j2') | |
template_html = env.get_template('template_html.j2') | |
# def add_text(history, text): | |
# history = [] if history is None else history | |
# history = history + [(text, None)] | |
# return history, gr.Textbox(value="", interactive=False) | |
def bot(history, cross_encoder): | |
top_rerank = 25 | |
top_k_rank = 20 | |
query = history[-1][0] if history else '' | |
print('\nQuery: ',query ) | |
print('\nHistory:',history) | |
if not query: | |
gr.Warning("Please submit a non-empty string as a prompt") | |
raise ValueError("Empty string was submitted") | |
logger.warning('Retrieving documents...') | |
if cross_encoder == '(HIGH ACCURATE) ColBERT': | |
gr.Warning('Retrieving using ColBERT.. First time query will take a minute for model to load..pls wait') | |
RAG = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0") | |
RAG_db = RAG.from_index('.ragatouille/colbert/indexes/cbseclass10index') | |
documents_full = RAG_db.search(query, k=top_k_rank) | |
documents = [item['content'] for item in documents_full] | |
prompt = template.render(documents=documents, query=query) | |
prompt_html = template_html.render(documents=documents, query=query) | |
generate_fn = generate_hf | |
history[-1][1] = "" | |
for character in generate_fn(prompt, history[:-1]): | |
history[-1][1] = character | |
yield history, prompt_html | |
else: | |
document_start = perf_counter() | |
query_vec = retriever.encode(query) | |
doc1 = table.search(query_vec, vector_column_name=VECTOR_COLUMN_NAME).limit(top_k_rank) | |
documents = table.search(query_vec, vector_column_name=VECTOR_COLUMN_NAME).limit(top_rerank).to_list() | |
documents = [doc[TEXT_COLUMN_NAME] for doc in documents] | |
query_doc_pair = [[query, doc] for doc in documents] | |
if cross_encoder == '(FAST) MiniLM-L6v2': | |
cross_encoder1 = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') | |
elif cross_encoder == '(ACCURATE) BGE reranker': | |
cross_encoder1 = CrossEncoder('BAAI/bge-reranker-base') | |
cross_scores = cross_encoder1.predict(query_doc_pair) | |
sim_scores_argsort = list(reversed(np.argsort(cross_scores))) | |
documents = [documents[idx] for idx in sim_scores_argsort[:top_k_rank]] | |
document_time = perf_counter() - document_start | |
prompt = template.render(documents=documents, query=query) | |
prompt_html = template_html.render(documents=documents, query=query) | |
#generate_fn = generate_hf | |
generate_fn=generate_qwen | |
# Create a new history entry instead of modifying the tuple directly | |
new_history = history[:-1] + [ (prompt, "") ] # query replaced prompt | |
output='' | |
# for character in generate_fn(prompt, history[:-1]): | |
# #new_history[-1] = (query, character) | |
# output+=character | |
output=generate_fn(prompt, history[:-1]) | |
print('Output:',output) | |
new_history[-1] = (prompt, output) #query replaced with prompt | |
print('New History',new_history) | |
#print('prompt html',prompt_html)# Update the last tuple with new text | |
history_list = list(history[-1]) | |
history_list[1] = output # Assuming `character` is what you want to assign | |
# Update the history with the modified list converted back to a tuple | |
history[-1] = tuple(history_list) | |
#history[-1][1] = character | |
# yield new_history, prompt_html | |
yield history, prompt_html | |
# new_history,prompt_html | |
# history[-1][1] = "" | |
# for character in generate_fn(prompt, history[:-1]): | |
# history[-1][1] = character | |
# yield history, prompt_html | |
#def translate_text(response_text, selected_language): | |
def translate_text(selected_language,history): | |
iso_language_codes = { | |
"Hindi": "hi", | |
"Gom": "gom", | |
"Kannada": "kn", | |
"Dogri": "doi", | |
"Bodo": "brx", | |
"Urdu": "ur", | |
"Tamil": "ta", | |
"Kashmiri": "ks", | |
"Assamese": "as", | |
"Bengali": "bn", | |
"Marathi": "mr", | |
"Sindhi": "sd", | |
"Maithili": "mai", | |
"Punjabi": "pa", | |
"Malayalam": "ml", | |
"Manipuri": "mni", | |
"Telugu": "te", | |
"Sanskrit": "sa", | |
"Nepali": "ne", | |
"Santali": "sat", | |
"Gujarati": "gu", | |
"Odia": "or" | |
} | |
to_code = iso_language_codes[selected_language] | |
response_text = history[-1][1] if history else '' | |
print('response_text for translation',response_text) | |
translation = bhashini_translate(response_text, to_code=to_code) | |
return translation['translated_content'] | |
# Gradio interface | |
with gr.Blocks(theme='gradio/soft') as CHATBOT: | |
history_state = gr.State([]) | |
with gr.Row(): | |
with gr.Column(scale=10): | |
gr.HTML(value="""<div style="color: #FF4500;"><h1>Welcome! I am your friend!</h1>Ask me !I will help you<h1><span style="color: #008000">I AM A CHATBOT FOR 10 SOCIAL WITH TRANSLATION IN 22 LANGUAGES</span></h1></div>""") | |
gr.HTML(value=f"""<p style="font-family: sans-serif; font-size: 16px;">A free chat bot developed by K.M.RAMYASRI,TGT,GHS.SUTHUKENY using Open source LLMs for 10 std students</p>""") | |
gr.HTML(value=f"""<p style="font-family: Arial, sans-serif; font-size: 14px;"> Suggestions may be sent to <a href="mailto:ramyasriraman2019@gmail.com" style="color: #00008B; font-style: italic;">ramyadevi1607@yahoo.com</a>.</p>""") | |
with gr.Column(scale=3): | |
gr.Image(value='logo.png', height=200, width=200) | |
chatbot = gr.Chatbot( | |
[], | |
elem_id="chatbot", | |
avatar_images=('https://aui.atlassian.com/aui/8.8/docs/images/avatar-person.svg', | |
'https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.svg'), | |
bubble_full_width=False, | |
show_copy_button=True, | |
show_share_button=True, | |
) | |
with gr.Row(): | |
txt = gr.Textbox( | |
scale=3, | |
show_label=False, | |
placeholder="Enter text and press enter", | |
container=False, | |
) | |
txt_btn = gr.Button(value="Submit text", scale=1) | |
cross_encoder = gr.Radio(choices=['(FAST) MiniLM-L6v2', '(ACCURATE) BGE reranker', '(HIGH ACCURATE) ColBERT'], value='(ACCURATE) BGE reranker', label="Embeddings", info="Only First query to Colbert may take little time)") | |
language_dropdown = gr.Dropdown( | |
choices=[ | |
"Hindi", "Gom", "Kannada", "Dogri", "Bodo", "Urdu", "Tamil", "Kashmiri", "Assamese", "Bengali", "Marathi", | |
"Sindhi", "Maithili", "Punjabi", "Malayalam", "Manipuri", "Telugu", "Sanskrit", "Nepali", "Santali", | |
"Gujarati", "Odia" | |
], | |
value="Hindi", # default to Hindi | |
label="Select Language for Translation" | |
) | |
prompt_html = gr.HTML() | |
translated_textbox = gr.Textbox(label="Translated Response") | |
def update_history_and_translate(txt, cross_encoder, history_state, language_dropdown): | |
print('History state',history_state) | |
history = history_state | |
history.append((txt, "")) | |
#history_state.value=(history) | |
# Call bot function | |
# bot_output = list(bot(history, cross_encoder)) | |
bot_output = next(bot(history, cross_encoder)) | |
print('bot_output',bot_output) | |
#history, prompt_html = bot_output[-1] | |
history, prompt_html = bot_output | |
print('History',history) | |
# Update the history state | |
history_state[:] = history | |
# Translate text | |
translated_text = translate_text(language_dropdown, history) | |
return history, prompt_html, translated_text | |
txt_msg = txt_btn.click(update_history_and_translate, [txt, cross_encoder, history_state, language_dropdown], [chatbot, prompt_html, translated_textbox]) | |
txt_msg = txt.submit(update_history_and_translate, [txt, cross_encoder, history_state, language_dropdown], [chatbot, prompt_html, translated_textbox]) | |
examples = ['WHAT IS POWER SHARING?','WHAT IS THE REASON FOR RISE OF NATIONALISM IN INDIA?'] | |
gr.Examples(examples, txt) | |
# Launch the Gradio application | |
CHATBOT.launch(share=True,debug=True) | |