Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| """ | |
| For more information on `huggingface_hub` Inference API support, please check the docs: https://huggingface.co/docs/huggingface_hub/v0.22.2/en/guides/inference | |
| """ | |
| client = InferenceClient("microsoft/phi-2") | |
| #client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") | |
| def respond( | |
| message, | |
| history: list[tuple[str, str]], | |
| system_message, | |
| max_tokens, | |
| temperature, | |
| top_p, | |
| ): | |
| messages = [{"role": "system", "content": system_message}] | |
| for val in history: | |
| if val[0]: | |
| messages.append({"role": "user", "content": val[0]}) | |
| if val[1]: | |
| messages.append({"role": "assistant", "content": val[1]}) | |
| messages.append({"role": "user", "content": message}) | |
| response = "" | |
| for message in messages: | |
| print(message) | |
| for message in client.chat_completion( | |
| messages, | |
| max_tokens=max_tokens, | |
| stream=True, | |
| temperature=temperature, | |
| top_p=top_p, | |
| ): | |
| token = message.choices[0].delta.content | |
| response += token | |
| yield response | |
| """ | |
| For information on how to customize the ChatInterface, peruse the gradio docs: https://www.gradio.app/docs/chatinterface | |
| """ | |
| demo = gr.ChatInterface( | |
| respond, | |
| additional_inputs=[ | |
| gr.Textbox(value="You are a friendly Chatbot.", label="System message"), | |
| gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), | |
| gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), | |
| gr.Slider( | |
| minimum=0.1, | |
| maximum=1.0, | |
| value=0.95, | |
| step=0.05, | |
| label="Top-p (nucleus sampling)", | |
| ), | |
| ], | |
| ) | |
| from typing import Annotated, Sequence, TypedDict | |
| import operator | |
| import functools | |
| from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder | |
| from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from langchain_experimental.tools import PythonREPLTool | |
| from langchain.agents import create_openai_tools_agent | |
| from langchain_huggingface import HuggingFacePipeline | |
| from langgraph.graph import StateGraph, END | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| # SETUP: HuggingFace Model and Pipeline | |
| #name = "meta-llama/Llama-3.2-1B" | |
| #name="deepseek-ai/DeepSeek-R1-Distill-Qwen-32B" | |
| #name="deepseek-ai/deepseek-llm-7b-chat" | |
| #name="openai-community/gpt2" | |
| #name="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B" | |
| #name="microsoft/Phi-3.5-mini-instruct" | |
| name="Qwen/Qwen2.5-7B-Instruct-1M" | |
| tokenizer = AutoTokenizer.from_pretrained(name,truncation=True) | |
| tokenizer.pad_token = tokenizer.eos_token | |
| model = AutoModelForCausalLM.from_pretrained(name) | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| device_map="auto", | |
| max_new_tokens=500, # text to generate for outputs | |
| ) | |
| print ("pipeline is created") | |
| # Wrap in LangChain's HuggingFacePipeline | |
| llm = HuggingFacePipeline(pipeline=pipe) | |
| # Members and Final Options | |
| members = ["Researcher", "Coder"] | |
| options = ["FINISH"] + members | |
| # Supervisor prompt | |
| system_prompt = ( | |
| "You are a supervisor tasked with managing a conversation between the following workers: {members}." | |
| " Given the following user request, respond with the workers to act next. Each worker will perform a task" | |
| " and respond with their results and status. When all workers are finished, respond with FINISH." | |
| ) | |
| # Prompt template required for the workflow | |
| prompt = ChatPromptTemplate.from_messages( | |
| [ | |
| ("system", system_prompt), | |
| MessagesPlaceholder(variable_name="messages"), | |
| ("system", "Given the conversation above, who should act next? Or Should we FINISH? Select one of: {options}"), | |
| ] | |
| ).partial(options=str(options), members=", ".join(members)) | |
| print ("Prompt Template created") | |
| # Supervisor routing logic | |
| def route_tool_response(llm_response: str) -> str: | |
| """ | |
| Parse the LLM response to determine the next step based on routing logic. | |
| Handles unexpected or poorly structured responses gracefully. | |
| """ | |
| # Normalize the LLM response | |
| #llm_response = llm_response.strip().lower() # Strip whitespace and make lowercase | |
| # Remove any prefixes like "Assistant:" or "System:" | |
| # if ":" in llm_response: | |
| # llm_response = llm_response.split(":")[-1].strip() | |
| # Check for "finish" or worker names in the response | |
| for member in members: | |
| #if member.lower() in llm_response: | |
| if member in llm_response: | |
| return member | |
| if "finish" in llm_response: | |
| return "FINISH" | |
| # If no valid response is found, return a fallback error | |
| return "Invalid" | |
| def supervisor_chain(state): | |
| """ | |
| Supervisor logic to interact with HuggingFacePipeline and decide the next worker. | |
| """ | |
| messages = state.get("messages", []) | |
| try: | |
| # Construct prompt for the supervisor | |
| user_prompt = prompt.format(messages=messages) | |
| # Generate the LLM's response | |
| llm_response = pipe(user_prompt, max_new_tokens=100)[0]["generated_text"] | |
| print(f"[DEBUG] LLM Response: {llm_response.strip()}") # Log LLM raw output | |
| # Route the response to determine the next action | |
| next_action = route_tool_response(llm_response) | |
| # Validate the next action | |
| if next_action not in options: | |
| raise ValueError(f"Invalid next action: '{next_action}'. Expected one of {options}.") | |
| # # Initialize intermediate_steps if not already present | |
| # if "intermediate_steps" not in state: | |
| # state["intermediate_steps"] = [] | |
| # # Append the supervisor decision to intermediate_steps | |
| # state["intermediate_steps"].append( | |
| # {"supervisor": "decision", "next_action": next_action} | |
| # ) | |
| print(f"[DEBUG] Next action decided: {next_action}") # Log decision | |
| return {"next": next_action, "messages": messages} | |
| # return {"next": next_action, "messages": messages, "intermediate_steps": state["intermediate_steps"]} | |
| except Exception as e: | |
| print(f"[ERROR] Supervisor chain failed: {e}") | |
| raise RuntimeError(f"Supervisor logic error: {str(e)}") | |
| # AgentState definition | |
| class AgentState(TypedDict): | |
| messages: Annotated[Sequence[BaseMessage], operator.add] | |
| next: str | |
| # Create tools | |
| tavily_tool = TavilySearchResults(max_results=5) | |
| python_repl_tool = PythonREPLTool() | |
| # Create agents with their respective prompts | |
| research_agent = create_openai_tools_agent( | |
| llm=llm, | |
| tools=[tavily_tool], | |
| prompt=ChatPromptTemplate.from_messages( | |
| [ | |
| SystemMessage(content="You are a web researcher."), | |
| MessagesPlaceholder(variable_name="messages"), | |
| MessagesPlaceholder(variable_name="agent_scratchpad"), # Add required placeholder | |
| ] | |
| ), | |
| ) | |
| print ("Created agents with their respective prompts") | |
| code_agent = create_openai_tools_agent( | |
| llm=llm, | |
| tools=[python_repl_tool], | |
| prompt=ChatPromptTemplate.from_messages( | |
| [ | |
| SystemMessage(content="You may generate safe Python code for analysis."), | |
| MessagesPlaceholder(variable_name="messages"), | |
| MessagesPlaceholder(variable_name="agent_scratchpad"), # Add required placeholder | |
| ] | |
| ), | |
| ) | |
| print ("create_openai_tools_agent") | |
| # Create the workflow | |
| workflow = StateGraph(AgentState) | |
| # Nodes | |
| workflow.add_node("Researcher", research_agent) # Pass the agent directly (no .run required) | |
| workflow.add_node("Coder", code_agent) # Pass the agent directly | |
| workflow.add_node("supervisor", supervisor_chain) | |
| # Add edges for workflow transitions | |
| for member in members: | |
| workflow.add_edge(member, "supervisor") | |
| #workflow.add_conditional_edges( | |
| # "supervisor", | |
| # lambda x: x["next"], | |
| # {k: k for k in members} | {"FINISH": END} # Dynamically map workers to their actions | |
| #) | |
| workflow.add_conditional_edges( | |
| "supervisor", | |
| lambda x: x["next"], | |
| {"Researcher":"Researcher","Coder":"Coder","FINISH": END} | |
| ) | |
| print("[DEBUG] Workflow edges added: supervisor -> members/FINISH based on 'next'") | |
| # Define entry point | |
| workflow.set_entry_point("supervisor") | |
| print(workflow) | |
| # Compile the workflow | |
| graph = workflow.compile() | |
| from IPython.display import display, Image | |
| display(Image(graph.get_graph().draw_mermaid_png())) | |
| # Properly formatted initial state | |
| initial_state = { | |
| "messages": [ | |
| #HumanMessage(content="Code hello world and print it to the terminal.") # Correct format for user input | |
| HumanMessage(content="Write Code for printing \"hello world\" in Python. Keep it precise.") # Correct format for user input | |
| ] | |
| # , | |
| # "intermediate_steps": [] # Add this to track progress if needed | |
| } | |
| # Properly formatted second test state | |
| second_test = { | |
| "messages": [ | |
| HumanMessage(content="How is the weather in Sanfrancisco and Bangalore? Give research results") # Correct format for user input | |
| ] | |
| # , | |
| # "intermediate_steps": [] # Add this to track progress if needed | |
| } | |
| if __name__ == "__main__": | |
| #demo.launch() | |
| # Execute the workflow | |
| try: | |
| #print(f"[TRACE] Initial workflow state: {initial_state}") | |
| #result = graph.invoke(initial_state) | |
| #print("[INFO] Workflow Execution Complete.") | |
| #print(f"[TRACE] Workflow Result: {result}") # Final workflow result | |
| print(f"[TRACE] Initial workflow state: {second_test}") | |
| result2 = graph.invoke(second_test) | |
| print("[INFO] Workflow Execution Complete.") | |
| print(f"[TRACE] Workflow Result: {result2}") # Final workflow result | |
| except Exception as e: | |
| print(f"[ERROR] Workflow execution failed: {e}") | |