shamik
feat: updated the multiagent framework to include arxiv agent.
5ea5bac unverified
# import asyncio
from datetime import date
import nest_asyncio
from llama_index.core.agent.workflow import AgentWorkflow, ReActAgent
from llama_index.core.tools import FunctionTool
from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI
from llama_index.tools.duckduckgo import DuckDuckGoSearchToolSpec
from src.agent_hackathon.consts import PROJECT_ROOT_DIR
# from dotenv import find_dotenv, load_dotenv
from src.agent_hackathon.generate_arxiv_responses import ArxivResponseGenerator
from src.agent_hackathon.logger import get_logger
nest_asyncio.apply()
# _ = load_dotenv(dotenv_path=find_dotenv(raise_error_if_not_found=False), override=True)
logger = get_logger(log_name="multiagent", log_dir=PROJECT_ROOT_DIR / "logs")
class MultiAgentWorkflow:
"""Multi-agent workflow for retrieving research papers and related events."""
def __init__(self) -> None:
"""Initialize the workflow with LLM, tools, and generator."""
logger.info("Initializing MultiAgentWorkflow.")
self.llm = HuggingFaceInferenceAPI(
model="meta-llama/Llama-3.3-70B-Instruct",
provider="auto",
# provider="nebius",
temperature=0.1,
top_p=0.95,
# api_key=os.getenv(key="NEBIUS_API_KEY"),
# base_url="https://api.studio.nebius.com/v1/",
system_prompt="Don't just plan, but execute the plan until failure.",
)
self._generator = ArxivResponseGenerator(
vector_store_path=PROJECT_ROOT_DIR / "db/arxiv_docs.db"
)
self._arxiv_rag_tool = FunctionTool.from_defaults(
fn=self._arxiv_rag,
name="arxiv_rag",
description="Retrieves arxiv research papers.",
return_direct=False,
)
self._duckduckgo_search_tool = [
tool
for tool in DuckDuckGoSearchToolSpec().to_tool_list()
if tool.metadata.name == "duckduckgo_full_search"
]
self._arxiv_agent = ReActAgent(
name="arxiv_agent",
description="Retrieves information about arxiv research papers",
system_prompt="You are arxiv research paper agent, who retrieves information "
"about arxiv research papers.",
tools=[self._arxiv_rag_tool],
llm=self.llm,
)
self._websearch_agent = ReActAgent(
name="web_search",
description="Searches the web",
system_prompt="You are search engine who searches the web using duckduckgo tool",
tools=self._duckduckgo_search_tool,
llm=self.llm,
)
self._workflow = AgentWorkflow(
agents=[self._arxiv_agent, self._websearch_agent],
root_agent="arxiv_agent",
timeout=180,
)
# AgentWorkflow.from_tools_or_functions(
# tools_or_functions=self._duckduckgo_search_tool,
# llm=self.llm,
# system_prompt="You are an expert that "
# "searches for any corresponding events related to the "
# "user query "
# "using the duckduckgo_search_tool and returns the final results." \
# "Don't return the steps but execute the necessary tools that you have " \
# "access to and return the results.",
# timeout=180,
# )
logger.info("MultiAgentWorkflow initialized.")
def _arxiv_rag(self, query: str) -> str:
"""Retrieve research papers from arXiv based on the query.
Args:
query (str): The search query.
Returns:
str: Retrieved research papers as a string.
"""
return self._generator.retrieve_arxiv_papers(query=query)
def _clean_response(self, result: str) -> str:
"""Removes the think tags.
Args:
result (str): The result with the <think></think> content.
Returns:
str: The result without the <think></think> content.
"""
if result.find("</think>"):
result = result[result.find("</think>") + len("</think>") :]
return result
async def run(self, user_query: str) -> str:
"""Run the multi-agent workflow for a given user query.
Args:
user_query (str): The user's search query.
Returns:
str: The output string.
"""
logger.info("Running multi-agent workflow.")
try:
user_msg = (
f"First, give me arxiv research papers about: {user_query}."
f"Then search with web search agent for any events related to : {user_query}.\n"
f"The web search results should be relevant to the current year: {date.today().year}."
"Return all the content from all the agents."
)
results = await self._workflow.run(user_msg=user_msg)
logger.info("Workflow run completed successfully.")
return results
except Exception as err:
logger.error(f"Workflow run failed: {err}")
raise
# if __name__ == "__main__":
# USER_QUERY = "i want to learn more about nlp"
# workflow = MultiAgentWorkflow()
# logger.info("Starting workflow for user query.")
# try:
# result = asyncio.run(workflow.run(user_query=USER_QUERY))
# logger.info("Workflow finished. Output below:")
# print(result)
# except Exception as err:
# logger.error(f"Error during workflow execution: {err}")