|
|
import os |
|
|
from dotenv import load_dotenv |
|
|
from typing import TypedDict, List, Dict, Any, Optional |
|
|
from urllib.parse import urlparse |
|
|
from langgraph.graph import StateGraph, START, END, MessagesState |
|
|
from langchain.agents import create_tool_calling_agent, ConversationalAgent, AgentExecutor, initialize_agent, create_react_agent |
|
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
|
from langchain_groq import ChatGroq |
|
|
from langchain_core.tools import tool, Tool |
|
|
from langchain_core.messages import HumanMessage, SystemMessage |
|
|
from langchain.memory import ConversationBufferMemory |
|
|
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate |
|
|
from langgraph.prebuilt import ToolNode |
|
|
from langgraph.prebuilt import tools_condition |
|
|
|
|
|
|
|
|
from langchain_community.tools import DuckDuckGoSearchResults |
|
|
from langchain_community.document_loaders import ImageCaptionLoader |
|
|
import requests, time, yt_dlp |
|
|
import pandas as pd |
|
|
from pathlib import Path |
|
|
from bs4 import BeautifulSoup |
|
|
from langchain_community.tools import WikipediaQueryRun |
|
|
from langchain_community.utilities import WikipediaAPIWrapper, DuckDuckGoSearchAPIWrapper |
|
|
from langchain_community.document_loaders import YoutubeLoader |
|
|
from langchain_community.document_loaders import UnstructuredExcelLoader |
|
|
from langchain_community.document_loaders import AssemblyAIAudioTranscriptLoader |
|
|
from langchain.text_splitter import CharacterTextSplitter |
|
|
from langchain_community.utilities import GoogleSerperAPIWrapper |
|
|
|
|
|
load_dotenv() |
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
|
|
@tool |
|
|
def duckduck_websearch(query: str) -> str: |
|
|
"""Allows search through DuckDuckGo. |
|
|
Args: |
|
|
query: what you want to search |
|
|
""" |
|
|
try: |
|
|
|
|
|
|
|
|
search = search = DuckDuckGoSearchAPIWrapper(max_results=5) |
|
|
results = search.run(query) |
|
|
if not results or results.strip() == "": |
|
|
return "No search results found." |
|
|
|
|
|
return results |
|
|
except Exception as e: |
|
|
print(str(e)) |
|
|
print('Try to use request method for duckcudckgo Search') |
|
|
base_url = "https://html.duckduckgo.com/html" |
|
|
params = {"q": query} |
|
|
response = requests.get(base_url, params=params, timeout=10) |
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
for result in soup.find_all('div', {'class': 'result'}): |
|
|
title = result.find('a', {'class': 'result__a'}) |
|
|
snippet = result.find('a', {'class': 'result__snippet'}) |
|
|
if title and snippet: |
|
|
results.append({ |
|
|
'title': title.get_text(), |
|
|
'snippet': snippet.get_text(), |
|
|
'url': title.get('href') |
|
|
}) |
|
|
|
|
|
|
|
|
formatted_results = [] |
|
|
for r in results[:10]: |
|
|
formatted_results.append(f"[{r['title']}]({r['url']})\n{r['snippet']}\n") |
|
|
|
|
|
return "## Search Results\n\n" + "\n".join(formatted_results) |
|
|
|
|
|
@tool |
|
|
def serper_websearch(query: str) -> str: |
|
|
"""Allows search through Serper. |
|
|
Args: |
|
|
query: what you want to search |
|
|
""" |
|
|
search = GoogleSerperAPIWrapper(serper_api_key=os.getenv("SERPER_API_KEY")) |
|
|
results = search.run(query) |
|
|
return results |
|
|
|
|
|
@tool |
|
|
def visit_webpage(url: str) -> str: |
|
|
"""Fetches raw HTML content of a web page. |
|
|
Args: |
|
|
url: the webpage url |
|
|
""" |
|
|
try: |
|
|
response = requests.get(url, timeout=5) |
|
|
return response.text[:5000] |
|
|
except Exception as e: |
|
|
return f"[ERROR fetching {url}]: {str(e)}" |
|
|
|
|
|
@tool |
|
|
def wiki_search(query: str) -> str: |
|
|
"""Wiki search tools. |
|
|
Args: |
|
|
query: what you want to wiki |
|
|
""" |
|
|
api_wrapper = WikipediaAPIWrapper(top_k_results=1, doc_content_chars_max=100) |
|
|
wikipediatool = WikipediaQueryRun(api_wrapper=api_wrapper) |
|
|
return wikipediatool.run({"query": query}) |
|
|
|
|
|
@tool |
|
|
def text_splitter(text: str) -> List[str]: |
|
|
"""Splits text into chunks using LangChain's CharacterTextSplitter. |
|
|
Args: |
|
|
text: A string of text to split. |
|
|
""" |
|
|
splitter = CharacterTextSplitter(chunk_size=450, chunk_overlap=10) |
|
|
return splitter.split_text(text) |
|
|
|
|
|
@tool |
|
|
def youtube_transcript(video_url: str) -> str: |
|
|
"""Fetched youtube transcript |
|
|
Args: |
|
|
video_url: YouTube video url |
|
|
""" |
|
|
try: |
|
|
loader = YoutubeLoader.from_youtube_url(video_url) |
|
|
|
|
|
|
|
|
return loader.load() |
|
|
except Exception as e: |
|
|
return f"Error fetching transcript: {str(e)}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def read_file(task_id: str) -> str: |
|
|
"""First download the file, then read its content |
|
|
Args: |
|
|
dir: the task_id |
|
|
""" |
|
|
file_url = f'{DEFAULT_API_URL}/files/{task_id}' |
|
|
r = requests.get(file_url, timeout=15, allow_redirects=True) |
|
|
with open('temp', "wb") as fp: |
|
|
fp.write(r.content) |
|
|
with open('temp') as f: |
|
|
return f.read() |
|
|
|
|
|
@tool |
|
|
def excel_read(task_id: str) -> str: |
|
|
"""First download the excel file, then read its content |
|
|
Args: |
|
|
dir: the task_id |
|
|
""" |
|
|
try: |
|
|
file_url = f'{DEFAULT_API_URL}/files/{task_id}' |
|
|
r = requests.get(file_url, timeout=15, allow_redirects=True) |
|
|
with open('temp.xlsx', "wb") as fp: |
|
|
fp.write(r.content) |
|
|
|
|
|
df = pd.read_excel('temp.xlsx') |
|
|
|
|
|
result = ( |
|
|
f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" |
|
|
) |
|
|
result += f"Columns: {', '.join(df.columns)}\n\n" |
|
|
|
|
|
result += "Summary statistics:\n" |
|
|
result += str(df.describe()) |
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error analyzing Excel file: {str(e)}" |
|
|
|
|
|
@tool |
|
|
def csv_read(task_id: str) -> str: |
|
|
"""First download the csv file, then read its content |
|
|
Args: |
|
|
dir: the task_id |
|
|
""" |
|
|
try: |
|
|
file_url = f'{DEFAULT_API_URL}/files/{task_id}' |
|
|
r = requests.get(file_url, timeout=15, allow_redirects=True) |
|
|
with open('temp.csv', "wb") as fp: |
|
|
fp.write(r.content) |
|
|
|
|
|
df = pd.read_csv(temp.csv) |
|
|
|
|
|
result = ( |
|
|
f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" |
|
|
) |
|
|
result += f"Columns: {', '.join(df.columns)}\n\n" |
|
|
|
|
|
result += "Summary statistics:\n" |
|
|
result += str(df.describe()) |
|
|
return result |
|
|
except Exception as e: |
|
|
return f"Error analyzing CSV file: {str(e)}" |
|
|
|
|
|
@tool |
|
|
def mp3_listen(task_id: str) -> str: |
|
|
"""First download the mp3 file, then listen to it |
|
|
Args: |
|
|
dir: the task_id |
|
|
""" |
|
|
file_url = f'{DEFAULT_API_URL}/files/{task_id}' |
|
|
r = requests.get(file_url, timeout=15, allow_redirects=True) |
|
|
with open('temp.mp3', "wb") as fp: |
|
|
fp.write(r.content) |
|
|
loader = AssemblyAIAudioTranscriptLoader(file_path="temp.mp3", api_key=os.getenv("AssemblyAI_API_KEY")) |
|
|
docs = loader.load() |
|
|
contents = [doc.page_content for doc in docs] |
|
|
return "\n".join(contents) |
|
|
|
|
|
|
|
|
@tool |
|
|
def image_caption(dir: str) -> str: |
|
|
"""Understand the content of the provided image |
|
|
Args: |
|
|
dir: the image url link |
|
|
""" |
|
|
loader = ImageCaptionLoader(images=[dir]) |
|
|
metadata = loader.load() |
|
|
return metadata[0].page_content |
|
|
|
|
|
|
|
|
from langchain_experimental.tools import PythonREPLTool |
|
|
@tool |
|
|
def run_python(code: str): |
|
|
""" Run the given python code |
|
|
Args: |
|
|
code: the python code |
|
|
""" |
|
|
return PythonREPLTool().run(code) |
|
|
|
|
|
@tool |
|
|
def multiply(a: float, b: float) -> float: |
|
|
"""Multiply two numbers. |
|
|
Args: |
|
|
a: first float |
|
|
b: second float |
|
|
""" |
|
|
return a * b |
|
|
|
|
|
@tool |
|
|
def add(a: float, b: float) -> float: |
|
|
"""Add two numbers. |
|
|
Args: |
|
|
a: first float |
|
|
b: second float |
|
|
""" |
|
|
return a + b |
|
|
|
|
|
@tool |
|
|
def subtract(a: float, b: float) -> float: |
|
|
"""Subtract two numbers. |
|
|
Args: |
|
|
a: first float |
|
|
b: second float |
|
|
""" |
|
|
return a - b |
|
|
|
|
|
@tool |
|
|
def divide(a: float, b: float) -> float: |
|
|
"""Divide two numbers. |
|
|
Args: |
|
|
a: first float |
|
|
b: second float |
|
|
""" |
|
|
if b == 0: |
|
|
raise ValueError("Cannot divide by zero.") |
|
|
return a / b |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BasicAgent: |
|
|
def __init__(self): |
|
|
self.model = ChatGoogleGenerativeAI( |
|
|
model="gemini-2.0-flash-lite", |
|
|
temperature=0, |
|
|
max_tokens=1024, |
|
|
candidate_count=1, |
|
|
google_api_key=os.getenv("GEMINI_API_KEY"), |
|
|
) |
|
|
|
|
|
self.sys_prompt = """" |
|
|
You are a general AI assistant. I will ask you a question. Report your thoughts, and finish your answer with the following template: |
|
|
FINAL ANSWER: [YOUR FINAL ANSWER]. |
|
|
YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separared list of numbers and/or strings. |
|
|
If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. |
|
|
If you are asked for a string, don't use articles, neither abbreviations (eg. for cities), and write the digits in plain text unless specified otherwise. |
|
|
If you are asked for a comma separated list, apply the above rules depending of whether the element to put in the list is a number or a string. |
|
|
|
|
|
You have access to the following tools: |
|
|
- serper_websearch: web search the content of the query by passing the query as input with Serper Search Engine |
|
|
- duckduck_websearch: web search the content of the query by passing the query as input with DuckDuckGo Search Engine |
|
|
- visit_webpage: visit the given webpage url by passing the url as input |
|
|
- wiki_search: wiki search the content of the query by passing the query as input if the question asks for wiki search it |
|
|
- text_splitter: split text into chunks |
|
|
- youtube_transcript: fetch the transcript of the Youtube video by passing the video url as input if the question asks for watching a Youtube video |
|
|
- read_file: read the content of the attached file by passing the TASK-ID as input |
|
|
- excel_read: read the content of the attached excel file by passing the TASK-ID as input |
|
|
- csv_read: read the content of the attached csv file by passing the TASK-ID as input |
|
|
- mp3_listen: listen to the content of the attached mp3 file by passing the TASK-ID as input |
|
|
- image_caption: understand the visual content of the attached image by passing the TASK-ID as input |
|
|
- run_python: run the python code |
|
|
|
|
|
If Task ID is included in the question, remember to call the relevant read tools [ie. read_file, excel_read, csv_read, mp3_listen, image_caption] |
|
|
Note: python_tool is called when the question mentions the term "Python" or any math calculation. |
|
|
""" |
|
|
|
|
|
self.tools = [ |
|
|
Tool( |
|
|
name="duckduck_websearch", |
|
|
func=duckduck_websearch, |
|
|
description="Search the web for information with DuckDuckGo" |
|
|
), |
|
|
Tool( |
|
|
name="serper_websearch", |
|
|
func=serper_websearch, |
|
|
description="Search the web for information with Serper" |
|
|
), |
|
|
Tool( |
|
|
name="visit_webpage", |
|
|
func=visit_webpage, |
|
|
description="Directly visit the webpage" |
|
|
), |
|
|
Tool( |
|
|
name="wiki_search", |
|
|
func=wiki_search, |
|
|
description="Search the information on Wikipedia" |
|
|
), |
|
|
Tool( |
|
|
name="text_splitter", |
|
|
func=text_splitter, |
|
|
description="Split text into chunks" |
|
|
), |
|
|
Tool( |
|
|
name="analyze_video", |
|
|
func=self._analyze_video, |
|
|
description="Analyze YouTube video content directly" |
|
|
), |
|
|
Tool( |
|
|
name="youtube_transcript", |
|
|
func=youtube_transcript, |
|
|
description="Fetch the transcript of YouTube video" |
|
|
), |
|
|
Tool( |
|
|
name="read_file", |
|
|
func=read_file, |
|
|
description="Read the file content" |
|
|
), |
|
|
Tool( |
|
|
name="excel_read", |
|
|
func=excel_read, |
|
|
description="Read the content of Excel file" |
|
|
), |
|
|
Tool( |
|
|
name="csv_read", |
|
|
func=csv_read, |
|
|
description="Read the content of CSV file" |
|
|
), |
|
|
Tool( |
|
|
name='mp3_listen', |
|
|
func=mp3_listen, |
|
|
description="Listen to the MP3 file" |
|
|
), |
|
|
Tool( |
|
|
name="image_caption", |
|
|
func=image_caption, |
|
|
description="Understand the image content" |
|
|
), |
|
|
Tool( |
|
|
name="run_python", |
|
|
func=run_python, |
|
|
description="Run Python code" |
|
|
) |
|
|
] |
|
|
|
|
|
self.memory = ConversationBufferMemory( |
|
|
memory_key="chat_history", |
|
|
return_messages=True |
|
|
) |
|
|
self.agent = self.__setup_agent__() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("BasicAgent initialized.") |
|
|
|
|
|
def __call__(self, task: dict) -> str: |
|
|
task_id, question, file_name = task["task_id"], task["question"], task["file_name"] |
|
|
print(f"Agent received question (first 50 chars): {question[:50]}...") |
|
|
|
|
|
if file_name == "" or file_name is None: |
|
|
question = question |
|
|
else: |
|
|
question = f"{question} with TASK-ID: {task_id}" |
|
|
|
|
|
fixed_answer = "This is a default answer." |
|
|
|
|
|
|
|
|
max_retries = 5 |
|
|
base_sleep = 1 |
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
fixed_answer = self.agent.run(question) |
|
|
print(f"Agent returning fixed answer: {fixed_answer}") |
|
|
time.sleep(60) |
|
|
return fixed_answer |
|
|
except Exception as e: |
|
|
sleep_time = base_sleep * (attempt + 1) |
|
|
if attempt < max_retries - 1: |
|
|
print(str(e)) |
|
|
print(f"Attempt {attempt + 1} failed. Retrying in {sleep_time} seconds...") |
|
|
time.sleep(sleep_time) |
|
|
continue |
|
|
return f"Error processing query after {max_retries} attempts: {str(e)}" |
|
|
return fixed_answer |
|
|
|
|
|
@tool |
|
|
def _analyze_video(self, url: str) -> str: |
|
|
"""Analyze video content using Gemini's video understanding capabilities.""" |
|
|
try: |
|
|
|
|
|
parsed_url = urlparse(url) |
|
|
if not all([parsed_url.scheme, parsed_url.netloc]): |
|
|
return "Please provide a valid video URL with http:// or https:// prefix." |
|
|
|
|
|
|
|
|
if 'youtube.com' not in url and 'youtu.be' not in url: |
|
|
return "Only YouTube videos are supported at this time." |
|
|
|
|
|
try: |
|
|
|
|
|
ydl_opts = { |
|
|
'quiet': True, |
|
|
'no_warnings': True, |
|
|
'extract_flat': True, |
|
|
'no_playlist': True, |
|
|
'youtube_include_dash_manifest': False |
|
|
} |
|
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
|
try: |
|
|
|
|
|
info = ydl.extract_info(url, download=False, process=False) |
|
|
if not info: |
|
|
return "Could not extract video information." |
|
|
|
|
|
title = info.get('title', 'Unknown') |
|
|
description = info.get('description', '') |
|
|
|
|
|
|
|
|
prompt = f"""Please analyze this YouTube video: |
|
|
Title: {title} |
|
|
URL: {url} |
|
|
Description: {description} |
|
|
Please provide a detailed analysis focusing on: |
|
|
1. Main topic and key points from the title and description |
|
|
2. Expected visual elements and scenes |
|
|
3. Overall message or purpose |
|
|
4. Target audience""" |
|
|
|
|
|
|
|
|
messages = [HumanMessage(content=prompt)] |
|
|
response = self.model.invoke(messages) |
|
|
return response.content if hasattr(response, 'content') else str(response) |
|
|
|
|
|
except Exception as e: |
|
|
if 'Sign in to confirm' in str(e): |
|
|
return "This video requires age verification or sign-in. Please provide a different video URL." |
|
|
return f"Error accessing video: {str(e)}" |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error extracting video info: {str(e)}" |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error analyzing video: {str(e)}" |
|
|
|
|
|
def __setup_agent__(self) -> AgentExecutor: |
|
|
PREFIX = """ |
|
|
You are a general AI assistant that can use various tools to answer question. I will ask you a question. Report your thoughts, and finish your answer with the following template: |
|
|
FINAL ANSWER: [YOUR FINAL ANSWER]. |
|
|
YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separared list of numbers and/or strings. |
|
|
If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. |
|
|
If you are asked for a string, don't use articles, neither abbreviations (eg. for cities), and write the digits in plain text unless specified otherwise. |
|
|
If you are asked for a comma separated list, apply the above rules depending of whether the element to put in the list is a number or a string. |
|
|
|
|
|
NOTE: |
|
|
- If Task ID is included in the question, remember to call the relevant read tools [ie. read_file, excel_read, csv_read, mp3_listen, image_caption] |
|
|
- python_tool is called when the question mentions the term "Python" or any math calculation. |
|
|
""" |
|
|
FORMAT_INSTRUCTIONS = """ |
|
|
To use a tool, use the following format: |
|
|
Thought: Do I need to use a tool? Yes |
|
|
Action: the action to take, should be one of [{tool_names}] |
|
|
Action Input: the input to the action |
|
|
Observation: the result of the action |
|
|
When you have a response to say to the Human, or if you do not need to use a tool, you MUST use the format: |
|
|
Thought: Do I need to use a tool? No |
|
|
Final Answer: [your response here] |
|
|
Begin! Remember to ALWAYS include 'Thought:', 'Action:', 'Action Input:', and 'Final Answer:' in your responses. |
|
|
""" |
|
|
SUFFIX = """ |
|
|
Previous conversation history: |
|
|
{chat_history} |
|
|
New question: {input} |
|
|
{agent_scratchpad} |
|
|
""" |
|
|
agent = ConversationalAgent.from_llm_and_tools( |
|
|
llm=self.model, |
|
|
tools=self.tools, |
|
|
prefix=PREFIX, |
|
|
format_instructions=FORMAT_INSTRUCTIONS, |
|
|
suffix=SUFFIX, |
|
|
input_variables=["input", "chat_history", "agent_scratchpad", "tool_names"], |
|
|
handle_parsing_errors=True |
|
|
) |
|
|
return AgentExecutor.from_agent_and_tools( |
|
|
agent=agent, |
|
|
tools=self.tools, |
|
|
memory=self.memory, |
|
|
max_iterations=30, |
|
|
verbose=True, |
|
|
handle_parsing_errors=True, |
|
|
|
|
|
) |
|
|
|