|
""" |
|
The primary function of this application is artificial intelligence-based question answering. In the dynamic landscape of AI, new |
|
technologies and trends constantly emerge, rendering conventional data insufficient to address real-time challenges. To tackle this |
|
issue, the application leverages advanced Retrieval Augmented Generation (RAG) and content scraping techniques. Despite having limited |
|
knowledge due to storage and cost constraints with OpenAI, the application possesses some understanding of data science stored in vector |
|
format. When users inquire about topics not covered in the custom data, the application utilizes SERPAPI and advanced RAG methods to |
|
incorporate unavailable context and resolve knowledge gaps effectively. The project initially utilizes pre-generated embeddings from a small pool of research papers . |
|
However, generating content from recent articles and research papers, which require new vector embeddings each time, presents challenges. |
|
The project aims to mitigate costs, minimize hallucinations, and enhance accuracy in its approach. |
|
|
|
""" |
|
|
|
import os |
|
import os.path |
|
import serpapi |
|
import requests |
|
import feedparser |
|
import streamlit as st |
|
from typing import List |
|
from docx import Document |
|
from bs4 import BeautifulSoup |
|
import huggingface_hub as hfh |
|
from urllib.parse import quote |
|
from llama_index.llms.openai import OpenAI |
|
from langchain_community.document_loaders import WebBaseLoader |
|
from llama_index.embeddings.openai import OpenAIEmbedding |
|
from langchain_community.document_loaders import PyPDFLoader |
|
from llama_index.embeddings.huggingface import HuggingFaceEmbedding |
|
from llama_index.postprocessor.cohere_rerank import CohereRerank |
|
from llama_index.core.query_engine import RetrieverQueryEngine |
|
from llama_index.core.storage.docstore import SimpleDocumentStore |
|
from llama_index.core.retrievers import AutoMergingRetriever |
|
from llama_index.core.node_parser import get_leaf_nodes, HierarchicalNodeParser, get_root_nodes, SentenceSplitter |
|
from llama_index.core.postprocessor import MetadataReplacementPostProcessor, SimilarityPostprocessor |
|
from llama_index.core import (VectorStoreIndex, SimpleDirectoryReader, ServiceContext, load_index_from_storage, |
|
StorageContext, Document, Settings, get_response_synthesizer, set_global_service_context) |
|
|
|
import warnings |
|
|
|
warnings.filterwarnings("ignore") |
|
|
|
if 'vector_index' not in st.session_state: |
|
st.session_state.vector_index = None |
|
if 'cohere_api_key' not in st.session_state: |
|
st.session_state.cohere_api_key = None |
|
if 'serp_api_key' not in st.session_state: |
|
st.session_state.serp_api_key = None |
|
if 'storage_context' not in st.session_state: |
|
st.session_state.storage_context = None |
|
|
|
st.set_page_config( |
|
page_title="Quik Querium AI Genie", |
|
page_icon="π§", |
|
layout="wide", |
|
initial_sidebar_state="expanded", |
|
) |
|
|
|
|
|
def setting_api_key(openai_api_key, serp_api_key): |
|
try: |
|
os.environ['OPENAI_API_KEY'] = openai_api_key |
|
st.session_state.hf_token = os.getenv("hf_token") |
|
hfh.login(token=st.session_state.hf_token) |
|
os.environ["COHERE_API_KEY"] = os.getenv("cohere_api_key") |
|
|
|
st.session_state.serp_api_key = serp_api_key |
|
|
|
except Exception as e: |
|
st.warning(e) |
|
|
|
|
|
template = """<|system|> |
|
you are a Question answering system based AI, Machine Learning , Deep Learning , Generative AI, |
|
Data science, Data Analytics and Mathematics. |
|
Mention Clearly Before response " RAG Output :\n". |
|
Please check if the following pieces of context has any mention of the keywords provided |
|
in the question.Generate response as much as you could with context you get. |
|
if the following pieces of Context does not relate to Question, You must not answer on your own, you don't know the answer, |
|
</s> |
|
<|user|> |
|
Question:{query_str}</s> |
|
<|assistant|> """ |
|
|
|
Settings.llm = OpenAI(model="gpt-3.5-turbo-0125", temperature=0.1, model_kwargs={'trust_remote_code': True}, |
|
max_tokens=512, system_prompt=template) |
|
|
|
Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5") |
|
|
|
service_context = ServiceContext.from_defaults(embed_model=Settings.embed_model, llm=Settings.llm) |
|
set_global_service_context(service_context) |
|
|
|
|
|
def hierarchical_split(documents): |
|
node_parser = HierarchicalNodeParser.from_defaults(chunk_sizes=[2048, 512, 128]) |
|
nodes = node_parser.get_nodes_from_documents(documents) |
|
return nodes |
|
|
|
|
|
def hierarchical_split_research_paper_article(documents): |
|
node_parser = HierarchicalNodeParser.from_defaults(chunk_sizes=[512, 256, 64]) |
|
nodes = node_parser.get_nodes_from_documents(documents) |
|
return nodes |
|
|
|
|
|
def storage_ctx(nodes): |
|
docstore = SimpleDocumentStore() |
|
docstore.add_documents(nodes) |
|
storage_context = StorageContext.from_defaults(docstore=docstore) |
|
return storage_context |
|
|
|
|
|
def saving_vectors(vector_index): |
|
vector_index.storage_context.persist(persist_dir="vector_index/") |
|
|
|
|
|
def create_vector_index(nodes, storage_context): |
|
vector_index = VectorStoreIndex(nodes, storage_context=storage_context) |
|
|
|
return vector_index |
|
|
|
|
|
def search_arxiv(query, max_results=8): |
|
encoded_query = quote(query) |
|
base_url = 'http://export.arxiv.org/api/query?' |
|
query_url = f'{base_url}search_query={encoded_query}&start=0&max_results={max_results}' |
|
feed = feedparser.parse(query_url) |
|
papers = [] |
|
for entry in feed.entries: |
|
paper_info = { |
|
'Title': entry.title, |
|
'URL': entry.link |
|
} |
|
papers.append(paper_info) |
|
return papers |
|
|
|
|
|
def remove_empty_lines(lines): |
|
non_empty_lines = [line for line in lines if line.strip()] |
|
return ' '.join(non_empty_lines) |
|
|
|
|
|
def get_article_and_arxiv_content(query): |
|
|
|
serpapi_api_key = st.session_state.serp_api_key |
|
search_engine = "google" |
|
|
|
params = { |
|
"engine": "google", |
|
"gl": "us", |
|
"hl": "en", |
|
"api_key": serpapi_api_key, |
|
"q": query |
|
} |
|
serpapi_wrapper = serpapi.GoogleSearch(params) |
|
search_results = serpapi_wrapper.get_dict() |
|
results = [] |
|
for result_type in ["organic_results", "related_questions"]: |
|
if result_type in search_results: |
|
for result in search_results[result_type]: |
|
if "title" in result and "link" in result: |
|
|
|
item = {"title": result["title"], "link": result["link"]} |
|
results.append(item) |
|
|
|
|
|
links = [result['link'] for result in results] |
|
|
|
contents = [] |
|
for link in links: |
|
response = requests.get(link) |
|
if response.status_code == 200: |
|
soup = BeautifulSoup(response.content, "html.parser") |
|
content_tags = soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6']) |
|
document = "" |
|
for tag in content_tags: |
|
document += tag.text + "\n" |
|
|
|
if not document: |
|
loader = WebBaseLoader(link) |
|
document_ = loader.load() |
|
document = document_[0].page_content |
|
|
|
article = remove_empty_lines(document.split('\n')) |
|
contents.append(article) |
|
|
|
|
|
papers_to_download = search_arxiv(query) |
|
papers_urls = [] |
|
for paper in papers_to_download: |
|
page_url = paper['URL'] |
|
response = requests.get(page_url) |
|
if response.status_code == 200: |
|
soup = BeautifulSoup(response.content, "html.parser") |
|
download_link = soup.find("a", class_="abs-button download-pdf") |
|
if download_link: |
|
pdf_url = download_link['href'] |
|
if not pdf_url.startswith("http"): |
|
pdf_url = "https://arxiv.org" + pdf_url |
|
papers_urls.append(pdf_url) |
|
|
|
paper_content = [] |
|
for url_ in papers_urls[:2]: |
|
loader = PyPDFLoader(url_) |
|
pages = loader.load_and_split() |
|
paper_text = '' |
|
for page in pages: |
|
page_text = remove_empty_lines(page.page_content.split('\n')) |
|
paper_text += ''.join(page_text) |
|
|
|
if paper_text: |
|
paper_content.append(paper_text) |
|
contents = [content for content in contents if content.strip()] |
|
paper_content = [content for content in paper_content if content.strip()] |
|
|
|
return contents[:3] + paper_content[:1] |
|
|
|
|
|
|
|
|
|
|
|
def file_nodes_vector(): |
|
PERSIST_DIR_vector = "vector_index" |
|
|
|
if not os.path.exists(PERSIST_DIR_vector): |
|
os.makedirs(PERSIST_DIR_vector) |
|
|
|
try: |
|
storage_context = StorageContext.from_defaults(persist_dir=PERSIST_DIR_vector) |
|
vector_index = load_index_from_storage(storage_context) |
|
|
|
except FileNotFoundError: |
|
|
|
documents = SimpleDirectoryReader(input_dir="research_papers/").load_data() |
|
|
|
nodes = hierarchical_split(documents) |
|
leaf_nodes = get_leaf_nodes(nodes) |
|
storage_context = storage_ctx(nodes) |
|
|
|
vector_index = create_vector_index(leaf_nodes, storage_context) |
|
return vector_index, storage_context |
|
|
|
|
|
@st.cache_data |
|
def response_generation(query, cohere_api_key, _vector_index, _storage_context, rank_top=7, similarity_cutoff_thr=0.80, |
|
similarity_top_nodes=15): |
|
cohere_rerank = CohereRerank(api_key=cohere_api_key, top_n=rank_top) |
|
postprocessor = SimilarityPostprocessor(similarity_cutoff=similarity_cutoff_thr) |
|
|
|
base_retriever = _vector_index.as_retriever(similarity_top_k=similarity_top_nodes) |
|
retriever = AutoMergingRetriever(base_retriever, _storage_context, verbose=False) |
|
|
|
response_synthesizer = get_response_synthesizer() |
|
query_engine = RetrieverQueryEngine(retriever=retriever, |
|
node_postprocessors=[ |
|
MetadataReplacementPostProcessor(target_metadata_key="window"), |
|
cohere_rerank, postprocessor], response_synthesizer=response_synthesizer) |
|
response = query_engine.query(query) |
|
return response |
|
|
|
|
|
def func_add_new_article_content(content_): |
|
documents = [Document(text=t) for t in content_] |
|
|
|
new_nodes = hierarchical_split_research_paper_article(documents) |
|
|
|
new_leaf_nodes = get_leaf_nodes(new_nodes) |
|
new_storage_context = storage_ctx(new_nodes) |
|
new_vector_index = create_vector_index(new_leaf_nodes, new_storage_context) |
|
return new_vector_index, new_nodes, new_storage_context, new_leaf_nodes |
|
|
|
|
|
def updating_vector(new_leaf_nodes): |
|
""" |
|
Update didn't happen in with in hugging-space hub , Possible could be hierarchical_split storage Context not able to update , |
|
when tries to update Vector index in local with splits like Semantic split , sentencesplit, simple node parser update works. |
|
If anyone Copying this, try to improve and Post the solution. |
|
""" |
|
|
|
vector_index, storage_context = file_nodes_vector() |
|
vector_index.insert_nodes(new_leaf_nodes) |
|
saving_vectors(vector_index) |
|
|
|
st.session_state.vector_index = vector_index |
|
st.session_state.storage_context = storage_context |
|
|
|
|
|
@st.cache_data |
|
def generate_response_article_paper(query): |
|
content_ = get_article_and_arxiv_content(query) |
|
new_vector_index, new_nodes, storage_context, new_leaf_nodes = func_add_new_article_content(content_) |
|
rank_top = 10 |
|
similarity_cutoff = 0.70 |
|
similarity_top_nodes = 20 |
|
response = response_generation(query, st.session_state.cohere_api_key, new_vector_index, storage_context, rank_top, |
|
similarity_cutoff, similarity_top_nodes) |
|
return response, new_nodes, new_leaf_nodes |
|
|
|
|
|
def main(): |
|
st.markdown("""<div style="text-align:center;"><h1 style="font-size: 30px;">Genieπ§ : RAG for AI Insights </h1></div> |
|
""", unsafe_allow_html=True) |
|
st.markdown("""<div style="text-align:center;"><h1 style="font-size: 17px;">"Interact with our real-time Q&A system, |
|
where you can ask questions on AI-related topics. If the system has the answer, it will respond immediately. |
|
Otherwise, it will fetch real-time information from the articles and research papers to provide you with the most up-to-date response. |
|
During the initial run, there may be a delay as the vector embeddings are loaded into the session state."</h1></div>""", unsafe_allow_html=True) |
|
|
|
|
|
if 'key_flag' not in st.session_state: |
|
st.session_state.key_flag = False |
|
|
|
col_left, col_right = st.columns([1, 2]) |
|
with (col_left): |
|
st.write("""<h1 style="font-size: 15px;">Enter your OpenAI API key </h1>""", unsafe_allow_html=True) |
|
openai_api_key = st.text_input(placeholder="OpenAI api key ", label=" ", type="password") |
|
|
|
st.write("""<h1 style="font-size: 15px;">Enter your SERP API key </h1>""", unsafe_allow_html=True) |
|
serp_api_key = st.text_input(placeholder="Serp api key ", label=" ", type="password") |
|
|
|
set_keys_button = st.button("Set Keys ", type="primary") |
|
|
|
try: |
|
if set_keys_button and openai_api_key and serp_api_key: |
|
setting_api_key(openai_api_key, serp_api_key) |
|
st.success("Successful π") |
|
st.session_state.key_flag = True |
|
elif set_keys_button: |
|
st.warning("Please set the necessary API keys !") |
|
except Exception as e: |
|
st.warning(e) |
|
|
|
with col_right: |
|
st.write("""<h1 style="font-size: 15px;">Enter your Question </h1>""", unsafe_allow_html=True) |
|
query = st.text_input(placeholder="Ex : Explain Batch normalization ", label=" ") |
|
generate_response_button = st.button("Generate response", type="primary") |
|
|
|
if generate_response_button and st.session_state.key_flag and str(query): |
|
try : |
|
with st.spinner("Generating Response..."): |
|
if "vector_index" in st.session_state and st.session_state["vector_index"] is not None and "storage_context" in st.session_state and st.session_state["storage_context"] is not None: |
|
|
|
response = response_generation(query, st.session_state.cohere_api_key,st.session_state.vector_index, st.session_state.storage_context) |
|
|
|
else: |
|
vector_index, storage_context = file_nodes_vector() |
|
st.session_state.vector_index = vector_index |
|
st.session_state.storage_context = storage_context |
|
|
|
response = response_generation(query, st.session_state.cohere_api_key, vector_index,storage_context) |
|
|
|
if str(response) in ["Empty Response", "RAG Output"] or not response: |
|
try : |
|
with st.spinner("Getting Information from Articles and Research Papers, It will take some time..."): |
|
paper_response, new_nodes, new_leaf_nodes = generate_response_article_paper(query) |
|
|
|
if paper_response: |
|
st.write(str(paper_response)) |
|
generate_response_article_paper.clear() |
|
|
|
col1, col2 = st.columns([1, 10]) |
|
thumps_up_button = col1.button("π") |
|
thumps_down_button = col2.button("π") |
|
if thumps_up_button: |
|
st.write("Thank you for your positive feedback!") |
|
|
|
elif thumps_down_button: |
|
st.write("""We're sorry , We will improve it.""") |
|
|
|
elif str(paper_response) in ["Empty Response", "RAG Output"] or not paper_response: |
|
st.write("RAG Couldn't get the results, it will be improved ") |
|
except Exception as e: |
|
st.warning(e) |
|
|
|
elif response: |
|
st.write(str(response)) |
|
response_generation.clear() |
|
|
|
col1, col2 = st.columns([1, 10]) |
|
thumps_up_button = col1.button("π") |
|
thumps_down_button = col2.button("π") |
|
if thumps_up_button: |
|
st.write("Thank you for your positive feedback!") |
|
elif thumps_down_button: |
|
st.write("We're sorry , We will improve it.") |
|
|
|
else: |
|
st.write("RAG Couldn't get the results, it will be improved.") |
|
except Exception as e: |
|
st.warning(e) |
|
|
|
elif generate_response_button and not str(query) and not st.session_state.key_flag: |
|
st.warning("Please set the necessary API keys and Enter the query") |
|
|
|
elif generate_response_button and str(query) and not st.session_state.key_flag: |
|
st.warning("Please set the necessary API keys") |
|
|
|
elif generate_response_button and st.session_state.key_flag and not str(query): |
|
st.warning("Please Enter the query !") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|