Spaces:
Sleeping
Sleeping
import time | |
import gradio as gr | |
import logging | |
from langchain_community.document_loaders import OnlinePDFLoader | |
# from langchain.document_loaders import PDFMinerLoader,CSVLoader ,UnstructuredWordDocumentLoader,TextLoader,OnlinePDFLoader | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain_community.embeddings import SentenceTransformerEmbeddings | |
from langchain.vectorstores import FAISS | |
from langchain.chains import RetrievalQA | |
from langchain.prompts import PromptTemplate | |
from langchain.docstore.document import Document | |
from youtube_transcript_api import YouTubeTranscriptApi | |
import chatops | |
logger = logging.getLogger(__name__) | |
DEVICE = 'cpu' | |
MAX_NEW_TOKENS = 4096 | |
DEFAULT_TEMPERATURE = 0.1 | |
DEFAULT_MAX_NEW_TOKENS = 2048 | |
MAX_INPUT_TOKEN_LENGTH = 4000 | |
DEFAULT_CHAR_LENGTH = 1000 | |
EXAMPLES = ["https://www.youtube.com/watch?v=aircAruvnKk&ab_channel=3Blue1Brown", | |
"https://www.youtube.com/watch?v=Ilg3gGewQ5U", | |
"https://www.youtube.com/watch?v=WUvTyaaNkzM" | |
] | |
def clear_chat(): | |
return [] | |
def get_text_from_youtube_link(video_link,max_video_length=800): | |
video_text = "" | |
video_id = video_link.split("watch?v=")[1].split("&")[0] | |
srt = YouTubeTranscriptApi.get_transcript(video_id) | |
for text_data in srt: | |
video_text = video_text + " " + text_data.get("text") | |
if len(video_text) > max_video_length: | |
print(video_text) | |
return video_text[0:max_video_length] | |
else: | |
print("SRT might be disabled for the video . Uunable to get SRT") | |
return video_text | |
def process_documents(documents,data_chunk=1500,chunk_overlap=100): | |
text_splitter = CharacterTextSplitter(chunk_size=data_chunk, chunk_overlap=chunk_overlap,separator='\n') | |
texts = text_splitter.split_documents(documents) | |
return texts | |
def process_youtube_link(link, document_name="youtube-content",char_length=1000): | |
try: | |
metadata = {"source": f"{document_name}.txt"} | |
return [Document(page_content=get_text_from_youtube_link(video_link=link,max_video_length=char_length), metadata=metadata)] | |
except Exception as err: | |
logger.error(f'Error in reading document. {err}') | |
def create_prompt(): | |
prompt_template = """As a chatbot asnwer the questions regarding the content in the video. | |
Use the following context to answer. | |
If you don't know the answer, just say I don't know. | |
{context} | |
Question: {question} | |
Answer :""" | |
prompt = PromptTemplate( | |
template=prompt_template, input_variables=["context", "question"] | |
) | |
return prompt | |
def youtube_chat(youtube_link,API_key,llm='HuggingFace',temperature=0.1,max_tokens=1096,char_length=1500): | |
document = process_youtube_link(link=youtube_link,char_length=char_length) | |
print("docuemt:",document) | |
embedding_model = SentenceTransformerEmbeddings(model_name='thenlper/gte-base',model_kwargs={"device": DEVICE}) | |
texts = process_documents(documents=document) | |
global vector_db | |
vector_db = FAISS.from_documents(documents=texts, embedding= embedding_model) | |
global qa | |
if llm == 'HuggingFace': | |
chat = chatops.get_hugging_face_model( | |
model_id="tiiuae/falcon-7b-instruct", | |
API_key=API_key, | |
temperature=temperature, | |
max_tokens=max_tokens | |
) | |
else: | |
chat = chatops.get_openai_chat_model(API_key=API_key) | |
chain_type_kwargs = {"prompt": create_prompt()} | |
qa = RetrievalQA.from_chain_type(llm=chat, | |
chain_type='stuff', | |
retriever=vector_db.as_retriever(), | |
chain_type_kwargs=chain_type_kwargs, | |
return_source_documents=True | |
) | |
return "Youtube link Processing completed ..." | |
def infer(question, history): | |
# res = [] | |
# # for human, ai in history[:-1]: | |
# # pair = (human, ai) | |
# # res.append(pair) | |
# chat_history = res | |
result = qa({"query": question}) | |
matching_docs_score = vector_db.similarity_search_with_score(question) | |
return result["result"] | |
def bot(history): | |
response = infer(history[-1][0], history) | |
history[-1][1] = "" | |
for character in response: | |
history[-1][1] += character | |
time.sleep(0.05) | |
yield history | |
def add_text(history, text): | |
history = history + [(text, None)] | |
return history, "" | |
css=""" | |
#col-container {max-width: 2048px; margin-left: auto; margin-right: auto;} | |
""" | |
title = """ | |
<div style="text-align: center;max-width: 2048px;"> | |
<h1>Chat with Youtube Videos </h1> | |
<p style="text-align: center;">Upload a youtube link of any video-lecture/song/Research/Conference & ask Questions to chatbot with the tool. | |
<i> Tools uses State of the Art Models from HuggingFace/OpenAI so, make sure to add your key.</i> | |
</p> | |
</div> | |
""" | |
with gr.Blocks(css=css) as demo: | |
with gr.Row(): | |
with gr.Column(elem_id="col-container"): | |
gr.HTML(title) | |
with gr.Column(): | |
with gr.Row(): | |
LLM_option = gr.Dropdown(['HuggingFace','OpenAI'],label='Select HuggingFace/OpenAI') | |
API_key = gr.Textbox(label="Add API key", type="password",autofocus=True) | |
with gr.Group(): | |
chatbot = gr.Chatbot(height=270) | |
with gr.Row(): | |
question = gr.Textbox(label="Type your question !",lines=1) | |
with gr.Row(): | |
submit_btn = gr.Button(value="Send message", variant="primary", scale = 1) | |
clean_chat_btn = gr.Button("Delete Chat") | |
with gr.Column(): | |
with gr.Row(): | |
youtube_link = gr.Textbox(label="Add your you tube Link",text_align='left',autofocus=True) | |
with gr.Row(): | |
load_youtube_bt = gr.Button("Process Youtube Link",) | |
langchain_status = gr.Textbox(label="Status", placeholder="", interactive = False) | |
with gr.Column(): | |
with gr.Accordion(label='Advanced options', open=False): | |
max_new_tokens = gr.Slider( | |
label='Max new tokens', | |
minimum=2048, | |
maximum=MAX_NEW_TOKENS, | |
step=1, | |
value=DEFAULT_MAX_NEW_TOKENS, | |
) | |
temperature = gr.Slider(label='Temperature',minimum=0.1,maximum=4.0,step=0.1,value=DEFAULT_TEMPERATURE,) | |
char_length = gr.Slider(label='Max Character', | |
minimum= DEFAULT_CHAR_LENGTH, | |
maximum = 5*DEFAULT_CHAR_LENGTH, | |
step = 500,value= 1500 | |
) | |
load_youtube_bt.click(youtube_chat,inputs= [youtube_link,API_key,LLM_option,temperature,max_new_tokens,char_length],outputs=[langchain_status], queue=False) | |
clean_chat_btn.click(clear_chat, [], chatbot) | |
question.submit(add_text, inputs=[chatbot, question], outputs=[chatbot, question]).then(bot, chatbot, chatbot) | |
submit_btn.click(add_text, inputs=[chatbot, question], outputs=[chatbot, question]).then(bot, chatbot, chatbot) | |
demo.launch() |