Spaces:
Runtime error
Runtime error
import gradio as gr | |
from huggingface_hub import InferenceClient | |
""" | |
For more information on `huggingface_hub` Inference API support, please check the docs: https://huggingface.co/docs/huggingface_hub/v0.22.2/en/guides/inference | |
""" | |
client = InferenceClient("microsoft/phi-2") | |
#client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") | |
def respond( | |
message, | |
history: list[tuple[str, str]], | |
system_message, | |
max_tokens, | |
temperature, | |
top_p, | |
): | |
messages = [{"role": "system", "content": system_message}] | |
for val in history: | |
if val[0]: | |
messages.append({"role": "user", "content": val[0]}) | |
if val[1]: | |
messages.append({"role": "assistant", "content": val[1]}) | |
messages.append({"role": "user", "content": message}) | |
response = "" | |
for message in messages: | |
print(message) | |
for message in client.chat_completion( | |
messages, | |
max_tokens=max_tokens, | |
stream=True, | |
temperature=temperature, | |
top_p=top_p, | |
): | |
token = message.choices[0].delta.content | |
response += token | |
yield response | |
""" | |
For information on how to customize the ChatInterface, peruse the gradio docs: https://www.gradio.app/docs/chatinterface | |
""" | |
demo = gr.ChatInterface( | |
respond, | |
additional_inputs=[ | |
gr.Textbox(value="You are a friendly Chatbot.", label="System message"), | |
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), | |
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), | |
gr.Slider( | |
minimum=0.1, | |
maximum=1.0, | |
value=0.95, | |
step=0.05, | |
label="Top-p (nucleus sampling)", | |
), | |
], | |
) | |
from typing import Annotated, Sequence, TypedDict | |
import operator | |
import functools | |
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder | |
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage | |
from langchain_community.tools.tavily_search import TavilySearchResults | |
from langchain_experimental.tools import PythonREPLTool | |
from langchain.agents import create_openai_tools_agent | |
from langchain_huggingface import HuggingFacePipeline | |
from langgraph.graph import StateGraph, END | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
# SETUP: HuggingFace Model and Pipeline | |
#name = "meta-llama/Llama-3.2-1B" | |
#name="deepseek-ai/DeepSeek-R1-Distill-Qwen-32B" | |
#name="deepseek-ai/deepseek-llm-7b-chat" | |
#name="openai-community/gpt2" | |
#name="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B" | |
#name="microsoft/Phi-3.5-mini-instruct" | |
name="Qwen/Qwen2.5-7B-Instruct-1M" | |
tokenizer = AutoTokenizer.from_pretrained(name,truncation=True) | |
tokenizer.pad_token = tokenizer.eos_token | |
model = AutoModelForCausalLM.from_pretrained(name) | |
pipe = pipeline( | |
"text-generation", | |
model=model, | |
tokenizer=tokenizer, | |
device_map="auto", | |
max_new_tokens=500, # text to generate for outputs | |
) | |
print ("pipeline is created") | |
# Wrap in LangChain's HuggingFacePipeline | |
llm = HuggingFacePipeline(pipeline=pipe) | |
# Members and Final Options | |
members = ["Researcher", "Coder"] | |
options = ["FINISH"] + members | |
# Supervisor prompt | |
system_prompt = ( | |
"You are a supervisor tasked with managing a conversation between the following workers: {members}." | |
" Given the following user request, respond with the workers to act next. Each worker will perform a task" | |
" and respond with their results and status. When all workers are finished, respond with FINISH." | |
) | |
# Prompt template required for the workflow | |
prompt = ChatPromptTemplate.from_messages( | |
[ | |
("system", system_prompt), | |
MessagesPlaceholder(variable_name="messages"), | |
("system", "Given the conversation above, who should act next? Or Should we FINISH? Select one of: {options}"), | |
] | |
).partial(options=str(options), members=", ".join(members)) | |
print ("Prompt Template created") | |
# Supervisor routing logic | |
def route_tool_response(llm_response: str) -> str: | |
""" | |
Parse the LLM response to determine the next step based on routing logic. | |
Handles unexpected or poorly structured responses gracefully. | |
""" | |
# Normalize the LLM response | |
#llm_response = llm_response.strip().lower() # Strip whitespace and make lowercase | |
# Remove any prefixes like "Assistant:" or "System:" | |
# if ":" in llm_response: | |
# llm_response = llm_response.split(":")[-1].strip() | |
# Check for "finish" or worker names in the response | |
for member in members: | |
#if member.lower() in llm_response: | |
if member in llm_response: | |
return member | |
if "finish" in llm_response: | |
return "FINISH" | |
# If no valid response is found, return a fallback error | |
return "Invalid" | |
def supervisor_chain(state): | |
""" | |
Supervisor logic to interact with HuggingFacePipeline and decide the next worker. | |
""" | |
messages = state.get("messages", []) | |
try: | |
# Construct prompt for the supervisor | |
user_prompt = prompt.format(messages=messages) | |
# Generate the LLM's response | |
llm_response = pipe(user_prompt, max_new_tokens=100)[0]["generated_text"] | |
print(f"[DEBUG] LLM Response: {llm_response.strip()}") # Log LLM raw output | |
# Route the response to determine the next action | |
next_action = route_tool_response(llm_response) | |
# Validate the next action | |
if next_action not in options: | |
raise ValueError(f"Invalid next action: '{next_action}'. Expected one of {options}.") | |
# # Initialize intermediate_steps if not already present | |
# if "intermediate_steps" not in state: | |
# state["intermediate_steps"] = [] | |
# # Append the supervisor decision to intermediate_steps | |
# state["intermediate_steps"].append( | |
# {"supervisor": "decision", "next_action": next_action} | |
# ) | |
print(f"[DEBUG] Next action decided: {next_action}") # Log decision | |
return {"next": next_action, "messages": messages} | |
# return {"next": next_action, "messages": messages, "intermediate_steps": state["intermediate_steps"]} | |
except Exception as e: | |
print(f"[ERROR] Supervisor chain failed: {e}") | |
raise RuntimeError(f"Supervisor logic error: {str(e)}") | |
# AgentState definition | |
class AgentState(TypedDict): | |
messages: Annotated[Sequence[BaseMessage], operator.add] | |
next: str | |
# Create tools | |
tavily_tool = TavilySearchResults(max_results=5) | |
python_repl_tool = PythonREPLTool() | |
# Create agents with their respective prompts | |
research_agent = create_openai_tools_agent( | |
llm=llm, | |
tools=[tavily_tool], | |
prompt=ChatPromptTemplate.from_messages( | |
[ | |
SystemMessage(content="You are a web researcher."), | |
MessagesPlaceholder(variable_name="messages"), | |
MessagesPlaceholder(variable_name="agent_scratchpad"), # Add required placeholder | |
] | |
), | |
) | |
print ("Created agents with their respective prompts") | |
code_agent = create_openai_tools_agent( | |
llm=llm, | |
tools=[python_repl_tool], | |
prompt=ChatPromptTemplate.from_messages( | |
[ | |
SystemMessage(content="You may generate safe Python code for analysis."), | |
MessagesPlaceholder(variable_name="messages"), | |
MessagesPlaceholder(variable_name="agent_scratchpad"), # Add required placeholder | |
] | |
), | |
) | |
print ("create_openai_tools_agent") | |
# Create the workflow | |
workflow = StateGraph(AgentState) | |
# Nodes | |
workflow.add_node("Researcher", research_agent) # Pass the agent directly (no .run required) | |
workflow.add_node("Coder", code_agent) # Pass the agent directly | |
workflow.add_node("supervisor", supervisor_chain) | |
# Add edges for workflow transitions | |
for member in members: | |
workflow.add_edge(member, "supervisor") | |
#workflow.add_conditional_edges( | |
# "supervisor", | |
# lambda x: x["next"], | |
# {k: k for k in members} | {"FINISH": END} # Dynamically map workers to their actions | |
#) | |
workflow.add_conditional_edges( | |
"supervisor", | |
lambda x: x["next"], | |
{"Researcher":"Researcher","Coder":"Coder","FINISH": END} | |
) | |
print("[DEBUG] Workflow edges added: supervisor -> members/FINISH based on 'next'") | |
# Define entry point | |
workflow.set_entry_point("supervisor") | |
print(workflow) | |
# Compile the workflow | |
graph = workflow.compile() | |
from IPython.display import display, Image | |
display(Image(graph.get_graph().draw_mermaid_png())) | |
# Properly formatted initial state | |
initial_state = { | |
"messages": [ | |
#HumanMessage(content="Code hello world and print it to the terminal.") # Correct format for user input | |
HumanMessage(content="Write Code for printing \"hello world\" in Python. Keep it precise.") # Correct format for user input | |
] | |
# , | |
# "intermediate_steps": [] # Add this to track progress if needed | |
} | |
# Properly formatted second test state | |
second_test = { | |
"messages": [ | |
HumanMessage(content="How is the weather in Sanfrancisco and Bangalore? Give research results") # Correct format for user input | |
] | |
# , | |
# "intermediate_steps": [] # Add this to track progress if needed | |
} | |
if __name__ == "__main__": | |
#demo.launch() | |
# Execute the workflow | |
try: | |
#print(f"[TRACE] Initial workflow state: {initial_state}") | |
#result = graph.invoke(initial_state) | |
#print("[INFO] Workflow Execution Complete.") | |
#print(f"[TRACE] Workflow Result: {result}") # Final workflow result | |
print(f"[TRACE] Initial workflow state: {second_test}") | |
result2 = graph.invoke(second_test) | |
print("[INFO] Workflow Execution Complete.") | |
print(f"[TRACE] Workflow Result: {result2}") # Final workflow result | |
except Exception as e: | |
print(f"[ERROR] Workflow execution failed: {e}") | |