import os
from dotenv import load_dotenv
from typing import AsyncGenerator, List, Dict, Any, Tuple, Optional
import json
import time
import asyncio
import gradio as gr
from swarms.structs.agent import Agent
from swarms.structs.swarm_router import SwarmRouter
from swarms.utils.loguru_logger import initialize_logger
import re
import csv # Import the csv module for csv parsing
from swarms.utils.litellm_wrapper import LiteLLM
from litellm import models_by_provider
from dotenv import set_key, find_dotenv
import logging # Import the logging module
import litellm # Import litellm exception
# Initialize logger
load_dotenv()
# Initialize logger
logger = initialize_logger(log_folder="swarm_ui")
# Define the path to agent_prompts.json
PROMPT_JSON_PATH = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "agent_prompts.json"
)
logger.info(f"Loading prompts from: {PROMPT_JSON_PATH}")
# Load prompts first so its available for create_app
def load_prompts_from_json() -> Dict[str, str]:
try:
if not os.path.exists(PROMPT_JSON_PATH):
# Load default prompts
return {
"Agent-Data_Extractor": "You are a data extraction agent...",
"Agent-Summarizer": "You are a summarization agent...",
"Agent-Onboarding_Agent": "You are an onboarding agent...",
}
with open(PROMPT_JSON_PATH, "r", encoding="utf-8") as f:
try:
data = json.load(f)
except json.JSONDecodeError:
# Load default prompts
return {
"Agent-Data_Extractor": "You are a data extraction agent...",
"Agent-Summarizer": "You are a summarization agent...",
"Agent-Onboarding_Agent": "You are an onboarding agent...",
}
if not isinstance(data, dict):
# Load default prompts
return {
"Agent-Data_Extractor": "You are a data extraction agent...",
"Agent-Summarizer": "You are a summarization agent...",
"Agent-Onboarding_Agent": "You are an onboarding agent...",
}
prompts = {}
for agent_name, details in data.items():
if (
not isinstance(details, dict)
or "system_prompt" not in details
):
continue
prompts[agent_name] = details["system_prompt"]
if not prompts:
# Load default prompts
return {
"Agent-Data_Extractor": "You are a data extraction agent...",
"Agent-Summarizer": "You are a summarization agent...",
"Agent-Onboarding_Agent": "You are an onboarding agent...",
}
return prompts
except Exception:
# Load default prompts
return {
"Agent-Data_Extractor": "You are a data extraction agent...",
"Agent-Summarizer": "You are a summarization agent...",
"Agent-Onboarding_Agent": "You are an onboarding agent...",
}
AGENT_PROMPTS = load_prompts_from_json()
api_keys = {}
def initialize_agents(
dynamic_temp: float,
agent_keys: List[str],
model_name: str,
provider: str,
api_key: str,
temperature: float,
max_tokens: int,
) -> List[Agent]:
logger.info("Initializing agents...")
agents = []
seen_names = set()
try:
for agent_key in agent_keys:
if agent_key not in AGENT_PROMPTS:
raise ValueError(f"Invalid agent key: {agent_key}")
agent_prompt = AGENT_PROMPTS[agent_key]
agent_name = agent_key
# Ensure unique agent names
base_name = agent_name
counter = 1
while agent_name in seen_names:
agent_name = f"{base_name}_{counter}"
counter += 1
seen_names.add(agent_name)
# Set API key using os.environ temporarily
if provider == "openai":
os.environ["OPENAI_API_KEY"] = api_key
elif provider == "anthropic":
os.environ["ANTHROPIC_API_KEY"] = api_key
elif provider == "cohere":
os.environ["COHERE_API_KEY"] = api_key
elif provider == "gemini":
os.environ["GEMINI_API_KEY"] = api_key
elif provider == "mistral":
os.environ["MISTRAL_API_KEY"] = api_key
elif provider == "groq":
os.environ["GROQ_API_KEY"] = api_key
elif provider == "perplexity":
os.environ["PERPLEXITY_API_KEY"] = api_key
# Add other providers and their environment variable names as needed
# Create LiteLLM instance (Now it will read from os.environ)
llm = LiteLLM(
model_name=model_name,
system_prompt=agent_prompt,
temperature=temperature,
max_tokens=max_tokens,
)
agent = Agent(
agent_name=agent_name,
system_prompt=agent_prompt,
llm=llm,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path=f"agent_{agent_name}.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string", # here is the output type which is string
temperature=dynamic_temp,
)
print(f"Agent created: {agent.agent_name}")
agents.append(agent)
logger.info(f"Agents initialized successfully: {[agent.agent_name for agent in agents]}")
return agents
except Exception as e:
logger.error(f"Error initializing agents: {e}", exc_info=True)
raise
def validate_flow(flow, agents_dict):
logger.info(f"Validating flow: {flow}")
agent_names = flow.split("->")
for agent in agent_names:
agent = agent.strip()
if agent not in agents_dict:
logger.error(f"Agent '{agent}' specified in the flow does not exist.")
raise ValueError(
f"Agent '{agent}' specified in the flow does not exist."
)
logger.info(f"Flow validated successfully: {flow}")
class TaskExecutionError(Exception):
"""Custom exception for task execution errors."""
def __init__(self, message: str):
self.message = message
super().__init__(self.message)
def __str__(self):
return f"TaskExecutionError: {self.message}"
async def execute_task(
task: str,
max_loops: int,
dynamic_temp: float,
swarm_type: str,
agent_keys: List[str],
flow: str = None,
model_name: str = "gpt-4o",
provider: str = "openai",
api_key: str = None,
temperature: float = 0.5,
max_tokens: int = 4000,
agents: dict = None,
log_display=None,
error_display=None
) -> AsyncGenerator[Tuple[Any, Optional["SwarmRouter"], str], None]: # Changed the return type here
logger.info(f"Executing task: {task} with swarm type: {swarm_type}")
try:
if not task:
logger.error("Task description is missing.")
yield "Please provide a task description.", gr.update(visible=True), ""
return
if not agent_keys:
logger.error("No agents selected.")
yield "Please select at least one agent.", gr.update(visible=True), ""
return
if not provider:
logger.error("Provider is missing.")
yield "Please select a provider.", gr.update(visible=True), ""
return
if not model_name:
logger.error("Model is missing.")
yield "Please select a model.", gr.update(visible=True), ""
return
if not api_key:
logger.error("API Key is missing.")
yield "Please enter an API Key.", gr.update(visible=True), ""
return
# Initialize agents
try:
if not agents:
agents = initialize_agents(
dynamic_temp,
agent_keys,
model_name,
provider,
api_key,
temperature,
max_tokens,
)
except Exception as e:
logger.error(f"Error initializing agents: {e}", exc_info=True)
yield f"Error initializing agents: {e}", gr.update(visible=True), ""
return
# Swarm-specific configurations
router_kwargs = {
"name": "multi-agent-workflow",
"description": f"Executing {swarm_type} workflow",
"max_loops": max_loops,
"agents": list(agents.values()),
"autosave": True,
"return_json": True,
"output_type": "string", # Default output type
"swarm_type": swarm_type, # Pass swarm_type here
}
if swarm_type == "AgentRearrange":
if not flow:
logger.error("Flow configuration is missing for AgentRearrange.")
yield "Flow configuration is required for AgentRearrange", gr.update(visible=True), ""
return
# Generate unique agent names in the flow
flow_agents = []
used_agent_names = set()
for agent_key in flow.split("->"):
agent_key = agent_key.strip()
base_agent_name = agent_key
count = 1
while agent_key in used_agent_names:
agent_key = f"{base_agent_name}_{count}"
count += 1
used_agent_names.add(agent_key)
flow_agents.append(agent_key)
# Update the flow string with unique names
flow = " -> ".join(flow_agents)
logger.info(f"Updated Flow string: {flow}")
router_kwargs["flow"] = flow
router_kwargs["output_type"] = "string" # Changed output type here
if swarm_type == "MixtureOfAgents":
if len(agents) < 2:
logger.error("MixtureOfAgents requires at least 2 agents.")
yield "MixtureOfAgents requires at least 2 agents", gr.update(visible=True), ""
return
if swarm_type == "SequentialWorkflow":
if len(agents) < 2:
logger.error("SequentialWorkflow requires at least 2 agents.")
yield "SequentialWorkflow requires at least 2 agents", gr.update(visible=True), ""
return
if swarm_type == "ConcurrentWorkflow":
pass
if swarm_type == "SpreadSheetSwarm":
pass
if swarm_type == "auto":
pass
# Create and execute SwarmRouter
try:
timeout = (
450 if swarm_type != "SpreadSheetSwarm" else 900
) # SpreadSheetSwarm will have different timeout.
if swarm_type == "AgentRearrange":
from swarms.structs.rearrange import AgentRearrange
router = AgentRearrange(
agents=list(agents.values()),
flow=flow,
max_loops=max_loops,
name="multi-agent-workflow",
description=f"Executing {swarm_type} workflow",
# autosave=True,
return_json=True,
output_type="string", # Changed output type according to agent rearrange
)
result = router(task) # Changed run method
logger.info(f"AgentRearrange task executed successfully.")
yield result, None, ""
return
# For other swarm types use the SwarmRouter and its run method
router = SwarmRouter(**router_kwargs) # Initialize SwarmRouter
if swarm_type == "ConcurrentWorkflow":
async def run_agent_task(agent, task_):
return agent.run(task_)
tasks = [
run_agent_task(agent, task)
for agent in list(agents.values())
]
responses = await asyncio.gather(*tasks)
result = {}
for agent, response in zip(list(agents.values()), responses):
result[agent.agent_name] = response
# Convert the result to JSON string for parsing
result = json.dumps(
{
"input" : {
"swarm_id" : "concurrent_workflow_swarm_id",
"name" : "ConcurrentWorkflow",
"flow" : "->".join([agent.agent_name for agent in list(agents.values())])
},
"time" : time.time(),
"outputs" : [
{
"agent_name": agent_name,
"steps" : [{"role":"assistant", "content":response}]
} for agent_name, response in result.items()
]
}
)
logger.info(f"ConcurrentWorkflow task executed successfully.")
yield result, None, ""
return
elif swarm_type == "auto":
result = await asyncio.wait_for(
asyncio.to_thread(router.run, task),
timeout=timeout
)
if isinstance(result,dict):
result = json.dumps(
{
"input" : {
"swarm_id" : "auto_swarm_id",
"name" : "AutoSwarm",
"flow" : "->".join([agent.agent_name for agent in list(agents.values())])
},
"time" : time.time(),
"outputs" : [
{
"agent_name": agent.agent_name,
"steps" : [{"role":"assistant", "content":response}]
} for agent, response in result.items()
]
}
)
elif isinstance(result, str):
result = json.dumps(
{
"input" : {
"swarm_id" : "auto_swarm_id",
"name" : "AutoSwarm",
"flow" : "->".join([agent.agent_name for agent in list(agents.values())])
},
"time" : time.time(),
"outputs" : [
{
"agent_name": "auto",
"steps" : [{"role":"assistant", "content":result}]
}
]
}
)
else :
logger.error("Auto Swarm returned an unexpected type")
yield "Error : Auto Swarm returned an unexpected type", gr.update(visible=True), ""
return
logger.info(f"Auto task executed successfully.")
yield result, None, ""
return
else:
result = await asyncio.wait_for(
asyncio.to_thread(router.run, task),
timeout=timeout
)
logger.info(f"{swarm_type} task executed successfully.")
yield result, None, ""
return
except asyncio.TimeoutError as e:
logger.error(f"Task execution timed out after {timeout} seconds", exc_info=True)
yield f"Task execution timed out after {timeout} seconds", gr.update(visible=True), ""
return
except litellm.exceptions.APIError as e: # Catch litellm APIError
logger.error(f"LiteLLM API Error: {e}", exc_info=True)
yield f"LiteLLM API Error: {e}", gr.update(visible=True), ""
return
except litellm.exceptions.AuthenticationError as e: # Catch litellm AuthenticationError
logger.error(f"LiteLLM Authentication Error: {e}", exc_info=True)
yield f"LiteLLM Authentication Error: {e}", gr.update(visible=True), ""
return
except Exception as e:
logger.error(f"Error executing task: {e}", exc_info=True)
yield f"Error executing task: {e}", gr.update(visible=True), ""
return
except TaskExecutionError as e:
logger.error(f"Task execution error: {e}")
yield str(e), gr.update(visible=True), ""
return
except Exception as e:
logger.error(f"An unexpected error occurred: {e}", exc_info=True)
yield f"An unexpected error occurred: {e}", gr.update(visible=True), ""
return
finally:
logger.info(f"Task execution finished for: {task} with swarm type: {swarm_type}")
def format_output(data:Optional[str], swarm_type:str, error_display=None) -> str:
if data is None:
return "Error : No output from the swarm."
if swarm_type == "AgentRearrange":
return parse_agent_rearrange_output(data, error_display)
elif swarm_type == "MixtureOfAgents":
return parse_mixture_of_agents_output(data, error_display)
elif swarm_type in ["SequentialWorkflow", "ConcurrentWorkflow"]:
return parse_sequential_workflow_output(data, error_display)
elif swarm_type == "SpreadSheetSwarm":
if os.path.exists(data):
return parse_spreadsheet_swarm_output(data, error_display)
else:
return data # Directly return JSON response
elif swarm_type == "auto":
return parse_auto_swarm_output(data, error_display)
else:
return "Unsupported swarm type."
def parse_mixture_of_agents_data(data: dict, error_display=None) -> str:
"""Parses the MixtureOfAgents output data and formats it for display."""
logger.info("Parsing MixtureOfAgents data within Auto Swarm output...")
try:
output = ""
if "InputConfig" in data and isinstance(data["InputConfig"], dict):
input_config = data["InputConfig"]
output += f"Mixture of Agents Workflow Details\n\n"
output += f"Name: `{input_config.get('name', 'N/A')}`\n"
output += (
f"Description:"
f" `{input_config.get('description', 'N/A')}`\n\n---\n"
)
output += f"Agent Task Execution\n\n"
for agent in input_config.get("agents", []):
output += (
f"Agent: `{agent.get('agent_name', 'N/A')}`\n"
)
if "normal_agent_outputs" in data and isinstance(
data["normal_agent_outputs"], list
):
for i, agent_output in enumerate(
data["normal_agent_outputs"], start=3
):
agent_name = agent_output.get("agent_name", "N/A")
output += f"Run {(3 - i)} (Agent: `{agent_name}`)\n\n"
for j, step in enumerate(
agent_output.get("steps", []), start=3
):
if (
isinstance(step, dict)
and "role" in step
and "content" in step
and step["role"].strip() != "System:"
):
content = step["content"]
output += f"Step {(3 - j)}: \n"
output += f"Response:\n {content}\n\n"
if "aggregator_agent_summary" in data:
output += (
f"\nAggregated Summary :\n"
f"{data['aggregator_agent_summary']}\n{'=' * 50}\n"
)
logger.info("MixtureOfAgents data parsed successfully within Auto Swarm.")
return output
except Exception as e:
logger.error(
f"Error during parsing MixtureOfAgents data within Auto Swarm: {e}",
exc_info=True,
)
return f"Error during parsing: {str(e)}"
def parse_auto_swarm_output(data: Optional[str], error_display=None) -> str:
"""Parses the auto swarm output string and formats it for display."""
logger.info("Parsing Auto Swarm output...")
if data is None:
logger.error("No data provided for parsing Auto Swarm output.")
return "Error: No data provided for parsing."
print(f"Raw data received for parsing:\n{data}") # Debug: Print raw data
try:
parsed_data = json.loads(data)
errors = []
# Basic structure validation
if (
"input" not in parsed_data
or not isinstance(parsed_data.get("input"), dict)
):
errors.append(
"Error: 'input' data is missing or not a dictionary."
)
else:
if "swarm_id" not in parsed_data["input"]:
errors.append(
"Error: 'swarm_id' key is missing in the 'input'."
)
if "name" not in parsed_data["input"]:
errors.append(
"Error: 'name' key is missing in the 'input'."
)
if "flow" not in parsed_data["input"]:
errors.append(
"Error: 'flow' key is missing in the 'input'."
)
if "time" not in parsed_data:
errors.append("Error: 'time' key is missing.")
if errors:
logger.error(
f"Errors found while parsing Auto Swarm output: {errors}"
)
return "\n".join(errors)
swarm_id = parsed_data["input"]["swarm_id"]
swarm_name = parsed_data["input"]["name"]
agent_flow = parsed_data["input"]["flow"]
overall_time = parsed_data["time"]
output = f"Workflow Execution Details\n\n"
output += f"Swarm ID: `{swarm_id}`\n"
output += f"Swarm Name: `{swarm_name}`\n"
output += f"Agent Flow: `{agent_flow}`\n\n---\n"
output += f"Agent Task Execution\n\n"
# Handle nested MixtureOfAgents data or other swarm type data
if (
"outputs" in parsed_data
and isinstance(parsed_data["outputs"], list)
and parsed_data["outputs"]
and isinstance(parsed_data["outputs"][0], dict)
):
if parsed_data["outputs"][0].get("agent_name") == "auto":
mixture_data = parsed_data["outputs"][0].get("steps", [])
if mixture_data and isinstance(mixture_data[0], dict) and "content" in mixture_data[0]:
try:
mixture_content = json.loads(mixture_data[0]["content"])
output += parse_mixture_of_agents_data(mixture_content)
except json.JSONDecodeError as e:
logger.error(f"Error decoding nested MixtureOfAgents data: {e}", exc_info=True)
return f"Error decoding nested MixtureOfAgents data: {e}"
else:
for i, agent_output in enumerate(parsed_data["outputs"], start=3):
if not isinstance(agent_output, dict):
errors.append(f"Error: Agent output at index {i} is not a dictionary")
continue
if "agent_name" not in agent_output:
errors.append(f"Error: 'agent_name' key is missing at index {i}")
continue
if "steps" not in agent_output:
errors.append(f"Error: 'steps' key is missing at index {i}")
continue
if agent_output["steps"] is None:
errors.append(f"Error: 'steps' data is None at index {i}")
continue
if not isinstance(agent_output["steps"], list):
errors.append(f"Error: 'steps' data is not a list at index {i}")
continue
agent_name = agent_output["agent_name"]
output += f"Run {(3-i)} (Agent: `{agent_name}`)\n\n"
# Iterate over steps
for j, step in enumerate(agent_output["steps"], start=3):
if not isinstance(step, dict):
errors.append(f"Error: step at index {j} is not a dictionary at {i} agent output.")
continue
if step is None:
errors.append(f"Error: step at index {j} is None at {i} agent output")
continue
if "role" not in step:
errors.append(f"Error: 'role' key missing at step {j} at {i} agent output.")
continue
if "content" not in step:
errors.append(f"Error: 'content' key missing at step {j} at {i} agent output.")
continue
if step["role"].strip() != "System:": # Filter out system prompts
content = step["content"]
output += f"Step {(3-j)}:\n"
output += f"Response : {content}\n\n"
else:
logger.error("Error: 'outputs' data is not in the expected format.")
return "Error: 'outputs' data is not in the expected format."
output += f"Overall Completion Time: `{overall_time}`"
if errors:
logger.error(
f"Errors found while parsing Auto Swarm output: {errors}"
)
return "\n".join(errors)
logger.info("Auto Swarm output parsed successfully.")
return output
except json.JSONDecodeError as e:
logger.error(
f"Error during parsing Auto Swarm output: {e}", exc_info=True
)
return f"Error during parsing json.JSONDecodeError: {e}"
except Exception as e:
logger.error(
f"Error during parsing Auto Swarm output: {e}", exc_info=True
)
return f"Error during parsing: {str(e)}"
def parse_agent_rearrange_output(data: Optional[str], error_display=None) -> str:
"""
Parses the AgentRearrange output string and formats it for display.
"""
logger.info("Parsing AgentRearrange output...")
if data is None:
logger.error("No data provided for parsing AgentRearrange output.")
return "Error: No data provided for parsing."
print(
f"Raw data received for parsing:\n{data}"
) # Debug: Print raw data
try:
parsed_data = json.loads(data)
errors = []
if (
"input" not in parsed_data
or not isinstance(parsed_data.get("input"), dict)
):
errors.append(
"Error: 'input' data is missing or not a dictionary."
)
else:
if "swarm_id" not in parsed_data["input"]:
errors.append(
"Error: 'swarm_id' key is missing in the 'input'."
)
if "name" not in parsed_data["input"]:
errors.append(
"Error: 'name' key is missing in the 'input'."
)
if "flow" not in parsed_data["input"]:
errors.append(
"Error: 'flow' key is missing in the 'input'."
)
if "time" not in parsed_data:
errors.append("Error: 'time' key is missing.")
if errors:
logger.error(f"Errors found while parsing AgentRearrange output: {errors}")
return "\n".join(errors)
swarm_id = parsed_data["input"]["swarm_id"]
swarm_name = parsed_data["input"]["name"]
agent_flow = parsed_data["input"]["flow"]
overall_time = parsed_data["time"]
output = f"Workflow Execution Details\n\n"
output += f"Swarm ID: `{swarm_id}`\n"
output += f"Swarm Name: `{swarm_name}`\n"
output += f"Agent Flow: `{agent_flow}`\n\n---\n"
output += f"Agent Task Execution\n\n"
if "outputs" not in parsed_data:
errors.append("Error: 'outputs' key is missing")
elif parsed_data["outputs"] is None:
errors.append("Error: 'outputs' data is None")
elif not isinstance(parsed_data["outputs"], list):
errors.append("Error: 'outputs' data is not a list.")
elif not parsed_data["outputs"]:
errors.append("Error: 'outputs' list is empty.")
if errors:
logger.error(f"Errors found while parsing AgentRearrange output: {errors}")
return "\n".join(errors)
for i, agent_output in enumerate(
parsed_data["outputs"], start=3
):
if not isinstance(agent_output, dict):
errors.append(
f"Error: Agent output at index {i} is not a"
" dictionary"
)
continue
if "agent_name" not in agent_output:
errors.append(
f"Error: 'agent_name' key is missing at index {i}"
)
continue
if "steps" not in agent_output:
errors.append(
f"Error: 'steps' key is missing at index {i}"
)
continue
if agent_output["steps"] is None:
errors.append(
f"Error: 'steps' data is None at index {i}"
)
continue
if not isinstance(agent_output["steps"], list):
errors.append(
f"Error: 'steps' data is not a list at index {i}"
)
continue
if not agent_output["steps"]:
errors.append(
f"Error: 'steps' list is empty at index {i}"
)
continue
agent_name = agent_output["agent_name"]
output += f"Run {(3-i)} (Agent: `{agent_name}`)**\n\n"
# output += "Show/Hide Agent Steps
\n\n"
# Iterate over steps
for j, step in enumerate(agent_output["steps"], start=3):
if not isinstance(step, dict):
errors.append(
f"Error: step at index {j} is not a dictionary"
f" at {i} agent output."
)
continue
if step is None:
errors.append(
f"Error: step at index {j} is None at {i} agent"
" output"
)
continue
if "role" not in step:
errors.append(
f"Error: 'role' key missing at step {j} at {i}"
" agent output."
)
continue
if "content" not in step:
errors.append(
f"Error: 'content' key missing at step {j} at"
f" {i} agent output."
)
continue
if step["role"].strip() != "System:": # Filter out system prompts
# role = step["role"]
content = step["content"]
output += f"Step {(3-j)}: \n"
output += f"Response :\n {content}\n\n"
# output += "Show/Hide Agent Steps
\n\n"
for j, step in enumerate(agent_output["steps"], start=3):
if not isinstance(step, dict):
logger.error(f"Error: step at index {j} is not a dictionary at {i} agent output.")
return f"Error: step at index {j} is not a dictionary at {i} agent output."
if step is None:
logger.error(f"Error: step at index {j} is None at {i} agent output.")
return f"Error: step at index {j} is None at {i} agent output."
if "role" not in step:
logger.error(f"Error: 'role' key missing at step {j} at {i} agent output.")
return f"Error: 'role' key missing at step {j} at {i} agent output."
if "content" not in step:
logger.error(f"Error: 'content' key missing at step {j} at {i} agent output.")
return f"Error: 'content' key missing at step {j} at {i} agent output."
if step["role"].strip() != "System:": # Filter out system prompts
# role = step["role"]
content = step["content"]
output += f"Step {(3-j)}: \n"
output += f"Response:\n {content}\n\n"
# output += "Show/Hide Agent Steps
\n\n"
# Iterate over steps
for j, step in enumerate(agent_output["steps"], start=3):
if not isinstance(step, dict):
logger.error(f"Error: step at index {j} is not a dictionary at {i} agent output.")
return f"Error: step at index {j} is not a dictionary at {i} agent output."
if step is None:
logger.error(f"Error: step at index {j} is None at {i} agent output")
return f"Error: step at index {j} is None at {i} agent output"
if "role" not in step:
logger.error(f"Error: 'role' key missing at step {j} at {i} agent output.")
return f"Error: 'role' key missing at step {j} at {i} agent output."
if "content" not in step:
logger.error(f"Error: 'content' key missing at step {j} at {i} agent output.")
return f"Error: 'content' key missing at step {j} at {i} agent output."
if step["role"].strip() != "System:": # Filter out system prompts
# role = step["role"]
content = step["content"]
output += f"Step {(3-j)}:\n"
output += f"Response : {content}\n\n"
# output += "
{text}
" ) self.components[f"markdown_{text}"] = markdown return markdown def create_text_input(self, label, lines=3, placeholder=""): text_input = gr.Textbox( label=label, lines=lines, placeholder=placeholder, elem_classes=["custom-input"], ) self.components[f"text_input_{label}"] = text_input return text_input def create_slider( self, label, minimum=0, maximum=1, value=0.5, step=0.1 ): slider = gr.Slider( minimum=minimum, maximum=maximum, value=value, step=step, label=label, interactive=True, ) self.components[f"slider_{label}"] = slider return slider def create_dropdown( self, label, choices, value=None, multiselect=False ): if not choices: choices = ["No options available"] if value is None and choices: value = choices[0] if not multiselect else [choices[0]] dropdown = gr.Dropdown( label=label, choices=choices, value=value, interactive=True, multiselect=multiselect, ) self.components[f"dropdown_{label}"] = dropdown return dropdown def create_button(self, text, variant="primary"): button = gr.Button(text, variant=variant) self.components[f"button_{text}"] = button return button def create_text_output(self, label, lines=10, placeholder=""): text_output = gr.Textbox( label=label, interactive=False, placeholder=placeholder, lines=lines, elem_classes=["custom-output"], ) self.components[f"text_output_{label}"] = text_output return text_output def create_tab(self, label, content_function): with gr.Tab(label): content_function(self) def set_event_listener(self, button, function, inputs, outputs): button.click(function, inputs=inputs, outputs=outputs) def get_components(self, *keys): if not keys: return self.components # return all components return [self.components[key] for key in keys] def create_json_output(self, label, placeholder=""): json_output = gr.JSON( label=label, value={}, elem_classes=["custom-output"], ) self.components[f"json_output_{label}"] = json_output return json_output def build(self): return self.blocks def create_conditional_input( self, component, visible_when, watch_component ): """Create an input that's only visible under certain conditions""" watch_component.change( fn=lambda x: gr.update(visible=visible_when(x)), inputs=[watch_component], outputs=[component], ) @staticmethod def create_ui_theme(primary_color="red"): return gr.themes.Ocean( primary_hue=primary_color, secondary_hue=primary_color, neutral_hue="gray", ).set( body_background_fill="#20252c", body_text_color="#f0f0f0", button_primary_background_fill=primary_color, button_primary_text_color="#ffffff", button_secondary_background_fill=primary_color, button_secondary_text_color="#ffffff", shadow_drop="0px 2px 4px rgba(0, 0, 0, 0.3)", ) def create_agent_details_tab(self): """Create the agent details tab content.""" with gr.Column(): gr.Markdown("### Agent Details") gr.Markdown( """ **Available Agent Types:** - Data Extraction Agent: Specialized in extracting relevant information - Summary Agent - Analysis Agent: Performs detailed analysis of data **Swarm Types:** - ConcurrentWorkflow: Agents work in parallel - SequentialWorkflow: Agents work in sequence - AgentRearrange: Custom agent execution flow - MixtureOfAgents: Combines multiple agents with an aggregator - SpreadSheetSwarm: Specialized for spreadsheet operations - Auto: Automatically determines optimal workflow **Note:** Spreasheet swarm saves data in csv, will work in local setup ! """ ) return gr.Column() def create_logs_tab(self): """Create the logs tab content.""" with gr.Column(): gr.Markdown("### Execution Logs") logs_display = gr.Textbox( label="System Logs", placeholder="Execution logs will appear here...", interactive=False, lines=10, ) return logs_display def update_flow_agents(agent_keys): """Update flow agents based on selected agent prompts.""" if not agent_keys: return [], "No agents selected" agent_names = [key for key in agent_keys] print(f"Flow agents: {agent_names}") # Debug: Print flow agents return agent_names, "Select agents in execution order" def update_flow_preview(selected_flow_agents): """Update flow preview based on selected agents.""" if not selected_flow_agents: return "Flow will be shown here..." flow = " -> ".join(selected_flow_agents) return flow def create_app(): # Initialize UI theme = UI.create_ui_theme(primary_color="red") ui = UI(theme=theme) global AGENT_PROMPTS # Available providers and models providers = [ "openai", "anthropic", "cohere", "gemini", "mistral", "groq", "perplexity", ] filtered_models = {} for provider in providers: filtered_models[provider] = models_by_provider.get(provider, []) with ui.blocks: with gr.Row(): with gr.Column(scale=4): # Left column (80% width) ui.create_markdown("Swarms", is_header=True) ui.create_markdown( "The Enterprise-Grade Production-Ready Multi-Agent" " Orchestration Framework" ) with gr.Row(): with gr.Column(scale=4): with gr.Row(): task_input = gr.Textbox( label="Task Description", placeholder="Describe your task here...", lines=3, ) with gr.Row(): with gr.Column(scale=1): with gr.Row(): # Provider selection dropdown provider_dropdown = gr.Dropdown( label="Select Provider", choices=providers, value=providers[0] if providers else None, interactive=True, ) # with gr.Row(): # # Model selection dropdown (initially empty) model_dropdown = gr.Dropdown( label="Select Model", choices=[], interactive=True, ) with gr.Row(): # API key input api_key_input = gr.Textbox( label="API Key", placeholder="Enter your API key", type="password", ) with gr.Column(scale=1): with gr.Row(): dynamic_slider = gr.Slider( label="Dyn. Temp", minimum=0, maximum=1, value=0.1, step=0.01, ) # with gr.Row(): # max tokens slider max_loops_slider = gr.Slider( label="Max Loops", minimum=1, maximum=10, value=1, step=1, ) with gr.Row(): # max tokens slider max_tokens_slider = gr.Slider( label="Max Tokens", minimum=100, maximum=10000, value=4000, step=100, ) with gr.Column(scale=2, min_width=200): with gr.Column(scale=1): # Get available agent prompts available_prompts = ( list(AGENT_PROMPTS.keys()) if AGENT_PROMPTS else ["No agents available"] ) agent_prompt_selector = gr.Dropdown( label="Select Agent Prompts", choices=available_prompts, value=[available_prompts[0]] if available_prompts else None, multiselect=True, interactive=True, ) # with gr.Column(scale=1): # Get available swarm types swarm_types = [ "SequentialWorkflow", "ConcurrentWorkflow", "AgentRearrange", "MixtureOfAgents", "SpreadSheetSwarm", "auto", ] agent_selector = gr.Dropdown( label="Select Swarm", choices=swarm_types, value=swarm_types[0], multiselect=False, interactive=True, ) # Flow configuration components for AgentRearrange with gr.Column(visible=False) as flow_config: flow_text = gr.Textbox( label="Agent Flow Configuration", placeholder="Enter agent flow !", lines=2, ) gr.Markdown( """ **Flow Configuration Help:** - Enter agent names separated by ' -> ' - Example: Agent1 -> Agent2 -> Agent3 - Use exact agent names from the prompts above """ ) # Create Agent Prompt Section with gr.Accordion( "Create Agent Prompt", open=False ) as create_prompt_accordion: with gr.Row(): with gr.Column(): new_agent_name_input = gr.Textbox( label="New Agent Name" ) with gr.Column(): new_agent_prompt_input = ( gr.Textbox( label="New Agent Prompt", lines=3, ) ) with gr.Row(): with gr.Column(): create_agent_button = gr.Button( "Save New Prompt" ) with gr.Column(): create_agent_status = gr.Textbox( label="Status", interactive=False, ) # with gr.Row(): # temperature_slider = gr.Slider( # label="Temperature", # minimum=0, # maximum=1, # value=0.1, # step=0.01 # ) # Hidden textbox to store API Key env_api_key_textbox = gr.Textbox( value="", visible=False ) with gr.Row(): with gr.Column(scale=1): run_button = gr.Button( "Run Task", variant="primary" ) cancel_button = gr.Button( "Cancel", variant="secondary" ) with gr.Column(scale=1): with gr.Row(): loading_status = gr.Textbox( label="Status", value="Ready", interactive=False, ) # Add loading indicator and status with gr.Row(): agent_output_display = gr.Textbox( label="Agent Responses", placeholder="Responses will appear here...", interactive=False, lines=10, ) with gr.Row(): log_display = gr.Textbox( label="Logs", placeholder="Logs will be displayed here...", interactive=False, lines=5, visible=False, ) error_display = gr.Textbox( label="Error", placeholder="Errors will be displayed here...", interactive=False, lines=5, visible=False, ) def update_agent_dropdown(): """Update agent dropdown when a new agent is added""" global AGENT_PROMPTS AGENT_PROMPTS = load_prompts_from_json() available_prompts = ( list(AGENT_PROMPTS.keys()) if AGENT_PROMPTS else ["No agents available"] ) return gr.update( choices=available_prompts, value=available_prompts[0] if available_prompts else None, ) def update_ui_for_swarm_type(swarm_type): """Update UI components based on selected swarm type.""" is_agent_rearrange = swarm_type == "AgentRearrange" is_mixture = swarm_type == "MixtureOfAgents" is_spreadsheet = swarm_type == "SpreadSheetSwarm" max_loops = ( 5 if is_mixture or is_spreadsheet else 10 ) # Return visibility state for flow configuration and max loops update return ( gr.update(visible=is_agent_rearrange), # For flow_config gr.update( maximum=max_loops ), # For max_loops_slider f"Selected {swarm_type}", # For loading_status ) def update_model_dropdown(provider): """Update model dropdown based on selected provider.""" models = filtered_models.get(provider, []) return gr.update( choices=models, value=models[0] if models else None, ) def save_new_agent_prompt(agent_name, agent_prompt): """Saves a new agent prompt to the JSON file.""" try: if not agent_name or not agent_prompt: return ( "Error: Agent name and prompt cannot be" " empty." ) if ( not agent_name.isalnum() and "_" not in agent_name ): return ( "Error : Agent name must be alphanumeric or" " underscore(_) " ) if "agent." + agent_name in AGENT_PROMPTS: return "Error : Agent name already exists" with open( PROMPT_JSON_PATH, "r+", encoding="utf-8" ) as f: try: data = json.load(f) except json.JSONDecodeError: data = {} data[agent_name] = { "system_prompt": agent_prompt } f.seek(0) json.dump(data, f, indent=4) f.truncate() return "New agent prompt saved successfully" except Exception as e: return f"Error saving agent prompt {str(e)}" # In the run_task_wrapper function, modify the API key handling async def run_task_wrapper( task, max_loops, dynamic_temp, swarm_type, agent_prompt_selector, flow_text, provider, model_name, api_key, temperature, max_tokens, ): """Execute the task and update the UI with progress.""" try: # Update status yield "Processing...", "Running task...", "", gr.update(visible=False), gr.update(visible=False) # Prepare flow for AgentRearrange flow = None if swarm_type == "AgentRearrange": if not flow_text: yield ( "Please provide the agent flow" " configuration.", "Error: Flow not configured", "", gr.update(visible=True), gr.update(visible=False) ) return flow = flow_text print( f"Flow string: {flow}" ) # Debug: Print flow string # save api keys in memory api_keys[provider] = api_key agents = initialize_agents( dynamic_temp, agent_prompt_selector, model_name, provider, api_keys.get(provider), # Access API key from the dictionary temperature, max_tokens, ) print( "Agents passed to SwarmRouter:" f" {[agent.agent_name for agent in agents]}" ) # Debug: Print agent list # Convert agent list to dictionary agents_dict = { agent.agent_name: agent for agent in agents } # Execute task async for result, router, error in execute_task( task=task, max_loops=max_loops, dynamic_temp=dynamic_temp, swarm_type=swarm_type, agent_keys=agent_prompt_selector, flow=flow, model_name=model_name, provider=provider, api_key=api_keys.get(provider), # Pass the api key from memory temperature=temperature, max_tokens=max_tokens, agents=agents_dict, # Changed here log_display=log_display, error_display = error_display ): if error: yield f"Error: {error}", f"Error: {error}", "", gr.update(visible=True), gr.update(visible=True) return if result is not None: formatted_output = format_output(result, swarm_type, error_display) yield formatted_output, "Completed", api_key, gr.update(visible=False), gr.update(visible=False) return except Exception as e: yield f"Error: {str(e)}", f"Error: {str(e)}", "", gr.update(visible=True), gr.update(visible=True) return # Connect the update functions agent_selector.change( fn=update_ui_for_swarm_type, inputs=[agent_selector], outputs=[ flow_config, max_loops_slider, loading_status, ], ) provider_dropdown.change( fn=update_model_dropdown, inputs=[provider_dropdown], outputs=[model_dropdown], ) # Event for creating new agent prompts create_agent_button.click( fn=save_new_agent_prompt, inputs=[new_agent_name_input, new_agent_prompt_input], outputs=[create_agent_status], ).then( fn=update_agent_dropdown, inputs=None, outputs=[agent_prompt_selector], ) # Create event trigger # Create event trigger for run button run_event = run_button.click( fn=run_task_wrapper, inputs=[ task_input, max_loops_slider, dynamic_slider, agent_selector, agent_prompt_selector, flow_text, provider_dropdown, model_dropdown, api_key_input, max_tokens_slider ], outputs=[ agent_output_display, loading_status, env_api_key_textbox, error_display, log_display, ], ) # Connect cancel button to interrupt processing def cancel_task(): return "Task cancelled.", "Cancelled", "", gr.update(visible=False), gr.update(visible=False) cancel_button.click( fn=cancel_task, inputs=None, outputs=[ agent_output_display, loading_status, env_api_key_textbox, error_display, log_display ], cancels=run_event, ) with gr.Column(scale=1): # Right column with gr.Tabs(): with gr.Tab("Agent Details"): ui.create_agent_details_tab() with gr.Tab("Logs"): logs_display = ui.create_logs_tab() def update_logs_display(): """Update logs display with current logs.""" return "" # Update logs when tab is selected logs_tab = gr.Tab("Logs") logs_tab.select( fn=update_logs_display, inputs=None, outputs=[logs_display], ) return ui.build() # if __name__ == "__main__": # app = create_app() # app.launch()