| | |
| | import os |
| | import chromadb |
| | from dotenv import load_dotenv |
| | import json |
| |
|
| | |
| | from langchain_core.documents import Document |
| | from langchain_core.runnables import RunnablePassthrough |
| | from langchain_core.output_parsers import StrOutputParser |
| | from langchain.prompts import ChatPromptTemplate |
| | from langchain.chains.query_constructor.base import AttributeInfo |
| | from langchain.retrievers.self_query.base import SelfQueryRetriever |
| | from langchain.retrievers.document_compressors import LLMChainExtractor, CrossEncoderReranker |
| | from langchain.retrievers import ContextualCompressionRetriever |
| |
|
| | |
| | from langchain_community.vectorstores import Chroma |
| | from langchain_community.document_loaders import PyPDFDirectoryLoader, PyPDFLoader |
| | from langchain_community.cross_encoders import HuggingFaceCrossEncoder |
| | from langchain_experimental.text_splitter import SemanticChunker |
| | from langchain.text_splitter import ( |
| | CharacterTextSplitter, |
| | RecursiveCharacterTextSplitter |
| | ) |
| | from langchain_core.tools import tool |
| | from langchain.agents import create_tool_calling_agent, AgentExecutor |
| | from langchain_core.prompts import ChatPromptTemplate |
| |
|
| | |
| | from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI |
| | from langchain.embeddings.openai import OpenAIEmbeddings |
| | from langchain_openai import ChatOpenAI |
| |
|
| | |
| | from llama_parse import LlamaParse |
| | from llama_index.core import Settings, SimpleDirectoryReader |
| |
|
| | |
| | from langgraph.graph import StateGraph, END, START |
| |
|
| | |
| | from pydantic import BaseModel |
| |
|
| | |
| | from typing import Dict, List, Tuple, Any, TypedDict |
| |
|
| | |
| | import numpy as np |
| | from groq import Groq |
| | from mem0 import MemoryClient |
| | import streamlit as st |
| | from datetime import datetime |
| |
|
| | |
| | |
| | api_key = os.getenv("API_KEY") |
| | endpoint = os.getenv("OPENAI_API_BASE") |
| | llama_api_key = os.getenv('GROQ_API_KEY') |
| | MEM0_api_key = os.getenv('mem0') |
| |
|
| | |
| | embedding_function = chromadb.utils.embedding_functions.OpenAIEmbeddingFunction( |
| | api_base=endpoint, |
| | api_key=api_key, |
| | model_name='text-embedding-ada-002' |
| | ) |
| |
|
| | |
| |
|
| | |
| | embedding_model = OpenAIEmbeddings( |
| | openai_api_base=endpoint, |
| | openai_api_key=api_key, |
| | model='text-embedding-ada-002' |
| | ) |
| |
|
| |
|
| | |
| | llm = ChatOpenAI( |
| | openai_api_base=endpoint, |
| | openai_api_key=api_key, |
| | model="gpt-4o-mini", |
| | streaming=False |
| | ) |
| | |
| |
|
| | |
| | Settings.llm = llm |
| | Settings.embedding = embedding_model |
| |
|
| | |
| |
|
| | class AgentState(TypedDict): |
| | query: str |
| | expanded_query: str |
| | context: List[Dict[str, Any]] |
| | response: str |
| | precision_score: float |
| | groundedness_score: float |
| | groundedness_loop_count: int |
| | precision_loop_count: int |
| | feedback: str |
| | query_feedback: str |
| | groundedness_check: bool |
| | loop_max_iter: int |
| |
|
| | def expand_query(state): |
| | """ |
| | Expands the user query to improve retrieval of nutrition disorder-related information. |
| | Args: |
| | state (Dict): The current state of the workflow, containing the user query. |
| | Returns: |
| | Dict: The updated state with the expanded query. |
| | """ |
| | print("---------Expanding Query---------") |
| | system_message = '''You are a medical assistant. Use the provided context to generate a clear, accurate, and grounded response to the user's query.''' |
| |
|
| |
|
| | expand_prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_message), |
| | ("user", "Expand this query: {query} using the feedback: {query_feedback}") |
| |
|
| | ]) |
| |
|
| | chain = expand_prompt | llm | StrOutputParser() |
| | expanded_query = chain.invoke({"query": state['query'], "query_feedback":state["query_feedback"]}) |
| | print("expanded_query", expanded_query) |
| | state["expanded_query"] = expanded_query |
| | return state |
| |
|
| |
|
| | |
| | vector_store = Chroma( |
| | collection_name="nutritional_hypotheticals", |
| | persist_directory="./nutritional_db", |
| | embedding_function=embedding_model |
| |
|
| | ) |
| |
|
| | |
| | retriever = vector_store.as_retriever( |
| | search_type='similarity', |
| | search_kwargs={'k': 3} |
| | ) |
| |
|
| | def retrieve_context(state): |
| | """ |
| | Retrieves context from the vector store using the expanded or original query. |
| | Args: |
| | state (Dict): The current state of the workflow, containing the query and expanded query. |
| | Returns: |
| | Dict: The updated state with the retrieved context. |
| | """ |
| | print("---------retrieve_context---------") |
| | query = state['expanded_query'] |
| | |
| |
|
| | |
| | docs = retriever.invoke(query) |
| | print("Retrieved documents:", docs) |
| |
|
| | |
| | context= [ |
| | { |
| | "content": doc.page_content, |
| | "metadata": doc.metadata |
| | } |
| | for doc in docs |
| | ] |
| | state['context'] = context |
| | print("Extracted context with metadata:", context) |
| | |
| | return state |
| |
|
| |
|
| |
|
| | def craft_response(state: Dict) -> Dict: |
| | """ |
| | Generates a response using the retrieved context, focusing on nutrition disorders. |
| | Args: |
| | state (Dict): The current state of the workflow, containing the query and retrieved context. |
| | Returns: |
| | Dict: The updated state with the generated response. |
| | """ |
| | print("---------craft_response---------") |
| | system_message = '''You are a medical assistant. Use the provided context to generate a clear, accurate, and grounded response to the user's query.''' |
| |
|
| | response_prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_message), |
| | ("user", "Query: {query}\nContext: {context}\n\nfeedback: {feedback}") |
| | ]) |
| |
|
| | chain = response_prompt | llm |
| | response = chain.invoke({ |
| | "query": state['query'], |
| | "context": "\n".join([doc["content"] for doc in state['context']]), |
| | "feedback": state['feedback'] |
| | }) |
| | state['response'] = response |
| | print("intermediate response: ", response) |
| |
|
| | return state |
| |
|
| |
|
| |
|
| | def score_groundedness(state: Dict) -> Dict: |
| | """ |
| | Checks whether the response is grounded in the retrieved context. |
| | Args: |
| | state (Dict): The current state of the workflow, containing the response and context. |
| | Returns: |
| | Dict: The updated state with the groundedness score. |
| | """ |
| | print("---------check_groundedness---------") |
| | system_message = '''Evaluate how well the response is grounded in the provided context. Return a score between 0 and 1.''' |
| |
|
| | groundedness_prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_message), |
| | ("user", "Context: {context}\nResponse: {response}\n\nGroundedness score:") |
| | ]) |
| |
|
| | chain = groundedness_prompt | llm | StrOutputParser() |
| | groundedness_score = float(chain.invoke({ |
| | "context": "\n".join([doc["content"] for doc in state['context']]), |
| | "response": state['response'] |
| | })) |
| | print("groundedness_score: ", groundedness_score) |
| | state['groundedness_loop_count'] += 1 |
| | print("#########Groundedness Incremented###########") |
| | state['groundedness_score'] = groundedness_score |
| |
|
| | return state |
| |
|
| |
|
| |
|
| | def check_precision(state: Dict) -> Dict: |
| | """ |
| | Checks whether the response precisely addresses the user’s query. |
| | Args: |
| | state (Dict): The current state of the workflow, containing the query and response. |
| | Returns: |
| | Dict: The updated state with the precision score. |
| | """ |
| | print("---------check_precision---------") |
| | system_message = '''Evaluate how precisely the response addresses the user's query. Return a score between 0 and 1.''' |
| |
|
| | precision_prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_message), |
| | ("user", "Query: {query}\nResponse: {response}\n\nPrecision score:") |
| | ]) |
| |
|
| | chain = precision_prompt| llm | StrOutputParser() |
| | precision_score = float(chain.invoke({ |
| | "query": state['query'], |
| | "response":state['response'] |
| | })) |
| | state['precision_score'] = precision_score |
| | print("precision_score:", precision_score) |
| | state['precision_loop_count'] +=1 |
| | print("#########Precision Incremented###########") |
| | return state |
| |
|
| |
|
| |
|
| | def refine_response(state: Dict) -> Dict: |
| | """ |
| | Suggests improvements for the generated response. |
| | Args: |
| | state (Dict): The current state of the workflow, containing the query and response. |
| | Returns: |
| | Dict: The updated state with response refinement suggestions. |
| | """ |
| | print("---------refine_response---------") |
| |
|
| | system_message = '''You are a helpful assistant. Suggest improvements to the response to enhance accuracy and completeness.''' |
| |
|
| | refine_response_prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_message), |
| | ("user", "Query: {query}\nResponse: {response}\n\n" |
| | "What improvements can be made to enhance accuracy and completeness?") |
| | ]) |
| |
|
| | chain = refine_response_prompt | llm| StrOutputParser() |
| |
|
| | |
| | feedback = f"Previous Response: {state['response']}\nSuggestions: {chain.invoke({'query': state['query'], 'response': state['response']})}" |
| | print("feedback: ", feedback) |
| | print(f"State: {state}") |
| | state['feedback'] = feedback |
| | return state |
| |
|
| |
|
| |
|
| | def refine_query(state: Dict) -> Dict: |
| | """ |
| | Suggests improvements for the expanded query. |
| | Args: |
| | state (Dict): The current state of the workflow, containing the query and expanded query. |
| | Returns: |
| | Dict: The updated state with query refinement suggestions. |
| | """ |
| | print("---------refine_query---------") |
| | system_message = '''You are a helpful assistant. Suggest improvements to the expanded query to improve search relevance.''' |
| |
|
| | refine_query_prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_message), |
| | ("user", "Original Query: {query}\nExpanded Query: {expanded_query}\n\n" |
| | "What improvements can be made for a better search?") |
| | ]) |
| |
|
| | chain = refine_query_prompt | llm | StrOutputParser() |
| |
|
| | |
| | query_feedback = f"Previous Expanded Query: {state['expanded_query']}\nSuggestions: {chain.invoke({'query': state['query'], 'expanded_query': state['expanded_query']})}" |
| | print("query_feedback: ", query_feedback) |
| | print(f"Groundedness loop count: {state['groundedness_loop_count']}") |
| | state['query_feedback'] = query_feedback |
| | return state |
| |
|
| |
|
| |
|
| | def should_continue_groundedness(state): |
| | """Decides if groundedness is sufficient or needs improvement.""" |
| | print("---------should_continue_groundedness---------") |
| | print("groundedness loop count: ", state['groundedness_loop_count']) |
| | if state['groundedness_score'] >= 0.8: |
| | print("Moving to precision") |
| | return "check_precision" |
| | else: |
| | if state["groundedness_loop_count"] > state['loop_max_iter']: |
| | return "max_iterations_reached" |
| | else: |
| | print(f"---------Groundedness Score Threshold Not met. Refining Response-----------") |
| | return "refine_response" |
| |
|
| |
|
| | def should_continue_precision(state: Dict) -> str: |
| | """Decides if precision is sufficient or needs improvement.""" |
| | print("---------should_continue_precision---------") |
| | print("precision loop count: ",state['precision_loop_count']) |
| | if state['precision_score'] >= 0.8: |
| | return "pass" |
| | else: |
| | if state['precision_loop_count'] > state['loop_max_iter']: |
| | return "max_iterations_reached" |
| | else: |
| | print(f"---------Precision Score Threshold Not met. Refining Query-----------") |
| | return "refine_query" |
| |
|
| |
|
| |
|
| |
|
| | def max_iterations_reached(state: Dict) -> Dict: |
| | """Handles the case when the maximum number of iterations is reached.""" |
| | print("---------max_iterations_reached---------") |
| | """Handles the case when the maximum number of iterations is reached.""" |
| | response = "I'm unable to refine the response further. Please provide more context or clarify your question." |
| | state['response'] = response |
| | return state |
| |
|
| |
|
| |
|
| | from langgraph.graph import END, StateGraph, START |
| |
|
| | def create_workflow() -> StateGraph: |
| | """Creates the updated workflow for the AI nutrition agent.""" |
| | workflow = StateGraph(AgentState) |
| |
|
| | |
| | workflow.add_node("expand_query", expand_query) |
| | workflow.add_node("retrieve_context", retrieve_context) |
| | workflow.add_node("craft_response", craft_response) |
| | workflow.add_node("score_groundedness", score_groundedness ) |
| | workflow.add_node("refine_response", refine_response) |
| | workflow.add_node("check_precision", check_precision) |
| | workflow.add_node("refine_query", refine_query) |
| | workflow.add_node("max_iterations_reached",max_iterations_reached) |
| |
|
| | |
| | workflow.add_edge(START, "expand_query") |
| | workflow.add_edge("expand_query", "retrieve_context") |
| | workflow.add_edge("retrieve_context", "craft_response") |
| | workflow.add_edge("craft_response", "score_groundedness") |
| |
|
| | |
| | workflow.add_conditional_edges( |
| | "score_groundedness", |
| | should_continue_groundedness, |
| | { |
| | "check_precision": "check_precision", |
| | "refine_response": "refine_response", |
| | "max_iterations_reached": "max_iterations_reached" |
| | } |
| | ) |
| |
|
| | workflow.add_edge("refine_response","craft_response") |
| |
|
| | |
| | workflow.add_conditional_edges( |
| | "check_precision", |
| | should_continue_precision, |
| | { |
| | "pass": END, |
| | "refine_query":"refine_query", |
| | "max_iterations_reached": "max_iterations_reached" |
| | } |
| | ) |
| |
|
| | workflow.add_edge("refine_query", "expand_query") |
| |
|
| | workflow.add_edge("max_iterations_reached", END) |
| |
|
| | return workflow |
| |
|
| |
|
| |
|
| |
|
| | |
| | WORKFLOW_APP = create_workflow().compile() |
| | @tool |
| | def agentic_rag(query: str): |
| | """ |
| | Runs the RAG-based agent with conversation history for context-aware responses. |
| | Args: |
| | query (str): The current user query. |
| | Returns: |
| | Dict[str, Any]: The updated state with the generated response and conversation history. |
| | """ |
| | |
| | inputs = { |
| | "query": query, |
| | "expanded_query": "", |
| | "context": [], |
| | "response": "", |
| | "precision_score": 0.0, |
| | "groundedness_score": 0.0, |
| | "groundedness_loop_count": 0, |
| | "precision_loop_count": 0, |
| | "feedback": "", |
| | "query_feedback": "", |
| | "loop_max_iter": 3 |
| | } |
| |
|
| | output = WORKFLOW_APP.invoke(inputs) |
| |
|
| | return output |
| |
|
| |
|
| | |
| | llama_guard_client = Groq(api_key=llama_api_key) |
| | |
| | def filter_input_with_llama_guard(user_input, model="llama-guard-3-8b"): |
| | """ |
| | Filters user input using Llama Guard to ensure it is safe. |
| | Parameters: |
| | - user_input: The input provided by the user. |
| | - model: The Llama Guard model to be used for filtering (default is "llama-guard-3-8b"). |
| | Returns: |
| | - The filtered and safe input. |
| | """ |
| | try: |
| | |
| | response = llama_guard_client.chat.completions.create( |
| | messages=[{"role": "user", "content": user_input}], |
| | model=model, |
| | ) |
| | |
| | return response.choices[0].message.content.strip() |
| | except Exception as e: |
| | print(f"Error with Llama Guard: {e}") |
| | return None |
| |
|
| |
|
| | |
| |
|
| | class NutritionBot: |
| | def __init__(self): |
| | """ |
| | Initialize the NutritionBot class, setting up memory, the LLM client, tools, and the agent executor. |
| | """ |
| |
|
| | |
| | self.memory = MemoryClient(api_key=userdata.get("mem0")) |
| |
|
| | |
| | self.client = ChatOpenAI( |
| | model_name="gpt-4o-mini", |
| | api_key=config.get("API_KEY"), |
| | endpoint = config.get("OPENAI_API_BASE"), |
| | temperature=0 |
| | ) |
| |
|
| | |
| | tools = [agentic_rag] |
| |
|
| | |
| | system_prompt = """You are a caring and knowledgeable Medical Support Agent, specializing in nutrition disorder-related guidance. Your goal is to provide accurate, empathetic, and tailored nutritional recommendations while ensuring a seamless customer experience. |
| | Guidelines for Interaction: |
| | Maintain a polite, professional, and reassuring tone. |
| | Show genuine empathy for customer concerns and health challenges. |
| | Reference past interactions to provide personalized and consistent advice. |
| | Engage with the customer by asking about their food preferences, dietary restrictions, and lifestyle before offering recommendations. |
| | Ensure consistent and accurate information across conversations. |
| | If any detail is unclear or missing, proactively ask for clarification. |
| | Always use the agentic_rag tool to retrieve up-to-date and evidence-based nutrition insights. |
| | Keep track of ongoing issues and follow-ups to ensure continuity in support. |
| | Your primary goal is to help customers make informed nutrition decisions that align with their health conditions and personal preferences. |
| | """ |
| |
|
| | |
| | prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_prompt), |
| | ("human", "{input}"), |
| | ("placeholder", "{agent_scratchpad}") |
| | ]) |
| |
|
| | |
| | agent = create_tool_calling_agent(self.client, tools, prompt) |
| |
|
| | |
| | self.agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True) |
| |
|
| |
|
| | def store_customer_interaction(self, user_id: str, message: str, response: str, metadata: Dict = None): |
| | """ |
| | Store customer interaction in memory for future reference. |
| | Args: |
| | user_id (str): Unique identifier for the customer. |
| | message (str): Customer's query or message. |
| | response (str): Chatbot's response. |
| | metadata (Dict, optional): Additional metadata for the interaction. |
| | """ |
| | if metadata is None: |
| | metadata = {} |
| |
|
| | |
| | metadata["timestamp"] = datetime.now().isoformat() |
| |
|
| | |
| | conversation = [ |
| | {"role": "user", "content": message}, |
| | {"role": "assistant", "content": response} |
| | ] |
| |
|
| | |
| | self.memory.add( |
| | conversation, |
| | user_id=user_id, |
| | output_format="v1.1", |
| | metadata=metadata |
| | ) |
| |
|
| |
|
| | def get_relevant_history(self, user_id: str, query: str) -> List[Dict]: |
| | """ |
| | Retrieve past interactions relevant to the current query. |
| | Args: |
| | user_id (str): Unique identifier for the customer. |
| | query (str): The customer's current query. |
| | Returns: |
| | List[Dict]: A list of relevant past interactions. |
| | """ |
| | return self.memory.search( |
| | query=query, |
| | user_id=user_id, |
| | limit=5 |
| | ) |
| |
|
| |
|
| | def handle_customer_query(self, user_id: str, query: str) -> str: |
| | """ |
| | Process a customer's query and provide a response, taking into account past interactions. |
| | Args: |
| | user_id (str): Unique identifier for the customer. |
| | query (str): Customer's query. |
| | Returns: |
| | str: Chatbot's response. |
| | """ |
| |
|
| | |
| | relevant_history = self.get_relevant_history(user_id, query) |
| |
|
| | |
| | context = "Previous relevant interactions:\n" |
| | for memory in relevant_history: |
| | context += f"Customer: {memory['memory']}\n" |
| | context += f"Support: {memory['memory']}\n" |
| | context += "---\n" |
| |
|
| | |
| | print("Context: ", context) |
| |
|
| | |
| | prompt = f""" |
| | Context: |
| | {context} |
| | Current customer query: {query} |
| | Provide a helpful response that takes into account any relevant past interactions. |
| | """ |
| |
|
| | |
| | response = self.agent_executor.invoke({"input": prompt}) |
| |
|
| | |
| | self.store_customer_interaction( |
| | user_id=user_id, |
| | message=query, |
| | response=response["output"], |
| | metadata={"type": "support_query"} |
| | ) |
| |
|
| | |
| | return response['output'] |
| |
|
| |
|
| | |
| | def nutrition_disorder_streamlit(): |
| | """ |
| | A Streamlit-based UI for the Nutrition Disorder Specialist Agent. |
| | """ |
| | st.title("Nutrition Disorder Specialist") |
| | st.write("Ask me anything about nutrition disorders, symptoms, causes, treatments, and more.") |
| | st.write("Type 'exit' to end the conversation.") |
| |
|
| | |
| | if 'chat_history' not in st.session_state: |
| | st.session_state.chat_history = [] |
| | if 'user_id' not in st.session_state: |
| | st.session_state.user_id = None |
| |
|
| | |
| | if st.session_state.user_id is None: |
| | with st.form("login_form", clear_on_submit=True): |
| | user_id = st.text_input("Please enter your name to begin:") |
| | submit_button = st.form_submit_button("Login") |
| | if submit_button and user_id: |
| | st.session_state.user_id = user_id |
| | st.session_state.chat_history.append({ |
| | "role": "assistant", |
| | "content": f"Welcome, {user_id}! How can I help you with nutrition disorders today?" |
| | }) |
| | st.session_state.login_submitted = True |
| | if st.session_state.get("login_submitted", False): |
| | st.session_state.pop("login_submitted") |
| | st.rerun() |
| | else: |
| | |
| | for message in st.session_state.chat_history: |
| | with st.chat_message(message["role"]): |
| | st.write(message["content"]) |
| |
|
| | |
| | user_query = st.chat_input('Type your question here or exit to end the conversation') |
| | if user_query: |
| | if user_query.lower() == "exit": |
| | st.session_state.chat_history.append({"role": "user", "content": "exit"}) |
| | with st.chat_message("user"): |
| | st.write("exit") |
| | goodbye_msg = "Goodbye! Feel free to return if you have more questions about nutrition disorders." |
| | st.session_state.chat_history.append({"role": "assistant", "content": goodbye_msg}) |
| | with st.chat_message("assistant"): |
| | st.write(goodbye_msg) |
| | st.session_state.user_id = None |
| | st.rerun() |
| | return |
| |
|
| | st.session_state.chat_history.append({"role": "user", "content": user_query}) |
| | with st.chat_message("user"): |
| | st.write(user_query) |
| |
|
| | |
| | filtered_result = filter_input_with_llama_guard(user_query) |
| | print(filtered_result) |
| |
|
| | |
| | with st.chat_message("assistant"): |
| | if filtered_result in ["safe", "unsafe S7", "unsafe S6"]: |
| | try: |
| | |
| | if 'chatbot' not in st.session_state: |
| | st.session_state.chatbot = NutritionBot() |
| |
|
| | |
| | response = st.session_state.chatbot.handle_customer_query( |
| | st.session_state.user_id, |
| | user_query |
| | ) |
| |
|
| | st.write(response) |
| | st.session_state.chat_history.append({"role": "assistant", "content": response}) |
| | except Exception as e: |
| | error_msg = f"Sorry, I encountered an error while processing your query. Please try again. Error: {str(e)}" |
| | st.write(error_msg) |
| | st.session_state.chat_history.append({"role": "assistant", "content": error_msg}) |
| | else: |
| | inappropriate_msg = "I apologize, but I cannot process that input as it may be inappropriate. Please try again." |
| | st.write(inappropriate_msg) |
| | st.session_state.chat_history.append({"role": "assistant", "content": inappropriate_msg}) |
| |
|
| | if __name__ == "__main__": |
| | nutrition_disorder_streamlit() |
| |
|