import os import pickle import chromadb import gradio as gr import logfire from custom_retriever import CustomRetriever from dotenv import load_dotenv from llama_index.agent.openai import OpenAIAgent from llama_index.core import VectorStoreIndex from llama_index.core.llms import MessageRole from llama_index.core.memory import ChatMemoryBuffer from llama_index.core.node_parser import SentenceSplitter from llama_index.core.retrievers import VectorIndexRetriever from llama_index.core.tools import RetrieverTool, ToolMetadata from llama_index.embeddings.openai import OpenAIEmbedding from llama_index.llms.openai import OpenAI from llama_index.vector_stores.chroma import ChromaVectorStore from tutor_prompts import system_message_openai_agent # from utils import init_mongo_db load_dotenv() logfire.configure() CONCURRENCY_COUNT = int(os.getenv("CONCURRENCY_COUNT", 64)) MONGODB_URI = os.getenv("MONGODB_URI") DB_PATH = os.getenv("DB_PATH", f"scripts/ai-tutor-vector-db") DB_COLLECTION = os.getenv("DB_NAME", "ai-tutor-vector-db") if not os.path.exists(DB_PATH): # Download the vector database from the Hugging Face Hub if it doesn't exist locally # https://huggingface.co/datasets/towardsai-buster/ai-tutor-db/tree/main logfire.warn( f"Vector database does not exist at {DB_PATH}, downloading from Hugging Face Hub" ) from huggingface_hub import snapshot_download snapshot_download( repo_id="towardsai-buster/ai-tutor-vector-db", local_dir=DB_PATH, repo_type="dataset", ) logfire.info(f"Downloaded vector database to {DB_PATH}") AVAILABLE_SOURCES_UI = [ "HF Transformers", "Towards AI Blog", "Wikipedia", "OpenAI Docs", "LangChain Docs", "LLama-Index Docs", "RAG Course", ] AVAILABLE_SOURCES = [ "HF_Transformers", "towards_ai_blog", "wikipedia", "openai_docs", "langchain_docs", "llama_index_docs", "rag_course", ] # # Initialize MongoDB # mongo_db = ( # init_mongo_db(uri=MONGODB_URI, db_name="towardsai-buster") # if MONGODB_URI # else logger.warning("No mongodb uri found, you will not be able to save data.") # ) db2 = chromadb.PersistentClient(path=DB_PATH) chroma_collection = db2.get_or_create_collection(DB_COLLECTION) vector_store = ChromaVectorStore(chroma_collection=chroma_collection) index = VectorStoreIndex.from_vector_store( vector_store=vector_store, embed_model=OpenAIEmbedding(model="text-embedding-3-large", mode="similarity"), transformations=[SentenceSplitter(chunk_size=800, chunk_overlap=400)], show_progress=True, use_async=True, ) vector_retriever = VectorIndexRetriever( index=index, similarity_top_k=10, use_async=True, embed_model=OpenAIEmbedding(model="text-embedding-3-large", mode="similarity"), ) with open("scripts/ai-tutor-vector-db/document_dict.pkl", "rb") as f: document_dict = pickle.load(f) custom_retriever = CustomRetriever(vector_retriever, document_dict) def format_sources(completion) -> str: if len(completion.sources) == 0: return "" # Mapping of source system names to user-friendly names display_source_to_ui = { src: ui for src, ui in zip(AVAILABLE_SOURCES, AVAILABLE_SOURCES_UI) } documents_answer_template: str = ( "📝 Here are the sources I used to answer your question:\n\n{documents}" ) document_template: str = "[🔗 {source}: {title}]({url}), relevance: {score:2.2f}" documents = "\n".join( [ document_template.format( title=src.metadata["title"], score=src.score, source=display_source_to_ui.get( src.metadata["source"], src.metadata["source"] ), url=src.metadata["url"], ) for src in completion.sources[0].raw_output ] ) return documents_answer_template.format(documents=documents) def add_sources(answer_str, completion): if completion is None: yield answer_str formatted_sources = format_sources(completion) if formatted_sources == "": yield answer_str if formatted_sources != "": answer_str += "\n\n" + formatted_sources yield answer_str def generate_completion( query, history, sources, model, memory, ): with logfire.span("Running query"): logfire.info(f"query: {query}") logfire.info(f"model: {model}") logfire.info(f"sources: {sources}") chat_list = memory.get() if len(chat_list) != 0: user_index = [ i for i, msg in enumerate(chat_list) if msg.role == MessageRole.USER ] if len(user_index) > len(history): user_index_to_remove = user_index[len(history)] chat_list = chat_list[:user_index_to_remove] memory.set(chat_list) logfire.info(f"chat_history: {len(memory.get())} {memory.get()}") logfire.info(f"gradio_history: {len(history)} {history}") # # # TODO: change source UI name to actual source name # filters = MetadataFilters( # filters=[ # # MetadataFilter(key="source", value="HF_Transformers"), # # MetadataFilter(key="source", value="towards_ai_blog"), # MetadataFilter( # key="source", operator=FilterOperator.EQ, value="HF_Transformers" # ), # ], # # condition=FilterCondition.OR, # ) # vector_retriever = VectorIndexRetriever( # # filters=filters, # index=index, # similarity_top_k=10, # use_async=True, # embed_model=OpenAIEmbedding(model="text-embedding-3-large", mode="similarity"), # ) # custom_retriever = CustomRetriever(vector_retriever, document_dict) llm = OpenAI(temperature=1, model=model, max_tokens=None) client = llm._get_client() logfire.instrument_openai(client) query_engine_tools = [ RetrieverTool( retriever=custom_retriever, metadata=ToolMetadata( name="AI_information", description="""Only use this tool if necessary. The 'AI_information' tool returns information about the artificial intelligence (AI) field. When using this tool, the input should be the user's question rewritten as a statement. e.g. When the user asks 'How can I quantize a model?', the input should be 'Model quantization'. The input can also be adapted to focus on specific aspects or further details of the current topic under discussion. This dynamic input approach allows for a tailored exploration of AI subjects, ensuring that responses are relevant and informative. Employ this tool to fetch nuanced information on topics such as model training, fine-tuning, and LLM augmentation, thereby facilitating a rich, context-aware dialogue. """, ), ) ] agent = OpenAIAgent.from_tools( llm=llm, memory=memory, tools=query_engine_tools, # type: ignore system_prompt=system_message_openai_agent, ) completion = agent.stream_chat(query) answer_str = "" for token in completion.response_gen: answer_str += token yield answer_str for answer_str in add_sources(answer_str, completion): yield answer_str def vote(data: gr.LikeData): if data.liked: print("You upvoted this response: " + data.value["value"]) else: print("You downvoted this response: " + data.value["value"]) accordion = gr.Accordion(label="Customize Sources (Click to expand)", open=False) sources = gr.CheckboxGroup( AVAILABLE_SOURCES_UI, label="Sources", value="HF Transformers", interactive=False # type: ignore ) model = gr.Dropdown( [ "gemini-1.5-pro", "gemini-1.5-flash", "gpt-4o-mini", "gpt-4o", ], label="Model", value="gpt-4o-mini", interactive=False, ) with gr.Blocks( fill_height=True, title="Towards AI 🤖", analytics_enabled=True, ) as demo: memory = gr.State(ChatMemoryBuffer.from_defaults(token_limit=120000)) chatbot = gr.Chatbot( scale=1, placeholder="Towards AI 🤖: A Question-Answering Bot for anything AI-related
", show_label=False, likeable=True, show_copy_button=True, ) chatbot.like(vote, None, None) gr.ChatInterface( fn=generate_completion, chatbot=chatbot, additional_inputs=[sources, model, memory], additional_inputs_accordion=accordion, ) if __name__ == "__main__": demo.queue(default_concurrency_limit=CONCURRENCY_COUNT) demo.launch(debug=False, share=False)