Preps

In [87]:
from tools import describe_audio_tool

In [88]:
from globals import *
from global_functions import *
from tools import *
from IPython.display import Image, display
import datasets
import base64
from langchain_core.messages import AnyMessage, HumanMessage, AIMessage, SystemMessage, ToolMessage
# describe_image_tool
import subprocess
from langchain_community.document_loaders import UnstructuredExcelLoader
import yt_dlp
from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper

In [89]:
# ------------------------------------------------------ #
# MODELS
# ------------------------------------------------------ #
# init_chat_llm = ChatOllama(model=model_name)
init_chat_llm = ChatTogether(model="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free", api_key=os.getenv("TOGETHER_API_KEY"))


In [90]:
# ------------------------------------------------------ #
# FUNCTIONS FOR TOOLS
# ------------------------------------------------------ #
def read_mp3(f, normalized=False):
 """Read MP3 file to numpy array."""
 a = pydub.AudioSegment.from_mp3(f)
 y = np.array(a.get_array_of_samples())
 if a.channels == 2:
 y = y.reshape((-1, 2))
 # y = y.mean(axis=1)
 y = y[:,1]
 if normalized:
 return a.frame_rate, np.float32(y) / 2**15
 else:
 return a.frame_rate, y

In [91]:
# ------------------------------------------------------ #
# TOOLS
# ------------------------------------------------------ #
# mp3
def describe_audio_tool(file_name: str) -> str:
 """
 This tool receives a file name of an audio, uploads the audio and returns a detailed description of the audio.
 Inputs: file_name as str
 Outputs: audio detailed description as str
 """
 # --------------------------------------------------------------------------- #
 file_dir = f'files/{file_name}'
 print(f"{file_dir=}")
 audio_input_sr, audio_input_np = read_mp3(file_dir)
 audio_input_t = torch.tensor(audio_input_np, dtype=torch.float32)
 target_sr = 16000
 resampler = T.Resample(audio_input_sr, target_sr, dtype=audio_input_t.dtype)
 resampled_audio_input_t: torch.Tensor = resampler(audio_input_t)
 resampled_audio_input_np = resampled_audio_input_t.numpy()
 # --------------------------------------------------------------------------- #
 inputs = processor(resampled_audio_input_np, sampling_rate=16000, return_tensors="pt", padding=True)
 # Inference
 with torch.no_grad():
 logits = model(**inputs).logits
 # Decode
 predicted_ids = torch.argmax(logits, dim=-1)
 transcription = processor.decode(predicted_ids[0])
 return transcription

# py
def python_repl_tool(file_name: str) -> str:
 """
 This tool receives a file name of a python code and executes it. Then, it returns a an output of the code.
 Inputs: file_name as str
 Outputs: code's output as str
 """
 file_dir = f'files/{file_name}'
 print(f"{file_dir=}")
 result = subprocess.run(["python", file_dir], capture_output=True, text=True)
 return result.stdout

# xlsx
def excel_repl_tool(file_name: str) -> str:
 """
 This tool receives a file name of an Excel file and reads it. Then, it returns a string of the content of the file.
 Inputs: file_name as str
 Outputs: file's content as str
 """
 file_dir = f'files/{file_name}'
 print(f"{file_dir=}")
 loader = UnstructuredExcelLoader(file_dir, mode="elements")
 docs = loader.load()
 return docs[0].metadata['text_as_html']


# youtube
def youtube_extractor_tool(url: str) -> str:
 """
 This tool receives a url of the youtube video and reads it. Then, it returns a string of the content of the video.
 Inputs: url as str
 Outputs: video's content as str
 """
 file_name = 'my_audio_file'
 ydl_opts = {
 'format': 'bestaudio/best',
 'outtmpl': f'files/{file_name}.%(ext)s', # <-- set your custom filename here
 'postprocessors': [{
 'key': 'FFmpegExtractAudio',
 'preferredcodec': 'mp3',
 'preferredquality': '192',
 }],
 }

 with yt_dlp.YoutubeDL(ydl_opts) as ydl:
 ydl.download([url])
 return describe_audio_tool(file_name=f'{file_name}.mp3')


