Spaces:
Sleeping
Sleeping
import gradio as gr | |
import requests | |
import json | |
import torch | |
import pandas as pd | |
import inspect | |
import os | |
import gc | |
import torch, transformers | |
print(f"PyTorch: {torch.__version__}") | |
print(f"Transformers: {transformers.__version__}") | |
os.environ['CUDA_LAUNCH_BLOCKING'] = '1' | |
try: | |
from smolagents import CodeAgent, DuckDuckGoSearchTool, TransformersModel | |
SMOLAGENTS_AVAILABLE = True | |
except ImportError: | |
print("Warning: smolagents not available, using fallback implementation") | |
SMOLAGENTS_AVAILABLE = False | |
# (Keep Constants as is) | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
CURRENT_PROMPT = """You are a helpful assistant that can use tools including web-search and writing Python code to answer questions. | |
Question: {question} | |
To answer this question: | |
1. If you need current information or facts you're unsure about, use the search tool | |
2. If you need to transcribe audio, import smolagents and instantiate 'transcriber = smolagents.Transcriber()' | |
3. Write Python code to solve the problem | |
4. Make sure your final answer is clear and direct | |
Available tools: | |
- web_search(query: str) β str | |
- visit_webpage(url: str) β str | |
- transcriber(audio_file_path: str) β str | |
- python(code: str) β Any | |
- final_answer(answer: str) β None | |
Please provide a complete solution that ends with the correct final answer.""" | |
# --- Basic Agent Definition --- | |
# ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------ | |
class BasicAgent: | |
""" | |
Basic agent using smolagents CodeAgent with DuckDuckGoSearchTool. | |
""" | |
def __init__(self): | |
print("BasicAgent initialized.") | |
# Check GPU availability | |
print(f"π GPU Check:") | |
print(f" - CUDA available: {torch.cuda.is_available()}") | |
if torch.cuda.is_available(): | |
print(f" - CUDA device count: {torch.cuda.device_count()}") | |
print(f" - Current device: {torch.cuda.current_device()}") | |
print(f" - Device name: {torch.cuda.get_device_name()}") | |
print(f" - Device memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB") | |
else: | |
print(" - No CUDA devices found, will use CPU") | |
if SMOLAGENTS_AVAILABLE: | |
try: | |
# Initialize the model | |
print("π€ Initializing TransformersModel...") | |
self.model = TransformersModel( | |
model_id="Qwen/Qwen2.5-Coder-14B", | |
torch_dtype=torch.bfloat16, | |
device_map="auto", | |
) | |
if hasattr(self.model, 'tokenizer') and self.model.tokenizer is not None: | |
# Set left padding for better batching with causal models | |
self.model.tokenizer.padding_side = "left" | |
# Ensure pad token is set | |
if self.model.tokenizer.pad_token is None: | |
self.model.tokenizer.pad_token = self.model.tokenizer.eos_token | |
# Set default padding behavior for FlashAttention | |
self.model.tokenizer.pad_to_multiple_of = 64 | |
print("β Applied tokenizer padding fix for FlashAttention alignment") | |
# If the model has a processor with tokenizer, fix that too | |
if hasattr(self.model, 'processor') and hasattr(self.model.processor, 'tokenizer'): | |
self.model.processor.tokenizer.padding_side = "left" | |
if self.model.processor.tokenizer.pad_token is None: | |
self.model.processor.tokenizer.pad_token = self.model.processor.tokenizer.eos_token | |
self.model.processor.tokenizer.pad_to_multiple_of = 64 | |
print("β Applied processor tokenizer padding fix") | |
# Verify where model actually loaded | |
if hasattr(self.model, 'device'): | |
print(f"β Model loaded on device: {self.model.device}") | |
elif hasattr(self.model, 'model') and hasattr(self.model.model, 'device'): | |
print(f"β Model loaded on device: {self.model.model.device}") | |
else: | |
print("β Model loaded (device info not directly accessible)") | |
# Create CodeAgent with DuckDuckGoSearchTool and additional imports | |
self.agent = CodeAgent( | |
tools=[], | |
model=self.model, | |
max_steps=24, | |
additional_authorized_imports=[ | |
'math', 'statistics', 're', # Basic computation | |
'requests', 'json', # Web requests and JSON | |
'pandas', 'numpy', 'openpyxl',# Data analysis | |
'zipfile', 'os', # File processing | |
'datetime', 'time', # Date/time operations | |
'smolagents' | |
], | |
add_base_tools=True, | |
) | |
self.tools_available = True | |
print("β Smolagents CodeAgent initialized with DuckDuckGoSearchTool") | |
except Exception as e: | |
print(f"β οΈ Error initializing smolagents: {e}") | |
import traceback | |
traceback.print_exc() | |
self.tools_available = False | |
else: | |
self.tools_available = False | |
if not self.tools_available: | |
print("β οΈ Using fallback implementation without smolagents") | |
def _run_smolagents(self, question): | |
"""Run question through smolagents CodeAgent with enhanced prompting.""" | |
try: | |
# Use the global CURRENT_PROMPT variable | |
formatted_question = CURRENT_PROMPT.format(question=question) | |
print(f"π Processing question: {question}") | |
print(f"π§ Available tools: {[tool.__class__.__name__ for tool in self.agent.tools]}") | |
# Run the agent | |
with torch.no_grad(): | |
result = self.agent.run(formatted_question) | |
print(f"Raw result: {result}") | |
# Clean up the result (remove any remaining prefixes) | |
if isinstance(result, str): | |
result = result.strip() | |
# Remove common prefixes | |
prefixes_to_remove = ["The answer is ", "Answer: ", "Final answer: "] | |
for prefix in prefixes_to_remove: | |
if result.startswith(prefix): | |
result = result[len(prefix):].strip() | |
return result | |
except Exception as e: | |
import traceback | |
return f"Agent error: {e}\n{traceback.format_exc()}" | |
def _fallback_implementation(self, question): | |
"""Fallback when smolagents is not available.""" | |
return f"Smolagents not available. Question received: {question}" | |
def __call__(self, question): | |
"""Process a question using the smolagents CodeAgent or fallback.""" | |
if self.tools_available: | |
return self._run_smolagents(question) | |
else: | |
return self._fallback_implementation(question) | |
def cleanup_memory(): | |
"""Centralized memory cleanup function""" | |
if torch.cuda.is_available(): | |
torch.cuda.synchronize() | |
import time | |
time.sleep(0.1) | |
torch.cuda.empty_cache() | |
gc.collect() | |
def run_and_submit_all( profile: gr.OAuthProfile | None): | |
""" | |
Fetches all questions, runs the BasicAgent on them, submits all answers, | |
and displays the results. | |
""" | |
# --- Determine HF Space Runtime URL and Repo URL --- | |
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code | |
if profile: | |
username= f"{profile.username}" | |
print(f"User logged in: {username}") | |
else: | |
print("User not logged in.") | |
return "Please Login to Hugging Face with the button.", None | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
submit_url = f"{api_url}/submit" | |
# 1. Instantiate Agent ( modify this part to create your agent) | |
try: | |
agent = BasicAgent() | |
except Exception as e: | |
print(f"Error instantiating agent: {e}") | |
return f"Error initializing agent: {e}", None | |
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
print(agent_code) | |
# 2. Fetch Questions | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
print("Fetched questions list is empty.") | |
return "Fetched questions list is empty or invalid format.", None | |
print(f"Fetched {len(questions_data)} questions.") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching questions: {e}") | |
return f"Error fetching questions: {e}", None | |
except requests.exceptions.JSONDecodeError as e: | |
print(f"Error decoding JSON response from questions endpoint: {e}") | |
print(f"Response text: {response.text[:500]}") | |
return f"Error decoding server response for questions: {e}", None | |
except Exception as e: | |
print(f"An unexpected error occurred fetching questions: {e}") | |
return f"An unexpected error occurred fetching questions: {e}", None | |
# 3. Run your Agent | |
results_log = [] | |
answers_payload = [] | |
print(f"Running agent on {len(questions_data)} questions...") | |
for item in questions_data: | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or question_text is None: | |
print(f"Skipping item with missing task_id or question: {item}") | |
continue | |
try: | |
submitted_answer = agent(question_text) | |
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
except Exception as e: | |
print(f"Error running agent on task {task_id}: {e}") | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
finally: | |
cleanup_memory() | |
if not answers_payload: | |
print("Agent did not produce any answers to submit.") | |
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
# 4. Prepare Submission | |
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
print(status_update) | |
# 5. Submit | |
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = ( | |
f"Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
print("Submission successful.") | |
results_df = pd.DataFrame(results_log) | |
return final_status, results_df | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() | |
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
except requests.exceptions.JSONDecodeError: | |
error_detail += f" Response: {e.response.text[:500]}" | |
status_message = f"Submission Failed: {error_detail}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.Timeout: | |
status_message = "Submission Failed: The request timed out." | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.RequestException as e: | |
status_message = f"Submission Failed: Network error - {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except Exception as e: | |
status_message = f"An unexpected error occurred during submission: {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
# --- Build Gradio Interface using Blocks --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# Basic Agent Evaluation Runner") | |
gr.Markdown( | |
""" | |
**Instructions:** | |
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... | |
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. | |
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. | |
--- | |
**Disclaimers:** | |
Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). | |
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. | |
""" | |
) | |
gr.LoginButton() | |
run_button = gr.Button("Run Evaluation & Submit All Answers") | |
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
# Removed max_rows=10 from DataFrame constructor | |
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
run_button.click( | |
fn=run_and_submit_all, | |
outputs=[status_output, results_table] | |
) | |
if __name__ == "__main__": | |
print("\n" + "-"*30 + " App Starting " + "-"*30) | |
# Check for SPACE_HOST and SPACE_ID at startup for information | |
space_host_startup = os.getenv("SPACE_HOST") | |
space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup | |
if space_host_startup: | |
print(f"β SPACE_HOST found: {space_host_startup}") | |
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
else: | |
print("βΉοΈ SPACE_HOST environment variable not found (running locally?).") | |
if space_id_startup: # Print repo URLs if SPACE_ID is found | |
print(f"β SPACE_ID found: {space_id_startup}") | |
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
else: | |
print("βΉοΈ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
print("-"*(60 + len(" App Starting ")) + "\n") | |
print("Launching Gradio Interface for Basic Agent Evaluation...") | |
demo.launch(debug=True, share=False) |