|
import math |
|
from groq import Groq |
|
|
|
import smolagents |
|
|
|
|
|
import os |
|
import re |
|
import requests |
|
import gradio as gr |
|
from langchain_community.chat_models import ChatHuggingFace |
|
from langchain_community.llms import HuggingFaceEndpoint |
|
from langchain_community.tools import DuckDuckGoSearchRun |
|
from langchain_community.utilities import WikipediaAPIWrapper |
|
from langchain_community.vectorstores import Chroma |
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
from langchain.agents import Tool, AgentExecutor, initialize_agent |
|
from langchain.agents import AgentType |
|
from langchain_core.prompts import ChatPromptTemplate |
|
from langchain_core.messages import SystemMessage |
|
from langchain.memory import ConversationBufferWindowMemory |
|
from youtube_transcript_api import YouTubeTranscriptApi |
|
import pytesseract |
|
import cv2 |
|
import pandas as pd |
|
from langchain.tools import tool |
|
from huggingface_hub import InferenceClient |
|
|
|
import json |
|
import inspect |
|
from typing import List, Dict, Optional, Callable, Any |
|
import cv2 |
|
import pytesseract |
|
import pandas as pd |
|
from langchain_community.utilities import WikipediaAPIWrapper, DuckDuckGoSearchAPIWrapper |
|
from youtube_transcript_api import YouTubeTranscriptApi |
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
from langchain_community.vectorstores import Chroma |
|
|
|
|
|
import os |
|
import gradio as gr |
|
import requests |
|
import inspect |
|
import pandas as pd |
|
|
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
client = Groq(api_key=os.getenv("GROQ_API_KEY")) |
|
''' |
|
# --- Basic Agent Definition --- |
|
# ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------ |
|
class BasicAgent: |
|
def __init__(self): |
|
print("BasicAgent initialized.") |
|
def __call__(self, question: str) -> str: |
|
print(f"Agent received question (first 50 chars): {question[:50]}...") |
|
fixed_answer = "This is a default answer." |
|
print(f"Agent returning fixed answer: {fixed_answer}") |
|
return fixed_answer |
|
''' |
|
|
|
|
|
|
|
''' |
|
class CodeAgent: |
|
def __init__( |
|
self, |
|
client, |
|
model: str = "llama-3.3-70b-versatile", |
|
system_prompt: str = "", |
|
tools: Optional[List[Dict]] = None, |
|
tool_functions: Optional[Dict[str, Callable]] = None |
|
): |
|
self.client = client |
|
self.model = model |
|
self.conversation_history = [ |
|
{ |
|
"role": "system", |
|
"content": f"{system_prompt}\n\nIMPORTANT: Always respond with complete, natural " |
|
"language answers. Never show raw function calls to the user." |
|
} |
|
] |
|
self.tools = tools or [] |
|
self.tool_functions = tool_functions or {} |
|
|
|
def add_tool(self, tool_definition: Dict, tool_function: Callable) -> None: |
|
self.tools.append(tool_definition) |
|
self.tool_functions[tool_definition["function"]["name"]] = tool_function |
|
|
|
def add_langchain_tools(self, tools: List[Callable]) -> None: |
|
for tool_func in tools: |
|
tool_def = self._convert_tool_to_definition(tool_func) |
|
self.add_tool(tool_def, tool_func) |
|
|
|
def _convert_tool_to_definition(self, tool_func: Callable) -> Dict: |
|
docstring = inspect.getdoc(tool_func) or "" |
|
description = docstring.split("\n")[0] if docstring else tool_func.__name__ |
|
|
|
sig = inspect.signature(tool_func) |
|
parameters = {"type": "object", "properties": {}, "required": []} |
|
|
|
for name, param in sig.parameters.items(): |
|
if name == "self": |
|
continue |
|
|
|
param_info = {"type": "string", "description": ""} |
|
for line in docstring.split("\n")[1:]: |
|
line = line.strip() |
|
if line.startswith(f"{name}:"): |
|
param_info["description"] = line[len(f"{name}:"):].strip() |
|
break |
|
|
|
parameters["properties"][name] = param_info |
|
parameters["required"].append(name) |
|
|
|
return { |
|
"type": "function", |
|
"function": { |
|
"name": tool_func.__name__, |
|
"description": description, |
|
"parameters": parameters |
|
} |
|
} |
|
|
|
def get_response(self, user_message: str) -> str: |
|
self.add_message("user", user_message) |
|
|
|
while True: |
|
response = self.client.chat.completions.create( |
|
messages=self.conversation_history, |
|
model=self.model, |
|
tools=self.tools if self.tools else None |
|
) |
|
|
|
message = response.choices[0].message |
|
|
|
if not hasattr(message, 'tool_calls') or not message.tool_calls: |
|
self.add_message("assistant", message.content) |
|
return message.content |
|
|
|
for tool_call in message.tool_calls: |
|
self._process_tool_call(tool_call) |
|
|
|
self._add_tool_call_message(message) |
|
|
|
def _process_tool_call(self, tool_call: Any) -> None: |
|
tool_name = tool_call.function.name |
|
try: |
|
arguments = json.loads(tool_call.function.arguments) |
|
tool_result = self._execute_tool(tool_name, arguments) |
|
|
|
# ✅ Correct format for OpenAI/Groq API |
|
self.conversation_history.append({ |
|
"role": "tool", |
|
"tool_call_id": tool_call.id, |
|
"content": str(tool_result) |
|
}) |
|
|
|
except Exception as e: |
|
self.conversation_history.append({ |
|
"role": "tool", |
|
"tool_call_id": tool_call.id, |
|
"content": f"Error: {str(e)}" |
|
}) |
|
|
|
def _add_tool_call_message(self, message: Any) -> None: |
|
self.conversation_history.append({ |
|
"role": "assistant", |
|
"content": None, |
|
"tool_calls": [{ |
|
"id": tc.id, |
|
"function": { |
|
"name": tc.function.name, |
|
"arguments": tc.function.arguments |
|
}, |
|
"type": "function" |
|
} for tc in message.tool_calls] |
|
}) |
|
|
|
def _execute_tool(self, tool_name: str, arguments: Dict) -> Any: |
|
if tool_name not in self.tool_functions: |
|
raise ValueError(f"Tool not found: {tool_name}") |
|
return self.tool_functions[tool_name](**arguments) |
|
|
|
def add_message(self, role: str, content: str) -> None: |
|
self.conversation_history.append({"role": role, "content": content}) |
|
|
|
def get_conversation_history(self) -> List[Dict]: |
|
return self.conversation_history |
|
|
|
def clear_history(self, keep_system_prompt: bool = True) -> None: |
|
if keep_system_prompt and self.conversation_history: |
|
system_msg = self.conversation_history[0] if self.conversation_history[0]["role"] == "system" else None |
|
self.conversation_history = [system_msg] if system_msg else [] |
|
else: |
|
self.conversation_history = [] |
|
|
|
''' |
|
|
|
|
|
|
|
|
|
|
|
class MyAgent: |
|
def __init__(self): |
|
self.client = Groq() |
|
self.model = "llama3-70b-8192" |
|
self.conversation_history = [] |
|
self.tools = self._define_tools() |
|
self._add_system_prompt() |
|
|
|
def _add_system_prompt(self): |
|
self.conversation_history.append({ |
|
"role": "system", |
|
"content": ( |
|
"You are a helpful assistant tasked with answering questions using a set of tools when needed. " |
|
"Always reason step-by-step internally, but only return the final answer to the user without any explanation. " |
|
"If a tool is needed, respond in JSON format like:\n" |
|
"{\n" |
|
" \"tool\": \"tool_name\",\n" |
|
" \"args\": {\"arg1\": \"value1\", ...}\n" |
|
"}\n" |
|
"Once the tool returns a result, respond with the final answer only, following these rules:\n" |
|
"- Use a number, a short string, or a comma-separated list of numbers/strings.\n" |
|
"- Do not include commas in numbers (e.g., use 1000 not 1,000).\n" |
|
"- Do not include units like \"$\" or \"%\" unless explicitly asked.\n" |
|
"- For strings, avoid articles (e.g., no 'the', 'a') and avoid abbreviations (e.g., use 'New York City' not 'NYC').\n" |
|
"- Write digits plainly (e.g., '2025' not 'two thousand twenty-five').\n" |
|
"Do not prepend any labels to the final answer." |
|
) |
|
}) |
|
|
|
def _define_tools(self): |
|
def web_search(query: str) -> str: |
|
|
|
return f"[WEB RESULT for: {query}]" |
|
|
|
def calculator(expression: str) -> str: |
|
try: |
|
return str(eval(expression)) |
|
except Exception as e: |
|
return f"Error: {e}" |
|
|
|
return [ |
|
Tool.from_function( |
|
name="web_search", |
|
func=web_search, |
|
description="Search the web for current information." |
|
), |
|
Tool.from_function( |
|
name="calculator", |
|
func=calculator, |
|
description="Evaluate a math expression." |
|
), |
|
] |
|
|
|
def _execute_tool(self, tool_name, args): |
|
for tool in self.tools: |
|
if tool.name == tool_name: |
|
return tool.func(**args) |
|
return f"Tool '{tool_name}' not found." |
|
|
|
def add_message(self, role, content): |
|
self.conversation_history.append({"role": role, "content": content}) |
|
|
|
def get_response(self, user_message: str) -> str: |
|
self.add_message("user", user_message) |
|
|
|
response = self.client.chat.completions.create( |
|
model=self.model, |
|
messages=self.conversation_history |
|
) |
|
|
|
message = response.choices[0].message |
|
assistant_reply = message.content.strip() |
|
self.add_message("assistant", assistant_reply) |
|
|
|
|
|
if assistant_reply.startswith("{") and "tool" in assistant_reply: |
|
try: |
|
tool_call = json.loads(assistant_reply) |
|
tool_name = tool_call.get("tool") |
|
args = tool_call.get("args", {}) |
|
result = self._execute_tool(tool_name, args) |
|
self.add_message("tool", result) |
|
return result |
|
except Exception as e: |
|
error_msg = f"Error executing tool: {e}" |
|
self.add_message("tool", error_msg) |
|
return error_msg |
|
|
|
return assistant_reply |
|
''' |
|
# ===== Tool Implementations ===== |
|
def wikipedia_search(query: str) -> str: |
|
"""Search Wikipedia and return summary.""" |
|
try: |
|
return WikipediaAPIWrapper().run(query) |
|
except Exception as e: |
|
return f"Wikipedia error: {str(e)}" |
|
|
|
def web_search(query: str) -> str: |
|
"""Search the web using DuckDuckGo.""" |
|
try: |
|
return DuckDuckGoSearchAPIWrapper().run(query) |
|
except Exception as e: |
|
return f"Search error: {str(e)}" |
|
|
|
def youtube_transcript(url: str) -> str: |
|
"""Extract transcript from a YouTube video URL.""" |
|
try: |
|
video_id = url.split("v=")[-1].split("&")[0] |
|
transcript = YouTubeTranscriptApi.get_transcript(video_id) |
|
return "\n".join([x["text"] for x in transcript]) |
|
except Exception as e: |
|
return f"YouTube error: {str(e)}" |
|
|
|
def image_ocr(path: str) -> str: |
|
"""Extract text from an image file.""" |
|
try: |
|
img = cv2.imread(path) |
|
if img is None: |
|
return "Error: Could not read image file" |
|
return pytesseract.image_to_string(img) |
|
except Exception as e: |
|
return f"OCR error: {str(e)}" |
|
|
|
def read_excel(path: str) -> str: |
|
"""Read contents of an Excel (.xlsx) file.""" |
|
try: |
|
df = pd.read_excel(path) |
|
return df.head(100).to_string() # Limit output size |
|
except Exception as e: |
|
return f"Excel error: {str(e)}" |
|
|
|
def math_calc(expression: str) -> str: |
|
"""Evaluate a math expression safely.""" |
|
try: |
|
allowed_chars = set('0123456789+-*/.() ') |
|
if not all(c in allowed_chars for c in expression): |
|
return "Error: Invalid characters in expression" |
|
return str(eval(expression, {"__builtins__": None}, {})) |
|
except Exception as e: |
|
return f"Math error: {str(e)}" |
|
''' |
|
''' |
|
# === Example Usage === |
|
if __name__ == "__main__": |
|
from groq import Groq |
|
import os |
|
|
|
# Initialize client |
|
#client = Groq(api_key=userdata.get('GROQ_API_KEY')) |
|
client = Groq(api_key=os.getenv("GROQ_API_KEY")) |
|
# Create agent |
|
agent = CodeAgent(client=client,model="llama-3.3-70b-versatile",system_prompt="You are a helpful AI assistant with access to tools.") |
|
|
|
# Add tools |
|
tools = [wikipedia_search,web_search,youtube_transcript,math_calc] |
|
agent.add_langchain_tools(tools) |
|
|
|
# Example queries |
|
print(agent.get_response("What is machine learning?")) |
|
print(agent.get_response("Calculate 2 + 2 * 3")) |
|
#print(agent.get_response("Search for latest AI news")) |
|
''' |
|
|
|
def run_and_submit_all( profile: gr.OAuthProfile | None): |
|
""" |
|
Fetches all questions, runs the BasicAgent on them, submits all answers, |
|
and displays the results. |
|
""" |
|
|
|
space_id = os.getenv("SPACE_ID") |
|
|
|
if profile: |
|
username= f"{profile.username}" |
|
print(f"User logged in: {username}") |
|
else: |
|
print("User not logged in.") |
|
return "Please Login to Hugging Face with the button.", None |
|
|
|
api_url = DEFAULT_API_URL |
|
questions_url = f"{api_url}/questions" |
|
submit_url = f"{api_url}/submit" |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
agent = MyAgent() |
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
print(f"Error instantiating agent: {e}") |
|
return f"Error initializing agent: {e}", None |
|
|
|
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
print(agent_code) |
|
|
|
|
|
print(f"Fetching questions from: {questions_url}") |
|
try: |
|
response = requests.get(questions_url, timeout=15) |
|
response.raise_for_status() |
|
questions_data = response.json() |
|
if not questions_data: |
|
print("Fetched questions list is empty.") |
|
return "Fetched questions list is empty or invalid format.", None |
|
print(f"Fetched {len(questions_data)} questions.") |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching questions: {e}") |
|
return f"Error fetching questions: {e}", None |
|
except requests.exceptions.JSONDecodeError as e: |
|
print(f"Error decoding JSON response from questions endpoint: {e}") |
|
print(f"Response text: {response.text[:500]}") |
|
return f"Error decoding server response for questions: {e}", None |
|
except Exception as e: |
|
print(f"An unexpected error occurred fetching questions: {e}") |
|
return f"An unexpected error occurred fetching questions: {e}", None |
|
|
|
|
|
results_log = [] |
|
answers_payload = [] |
|
print(f"Running agent on {len(questions_data)} questions...") |
|
for item in questions_data: |
|
task_id = item.get("task_id") |
|
question_text = item.get("question") |
|
if not task_id or question_text is None: |
|
print(f"Skipping item with missing task_id or question: {item}") |
|
continue |
|
try: |
|
submitted_answer = agent.get_response(question_text) |
|
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
|
except Exception as e: |
|
print(f"Error running agent on task {task_id}: {e}") |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
|
|
|
if not answers_payload: |
|
print("Agent did not produce any answers to submit.") |
|
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
|
|
|
|
|
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
|
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
|
print(status_update) |
|
|
|
|
|
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
|
try: |
|
response = requests.post(submit_url, json=submission_data, timeout=60) |
|
response.raise_for_status() |
|
result_data = response.json() |
|
final_status = ( |
|
f"Submission Successful!\n" |
|
f"User: {result_data.get('username')}\n" |
|
f"Overall Score: {result_data.get('score', 'N/A')}% " |
|
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
|
f"Message: {result_data.get('message', 'No message received.')}" |
|
) |
|
print("Submission successful.") |
|
results_df = pd.DataFrame(results_log) |
|
return final_status, results_df |
|
except requests.exceptions.HTTPError as e: |
|
error_detail = f"Server responded with status {e.response.status_code}." |
|
try: |
|
error_json = e.response.json() |
|
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
|
except requests.exceptions.JSONDecodeError: |
|
error_detail += f" Response: {e.response.text[:500]}" |
|
status_message = f"Submission Failed: {error_detail}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except requests.exceptions.Timeout: |
|
status_message = "Submission Failed: The request timed out." |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except requests.exceptions.RequestException as e: |
|
status_message = f"Submission Failed: Network error - {e}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except Exception as e: |
|
status_message = f"An unexpected error occurred during submission: {e}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Basic Agent Evaluation Runner") |
|
gr.Markdown( |
|
""" |
|
**Instructions:** |
|
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... |
|
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. |
|
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. |
|
--- |
|
**Disclaimers:** |
|
Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). |
|
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. |
|
""" |
|
) |
|
|
|
gr.LoginButton() |
|
|
|
run_button = gr.Button("Run Evaluation & Submit All Answers") |
|
|
|
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
|
|
|
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
|
run_button.click( |
|
fn=run_and_submit_all, |
|
outputs=[status_output, results_table] |
|
) |
|
|
|
if __name__ == "__main__": |
|
print("\n" + "-"*30 + " App Starting " + "-"*30) |
|
|
|
space_host_startup = os.getenv("SPACE_HOST") |
|
space_id_startup = os.getenv("SPACE_ID") |
|
|
|
if space_host_startup: |
|
print(f"✅ SPACE_HOST found: {space_host_startup}") |
|
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") |
|
else: |
|
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") |
|
|
|
if space_id_startup: |
|
print(f"✅ SPACE_ID found: {space_id_startup}") |
|
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") |
|
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") |
|
else: |
|
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") |
|
|
|
print("-"*(60 + len(" App Starting ")) + "\n") |
|
|
|
print("Launching Gradio Interface for Basic Agent Evaluation...") |
|
demo.launch(debug=True, share=False) |
|
'''prompt = """ |
|
You are a helpful assistant tasked with answering questions using a set of tools through tool calls in JSON format. |
|
For each question, think step-by-step. If a tool is required, call it using a properly structured tool_call in JSON format including 'tool_call_id'. |
|
|
|
After using tools, return the final answer in the following format: |
|
- A single number, or |
|
- A single word or phrase, or |
|
- A comma-separated list of numbers and/or strings. |
|
|
|
Formatting guidelines for answers: |
|
- Do not include units such as $, %, kg, etc., unless explicitly asked. |
|
- Do not use commas in numbers (write 1000000, not 1,000,000). |
|
- Do not use abbreviations (e.g., use 'los angeles' instead of 'la'). |
|
- Do not use articles (e.g., use 'banana' instead of 'a banana'). |
|
- Numbers must be in digits. |
|
- Strings must be lowercase and space-separated if needed. |
|
|
|
Your output must directly start with the answer — do not prepend anything like 'Answer:', 'The answer is:', or similar. |
|
|
|
Example tool call structure (when using a tool): |
|
{ |
|
"tool_call_id": "example-id", |
|
"function": { |
|
"name": "web_search", |
|
"arguments": { |
|
"query": "current population of india" |
|
} |
|
} |
|
} |
|
"""''' |