# wiki
def wikipedia_tool(query: str) -> str:
 """
 This tool receives a query to search inside the Wikipedia website, reads the page and returns the relevant information as a string.
 Inputs: query as str
 Outputs: Wikipedia's relevant content as str
 """
 print(f"[wiki tool] {query=}")
 wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
 respond = wikipedia.run(query)
 return respond

In [92]:
# ------------------------------------------------------ #
# BENDING TO TOOLS
# ------------------------------------------------------ #
tools = [search_tool, describe_image_tool, describe_audio_tool, python_repl_tool, excel_repl_tool, youtube_extractor_tool, wikipedia_tool]
chat_llm = init_chat_llm.bind_tools(tools)

In [93]:
# ------------------------------------------------------ #
# STATE
# ------------------------------------------------------ #
class AgentState(TypedDict):
 # messages: list[AnyMessage, add_messages]
 messages: list[AnyMessage]
 file_name: str
 final_output_is_good: bool

In [94]:
# ------------------------------------------------------ #
# HELP FUNCTIONS
# ------------------------------------------------------ #
def step_print(state: AgentState | None, step_label: str):
 if state:
 print(f'<<--- [{len(state["messages"])}] Entering ``{step_label}`` Node... --->>')
 else:
 print(f'<<--- [] Entering ``{step_label}`` Node... --->>')


def messages_print(messages_to_print: List[AnyMessage]):
 print('--- Message/s ---')
 for m in messages_to_print:
 print(f'{m.type} ({m.name}): \n{m.content}')
 print(f'<<--- *** --->>')

In [95]:
# ------------------------------------------------------ #
# NODES
# ------------------------------------------------------ #
def preprocessing(state: AgentState):
 # state['messages'] = [state['messages'][0]]
 step_print(None, 'Preprocessing')
 if state['file_name'] != '':
 # state['messages'] += f"\nfile_name: {state['file_name']}"
 state['messages'][0].content += f"\nfile_name: {state['file_name']}"
 messages_print(state['messages'])
 return {
 "messages": [SystemMessage(content=DEFAULT_SYSTEM_PROMPT)] + state["messages"]
 }


def assistant(state: AgentState):
 # state["messages"] = [SystemMessage(content=DEFAULT_SYSTEM_PROMPT)] + state["messages"]
 step_print(state, 'assistant')
 ai_message = chat_llm.invoke(state["messages"])
 messages_print([ai_message])
 return {
 'messages': state["messages"] + [ai_message]
 }


base_tool_node = ToolNode(tools)
def wrapped_tool_node(state: AgentState):
 step_print(state, 'Tools')
 # Call the original ToolNode
 result = base_tool_node.invoke(state)
 messages_print(result["messages"])
 # Append to the messages list instead of replacing it
 state["messages"] += result["messages"]
 return {"messages": state["messages"]}


def checker_final_answer(state: AgentState):
 step_print(state, 'Final Check')
 s = state['messages'][-1].content
 if "FINAL ANSWER: " not in s:
 return {
 'messages': state["messages"],
 'final_output_is_good': False
 }
 return {
 'final_output_is_good': True
 }


In [96]:
# ------------------------------------------------------ #
# CONDITIONAL FUNCTIONS
# ------------------------------------------------------ #
def condition_output(state: AgentState) -> Literal["assistant", "__end__"]:
 if state['final_output_is_good']:
 return END
 return "assistant"


def condition_tools_or_continue(
 state: Union[list[AnyMessage], dict[str, Any], BaseModel],
 messages_key: str = "messages",
) -> Literal["tools", "checker_final_answer"]:

 if isinstance(state, list):
 ai_message = state[-1]
 elif isinstance(state, dict) and (messages := state.get(messages_key, [])):
 ai_message = messages[-1]
 elif messages := getattr(state, messages_key, []):
 ai_message = messages[-1]
 else:
 # pass
 raise ValueError(f"No messages found in input state to tool_edge: {state}")
 if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
 return "tools"
 return "checker_final_answer"
 # return "__end__"


