Spaces:
Restarting
Restarting
File size: 11,965 Bytes
425ca9e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
import os
import tempfile
import requests
import base64
from io import BytesIO
import time
from llama_index.core.tools import QueryEngineTool
from llama_index.core.tools import FunctionTool
from llama_index.core.agent.workflow import ReActAgent
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.llms.openai import OpenAI
from llama_index.core.agent.workflow import AgentStream
from openai import OpenAI as OpenAIClient
# Config
from dotenv import load_dotenv
load_dotenv()
USERNAME = os.environ["USERNAME"]
AGENT_CODE_URL = os.environ["AGENT_CODE_URL"]
GAIA_BASE_URL = "https://agents-course-unit4-scoring.hf.space"
open_ai_api_key = os.environ["OPENAI_API_KEY"]
os.environ['OPENAI_API_KEY'] = open_ai_api_key
class Agent:
def __init__(self, task: dict):
self.task = task
self.task_id = task["task_id"]
self.question = task["question"]
self.file_name = task.get("file_name", "")
self.llm = OpenAI(model="gpt-4o", api_key=open_ai_api_key)
self.client = OpenAIClient()
self.file_bytes = None
self.query_tool = None
self.agent = None
def download_file(self, task_id: str) -> bytes:
"""
Download the file associated with a GAIA task ID.
:param task_id: The task ID for which to download the file
:return: File content as bytes, or b"" if the download fails
"""
try:
url = f"{GAIA_BASE_URL}/files/{task_id}"
resp = requests.get(url)
resp.raise_for_status()
return resp.content
except Exception as e:
print(f"β Error downloading file for task {task_id}: {e}")
return b""
def save_file_to_temp(self) -> str:
temp_dir = tempfile.mkdtemp()
file_path = os.path.join(temp_dir, f"{self.file_name}")
with open(file_path, "wb") as f:
f.write(self.file_bytes)
return temp_dir
def index_from_directory(self, directory_path: str):
documents = SimpleDirectoryReader(directory_path).load_data()
index = VectorStoreIndex.from_documents(documents)
return index
def encode_image_bytes(self, image_bytes: bytes) -> str:
base64_bytes = base64.b64encode(image_bytes).decode("utf-8")
return f"data:image/jpeg;base64,{base64_bytes}"
def process_image(self, query: str) -> str:
"""
Process image and reply to the question.
"""
base64_image = self.encode_image_bytes(self.file_bytes)
try:
response = self.client.responses.create(
model="gpt-4o",
input=[{
"role": "user",
"content": [
{"type": "input_text", "text": f"Answer the question based on the image: {query}."},
{
"type": "input_image",
"image_url": base64_image,
},
],
}],
)
result = response.output_text
return result
except Exception as e:
print(f"β Error extracting the data from image: {e}")
return ""
def process_audio(self, query: str) -> str:
"""
Process image and reply to the question.
"""
audio_stream = BytesIO(self.file_bytes)
audio_stream.name = "audio.mp3"
try:
transcription = self.client.audio.transcriptions.create(
model="gpt-4o-mini-transcribe",
file=audio_stream,
response_format="text"
)
response = self.client.responses.create(
model="gpt-4o",
input = (
"You're an AI assistant whose task is to answer the following question based on the provided text. "
f"The question is: {query} "
f"The text is: {transcription} "
"Do not provide any additional information or explanation."
)
)
result = response.output_text
return result
except Exception as e:
print(f"β Error extracting the data from audio: {e}")
return ""
def run_code(self, query: str) -> str:
try:
# Upload the code file
uploaded_file = self.client.files.create(
file=BytesIO(self.file_bytes),
purpose="assistants"
)
# Create an assistant with Code Interpreter enabled
assistant = self.client.beta.assistants.create(
instructions=(
"You are a professional programmer. When asked a technical question, "
"analyze and execute the uploaded code using the code interpreter tool."
),
model="gpt-4o",
tools=[{"type": "code_interpreter"}],
tool_resources={"code_interpreter": {"file_ids": [uploaded_file.id]}}
)
# Create a thread and send message with the user query
thread = self.client.beta.threads.create()
self.client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content=query,
)
# Run the assistant and wait for it to complete
run = self.client.beta.threads.runs.create_and_poll(
thread_id=thread.id,
assistant_id=assistant.id
)
if run.status != "completed":
print(f"β οΈ Run did not complete successfully. Status: {run.status}")
return "Code execution failed or was incomplete."
# Retrieve and return the assistant's reply
messages = self.client.beta.threads.messages.list(thread_id=thread.id)
final_response = messages.data[0].content[0].text.value
return final_response
except Exception as e:
print(f"β Error running code via assistant: {e}")
return ""
def validate_query_tool_output(self, query: str, output: str) -> str:
"""
Validate the output of the query against the expected format.
"""
try:
response = self.client.responses.create(
model="gpt-4o",
input = (
"You're an AI assistant that validates the output of a query against the expected format. "
f"The query is: {query}. The output is: {output}. Validate the output and if the output is not correctly formatted as per the query, provide the correct output. "
"The output should be concise. Examples: (1) if you need to provide a move in a chess game, then the output should contain only the move `Qd1+` without any additional details. "
"(2) If the output should be a list of items, provide them without any additional details like `Salt, pepper, chilli`. "
"If the output is already correct, then just return the output. "
"Do not provide any additional information or explanation."
)
)
result = response.output_text
return result
except Exception as e:
print(f"β Error validating query output: {e}")
print("Returning an original output ...")
return output
def buld_tools(self, query_engine):
query_engine_tool = QueryEngineTool.from_defaults(
query_engine=query_engine,
name=f"query_tool_task",
description="Query the indexed content from the GAIA file.",
return_direct=True,
)
image_question_tool = FunctionTool.from_defaults(
self.process_image,
name="image_question_tool",
description="Answer a question based on an image and its contents."
)
audio_question_tool = FunctionTool.from_defaults(
self.process_audio,
name="audio_question_tool",
description="Answer a question based on an audio and its contents."
)
code_execution_tool = FunctionTool.from_defaults(
self.run_code,
name="load_and_execute_code_tool",
description="Loads the full content of a script and executes it to answer the question.",
)
return [
query_engine_tool,
image_question_tool,
audio_question_tool,
code_execution_tool
]
async def run_task(self):
task_id = self.task["task_id"]
question = self.task["question"]
self.file_bytes = self.download_file(task_id)
if not self.file_bytes:
print(f"β οΈ No file found for task {task_id}")
return
# Save file to temp dir and index it
directory_path = self.save_file_to_temp()
index = self.index_from_directory(directory_path)
if not index:
print(f"β Could not index task {task_id}")
return
query_engine = index.as_query_engine(llm=self.llm, similarity_top_k=5)
# Create a task-specific tool
tools = self.buld_tools(query_engine)
# Create a one-off agent for this task
rag_agent = ReActAgent(
name=f"agent_task_{task_id}",
description="Parses and answers the question using indexed content.",
llm=self.llm,
tools=tools,
system_prompt=(
"You are an agent designed to answer a GAIA benchmark question using the attached file.\n"
"You must always start by choosing the correct tool:\n"
"- Use `query_tool_task` for parsing and searching documents (text, tables, PDFs, etc.).\n"
"- Use `image_question_tool` if the file is an image and cannot be parsed as text.\n"
"- Use `audio_question_tool` if the file is an audio and cannot be parsed as text.\n"
"- Use `code_execution_tool` if the file is a code and cannot be parsed as text.\n"
"Do not explain or comment on your answer. the output should be formatted as per the query."
)
)
user_msg = (
f"GAIA Question:\n{question}\n\n"
"Choose the correct tool based on the file type (document or image).\n"
"Use `query_tool_task`, `image_question_tool`, `audio_question_tool` or `code_execution_tool` to extract the answer."
)
try:
handler = rag_agent.run(user_msg=user_msg)
# π§ Show live reasoning/thought process
print(f"\nπ§ ReAct Reasoning for question {question}:\n")
async for event in handler.stream_events():
if isinstance(event, AgentStream):
print(event.delta, end="", flush=True)
# Final response
response = await handler
print(f"\nβ
Final Answer:\n{response}\n")
# Optional: print tool call history
if response.tool_calls:
print("π οΈ Tool Calls:")
for call in response.tool_calls:
tool_name = getattr(call, "tool_name", "unknown")
kwargs = getattr(call, "tool_kwargs", {})
print(f"- Tool: {tool_name} | Input: {kwargs}")
validated_result = self.validate_query_tool_output(question, response)
print("====================================")
print(f"β
Validated Answer:\n{validated_result}\n")
print("====================================")
return validated_result
except Exception as e:
print(f"β Error for task {task_id}: {e}") |