# agent.py import os from dotenv import load_dotenv from langgraph.graph import START, StateGraph, MessagesState from langgraph.prebuilt import tools_condition from langgraph.prebuilt import ToolNode from langchain_google_genai import ChatGoogleGenerativeAI from langchain_groq import ChatGroq from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint, HuggingFaceEmbeddings from langchain_community.tools.tavily_search import TavilySearchResults from langchain_community.document_loaders import WikipediaLoader from langchain_community.document_loaders import ArxivLoader from langchain_community.vectorstores import SupabaseVectorStore from langchain_core.messages import SystemMessage, HumanMessage from langchain_core.tools import tool from langchain.tools.retriever import create_retriever_tool from supabase.client import Client, create_client from sentence_transformers import SentenceTransformer from langchain.embeddings.base import Embeddings from typing import List import numpy as np import pandas as pd import uuid import requests import json from langchain_core.documents import Document from langchain_community.vectorstores import FAISS from langchain_community.embeddings import HuggingFaceEmbeddings load_dotenv() @tool def multiply(a: int, b: int) -> int: """Multiply two numbers. Args: a: first int b: second int """ return a * b @tool def add(a: int, b: int) -> int: """Add two numbers. Args: a: first int b: second int """ return a + b @tool def subtract(a: int, b: int) -> int: """Subtract two numbers. Args: a: first int b: second int """ return a - b @tool def divide(a: int, b: int) -> int: """Divide two numbers. Args: a: first int b: second int """ if b == 0: raise ValueError("Cannot divide by zero.") return a / b @tool def modulus(a: int, b: int) -> int: """Get the modulus of two numbers. Args: a: first int b: second int """ return a % b @tool def wiki_search(query: str) -> str: """Search Wikipedia for a query and return maximum 2 results. Args: query: The search query.""" search_docs = WikipediaLoader(query=query, load_max_docs=2).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ]) return {"wiki_results": formatted_search_docs} @tool def web_search(query: str) -> str: """Search Tavily for a query and return maximum 3 results. Args: query: The search query.""" search_docs = TavilySearchResults(max_results=3).invoke(query=query) formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ]) return {"web_results": formatted_search_docs} @tool def arvix_search(query: str) -> str: """Search Arxiv for a query and return maximum 3 result. Args: query: The search query.""" search_docs = ArxivLoader(query=query, load_max_docs=3).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content[:1000]}\n' for doc in search_docs ]) return {"arvix_results": formatted_search_docs} # load the system prompt from the file with open("system_prompt.txt", "r", encoding="utf-8") as f: system_prompt = f.read() # System message sys_msg = SystemMessage(content=system_prompt) # ------------------------------- # Step 2: Load the JSON file or tasks (Replace this part if you're loading tasks dynamically) # ------------------------------- # Here we assume the tasks are already fetched from a URL or file. # For now, using an example JSON array directly. Replace this with the actual loading logic. tasks = [ { "task_id": "8e867cd7-cff9-4e6c-867a-ff5ddc2550be", "question": "How many studio albums were published by Mercedes Sosa between 2000 and 2009 (included)? You can use the latest 2022 version of English Wikipedia.", "Level": "1", "file_name": "" }, { "task_id": "a1e91b78-d3d8-4675-bb8d-62741b4b68a6", "question": "In the video https://www.youtube.com/watch?v=L1vXCYZAYYM, what is the highest number of bird species to be on camera simultaneously?", "Level": "1", "file_name": "" } ] # ------------------------------- # Step 3: Create Documents from Each JSON Object # ------------------------------- docs = [] for task in tasks: # Debugging: Print the keys of each task to ensure 'question' exists print(f"Keys in task: {task.keys()}") # Ensure the required field 'question' exists if 'question' not in task: print(f"Skipping task with missing 'question' field: {task}") continue content = task.get('question', "").strip() if not content: print(f"Skipping task with empty 'question': {task}") continue # Add unique ID to each document task['id'] = str(uuid.uuid4()) # Create a document from the task data docs.append(Document(page_content=content, metadata=task)) # ------------------------------- # Step 4: Set up HuggingFace Embeddings and FAISS VectorStore # ------------------------------- # Initialize HuggingFace Embedding model embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") # Create FAISS VectorStore from documents vector_store = FAISS.from_documents(docs, embedding_model) # Save the FAISS index locally vector_store.save_local("faiss_index") #print("✅ FAISS index created and saved locally.") # ------------------------------- # Step 5: Create Retriever Tool (for use in LangChain) # ------------------------------- retriever = vector_store.as_retriever() # Create the retriever tool question_retriever_tool = create_retriever_tool( retriever=retriever, name="Question_Search", description="A tool to retrieve documents related to a user's question." ) tools = [ multiply, add, subtract, divide, modulus, wiki_search, web_search, arvix_search, ] # Build graph function def build_graph(provider: str = "google"): """Build the graph""" # Load environment variables from .env file if provider == "google": # Google Gemini llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0) elif provider == "groq": # Groq https://console.groq.com/docs/models llm = ChatGroq(model="qwen-qwq-32b", temperature=0) # optional : qwen-qwq-32b gemma2-9b-it elif provider == "huggingface": # TODO: Add huggingface endpoint llm = ChatHuggingFace( llm=HuggingFaceEndpoint( url="https://api-inference.huggingface.co/models/Meta-DeepLearning/llama-2-7b-chat-hf", temperature=0, ), ) else: raise ValueError("Invalid provider. Choose 'google', 'groq' or 'huggingface'.") # Bind tools to LLM llm_with_tools = llm.bind_tools(tools) # Node def assistant(state: MessagesState): """Assistant node""" return {"messages": [llm_with_tools.invoke(state["messages"])]} def retriever(state: MessagesState): """Retriever node""" similar_question = vector_store.similarity_search(state["messages"][0].content) if not similar_question: example_msg = HumanMessage(content="No similar question found.") else: example_msg = HumanMessage( content=f"Here I provide a similar question and answer for reference:\n\n{similar_question[0].page_content}", ) return {"messages": [sys_msg] + state["messages"] + [example_msg]} builder = StateGraph(MessagesState) builder.add_node("retriever", retriever) builder.add_node("assistant", assistant) builder.add_node("tools", ToolNode(tools)) builder.add_edge(START, "retriever") builder.add_edge("retriever", "assistant") builder.add_conditional_edges( "assistant", tools_condition, ) builder.add_edge("tools", "assistant") # Compile graph return builder.compile()