Spaces:
Sleeping
Sleeping
| # Import necessary libraries | |
| import os # Interacting with the operating system (reading/writing files) | |
| import chromadb # High-performance vector database for storing/querying dense vectors | |
| from dotenv import load_dotenv # Loading environment variables from a .env file | |
| import json # Parsing and handling JSON data | |
| # LangChain imports | |
| from langchain_core.documents import Document # Document data structures | |
| from langchain_core.runnables import RunnablePassthrough # LangChain core library for running pipelines | |
| from langchain_core.output_parsers import StrOutputParser # String output parser | |
| from langchain.prompts import ChatPromptTemplate # Template for chat prompts | |
| from langchain.chains.query_constructor.base import AttributeInfo # Base classes for query construction | |
| from langchain.retrievers.self_query.base import SelfQueryRetriever # Base classes for self-querying retrievers | |
| from langchain.retrievers.document_compressors import LLMChainExtractor, CrossEncoderReranker # Document compressors | |
| from langchain.retrievers import ContextualCompressionRetriever # Contextual compression retrievers | |
| # LangChain community & experimental imports | |
| from langchain_community.vectorstores import Chroma # Implementations of vector stores like Chroma | |
| from langchain_community.document_loaders import PyPDFDirectoryLoader, PyPDFLoader # Document loaders for PDFs | |
| from langchain_community.cross_encoders import HuggingFaceCrossEncoder # Cross-encoders from HuggingFace | |
| from langchain_experimental.text_splitter import SemanticChunker # Experimental text splitting methods | |
| from langchain.text_splitter import ( | |
| CharacterTextSplitter, # Splitting text by characters | |
| RecursiveCharacterTextSplitter # Recursive splitting of text by characters | |
| ) | |
| from langchain_core.tools import tool | |
| from langchain.agents import create_tool_calling_agent, AgentExecutor | |
| from langchain_core.prompts import ChatPromptTemplate | |
| # LangChain OpenAI imports | |
| from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI, ChatOpenAI # OpenAI embeddings and models | |
| from langchain.embeddings.openai import OpenAIEmbeddings # OpenAI embeddings for text vectors | |
| # LangGraph import | |
| from langgraph.graph import StateGraph, END, START # State graph for managing states in LangChain | |
| # Typing imports | |
| from typing import Dict, List, Tuple, Any, TypedDict # Python typing for function annotations | |
| # Other utilities | |
| import numpy as np # Numpy for numerical operations | |
| from groq import Groq | |
| from mem0 import MemoryClient | |
| import streamlit as st | |
| from datetime import datetime | |
| #====================================SETUP=====================================# | |
| # Load environment variables for Hugging Face Spaces deployment | |
| load_dotenv() | |
| # Fetch secrets from environment variables (for Hugging Face Spaces) | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| endpoint = os.getenv("OPENAI_API_BASE") | |
| llama_api_key = os.getenv("GROQ_API_KEY") | |
| MEM0_api_key = os.getenv("MEM0_API_KEY") | |
| # Initialize the OpenAI embedding function for Chroma | |
| embedding_function = chromadb.utils.embedding_functions.OpenAIEmbeddingFunction( | |
| api_base=endpoint, # Complete the code to define the API base endpoint | |
| api_key=api_key, # Complete the code to define the API key | |
| model_name='text-embedding-3-small' # This is a fixed value and does not need modification | |
| ) | |
| # This initializes the OpenAI embedding function for the Chroma vectorstore, using the provided endpoint and API key. | |
| # Initialize the OpenAI Embeddings | |
| embedding_model = OpenAIEmbeddings( | |
| openai_api_base=endpoint, | |
| openai_api_key=api_key, | |
| model='text-embedding-3-small' | |
| ) | |
| # Initialize the Chat OpenAI model | |
| llm = ChatOpenAI( | |
| openai_api_base=endpoint, | |
| openai_api_key=api_key, | |
| model="gpt-4o-mini", | |
| streaming=False | |
| ) | |
| # This initializes the Chat OpenAI model with the provided endpoint, API key, deployment name, and a temperature setting of 0 (to control response variability). | |
| #================================Creating Langgraph agent======================# | |
| class AgentState(TypedDict): | |
| query: str # The current user query | |
| expanded_query: str # The expanded version of the user query | |
| context: List[Dict[str, Any]] # Retrieved documents (content and metadata) | |
| response: str # The generated response to the user query | |
| precision_score: float # The precision score of the response | |
| groundedness_score: float # The groundedness score of the response | |
| groundedness_loop_count: int # Counter for groundedness refinement loops | |
| precision_loop_count: int # Counter for precision refinement loops | |
| feedback: str | |
| query_feedback: str | |
| groundedness_check: bool | |
| loop_max_iter: int | |
| def expand_query(state): | |
| """ | |
| Expands the user query to improve retrieval of nutrition disorder-related information. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the user query. | |
| Returns: | |
| Dict: The updated state with the expanded query. | |
| """ | |
| print("---------Expanding Query---------") | |
| system_message = """ | |
| You are a query-expansion engine for a medical retrieval system. | |
| Your job: | |
| 1. Expand the user's query into 6–8 alternative questions that could retrieve the same medical information. | |
| Rules: | |
| - Do NOT answer the query. | |
| - Keep output in the same language as input. | |
| - Preserve key entities (e.g., vitamins, disorders, nutrients). | |
| - Each query must be ≤ 16 words. | |
| - Output strict JSON only. No explanation. No extra text. | |
| Schema: | |
| {{ | |
| "queries": ["...", "..."] | |
| }} | |
| """ | |
| expand_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("user", "Expand this query: {query} using the feedback: {query_feedback}") | |
| ]) | |
| chain = expand_prompt | llm | StrOutputParser() | |
| expanded_query = chain.invoke({"query": state['query'], "query_feedback": state["query_feedback"]}) | |
| print("expanded_query", expanded_query) | |
| state["expanded_query"] = expanded_query | |
| return state | |
| # Initialize the Chroma vector store for retrieving documents | |
| vector_store = Chroma( | |
| collection_name='nutritional_hypotheticals', # Complete the code to define the collection name | |
| persist_directory='./nutritional_db', # Complete the code to define the directory for persistence | |
| embedding_function=embedding_model # Complete the code to define the embedding function | |
| ) | |
| # Create a retriever from the vector store | |
| # this is the provided code but I want to use the structured retriever | |
| retriever = vector_store.as_retriever( | |
| search_type='similarity', # Complete the code to define the search type | |
| search_kwargs={'k': 6} # Complete the code to define the number of results to retrieve | |
| ) | |
| def retrieve_context(state): | |
| """ | |
| Retrieves context from the vector store using the expanded or original query. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the query and expanded query. | |
| Returns: | |
| Dict: The updated state with the retrieved context. | |
| """ | |
| print("---------retrieve_context---------") | |
| query = state.get('expanded_query') or state.get('query') # Complete the code to define the key for the expanded query | |
| #print("Query used for retrieval:", query) # Debugging: Print the query | |
| # Retrieve hypothetical questions from the vector store | |
| docs = retriever.invoke(query) | |
| print("Retrieved documents:", docs) # Debugging: Print the raw docs object | |
| # Extract both page_content and metadata from each document | |
| context= [ | |
| { | |
| "content": doc.metadata.get("original_content", ""), | |
| "metadata": doc.metadata | |
| } | |
| for doc in docs | |
| ] | |
| state['context'] = context # Complete the code to define the key for storing the context | |
| print("Extracted context with metadata:", context) # Debugging: Print the extracted context | |
| #print(f"Groundedness loop count: {state['groundedness_loop_count']}") | |
| return state | |
| def craft_response(state: Dict) -> Dict: | |
| """ | |
| Generates a response using the retrieved context, focusing on nutrition disorders. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the query and retrieved context. | |
| Returns: | |
| Dict: The updated state with the generated response. | |
| """ | |
| print("---------craft_response---------") | |
| system_message = ''' | |
| You are a medical assistant specializing in nutritional disorders. | |
| Your job is to generate concise, accurate answers strictly based on the provided context from a textbook or trusted source. | |
| Rules: | |
| - Only use information found in the context. | |
| - Do NOT hallucinate or assume facts. | |
| - If the answer is not clearly present, say "The answer is not available in the provided context." | |
| - If multiple pieces of evidence are relevant, synthesize them clearly. | |
| - Keep the language medically precise but accessible. | |
| - If available, cite the source and printed page (from metadata) like [source: file_name, printedPageNumber, citation]. | |
| Be grounded, reliable, and avoid unsupported statements. | |
| ''' | |
| response_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("user", "Query: {query}\nContext: {context}\n\nfeedback: {feedback}") | |
| ]) | |
| context_docs = state.get("context", []) | |
| context_string = "\n\n".join(doc["metadata"].get("original_content", "") for doc in context_docs) | |
| feedback_text = state.get("feedback", "None") | |
| chain = response_prompt | llm | |
| response = chain.invoke({ | |
| "query": state['query'], | |
| "context": context_string, | |
| "feedback": feedback_text # add feedback to the prompt | |
| }) | |
| state['response'] = response | |
| print("intermediate response: ", response) | |
| return state | |
| def score_groundedness(state: Dict) -> Dict: | |
| """ | |
| Checks whether the response is grounded in the retrieved context. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the response and context. | |
| Returns: | |
| Dict: The updated state with the groundedness score. | |
| """ | |
| print("---------check_groundedness---------") | |
| system_message = ''' | |
| You are an expert evaluator. Your task is to assess how well a response is grounded in the provided context. | |
| Rules: | |
| - Only assign high scores if all factual claims in the response are explicitly supported by the context. | |
| - Penalize unsupported or hallucinated statements. | |
| - Score from 0.0 to 1.0: | |
| - 1.0 = fully supported | |
| - 0.7 = mostly supported with minor gaps | |
| - 0.5 = some support, some unsupported | |
| - 0.0 = completely ungrounded | |
| Return ONLY the number (no explanation). | |
| ''' | |
| groundedness_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("user", "Context: {context}\nResponse: {response}\n\nGroundedness score:") | |
| ]) | |
| chain = groundedness_prompt | llm | StrOutputParser() | |
| groundedness_score = float(chain.invoke({ | |
| "context": "\n".join([doc["content"] for doc in state['context']]), | |
| "response": state["response"] # Complete the code to define the response | |
| })) | |
| print("groundedness_score: ", groundedness_score) | |
| state['groundedness_loop_count'] += 1 | |
| print("#########Groundedness Incremented###########") | |
| state['groundedness_score'] = groundedness_score | |
| return state | |
| def check_precision(state: Dict) -> Dict: | |
| """ | |
| Checks whether the response precisely addresses the user’s query. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the query and response. | |
| Returns: | |
| Dict: The updated state with the precision score. | |
| """ | |
| print("---------check_precision---------") | |
| system_message = ''' | |
| You are a task evaluator. Your job is to score how precisely a response answers the user's question. | |
| Guidelines: | |
| - Score from 0.0 to 1.0 | |
| - 1.0 = The response directly and completely answers the question with no fluff or unrelated info. | |
| - 0.7 = Mostly complete and relevant, with minor omissions or slight tangents. | |
| - 0.5 = Partially relevant, but some key parts of the question are missed or answered vaguely. | |
| - 0.0 = The response misses the intent of the question or is mostly unrelated. | |
| - Ignore spelling/grammar. Focus only on content alignment. | |
| Return ONLY the number. Do not explain your reasoning. | |
| ''' | |
| precision_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("user", "Query: {query}\nResponse: {response}\n\nPrecision score:") | |
| ]) | |
| chain = precision_prompt | llm | StrOutputParser() # Complete the code to define the chain of processing | |
| precision_score = float(chain.invoke({ | |
| "query": state['query'], | |
| "response":state['response'] # Complete the code to access the response from the state | |
| })) | |
| state['precision_score'] = precision_score | |
| print("precision_score:", precision_score) | |
| state['precision_loop_count'] +=1 | |
| print("#########Precision Incremented###########") | |
| return state | |
| def refine_response(state: Dict) -> Dict: | |
| """ | |
| Suggests improvements for the generated response. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the query and response. | |
| Returns: | |
| Dict: The updated state with response refinement suggestions. | |
| """ | |
| print("---------refine_response---------") | |
| system_message = ''' | |
| You are a helpful reviewer evaluating answers in a medical question-answering system focused on nutritional disorders. | |
| Your task is to suggest improvements to the response, especially in terms of: | |
| - Factual accuracy (is every claim supported by the context?) | |
| - Completeness (does the response fully answer the question?) | |
| - Clarity (can anything be phrased more clearly?) | |
| - Conciseness (are there unnecessary parts?) | |
| Return clear, concise feedback that guides the next revision. | |
| ''' | |
| refine_response_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("user", "Query: {query}\nResponse: {response}\n\n" | |
| "What improvements can be made to enhance accuracy and completeness?") | |
| ]) | |
| chain = refine_response_prompt | llm| StrOutputParser() | |
| # Store response suggestions in a structured format | |
| feedback = f"Previous Response: {state['response']}\nSuggestions: {chain.invoke({'query': state['query'], 'response': state['response']})}" | |
| print("feedback: ", feedback) | |
| print(f"State: {state}") | |
| state['feedback'] = feedback | |
| return state | |
| def refine_query(state: Dict) -> Dict: | |
| """ | |
| Suggests improvements for the expanded query. | |
| Args: | |
| state (Dict): The current state of the workflow, containing the query and expanded query. | |
| Returns: | |
| Dict: The updated state with query refinement suggestions. | |
| """ | |
| print("---------refine_query---------") | |
| system_message = ''' | |
| You are a query refinement assistant helping improve search queries for a medical information retrieval system. | |
| The original query was expanded to generate similar variations for better search coverage. Your job is to: | |
| - Evaluate the quality of the expanded query | |
| - Identify issues like redundancy, ambiguity, missing key terms, or irrelevant focus | |
| - Suggest specific improvements to increase precision and retrieval effectiveness | |
| Be concise. Return clear bullet points or suggestions to help guide a better query reformulation. | |
| ''' | |
| refine_query_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("user", "Original Query: {query}\nExpanded Query: {expanded_query}\n\n" | |
| "What improvements can be made for a better search?") | |
| ]) | |
| chain = refine_query_prompt | llm | StrOutputParser() | |
| # Store refinement suggestions without modifying the original expanded query | |
| query_feedback = f"Previous Expanded Query: {state['expanded_query']}\nSuggestions: {chain.invoke({'query': state['query'], 'expanded_query': state['expanded_query']})}" | |
| print("query_feedback: ", query_feedback) | |
| print(f"Groundedness loop count: {state['groundedness_loop_count']}") | |
| state['query_feedback'] = query_feedback | |
| return state | |
| def should_continue_groundedness(state): | |
| """Decides if groundedness is sufficient or needs improvement.""" | |
| print("---------should_continue_groundedness---------") | |
| print("groundedness loop count: ", state['groundedness_loop_count']) | |
| if state['groundedness_score'] >= 0.85: # Complete the code to define the threshold for groundedness | |
| print("Moving to precision") | |
| return "check_precision" | |
| else: | |
| if state["groundedness_loop_count"] >= state['loop_max_iter']: | |
| return "max_iterations_reached" | |
| else: | |
| print(f"---------Groundedness Score Threshold Not met. Refining Response-----------") | |
| return "refine_response" | |
| def should_continue_precision(state: Dict) -> str: | |
| """Decides if precision is sufficient or needs improvement.""" | |
| print("---------should_continue_precision---------") | |
| print("precision loop count: ", state["precision_loop_count"]) | |
| if state["precision_score"] >= 0.85: # Threshold for precision | |
| return "pass" # Complete the workflow | |
| else: | |
| if state["precision_loop_count"] >= state["loop_max_iter"]: # Maximum allowed loops | |
| return "max_iterations_reached" | |
| else: | |
| print(f"---------Precision Score Threshold Not met. Refining Query-----------") # Debugging | |
| return "refine_query" # Refine the query | |
| def max_iterations_reached(state: Dict) -> Dict: | |
| """Handles the case when the maximum number of iterations is reached.""" | |
| print("---------max_iterations_reached---------") | |
| """Handles the case when the maximum number of iterations is reached.""" | |
| response = "I'm unable to refine the response further. Please provide more context or clarify your question." | |
| state['response'] = response | |
| return state | |
| from langgraph.graph import END, StateGraph, START | |
| def create_workflow() -> StateGraph: | |
| """Creates the updated workflow for the AI nutrition agent.""" | |
| workflow = StateGraph(AgentState) # Complete the code to define the initial state of the agent | |
| # Add processing nodes | |
| workflow.add_node("expand_query", expand_query ) # Step 1: Expand user query. Complete with the function to expand the query | |
| workflow.add_node("retrieve_context", retrieve_context ) # Step 2: Retrieve relevant documents. Complete with the function to retrieve context | |
| workflow.add_node("craft_response", craft_response ) # Step 3: Generate a response based on retrieved data. Complete with the function to craft a response | |
| workflow.add_node("score_groundedness", score_groundedness ) # Step 4: Evaluate response grounding. Complete with the function to score groundedness | |
| workflow.add_node("refine_response", refine_response ) # Step 5: Improve response if it's weakly grounded. Complete with the function to refine the response | |
| workflow.add_node("check_precision", check_precision ) # Step 6: Evaluate response precision. Complete with the function to check precision | |
| workflow.add_node("refine_query", refine_query ) # Step 7: Improve query if response lacks precision. Complete with the function to refine the query | |
| workflow.add_node("max_iterations_reached", max_iterations_reached ) # Step 8: Handle max iterations. Complete with the function to handle max iterations | |
| # Main flow edges | |
| workflow.add_edge(START, "expand_query") | |
| workflow.add_edge("expand_query", "retrieve_context") | |
| workflow.add_edge("retrieve_context", "craft_response") | |
| workflow.add_edge("craft_response", "score_groundedness") | |
| # Conditional edges based on groundedness check | |
| workflow.add_conditional_edges( | |
| "score_groundedness", | |
| should_continue_groundedness, # Use the conditional function | |
| { | |
| "check_precision": "check_precision", # If well-grounded, proceed to precision check. | |
| "refine_response": "refine_response", # If not, refine the response. | |
| "max_iterations_reached": "max_iterations_reached" # If max loops reached, exit. | |
| } | |
| ) | |
| workflow.add_edge("refine_response", "craft_response") # Refined responses are reprocessed. | |
| # Conditional edges based on precision check | |
| workflow.add_conditional_edges( | |
| "check_precision", | |
| should_continue_precision, # Use the conditional function | |
| { | |
| "pass": END, # If precise, complete the workflow. | |
| "refine_query": "refine_query", # If imprecise, refine the query. | |
| "max_iterations_reached": "max_iterations_reached" # If max loops reached, exit. | |
| } | |
| ) | |
| workflow.add_edge("refine_query", "expand_query") # Refined queries go through expansion again. | |
| workflow.add_edge("max_iterations_reached", END) | |
| return workflow | |
| #=========================== Defining the agentic rag tool ====================# | |
| WORKFLOW_APP = create_workflow().compile() | |
| def agentic_rag(query: str): | |
| """ | |
| Runs the RAG-based agent with conversation history for context-aware responses. | |
| Args: | |
| query (str): The current user query. | |
| Returns: | |
| Dict[str, Any]: The updated state with the generated response and conversation history. | |
| """ | |
| # Initialize state with necessary parameters | |
| inputs = { | |
| "query": query, # Current user query | |
| "expanded_query": "", # Complete the code to define the expanded version of the query | |
| "context": [], # Retrieved documents (initially empty) | |
| "response": "", # Complete the code to define the AI-generated response | |
| "precision_score": 0.0, # Complete the code to define the precision score of the response | |
| "groundedness_score": 0.0, # Complete the code to define the groundedness score of the response | |
| "groundedness_loop_count": 0, # Complete the code to define the counter for groundedness loops | |
| "precision_loop_count": 0, # Complete the code to define the counter for precision loops | |
| "feedback": "", # Complete the code to define the feedback | |
| "query_feedback": "", # Complete the code to define the query feedback | |
| "loop_max_iter": 3 # Complete the code to define the maximum number of iterations for loops | |
| } | |
| output = WORKFLOW_APP.invoke(inputs) | |
| return output | |
| #================================ Guardrails ===========================# | |
| llama_guard_client = Groq(api_key=llama_api_key) | |
| # Function to filter user input with Llama Guard | |
| def filter_input_with_llama_guard(user_input, model="meta-llama/Llama-Guard-4-12B"): | |
| """ | |
| Filters user input using Llama Guard to ensure it is safe. | |
| Parameters: | |
| - user_input: The input provided by the user. | |
| - model: The Llama Guard model to be used for filtering (default is "llama-guard-4-12b"). | |
| Returns: | |
| - The filtered and safe input. | |
| """ | |
| try: | |
| # Create a request to Llama Guard to filter the user input | |
| response = llama_guard_client.chat.completions.create( | |
| messages=[{"role": "user", "content": user_input}], | |
| model=model, | |
| ) | |
| # Return the filtered input | |
| return response.choices[0].message.content.strip() | |
| except Exception as e: | |
| print(f"Error with Llama Guard: {e}") | |
| return None | |
| #============================= Adding Memory to the agent using mem0 ===============================# | |
| class NutritionBot: | |
| def __init__(self): | |
| """ | |
| Initialize the NutritionBot class, setting up memory, the LLM client, tools, and the agent executor. | |
| """ | |
| # Initialize a memory client to store and retrieve customer interactions | |
| self.memory = MemoryClient(api_key=MEM0_api_key) # Complete the code to define the memory client API key | |
| # Initialize the OpenAI client using the provided credentials | |
| self.client = ChatOpenAI( | |
| model_name="gpt-4o-mini", # Specify the model to use (e.g., GPT-4 optimized version) | |
| api_key=api_key, # API key for authentication | |
| openai_api_base=endpoint, | |
| temperature=0 # Controls randomness in responses; 0 ensures deterministic results | |
| ) | |
| # Define tools available to the chatbot, such as web search | |
| tools = [agentic_rag] | |
| # Define the system prompt to set the behavior of the chatbot | |
| system_prompt = """You are a caring and knowledgeable Medical Support Agent, specializing in nutrition disorder-related guidance. Your goal is to provide accurate, empathetic, and tailored nutritional recommendations while ensuring a seamless customer experience. | |
| Guidelines for Interaction: | |
| Maintain a polite, professional, and reassuring tone. | |
| Show genuine empathy for customer concerns and health challenges. | |
| Reference past interactions to provide personalized and consistent advice. | |
| Engage with the customer by asking about their food preferences, dietary restrictions, and lifestyle before offering recommendations. | |
| Ensure consistent and accurate information across conversations. | |
| If any detail is unclear or missing, proactively ask for clarification. | |
| Always use the agentic_rag tool to retrieve up-to-date and evidence-based nutrition insights. | |
| Keep track of ongoing issues and follow-ups to ensure continuity in support. | |
| Your primary goal is to help customers make informed nutrition decisions that align with their health conditions and personal preferences. | |
| """ | |
| # Build the prompt template for the agent | |
| prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_prompt), # System instructions | |
| ("human", "{input}"), # Placeholder for human input | |
| ("placeholder", "{agent_scratchpad}") # Placeholder for intermediate reasoning steps | |
| ]) | |
| # Create an agent capable of interacting with tools and executing tasks | |
| agent = create_tool_calling_agent(self.client, tools, prompt) | |
| # Wrap the agent in an executor to manage tool interactions and execution flow | |
| self.agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True) | |
| def store_customer_interaction(self, user_id: str, message: str, response: str, metadata: Dict = None): | |
| """ | |
| Store customer interaction in memory for future reference. | |
| Args: | |
| user_id (str): Unique identifier for the customer. | |
| message (str): Customer's query or message. | |
| response (str): Chatbot's response. | |
| metadata (Dict, optional): Additional metadata for the interaction. | |
| """ | |
| if metadata is None: | |
| metadata = {} | |
| # Add a timestamp to the metadata for tracking purposes | |
| metadata["timestamp"] = datetime.now().isoformat() | |
| # Format the conversation for storage | |
| conversation = [ | |
| {"role": "user", "content": message}, | |
| {"role": "assistant", "content": response} | |
| ] | |
| # Store the interaction in the memory client | |
| self.memory.add( | |
| conversation, | |
| user_id=user_id, | |
| output_format="v1.1", | |
| metadata=metadata | |
| ) | |
| def get_relevant_history(self, user_id: str, query: str) -> List[Dict]: | |
| """ | |
| Retrieve past interactions relevant to the current query. | |
| Args: | |
| user_id (str): Unique identifier for the customer. | |
| query (str): The customer's current query. | |
| Returns: | |
| List[Dict]: A list of relevant past interactions. | |
| """ | |
| return self.memory.search( | |
| query=query, # Search for interactions related to the query | |
| user_id=user_id, # Restrict search to the specific user | |
| limit=5 # Complete the code to define the limit for retrieved interactions | |
| ) | |
| def handle_customer_query(self, user_id: str, query: str) -> str: | |
| """ | |
| Process a customer's query and provide a response, taking into account past interactions. | |
| Args: | |
| user_id (str): Unique identifier for the customer. | |
| query (str): Customer's query. | |
| Returns: | |
| str: Chatbot's response. | |
| """ | |
| # Retrieve relevant past interactions for context | |
| relevant_history = self.get_relevant_history(user_id, query) | |
| # Build a context string from the relevant history | |
| context = "Previous relevant interactions:\n" | |
| for memory in relevant_history: | |
| context += f"Customer: {memory['memory']}\n" # Customer's past messages | |
| context += f"Support: {memory['memory']}\n" # Chatbot's past responses | |
| context += "---\n" | |
| # Print context for debugging purposes | |
| print("Context: ", context) | |
| # Prepare a prompt combining past context and the current query | |
| prompt = f""" | |
| Context: | |
| {context} | |
| Current customer query: {query} | |
| Provide a helpful response that takes into account any relevant past interactions. | |
| """ | |
| # Generate a response using the agent | |
| response = self.agent_executor.invoke({"input": prompt}) | |
| # Store the current interaction for future reference | |
| self.store_customer_interaction( | |
| user_id=user_id, | |
| message=query, | |
| response=response["output"], | |
| metadata={"type": "support_query"} | |
| ) | |
| # Return the chatbot's response | |
| return response['output'] | |
| #=====================User Interface using streamlit ===========================# | |
| def nutrition_disorder_streamlit(): | |
| """ | |
| A Streamlit-based UI for the Nutrition Disorder Specialist Agent. | |
| """ | |
| st.title("Nutrition Disorder Specialist") | |
| st.write("Ask me anything about nutrition disorders, symptoms, causes, treatments, and more.") | |
| st.write("Type 'exit' to end the conversation.") | |
| # Initialize session state for chat history and user_id if they don't exist | |
| if 'chat_history' not in st.session_state: | |
| st.session_state.chat_history = [] | |
| if 'user_id' not in st.session_state: | |
| st.session_state.user_id = None | |
| # Login form: Only if user is not logged in | |
| if st.session_state.user_id is None: | |
| with st.form("login_form", clear_on_submit=True): | |
| user_id = st.text_input("Please enter your name to begin:") | |
| submit_button = st.form_submit_button("Login") | |
| if submit_button and user_id: | |
| st.session_state.user_id = user_id | |
| st.session_state.chat_history.append({ | |
| "role": "assistant", | |
| "content": f"Welcome, {user_id}! How can I help you with nutrition disorders today?" | |
| }) | |
| st.session_state.login_submitted = True # Set flag to trigger rerun | |
| if st.session_state.get("login_submitted", False): | |
| st.session_state.pop("login_submitted") | |
| st.rerun() | |
| else: | |
| # Display chat history | |
| for message in st.session_state.chat_history: | |
| with st.chat_message(message["role"]): | |
| st.write(message["content"]) | |
| # Chat input with custom placeholder text | |
| user_query = st.chat_input("Type your question here (or 'exit' to end)...") # Blank #1: Fill in the chat input prompt (e.g., "Type your question here (or 'exit' to end)...") | |
| if user_query: | |
| if user_query.lower() == "exit": | |
| st.session_state.chat_history.append({"role": "user", "content": "exit"}) | |
| with st.chat_message("user"): | |
| st.write("exit") | |
| goodbye_msg = "Goodbye! Feel free to return if you have more questions about nutrition disorders." | |
| st.session_state.chat_history.append({"role": "assistant", "content": goodbye_msg}) | |
| with st.chat_message("assistant"): | |
| st.write(goodbye_msg) | |
| st.session_state.user_id = None | |
| st.rerun() | |
| return | |
| st.session_state.chat_history.append({"role": "user", "content": user_query}) | |
| with st.chat_message("user"): | |
| st.write(user_query) | |
| # Filter input using Llama Guard | |
| filtered_result = filter_input_with_llama_guard(user_query) # Blank #2: Fill in with the function name for filtering input (e.g., filter_input_with_llama_guard) | |
| filtered_result = filtered_result.replace("\n", " ") # Normalize the result | |
| # Check if input is safe based on allowed statuses | |
| if filtered_result in ["safe", "unsafe S7", "unsafe S6"]: # Blanks #3, #4, #5: Fill in with allowed safe statuses (e.g., "safe", "unsafe S7", "unsafe S6") | |
| try: | |
| if 'chatbot' not in st.session_state: | |
| st.session_state.chatbot = NutritionBot() # Blank #6: Fill in with the chatbot class initialization (e.g., NutritionBot) | |
| response = st.session_state.chatbot.handle_customer_query(st.session_state.user_id, user_query) | |
| # Blank #7: Fill in with the method to handle queries (e.g., handle_customer_query) | |
| st.write(response) | |
| st.session_state.chat_history.append({"role": "assistant", "content": response}) | |
| except Exception as e: | |
| error_msg = f"Sorry, I encountered an error while processing your query. Please try again. Error: {str(e)}" | |
| st.write(error_msg) | |
| st.session_state.chat_history.append({"role": "assistant", "content": error_msg}) | |
| else: | |
| inappropriate_msg = "I apologize, but I cannot process that input as it may be inappropriate. Please try again." | |
| st.write(inappropriate_msg) | |
| st.session_state.chat_history.append({"role": "assistant", "content": inappropriate_msg}) | |
| if __name__ == "__main__": | |
| nutrition_disorder_streamlit() | |