In [97]:
# ------------------------------------------------------ #
# BUILDERS
# ------------------------------------------------------ #
def workflow_tools() -> Tuple[StateGraph, str]:
 i_builder = StateGraph(AgentState)

 # Nodes
 i_builder.add_node('preprocessing', preprocessing)
 i_builder.add_node('assistant', assistant)
 i_builder.add_node('tools', wrapped_tool_node)
 i_builder.add_node('checker_final_answer', checker_final_answer)

 # Edges
 i_builder.add_edge(START, 'preprocessing')
 i_builder.add_edge('preprocessing', 'assistant')
 i_builder.add_conditional_edges('assistant', condition_tools_or_continue)
 i_builder.add_edge('tools', 'assistant')
 i_builder.add_conditional_edges('checker_final_answer', condition_output)
 return i_builder, 'workflow_tools'

Graph

In [98]:
# print(alfred.get_graph().draw_mermaid())

In [99]:
# ------------------------------------------------------ #
# COMPILATION
# ------------------------------------------------------ #
# builder, builder_name = workflow_simple()
builder, builder_name = workflow_tools()
alfred = builder.compile()

In [100]:
# display(Image(alfred.get_graph().draw_mermaid_png()))

Check

In [101]:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()

In [102]:
for item_num, item in enumerate(questions_data):
 # dict_keys(['task_id', 'question', 'Level', 'file_name'])
 if item['file_name'] != '':
 print(f"Task {item_num} has file: {item['file_name']}")
 if 'wiki' in item['question']:
 print(f"Task {item_num} question: {item['question']}")

item_num = 0
item = questions_data[item_num]
# dict_keys(['task_id', 'question', 'Level', 'file_name'])
print('---')
print(f"NUM: {item_num}")
print(f"ID: {item['task_id']}")
print(f"FILE NAME: {item['file_name']}")
print(f"QUESTION: \n{item['question']}")
print('---')

Task 0 question: How many studio albums were published by Mercedes Sosa between 2000 and 2009 (included)? You can use the latest 2022 version of english wikipedia.
Task 3 has file: cca530fc-4052-43b2-b130-b30968d8aa44.png
Task 9 has file: 99c9cc74-fdc8-46c6-8f8d-3ce2d3bfeea3.mp3
Task 11 has file: f918266a-b3e0-4914-865d-4faa564f1aef.py
Task 13 has file: 1f975693-876d-457b-a649-393859e79bf3.mp3
Task 18 has file: 7bd855d8-463d-4ed5-93ca-5fe35145f733.xlsx
---
NUM: 0
ID: 8e867cd7-cff9-4e6c-867a-ff5ddc2550be
FILE NAME: 
QUESTION: 
How many studio albums were published by Mercedes Sosa between 2000 and 2009 (included)? You can use the latest 2022 version of english wikipedia.
---


In [103]:
response = alfred.invoke({
 'messages': [HumanMessage(content=item['question'])],
 'file_name': item['file_name'],
 'final_output_is_good': False,
})

<<--- [] Entering ``Preprocessing`` Node... --->>
--- Message/s ---
human (None): 
How many studio albums were published by Mercedes Sosa between 2000 and 2009 (included)? You can use the latest 2022 version of english wikipedia.
<<--- *** --->>
<<--- [2] Entering ``assistant`` Node... --->>
--- Message/s ---
ai (None): 

<<--- *** --->>
<<--- [3] Entering ``Tools`` Node... --->>
[wiki tool] query='Mercedes Sosa discography'
--- Message/s ---
tool (wikipedia_tool): 
Page: Mercedes Sosa
Summary: Haydée Mercedes "La Negra" Sosa (Latin American Spanish: [meɾˈseðes ˈsosa]; 9 July 1935 – 4 October 2009) was an Argentine singer who was popular throughout Latin America and many countries outside the region. With her roots in Argentine folk music, Sosa became one of the preeminent exponents of El nuevo cancionero. She gave voice to songs written by many Latin American songwriters. Her music made people hail her as the "voice of the voiceless ones". She was often called "the conscience of Latin

In [93]:
# pic_loc_str = 'files/cca530fc-4052-43b2-b130-b30968d8aa44.png'
# # doc = [UnstructuredImageLoader(pic_loc_str).load()]
# dataset = datasets.Dataset.from_dict({"image": [pic_loc_str]}).cast_column("image", datasets.Image())
# dataset[0]["image"]