Spaces:
Sleeping
Sleeping
import os | |
from dotenv import load_dotenv | |
from typing import AsyncGenerator, List, Dict, Any, Tuple, Optional | |
import json | |
import time | |
import asyncio | |
import gradio as gr | |
from swarms.structs.agent import Agent | |
from swarms.structs.swarm_router import SwarmRouter | |
from swarms.utils.loguru_logger import initialize_logger | |
import re | |
import csv # Import the csv module for csv parsing | |
from swarms.utils.litellm_wrapper import LiteLLM | |
from litellm import models_by_provider | |
from dotenv import set_key, find_dotenv | |
import logging # Import the logging module | |
import litellm # Import litellm exception | |
# Initialize logger | |
load_dotenv() | |
# Initialize logger | |
logger = initialize_logger(log_folder="swarm_ui") | |
# Define the path to agent_prompts.json | |
PROMPT_JSON_PATH = os.path.join( | |
os.path.dirname(os.path.abspath(__file__)), "agent_prompts.json" | |
) | |
logger.info(f"Loading prompts from: {PROMPT_JSON_PATH}") | |
# Load prompts first so its available for create_app | |
def load_prompts_from_json() -> Dict[str, str]: | |
try: | |
if not os.path.exists(PROMPT_JSON_PATH): | |
# Load default prompts | |
return { | |
"Agent-Data_Extractor": "You are a data extraction agent...", | |
"Agent-Summarizer": "You are a summarization agent...", | |
"Agent-Onboarding_Agent": "You are an onboarding agent...", | |
} | |
with open(PROMPT_JSON_PATH, "r", encoding="utf-8") as f: | |
try: | |
data = json.load(f) | |
except json.JSONDecodeError: | |
# Load default prompts | |
return { | |
"Agent-Data_Extractor": "You are a data extraction agent...", | |
"Agent-Summarizer": "You are a summarization agent...", | |
"Agent-Onboarding_Agent": "You are an onboarding agent...", | |
} | |
if not isinstance(data, dict): | |
# Load default prompts | |
return { | |
"Agent-Data_Extractor": "You are a data extraction agent...", | |
"Agent-Summarizer": "You are a summarization agent...", | |
"Agent-Onboarding_Agent": "You are an onboarding agent...", | |
} | |
prompts = {} | |
for agent_name, details in data.items(): | |
if ( | |
not isinstance(details, dict) | |
or "system_prompt" not in details | |
): | |
continue | |
prompts[agent_name] = details["system_prompt"] | |
if not prompts: | |
# Load default prompts | |
return { | |
"Agent-Data_Extractor": "You are a data extraction agent...", | |
"Agent-Summarizer": "You are a summarization agent...", | |
"Agent-Onboarding_Agent": "You are an onboarding agent...", | |
} | |
return prompts | |
except Exception: | |
# Load default prompts | |
return { | |
"Agent-Data_Extractor": "You are a data extraction agent...", | |
"Agent-Summarizer": "You are a summarization agent...", | |
"Agent-Onboarding_Agent": "You are an onboarding agent...", | |
} | |
AGENT_PROMPTS = load_prompts_from_json() | |
api_keys = {} | |
def initialize_agents( | |
dynamic_temp: float, | |
agent_keys: List[str], | |
model_name: str, | |
provider: str, | |
api_key: str, | |
temperature: float, | |
max_tokens: int, | |
) -> List[Agent]: | |
logger.info("Initializing agents...") | |
agents = [] | |
seen_names = set() | |
try: | |
for agent_key in agent_keys: | |
if agent_key not in AGENT_PROMPTS: | |
raise ValueError(f"Invalid agent key: {agent_key}") | |
agent_prompt = AGENT_PROMPTS[agent_key] | |
agent_name = agent_key | |
# Ensure unique agent names | |
base_name = agent_name | |
counter = 1 | |
while agent_name in seen_names: | |
agent_name = f"{base_name}_{counter}" | |
counter += 1 | |
seen_names.add(agent_name) | |
# Set API key using os.environ temporarily | |
if provider == "openai": | |
os.environ["OPENAI_API_KEY"] = api_key | |
elif provider == "anthropic": | |
os.environ["ANTHROPIC_API_KEY"] = api_key | |
elif provider == "cohere": | |
os.environ["COHERE_API_KEY"] = api_key | |
elif provider == "gemini": | |
os.environ["GEMINI_API_KEY"] = api_key | |
elif provider == "mistral": | |
os.environ["MISTRAL_API_KEY"] = api_key | |
elif provider == "groq": | |
os.environ["GROQ_API_KEY"] = api_key | |
elif provider == "perplexity": | |
os.environ["PERPLEXITY_API_KEY"] = api_key | |
# Add other providers and their environment variable names as needed | |
# Create LiteLLM instance (Now it will read from os.environ) | |
llm = LiteLLM( | |
model_name=model_name, | |
system_prompt=agent_prompt, | |
temperature=temperature, | |
max_tokens=max_tokens, | |
) | |
agent = Agent( | |
agent_name=agent_name, | |
system_prompt=agent_prompt, | |
llm=llm, | |
max_loops=1, | |
autosave=True, | |
verbose=True, | |
dynamic_temperature_enabled=True, | |
saved_state_path=f"agent_{agent_name}.json", | |
user_name="pe_firm", | |
retry_attempts=1, | |
context_length=200000, | |
output_type="string", # here is the output type which is string | |
temperature=dynamic_temp, | |
) | |
print(f"Agent created: {agent.agent_name}") | |
agents.append(agent) | |
logger.info(f"Agents initialized successfully: {[agent.agent_name for agent in agents]}") | |
return agents | |
except Exception as e: | |
logger.error(f"Error initializing agents: {e}", exc_info=True) | |
raise | |
def validate_flow(flow, agents_dict): | |
logger.info(f"Validating flow: {flow}") | |
agent_names = flow.split("->") | |
for agent in agent_names: | |
agent = agent.strip() | |
if agent not in agents_dict: | |
logger.error(f"Agent '{agent}' specified in the flow does not exist.") | |
raise ValueError( | |
f"Agent '{agent}' specified in the flow does not exist." | |
) | |
logger.info(f"Flow validated successfully: {flow}") | |
class TaskExecutionError(Exception): | |
"""Custom exception for task execution errors.""" | |
def __init__(self, message: str): | |
self.message = message | |
super().__init__(self.message) | |
def __str__(self): | |
return f"TaskExecutionError: {self.message}" | |
async def execute_task( | |
task: str, | |
max_loops: int, | |
dynamic_temp: float, | |
swarm_type: str, | |
agent_keys: List[str], | |
flow: str = None, | |
model_name: str = "gpt-4o", | |
provider: str = "openai", | |
api_key: str = None, | |
temperature: float = 0.5, | |
max_tokens: int = 4000, | |
agents: dict = None, | |
log_display=None, | |
error_display=None | |
) -> AsyncGenerator[Tuple[Any, Optional["SwarmRouter"], str], None]: # Changed the return type here | |
logger.info(f"Executing task: {task} with swarm type: {swarm_type}") | |
try: | |
if not task: | |
logger.error("Task description is missing.") | |
yield "Please provide a task description.", gr.update(visible=True), "" | |
return | |
if not agent_keys: | |
logger.error("No agents selected.") | |
yield "Please select at least one agent.", gr.update(visible=True), "" | |
return | |
if not provider: | |
logger.error("Provider is missing.") | |
yield "Please select a provider.", gr.update(visible=True), "" | |
return | |
if not model_name: | |
logger.error("Model is missing.") | |
yield "Please select a model.", gr.update(visible=True), "" | |
return | |
if not api_key: | |
logger.error("API Key is missing.") | |
yield "Please enter an API Key.", gr.update(visible=True), "" | |
return | |
# Initialize agents | |
try: | |
if not agents: | |
agents = initialize_agents( | |
dynamic_temp, | |
agent_keys, | |
model_name, | |
provider, | |
api_key, | |
temperature, | |
max_tokens, | |
) | |
except Exception as e: | |
logger.error(f"Error initializing agents: {e}", exc_info=True) | |
yield f"Error initializing agents: {e}", gr.update(visible=True), "" | |
return | |
# Swarm-specific configurations | |
router_kwargs = { | |
"name": "multi-agent-workflow", | |
"description": f"Executing {swarm_type} workflow", | |
"max_loops": max_loops, | |
"agents": list(agents.values()), | |
"autosave": True, | |
"return_json": True, | |
"output_type": "string", # Default output type | |
"swarm_type": swarm_type, # Pass swarm_type here | |
} | |
if swarm_type == "AgentRearrange": | |
if not flow: | |
logger.error("Flow configuration is missing for AgentRearrange.") | |
yield "Flow configuration is required for AgentRearrange", gr.update(visible=True), "" | |
return | |
# Generate unique agent names in the flow | |
flow_agents = [] | |
used_agent_names = set() | |
for agent_key in flow.split("->"): | |
agent_key = agent_key.strip() | |
base_agent_name = agent_key | |
count = 1 | |
while agent_key in used_agent_names: | |
agent_key = f"{base_agent_name}_{count}" | |
count += 1 | |
used_agent_names.add(agent_key) | |
flow_agents.append(agent_key) | |
# Update the flow string with unique names | |
flow = " -> ".join(flow_agents) | |
logger.info(f"Updated Flow string: {flow}") | |
router_kwargs["flow"] = flow | |
router_kwargs["output_type"] = "string" # Changed output type here | |
if swarm_type == "MixtureOfAgents": | |
if len(agents) < 2: | |
logger.error("MixtureOfAgents requires at least 2 agents.") | |
yield "MixtureOfAgents requires at least 2 agents", gr.update(visible=True), "" | |
return | |
if swarm_type == "SequentialWorkflow": | |
if len(agents) < 2: | |
logger.error("SequentialWorkflow requires at least 2 agents.") | |
yield "SequentialWorkflow requires at least 2 agents", gr.update(visible=True), "" | |
return | |
if swarm_type == "ConcurrentWorkflow": | |
pass | |
if swarm_type == "SpreadSheetSwarm": | |
pass | |
if swarm_type == "auto": | |
pass | |
# Create and execute SwarmRouter | |
try: | |
timeout = ( | |
450 if swarm_type != "SpreadSheetSwarm" else 900 | |
) # SpreadSheetSwarm will have different timeout. | |
if swarm_type == "AgentRearrange": | |
from swarms.structs.rearrange import AgentRearrange | |
router = AgentRearrange( | |
agents=list(agents.values()), | |
flow=flow, | |
max_loops=max_loops, | |
name="multi-agent-workflow", | |
description=f"Executing {swarm_type} workflow", | |
# autosave=True, | |
return_json=True, | |
output_type="string", # Changed output type according to agent rearrange | |
) | |
result = router(task) # Changed run method | |
logger.info(f"AgentRearrange task executed successfully.") | |
yield result, None, "" | |
return | |
# For other swarm types use the SwarmRouter and its run method | |
router = SwarmRouter(**router_kwargs) # Initialize SwarmRouter | |
if swarm_type == "ConcurrentWorkflow": | |
async def run_agent_task(agent, task_): | |
return agent.run(task_) | |
tasks = [ | |
run_agent_task(agent, task) | |
for agent in list(agents.values()) | |
] | |
responses = await asyncio.gather(*tasks) | |
result = {} | |
for agent, response in zip(list(agents.values()), responses): | |
result[agent.agent_name] = response | |
# Convert the result to JSON string for parsing | |
result = json.dumps( | |
{ | |
"input" : { | |
"swarm_id" : "concurrent_workflow_swarm_id", | |
"name" : "ConcurrentWorkflow", | |
"flow" : "->".join([agent.agent_name for agent in list(agents.values())]) | |
}, | |
"time" : time.time(), | |
"outputs" : [ | |
{ | |
"agent_name": agent_name, | |
"steps" : [{"role":"assistant", "content":response}] | |
} for agent_name, response in result.items() | |
] | |
} | |
) | |
logger.info(f"ConcurrentWorkflow task executed successfully.") | |
yield result, None, "" | |
return | |
elif swarm_type == "auto": | |
result = await asyncio.wait_for( | |
asyncio.to_thread(router.run, task), | |
timeout=timeout | |
) | |
if isinstance(result,dict): | |
result = json.dumps( | |
{ | |
"input" : { | |
"swarm_id" : "auto_swarm_id", | |
"name" : "AutoSwarm", | |
"flow" : "->".join([agent.agent_name for agent in list(agents.values())]) | |
}, | |
"time" : time.time(), | |
"outputs" : [ | |
{ | |
"agent_name": agent.agent_name, | |
"steps" : [{"role":"assistant", "content":response}] | |
} for agent, response in result.items() | |
] | |
} | |
) | |
elif isinstance(result, str): | |
result = json.dumps( | |
{ | |
"input" : { | |
"swarm_id" : "auto_swarm_id", | |
"name" : "AutoSwarm", | |
"flow" : "->".join([agent.agent_name for agent in list(agents.values())]) | |
}, | |
"time" : time.time(), | |
"outputs" : [ | |
{ | |
"agent_name": "auto", | |
"steps" : [{"role":"assistant", "content":result}] | |
} | |
] | |
} | |
) | |
else : | |
logger.error("Auto Swarm returned an unexpected type") | |
yield "Error : Auto Swarm returned an unexpected type", gr.update(visible=True), "" | |
return | |
logger.info(f"Auto task executed successfully.") | |
yield result, None, "" | |
return | |
else: | |
result = await asyncio.wait_for( | |
asyncio.to_thread(router.run, task), | |
timeout=timeout | |
) | |
logger.info(f"{swarm_type} task executed successfully.") | |
yield result, None, "" | |
return | |
except asyncio.TimeoutError as e: | |
logger.error(f"Task execution timed out after {timeout} seconds", exc_info=True) | |
yield f"Task execution timed out after {timeout} seconds", gr.update(visible=True), "" | |
return | |
except litellm.exceptions.APIError as e: # Catch litellm APIError | |
logger.error(f"LiteLLM API Error: {e}", exc_info=True) | |
yield f"LiteLLM API Error: {e}", gr.update(visible=True), "" | |
return | |
except litellm.exceptions.AuthenticationError as e: # Catch litellm AuthenticationError | |
logger.error(f"LiteLLM Authentication Error: {e}", exc_info=True) | |
yield f"LiteLLM Authentication Error: {e}", gr.update(visible=True), "" | |
return | |
except Exception as e: | |
logger.error(f"Error executing task: {e}", exc_info=True) | |
yield f"Error executing task: {e}", gr.update(visible=True), "" | |
return | |
except TaskExecutionError as e: | |
logger.error(f"Task execution error: {e}") | |
yield str(e), gr.update(visible=True), "" | |
return | |
except Exception as e: | |
logger.error(f"An unexpected error occurred: {e}", exc_info=True) | |
yield f"An unexpected error occurred: {e}", gr.update(visible=True), "" | |
return | |
finally: | |
logger.info(f"Task execution finished for: {task} with swarm type: {swarm_type}") | |
def format_output(data:Optional[str], swarm_type:str, error_display=None) -> str: | |
if data is None: | |
return "Error : No output from the swarm." | |
if swarm_type == "AgentRearrange": | |
return parse_agent_rearrange_output(data, error_display) | |
elif swarm_type == "MixtureOfAgents": | |
return parse_mixture_of_agents_output(data, error_display) | |
elif swarm_type in ["SequentialWorkflow", "ConcurrentWorkflow"]: | |
return parse_sequential_workflow_output(data, error_display) | |
elif swarm_type == "SpreadSheetSwarm": | |
if os.path.exists(data): | |
return parse_spreadsheet_swarm_output(data, error_display) | |
else: | |
return data # Directly return JSON response | |
elif swarm_type == "auto": | |
return parse_auto_swarm_output(data, error_display) | |
else: | |
return "Unsupported swarm type." | |
def parse_mixture_of_agents_data(data: dict, error_display=None) -> str: | |
"""Parses the MixtureOfAgents output data and formats it for display.""" | |
logger.info("Parsing MixtureOfAgents data within Auto Swarm output...") | |
try: | |
output = "" | |
if "InputConfig" in data and isinstance(data["InputConfig"], dict): | |
input_config = data["InputConfig"] | |
output += f"Mixture of Agents Workflow Details\n\n" | |
output += f"Name: `{input_config.get('name', 'N/A')}`\n" | |
output += ( | |
f"Description:" | |
f" `{input_config.get('description', 'N/A')}`\n\n---\n" | |
) | |
output += f"Agent Task Execution\n\n" | |
for agent in input_config.get("agents", []): | |
output += ( | |
f"Agent: `{agent.get('agent_name', 'N/A')}`\n" | |
) | |
if "normal_agent_outputs" in data and isinstance( | |
data["normal_agent_outputs"], list | |
): | |
for i, agent_output in enumerate( | |
data["normal_agent_outputs"], start=3 | |
): | |
agent_name = agent_output.get("agent_name", "N/A") | |
output += f"Run {(3 - i)} (Agent: `{agent_name}`)\n\n" | |
for j, step in enumerate( | |
agent_output.get("steps", []), start=3 | |
): | |
if ( | |
isinstance(step, dict) | |
and "role" in step | |
and "content" in step | |
and step["role"].strip() != "System:" | |
): | |
content = step["content"] | |
output += f"Step {(3 - j)}: \n" | |
output += f"Response:\n {content}\n\n" | |
if "aggregator_agent_summary" in data: | |
output += ( | |
f"\nAggregated Summary :\n" | |
f"{data['aggregator_agent_summary']}\n{'=' * 50}\n" | |
) | |
logger.info("MixtureOfAgents data parsed successfully within Auto Swarm.") | |
return output | |
except Exception as e: | |
logger.error( | |
f"Error during parsing MixtureOfAgents data within Auto Swarm: {e}", | |
exc_info=True, | |
) | |
return f"Error during parsing: {str(e)}" | |
def parse_auto_swarm_output(data: Optional[str], error_display=None) -> str: | |
"""Parses the auto swarm output string and formats it for display.""" | |
logger.info("Parsing Auto Swarm output...") | |
if data is None: | |
logger.error("No data provided for parsing Auto Swarm output.") | |
return "Error: No data provided for parsing." | |
print(f"Raw data received for parsing:\n{data}") # Debug: Print raw data | |
try: | |
parsed_data = json.loads(data) | |
errors = [] | |
# Basic structure validation | |
if ( | |
"input" not in parsed_data | |
or not isinstance(parsed_data.get("input"), dict) | |
): | |
errors.append( | |
"Error: 'input' data is missing or not a dictionary." | |
) | |
else: | |
if "swarm_id" not in parsed_data["input"]: | |
errors.append( | |
"Error: 'swarm_id' key is missing in the 'input'." | |
) | |
if "name" not in parsed_data["input"]: | |
errors.append( | |
"Error: 'name' key is missing in the 'input'." | |
) | |
if "flow" not in parsed_data["input"]: | |
errors.append( | |
"Error: 'flow' key is missing in the 'input'." | |
) | |
if "time" not in parsed_data: | |
errors.append("Error: 'time' key is missing.") | |
if errors: | |
logger.error( | |
f"Errors found while parsing Auto Swarm output: {errors}" | |
) | |
return "\n".join(errors) | |
swarm_id = parsed_data["input"]["swarm_id"] | |
swarm_name = parsed_data["input"]["name"] | |
agent_flow = parsed_data["input"]["flow"] | |
overall_time = parsed_data["time"] | |
output = f"Workflow Execution Details\n\n" | |
output += f"Swarm ID: `{swarm_id}`\n" | |
output += f"Swarm Name: `{swarm_name}`\n" | |
output += f"Agent Flow: `{agent_flow}`\n\n---\n" | |
output += f"Agent Task Execution\n\n" | |
# Handle nested MixtureOfAgents data or other swarm type data | |
if ( | |
"outputs" in parsed_data | |
and isinstance(parsed_data["outputs"], list) | |
and parsed_data["outputs"] | |
and isinstance(parsed_data["outputs"][0], dict) | |
): | |
if parsed_data["outputs"][0].get("agent_name") == "auto": | |
mixture_data = parsed_data["outputs"][0].get("steps", []) | |
if mixture_data and isinstance(mixture_data[0], dict) and "content" in mixture_data[0]: | |
try: | |
mixture_content = json.loads(mixture_data[0]["content"]) | |
output += parse_mixture_of_agents_data(mixture_content) | |
except json.JSONDecodeError as e: | |
logger.error(f"Error decoding nested MixtureOfAgents data: {e}", exc_info=True) | |
return f"Error decoding nested MixtureOfAgents data: {e}" | |
else: | |
for i, agent_output in enumerate(parsed_data["outputs"], start=3): | |
if not isinstance(agent_output, dict): | |
errors.append(f"Error: Agent output at index {i} is not a dictionary") | |
continue | |
if "agent_name" not in agent_output: | |
errors.append(f"Error: 'agent_name' key is missing at index {i}") | |
continue | |
if "steps" not in agent_output: | |
errors.append(f"Error: 'steps' key is missing at index {i}") | |
continue | |
if agent_output["steps"] is None: | |
errors.append(f"Error: 'steps' data is None at index {i}") | |
continue | |
if not isinstance(agent_output["steps"], list): | |
errors.append(f"Error: 'steps' data is not a list at index {i}") | |
continue | |
agent_name = agent_output["agent_name"] | |
output += f"Run {(3-i)} (Agent: `{agent_name}`)\n\n" | |
# Iterate over steps | |
for j, step in enumerate(agent_output["steps"], start=3): | |
if not isinstance(step, dict): | |
errors.append(f"Error: step at index {j} is not a dictionary at {i} agent output.") | |
continue | |
if step is None: | |
errors.append(f"Error: step at index {j} is None at {i} agent output") | |
continue | |
if "role" not in step: | |
errors.append(f"Error: 'role' key missing at step {j} at {i} agent output.") | |
continue | |
if "content" not in step: | |
errors.append(f"Error: 'content' key missing at step {j} at {i} agent output.") | |
continue | |
if step["role"].strip() != "System:": # Filter out system prompts | |
content = step["content"] | |
output += f"Step {(3-j)}:\n" | |
output += f"Response : {content}\n\n" | |
else: | |
logger.error("Error: 'outputs' data is not in the expected format.") | |
return "Error: 'outputs' data is not in the expected format." | |
output += f"Overall Completion Time: `{overall_time}`" | |
if errors: | |
logger.error( | |
f"Errors found while parsing Auto Swarm output: {errors}" | |
) | |
return "\n".join(errors) | |
logger.info("Auto Swarm output parsed successfully.") | |
return output | |
except json.JSONDecodeError as e: | |
logger.error( | |
f"Error during parsing Auto Swarm output: {e}", exc_info=True | |
) | |
return f"Error during parsing json.JSONDecodeError: {e}" | |
except Exception as e: | |
logger.error( | |
f"Error during parsing Auto Swarm output: {e}", exc_info=True | |
) | |
return f"Error during parsing: {str(e)}" | |
def parse_agent_rearrange_output(data: Optional[str], error_display=None) -> str: | |
""" | |
Parses the AgentRearrange output string and formats it for display. | |
""" | |
logger.info("Parsing AgentRearrange output...") | |
if data is None: | |
logger.error("No data provided for parsing AgentRearrange output.") | |
return "Error: No data provided for parsing." | |
print( | |
f"Raw data received for parsing:\n{data}" | |
) # Debug: Print raw data | |
try: | |
parsed_data = json.loads(data) | |
errors = [] | |
if ( | |
"input" not in parsed_data | |
or not isinstance(parsed_data.get("input"), dict) | |
): | |
errors.append( | |
"Error: 'input' data is missing or not a dictionary." | |
) | |
else: | |
if "swarm_id" not in parsed_data["input"]: | |
errors.append( | |
"Error: 'swarm_id' key is missing in the 'input'." | |
) | |
if "name" not in parsed_data["input"]: | |
errors.append( | |
"Error: 'name' key is missing in the 'input'." | |
) | |
if "flow" not in parsed_data["input"]: | |
errors.append( | |
"Error: 'flow' key is missing in the 'input'." | |
) | |
if "time" not in parsed_data: | |
errors.append("Error: 'time' key is missing.") | |
if errors: | |
logger.error(f"Errors found while parsing AgentRearrange output: {errors}") | |
return "\n".join(errors) | |
swarm_id = parsed_data["input"]["swarm_id"] | |
swarm_name = parsed_data["input"]["name"] | |
agent_flow = parsed_data["input"]["flow"] | |
overall_time = parsed_data["time"] | |
output = f"Workflow Execution Details\n\n" | |
output += f"Swarm ID: `{swarm_id}`\n" | |
output += f"Swarm Name: `{swarm_name}`\n" | |
output += f"Agent Flow: `{agent_flow}`\n\n---\n" | |
output += f"Agent Task Execution\n\n" | |
if "outputs" not in parsed_data: | |
errors.append("Error: 'outputs' key is missing") | |
elif parsed_data["outputs"] is None: | |
errors.append("Error: 'outputs' data is None") | |
elif not isinstance(parsed_data["outputs"], list): | |
errors.append("Error: 'outputs' data is not a list.") | |
elif not parsed_data["outputs"]: | |
errors.append("Error: 'outputs' list is empty.") | |
if errors: | |
logger.error(f"Errors found while parsing AgentRearrange output: {errors}") | |
return "\n".join(errors) | |
for i, agent_output in enumerate( | |
parsed_data["outputs"], start=3 | |
): | |
if not isinstance(agent_output, dict): | |
errors.append( | |
f"Error: Agent output at index {i} is not a" | |
" dictionary" | |
) | |
continue | |
if "agent_name" not in agent_output: | |
errors.append( | |
f"Error: 'agent_name' key is missing at index {i}" | |
) | |
continue | |
if "steps" not in agent_output: | |
errors.append( | |
f"Error: 'steps' key is missing at index {i}" | |
) | |
continue | |
if agent_output["steps"] is None: | |
errors.append( | |
f"Error: 'steps' data is None at index {i}" | |
) | |
continue | |
if not isinstance(agent_output["steps"], list): | |
errors.append( | |
f"Error: 'steps' data is not a list at index {i}" | |
) | |
continue | |
if not agent_output["steps"]: | |
errors.append( | |
f"Error: 'steps' list is empty at index {i}" | |
) | |
continue | |
agent_name = agent_output["agent_name"] | |
output += f"Run {(3-i)} (Agent: `{agent_name}`)**\n\n" | |
# output += "<details>\n<summary>Show/Hide Agent Steps</summary>\n\n" | |
# Iterate over steps | |
for j, step in enumerate(agent_output["steps"], start=3): | |
if not isinstance(step, dict): | |
errors.append( | |
f"Error: step at index {j} is not a dictionary" | |
f" at {i} agent output." | |
) | |
continue | |
if step is None: | |
errors.append( | |
f"Error: step at index {j} is None at {i} agent" | |
" output" | |
) | |
continue | |
if "role" not in step: | |
errors.append( | |
f"Error: 'role' key missing at step {j} at {i}" | |
" agent output." | |
) | |
continue | |
if "content" not in step: | |
errors.append( | |
f"Error: 'content' key missing at step {j} at" | |
f" {i} agent output." | |
) | |
continue | |
if step["role"].strip() != "System:": # Filter out system prompts | |
# role = step["role"] | |
content = step["content"] | |
output += f"Step {(3-j)}: \n" | |
output += f"Response :\n {content}\n\n" | |
# output += "</details>\n\n---\n" | |
output += f"Overall Completion Time: `{overall_time}`" | |
if errors: | |
logger.error(f"Errors found while parsing AgentRearrange output: {errors}") | |
return "\n".join(errors) | |
else: | |
logger.info("AgentRearrange output parsed successfully.") | |
return output | |
except json.JSONDecodeError as e: | |
logger.error(f"Error during parsing AgentRearrange output: {e}", exc_info=True) | |
return f"Error during parsing: json.JSONDecodeError {e}" | |
except Exception as e: | |
logger.error(f"Error during parsing AgentRearrange output: {e}", exc_info=True) | |
return f"Error during parsing: {str(e)}" | |
def parse_mixture_of_agents_output(data: Optional[str], error_display=None) -> str: | |
"""Parses the MixtureOfAgents output string and formats it for display.""" | |
logger.info("Parsing MixtureOfAgents output...") | |
if data is None: | |
logger.error("No data provided for parsing MixtureOfAgents output.") | |
return "Error: No data provided for parsing." | |
print(f"Raw data received for parsing:\n{data}") # Debug: Print raw data | |
try: | |
parsed_data = json.loads(data) | |
if "InputConfig" not in parsed_data or not isinstance(parsed_data["InputConfig"], dict): | |
logger.error("Error: 'InputConfig' data is missing or not a dictionary.") | |
return "Error: 'InputConfig' data is missing or not a dictionary." | |
if "name" not in parsed_data["InputConfig"]: | |
logger.error("Error: 'name' key is missing in 'InputConfig'.") | |
return "Error: 'name' key is missing in 'InputConfig'." | |
if "description" not in parsed_data["InputConfig"]: | |
logger.error("Error: 'description' key is missing in 'InputConfig'.") | |
return "Error: 'description' key is missing in 'InputConfig'." | |
if "agents" not in parsed_data["InputConfig"] or not isinstance(parsed_data["InputConfig"]["agents"], list) : | |
logger.error("Error: 'agents' key is missing in 'InputConfig' or not a list.") | |
return "Error: 'agents' key is missing in 'InputConfig' or not a list." | |
name = parsed_data["InputConfig"]["name"] | |
description = parsed_data["InputConfig"]["description"] | |
output = f"Mixture of Agents Workflow Details\n\n" | |
output += f"Name: `{name}`\n" | |
output += f"Description: `{description}`\n\n---\n" | |
output += f"Agent Task Execution\n\n" | |
for agent in parsed_data["InputConfig"]["agents"]: | |
if not isinstance(agent, dict): | |
logger.error("Error: agent is not a dict in InputConfig agents") | |
return "Error: agent is not a dict in InputConfig agents" | |
if "agent_name" not in agent: | |
logger.error("Error: 'agent_name' key is missing in agents.") | |
return "Error: 'agent_name' key is missing in agents." | |
if "system_prompt" not in agent: | |
logger.error("Error: 'system_prompt' key is missing in agents.") | |
return f"Error: 'system_prompt' key is missing in agents." | |
agent_name = agent["agent_name"] | |
# system_prompt = agent["system_prompt"] | |
output += f"Agent: `{agent_name}`\n" | |
# output += f"* **System Prompt:** `{system_prompt}`\n\n" | |
if "normal_agent_outputs" not in parsed_data or not isinstance(parsed_data["normal_agent_outputs"], list) : | |
logger.error("Error: 'normal_agent_outputs' key is missing or not a list.") | |
return "Error: 'normal_agent_outputs' key is missing or not a list." | |
for i, agent_output in enumerate(parsed_data["normal_agent_outputs"], start=3): | |
if not isinstance(agent_output, dict): | |
logger.error(f"Error: agent output at index {i} is not a dictionary.") | |
return f"Error: agent output at index {i} is not a dictionary." | |
if "agent_name" not in agent_output: | |
logger.error(f"Error: 'agent_name' key is missing at index {i}") | |
return f"Error: 'agent_name' key is missing at index {i}" | |
if "steps" not in agent_output: | |
logger.error(f"Error: 'steps' key is missing at index {i}") | |
return f"Error: 'steps' key is missing at index {i}" | |
if agent_output["steps"] is None: | |
logger.error(f"Error: 'steps' is None at index {i}") | |
return f"Error: 'steps' is None at index {i}" | |
if not isinstance(agent_output["steps"], list): | |
logger.error(f"Error: 'steps' data is not a list at index {i}.") | |
return f"Error: 'steps' data is not a list at index {i}." | |
agent_name = agent_output["agent_name"] | |
output += f"Run {(3-i)} (Agent: `{agent_name}`)\n\n" | |
# output += "<details>\n<summary>Show/Hide Agent Steps</summary>\n\n" | |
for j, step in enumerate(agent_output["steps"], start=3): | |
if not isinstance(step, dict): | |
logger.error(f"Error: step at index {j} is not a dictionary at {i} agent output.") | |
return f"Error: step at index {j} is not a dictionary at {i} agent output." | |
if step is None: | |
logger.error(f"Error: step at index {j} is None at {i} agent output.") | |
return f"Error: step at index {j} is None at {i} agent output." | |
if "role" not in step: | |
logger.error(f"Error: 'role' key missing at step {j} at {i} agent output.") | |
return f"Error: 'role' key missing at step {j} at {i} agent output." | |
if "content" not in step: | |
logger.error(f"Error: 'content' key missing at step {j} at {i} agent output.") | |
return f"Error: 'content' key missing at step {j} at {i} agent output." | |
if step["role"].strip() != "System:": # Filter out system prompts | |
# role = step["role"] | |
content = step["content"] | |
output += f"Step {(3-j)}: \n" | |
output += f"Response:\n {content}\n\n" | |
# output += "</details>\n\n---\n" | |
if "aggregator_agent_summary" in parsed_data: | |
output += f"\nAggregated Summary :\n{parsed_data['aggregator_agent_summary']}\n{'=' * 50}\n" | |
logger.info("MixtureOfAgents output parsed successfully.") | |
return output | |
except json.JSONDecodeError as e: | |
logger.error(f"Error during parsing MixtureOfAgents output: {e}", exc_info=True) | |
return f"Error during parsing json.JSONDecodeError : {e}" | |
except Exception as e: | |
logger.error(f"Error during parsing MixtureOfAgents output: {e}", exc_info=True) | |
return f"Error during parsing: {str(e)}" | |
def parse_sequential_workflow_output(data: Optional[str], error_display=None) -> str: | |
"""Parses the SequentialWorkflow output string and formats it for display.""" | |
logger.info("Parsing SequentialWorkflow output...") | |
if data is None: | |
logger.error("No data provided for parsing SequentialWorkflow output.") | |
return "Error: No data provided for parsing." | |
print(f"Raw data received for parsing:\n{data}") # Debug: Print raw data | |
try: | |
parsed_data = json.loads(data) | |
if "input" not in parsed_data or not isinstance(parsed_data.get("input"), dict): | |
logger.error("Error: 'input' data is missing or not a dictionary.") | |
return "Error: 'input' data is missing or not a dictionary." | |
if "swarm_id" not in parsed_data["input"] : | |
logger.error("Error: 'swarm_id' key is missing in the 'input'.") | |
return "Error: 'swarm_id' key is missing in the 'input'." | |
if "name" not in parsed_data["input"]: | |
logger.error("Error: 'name' key is missing in the 'input'.") | |
return "Error: 'name' key is missing in the 'input'." | |
if "flow" not in parsed_data["input"]: | |
logger.error("Error: 'flow' key is missing in the 'input'.") | |
return "Error: 'flow' key is missing in the 'input'." | |
if "time" not in parsed_data : | |
logger.error("Error: 'time' key is missing.") | |
return "Error: 'time' key is missing." | |
swarm_id = parsed_data["input"]["swarm_id"] | |
swarm_name = parsed_data["input"]["name"] | |
agent_flow = parsed_data["input"]["flow"] | |
overall_time = parsed_data["time"] | |
output = f"Workflow Execution Details\n\n" | |
output += f"Swarm ID: `{swarm_id}`\n" | |
output += f"Swarm Name: `{swarm_name}`\n" | |
output += f"Agent Flow: `{agent_flow}`\n\n---\n" | |
output += f"Agent Task Execution\n\n" | |
if "outputs" not in parsed_data: | |
logger.error("Error: 'outputs' key is missing") | |
return "Error: 'outputs' key is missing" | |
if parsed_data["outputs"] is None: | |
logger.error("Error: 'outputs' data is None") | |
return "Error: 'outputs' data is None" | |
if not isinstance(parsed_data["outputs"], list): | |
logger.error("Error: 'outputs' data is not a list.") | |
return "Error: 'outputs' data is not a list." | |
for i, agent_output in enumerate(parsed_data["outputs"], start=3): | |
if not isinstance(agent_output, dict): | |
logger.error(f"Error: Agent output at index {i} is not a dictionary") | |
return f"Error: Agent output at index {i} is not a dictionary" | |
if "agent_name" not in agent_output: | |
logger.error(f"Error: 'agent_name' key is missing at index {i}") | |
return f"Error: 'agent_name' key is missing at index {i}" | |
if "steps" not in agent_output: | |
logger.error(f"Error: 'steps' key is missing at index {i}") | |
return f"Error: 'steps' key is missing at index {i}" | |
if agent_output["steps"] is None: | |
logger.error(f"Error: 'steps' data is None at index {i}") | |
return f"Error: 'steps' data is None at index {i}" | |
if not isinstance(agent_output["steps"], list): | |
logger.error(f"Error: 'steps' data is not a list at index {i}") | |
return f"Error: 'steps' data is not a list at index {i}" | |
agent_name = agent_output["agent_name"] | |
output += f"Run {(3-i)} (Agent: `{agent_name}`)\n\n" | |
# output += "<details>\n<summary>Show/Hide Agent Steps</summary>\n\n" | |
# Iterate over steps | |
for j, step in enumerate(agent_output["steps"], start=3): | |
if not isinstance(step, dict): | |
logger.error(f"Error: step at index {j} is not a dictionary at {i} agent output.") | |
return f"Error: step at index {j} is not a dictionary at {i} agent output." | |
if step is None: | |
logger.error(f"Error: step at index {j} is None at {i} agent output") | |
return f"Error: step at index {j} is None at {i} agent output" | |
if "role" not in step: | |
logger.error(f"Error: 'role' key missing at step {j} at {i} agent output.") | |
return f"Error: 'role' key missing at step {j} at {i} agent output." | |
if "content" not in step: | |
logger.error(f"Error: 'content' key missing at step {j} at {i} agent output.") | |
return f"Error: 'content' key missing at step {j} at {i} agent output." | |
if step["role"].strip() != "System:": # Filter out system prompts | |
# role = step["role"] | |
content = step["content"] | |
output += f"Step {(3-j)}:\n" | |
output += f"Response : {content}\n\n" | |
# output += "</details>\n\n---\n" | |
output += f"Overall Completion Time: `{overall_time}`" | |
logger.info("SequentialWorkflow output parsed successfully.") | |
return output | |
except json.JSONDecodeError as e : | |
logger.error(f"Error during parsing SequentialWorkflow output: {e}", exc_info=True) | |
return f"Error during parsing json.JSONDecodeError : {e}" | |
except Exception as e: | |
logger.error(f"Error during parsing SequentialWorkflow output: {e}", exc_info=True) | |
return f"Error during parsing: {str(e)}" | |
def parse_spreadsheet_swarm_output(file_path: str, error_display=None) -> str: | |
"""Parses the SpreadSheetSwarm output CSV file and formats it for display.""" | |
logger.info("Parsing SpreadSheetSwarm output...") | |
if not file_path: | |
logger.error("No file path provided for parsing SpreadSheetSwarm output.") | |
return "Error: No file path provided for parsing." | |
print(f"Parsing spreadsheet output from: {file_path}") | |
try: | |
with open(file_path, 'r', encoding='utf-8') as file: | |
csv_reader = csv.reader(file) | |
header = next(csv_reader, None) # Read the header row | |
if not header: | |
logger.error("CSV file is empty or has no header.") | |
return "Error: CSV file is empty or has no header" | |
output = "### Spreadsheet Swarm Output ###\n\n" | |
output += "| " + " | ".join(header) + " |\n" # Adding header | |
output += "| " + " | ".join(["---"] * len(header)) + " |\n" # Adding header seperator | |
for row in csv_reader: | |
output += "| " + " | ".join(row) + " |\n" # Adding row | |
output += "\n" | |
logger.info("SpreadSheetSwarm output parsed successfully.") | |
return output | |
except FileNotFoundError as e: | |
logger.error(f"Error during parsing SpreadSheetSwarm output: {e}", exc_info=True) | |
return "Error: CSV file not found." | |
except Exception as e: | |
logger.error(f"Error during parsing SpreadSheetSwarm output: {e}", exc_info=True) | |
return f"Error during parsing CSV file: {str(e)}" | |
def parse_json_output(data:str, error_display=None) -> str: | |
"""Parses a JSON string and formats it for display.""" | |
logger.info("Parsing JSON output...") | |
if not data: | |
logger.error("No data provided for parsing JSON output.") | |
return "Error: No data provided for parsing." | |
print(f"Parsing json output from: {data}") | |
try: | |
parsed_data = json.loads(data) | |
output = "### Swarm Metadata ###\n\n" | |
for key,value in parsed_data.items(): | |
if key == "outputs": | |
output += f"**{key}**:\n" | |
if isinstance(value, list): | |
for item in value: | |
output += f" - Agent Name : {item.get('agent_name', 'N/A')}\n" | |
output += f" Task : {item.get('task', 'N/A')}\n" | |
output += f" Result : {item.get('result', 'N/A')}\n" | |
output += f" Timestamp : {item.get('timestamp', 'N/A')}\n\n" | |
else : | |
output += f" {value}\n" | |
else : | |
output += f"**{key}**: {value}\n" | |
logger.info("JSON output parsed successfully.") | |
return output | |
except json.JSONDecodeError as e: | |
logger.error(f"Error during parsing JSON output: {e}", exc_info=True) | |
return f"Error: Invalid JSON format - {e}" | |
except Exception as e: | |
logger.error(f"Error during parsing JSON output: {e}", exc_info=True) | |
return f"Error during JSON parsing: {str(e)}" | |
class UI: | |
def __init__(self, theme): | |
self.theme = theme | |
self.blocks = gr.Blocks(theme=self.theme) | |
self.components = {} # Dictionary to store UI components | |
def create_markdown(self, text, is_header=False): | |
if is_header: | |
markdown = gr.Markdown( | |
f"<h1 style='color: #ffffff; text-align:" | |
f" center;'>{text}</h1>" | |
) | |
else: | |
markdown = gr.Markdown( | |
f"<p style='color: #cccccc; text-align:" | |
f" center;'>{text}</p>" | |
) | |
self.components[f"markdown_{text}"] = markdown | |
return markdown | |
def create_text_input(self, label, lines=3, placeholder=""): | |
text_input = gr.Textbox( | |
label=label, | |
lines=lines, | |
placeholder=placeholder, | |
elem_classes=["custom-input"], | |
) | |
self.components[f"text_input_{label}"] = text_input | |
return text_input | |
def create_slider( | |
self, label, minimum=0, maximum=1, value=0.5, step=0.1 | |
): | |
slider = gr.Slider( | |
minimum=minimum, | |
maximum=maximum, | |
value=value, | |
step=step, | |
label=label, | |
interactive=True, | |
) | |
self.components[f"slider_{label}"] = slider | |
return slider | |
def create_dropdown( | |
self, label, choices, value=None, multiselect=False | |
): | |
if not choices: | |
choices = ["No options available"] | |
if value is None and choices: | |
value = choices[0] if not multiselect else [choices[0]] | |
dropdown = gr.Dropdown( | |
label=label, | |
choices=choices, | |
value=value, | |
interactive=True, | |
multiselect=multiselect, | |
) | |
self.components[f"dropdown_{label}"] = dropdown | |
return dropdown | |
def create_button(self, text, variant="primary"): | |
button = gr.Button(text, variant=variant) | |
self.components[f"button_{text}"] = button | |
return button | |
def create_text_output(self, label, lines=10, placeholder=""): | |
text_output = gr.Textbox( | |
label=label, | |
interactive=False, | |
placeholder=placeholder, | |
lines=lines, | |
elem_classes=["custom-output"], | |
) | |
self.components[f"text_output_{label}"] = text_output | |
return text_output | |
def create_tab(self, label, content_function): | |
with gr.Tab(label): | |
content_function(self) | |
def set_event_listener(self, button, function, inputs, outputs): | |
button.click(function, inputs=inputs, outputs=outputs) | |
def get_components(self, *keys): | |
if not keys: | |
return self.components # return all components | |
return [self.components[key] for key in keys] | |
def create_json_output(self, label, placeholder=""): | |
json_output = gr.JSON( | |
label=label, | |
value={}, | |
elem_classes=["custom-output"], | |
) | |
self.components[f"json_output_{label}"] = json_output | |
return json_output | |
def build(self): | |
return self.blocks | |
def create_conditional_input( | |
self, component, visible_when, watch_component | |
): | |
"""Create an input that's only visible under certain conditions""" | |
watch_component.change( | |
fn=lambda x: gr.update(visible=visible_when(x)), | |
inputs=[watch_component], | |
outputs=[component], | |
) | |
def create_ui_theme(primary_color="red"): | |
return gr.themes.Ocean( | |
primary_hue=primary_color, | |
secondary_hue=primary_color, | |
neutral_hue="gray", | |
).set( | |
body_background_fill="#20252c", | |
body_text_color="#f0f0f0", | |
button_primary_background_fill=primary_color, | |
button_primary_text_color="#ffffff", | |
button_secondary_background_fill=primary_color, | |
button_secondary_text_color="#ffffff", | |
shadow_drop="0px 2px 4px rgba(0, 0, 0, 0.3)", | |
) | |
def create_agent_details_tab(self): | |
"""Create the agent details tab content.""" | |
with gr.Column(): | |
gr.Markdown("### Agent Details") | |
gr.Markdown( | |
""" | |
**Available Agent Types:** | |
- Data Extraction Agent: Specialized in extracting relevant information | |
- Summary Agent - Analysis Agent: Performs detailed analysis of data | |
**Swarm Types:** | |
- ConcurrentWorkflow: Agents work in parallel | |
- SequentialWorkflow: Agents work in sequence | |
- AgentRearrange: Custom agent execution flow | |
- MixtureOfAgents: Combines multiple agents with an aggregator | |
- SpreadSheetSwarm: Specialized for spreadsheet operations | |
- Auto: Automatically determines optimal workflow | |
**Note:** | |
Spreasheet swarm saves data in csv, will work in local setup ! | |
""" | |
) | |
return gr.Column() | |
def create_logs_tab(self): | |
"""Create the logs tab content.""" | |
with gr.Column(): | |
gr.Markdown("### Execution Logs") | |
logs_display = gr.Textbox( | |
label="System Logs", | |
placeholder="Execution logs will appear here...", | |
interactive=False, | |
lines=10, | |
) | |
return logs_display | |
def update_flow_agents(agent_keys): | |
"""Update flow agents based on selected agent prompts.""" | |
if not agent_keys: | |
return [], "No agents selected" | |
agent_names = [key for key in agent_keys] | |
print(f"Flow agents: {agent_names}") # Debug: Print flow agents | |
return agent_names, "Select agents in execution order" | |
def update_flow_preview(selected_flow_agents): | |
"""Update flow preview based on selected agents.""" | |
if not selected_flow_agents: | |
return "Flow will be shown here..." | |
flow = " -> ".join(selected_flow_agents) | |
return flow | |
def create_app(): | |
# Initialize UI | |
theme = UI.create_ui_theme(primary_color="red") | |
ui = UI(theme=theme) | |
global AGENT_PROMPTS | |
# Available providers and models | |
providers = [ | |
"openai", | |
"anthropic", | |
"cohere", | |
"gemini", | |
"mistral", | |
"groq", | |
"perplexity", | |
] | |
filtered_models = {} | |
for provider in providers: | |
filtered_models[provider] = models_by_provider.get(provider, []) | |
with ui.blocks: | |
with gr.Row(): | |
with gr.Column(scale=4): # Left column (80% width) | |
ui.create_markdown("Swarms", is_header=True) | |
ui.create_markdown( | |
"<b>The Enterprise-Grade Production-Ready Multi-Agent" | |
" Orchestration Framework</b>" | |
) | |
with gr.Row(): | |
with gr.Column(scale=4): | |
with gr.Row(): | |
task_input = gr.Textbox( | |
label="Task Description", | |
placeholder="Describe your task here...", | |
lines=3, | |
) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
with gr.Row(): | |
# Provider selection dropdown | |
provider_dropdown = gr.Dropdown( | |
label="Select Provider", | |
choices=providers, | |
value=providers[0] | |
if providers | |
else None, | |
interactive=True, | |
) | |
# with gr.Row(): | |
# # Model selection dropdown (initially empty) | |
model_dropdown = gr.Dropdown( | |
label="Select Model", | |
choices=[], | |
interactive=True, | |
) | |
with gr.Row(): | |
# API key input | |
api_key_input = gr.Textbox( | |
label="API Key", | |
placeholder="Enter your API key", | |
type="password", | |
) | |
with gr.Column(scale=1): | |
with gr.Row(): | |
dynamic_slider = gr.Slider( | |
label="Dyn. Temp", | |
minimum=0, | |
maximum=1, | |
value=0.1, | |
step=0.01, | |
) | |
# with gr.Row(): | |
# max tokens slider | |
max_loops_slider = gr.Slider( | |
label="Max Loops", | |
minimum=1, | |
maximum=10, | |
value=1, | |
step=1, | |
) | |
with gr.Row(): | |
# max tokens slider | |
max_tokens_slider = gr.Slider( | |
label="Max Tokens", | |
minimum=100, | |
maximum=10000, | |
value=4000, | |
step=100, | |
) | |
with gr.Column(scale=2, min_width=200): | |
with gr.Column(scale=1): | |
# Get available agent prompts | |
available_prompts = ( | |
list(AGENT_PROMPTS.keys()) | |
if AGENT_PROMPTS | |
else ["No agents available"] | |
) | |
agent_prompt_selector = gr.Dropdown( | |
label="Select Agent Prompts", | |
choices=available_prompts, | |
value=[available_prompts[0]] | |
if available_prompts | |
else None, | |
multiselect=True, | |
interactive=True, | |
) | |
# with gr.Column(scale=1): | |
# Get available swarm types | |
swarm_types = [ | |
"SequentialWorkflow", | |
"ConcurrentWorkflow", | |
"AgentRearrange", | |
"MixtureOfAgents", | |
"SpreadSheetSwarm", | |
"auto", | |
] | |
agent_selector = gr.Dropdown( | |
label="Select Swarm", | |
choices=swarm_types, | |
value=swarm_types[0], | |
multiselect=False, | |
interactive=True, | |
) | |
# Flow configuration components for AgentRearrange | |
with gr.Column(visible=False) as flow_config: | |
flow_text = gr.Textbox( | |
label="Agent Flow Configuration", | |
placeholder="Enter agent flow !", | |
lines=2, | |
) | |
gr.Markdown( | |
""" | |
**Flow Configuration Help:** | |
- Enter agent names separated by ' -> ' | |
- Example: Agent1 -> Agent2 -> Agent3 | |
- Use exact agent names from the prompts above | |
""" | |
) | |
# Create Agent Prompt Section | |
with gr.Accordion( | |
"Create Agent Prompt", open=False | |
) as create_prompt_accordion: | |
with gr.Row(): | |
with gr.Column(): | |
new_agent_name_input = gr.Textbox( | |
label="New Agent Name" | |
) | |
with gr.Column(): | |
new_agent_prompt_input = ( | |
gr.Textbox( | |
label="New Agent Prompt", | |
lines=3, | |
) | |
) | |
with gr.Row(): | |
with gr.Column(): | |
create_agent_button = gr.Button( | |
"Save New Prompt" | |
) | |
with gr.Column(): | |
create_agent_status = gr.Textbox( | |
label="Status", | |
interactive=False, | |
) | |
# with gr.Row(): | |
# temperature_slider = gr.Slider( | |
# label="Temperature", | |
# minimum=0, | |
# maximum=1, | |
# value=0.1, | |
# step=0.01 | |
# ) | |
# Hidden textbox to store API Key | |
env_api_key_textbox = gr.Textbox( | |
value="", visible=False | |
) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
run_button = gr.Button( | |
"Run Task", variant="primary" | |
) | |
cancel_button = gr.Button( | |
"Cancel", variant="secondary" | |
) | |
with gr.Column(scale=1): | |
with gr.Row(): | |
loading_status = gr.Textbox( | |
label="Status", | |
value="Ready", | |
interactive=False, | |
) | |
# Add loading indicator and status | |
with gr.Row(): | |
agent_output_display = gr.Textbox( | |
label="Agent Responses", | |
placeholder="Responses will appear here...", | |
interactive=False, | |
lines=10, | |
) | |
with gr.Row(): | |
log_display = gr.Textbox( | |
label="Logs", | |
placeholder="Logs will be displayed here...", | |
interactive=False, | |
lines=5, | |
visible=False, | |
) | |
error_display = gr.Textbox( | |
label="Error", | |
placeholder="Errors will be displayed here...", | |
interactive=False, | |
lines=5, | |
visible=False, | |
) | |
def update_agent_dropdown(): | |
"""Update agent dropdown when a new agent is added""" | |
global AGENT_PROMPTS | |
AGENT_PROMPTS = load_prompts_from_json() | |
available_prompts = ( | |
list(AGENT_PROMPTS.keys()) | |
if AGENT_PROMPTS | |
else ["No agents available"] | |
) | |
return gr.update( | |
choices=available_prompts, | |
value=available_prompts[0] | |
if available_prompts | |
else None, | |
) | |
def update_ui_for_swarm_type(swarm_type): | |
"""Update UI components based on selected swarm type.""" | |
is_agent_rearrange = swarm_type == "AgentRearrange" | |
is_mixture = swarm_type == "MixtureOfAgents" | |
is_spreadsheet = swarm_type == "SpreadSheetSwarm" | |
max_loops = ( | |
5 if is_mixture or is_spreadsheet else 10 | |
) | |
# Return visibility state for flow configuration and max loops update | |
return ( | |
gr.update(visible=is_agent_rearrange), # For flow_config | |
gr.update( | |
maximum=max_loops | |
), # For max_loops_slider | |
f"Selected {swarm_type}", # For loading_status | |
) | |
def update_model_dropdown(provider): | |
"""Update model dropdown based on selected provider.""" | |
models = filtered_models.get(provider, []) | |
return gr.update( | |
choices=models, | |
value=models[0] if models else None, | |
) | |
def save_new_agent_prompt(agent_name, agent_prompt): | |
"""Saves a new agent prompt to the JSON file.""" | |
try: | |
if not agent_name or not agent_prompt: | |
return ( | |
"Error: Agent name and prompt cannot be" | |
" empty." | |
) | |
if ( | |
not agent_name.isalnum() | |
and "_" not in agent_name | |
): | |
return ( | |
"Error : Agent name must be alphanumeric or" | |
" underscore(_) " | |
) | |
if "agent." + agent_name in AGENT_PROMPTS: | |
return "Error : Agent name already exists" | |
with open( | |
PROMPT_JSON_PATH, "r+", encoding="utf-8" | |
) as f: | |
try: | |
data = json.load(f) | |
except json.JSONDecodeError: | |
data = {} | |
data[agent_name] = { | |
"system_prompt": agent_prompt | |
} | |
f.seek(0) | |
json.dump(data, f, indent=4) | |
f.truncate() | |
return "New agent prompt saved successfully" | |
except Exception as e: | |
return f"Error saving agent prompt {str(e)}" | |
# In the run_task_wrapper function, modify the API key handling | |
async def run_task_wrapper( | |
task, | |
max_loops, | |
dynamic_temp, | |
swarm_type, | |
agent_prompt_selector, | |
flow_text, | |
provider, | |
model_name, | |
api_key, | |
temperature, | |
max_tokens, | |
): | |
"""Execute the task and update the UI with progress.""" | |
try: | |
# Update status | |
yield "Processing...", "Running task...", "", gr.update(visible=False), gr.update(visible=False) | |
# Prepare flow for AgentRearrange | |
flow = None | |
if swarm_type == "AgentRearrange": | |
if not flow_text: | |
yield ( | |
"Please provide the agent flow" | |
" configuration.", | |
"Error: Flow not configured", | |
"", | |
gr.update(visible=True), | |
gr.update(visible=False) | |
) | |
return | |
flow = flow_text | |
print( | |
f"Flow string: {flow}" | |
) # Debug: Print flow string | |
# save api keys in memory | |
api_keys[provider] = api_key | |
agents = initialize_agents( | |
dynamic_temp, | |
agent_prompt_selector, | |
model_name, | |
provider, | |
api_keys.get(provider), # Access API key from the dictionary | |
temperature, | |
max_tokens, | |
) | |
print( | |
"Agents passed to SwarmRouter:" | |
f" {[agent.agent_name for agent in agents]}" | |
) # Debug: Print agent list | |
# Convert agent list to dictionary | |
agents_dict = { | |
agent.agent_name: agent for agent in agents | |
} | |
# Execute task | |
async for result, router, error in execute_task( | |
task=task, | |
max_loops=max_loops, | |
dynamic_temp=dynamic_temp, | |
swarm_type=swarm_type, | |
agent_keys=agent_prompt_selector, | |
flow=flow, | |
model_name=model_name, | |
provider=provider, | |
api_key=api_keys.get(provider), # Pass the api key from memory | |
temperature=temperature, | |
max_tokens=max_tokens, | |
agents=agents_dict, # Changed here | |
log_display=log_display, | |
error_display = error_display | |
): | |
if error: | |
yield f"Error: {error}", f"Error: {error}", "", gr.update(visible=True), gr.update(visible=True) | |
return | |
if result is not None: | |
formatted_output = format_output(result, swarm_type, error_display) | |
yield formatted_output, "Completed", api_key, gr.update(visible=False), gr.update(visible=False) | |
return | |
except Exception as e: | |
yield f"Error: {str(e)}", f"Error: {str(e)}", "", gr.update(visible=True), gr.update(visible=True) | |
return | |
# Connect the update functions | |
agent_selector.change( | |
fn=update_ui_for_swarm_type, | |
inputs=[agent_selector], | |
outputs=[ | |
flow_config, | |
max_loops_slider, | |
loading_status, | |
], | |
) | |
provider_dropdown.change( | |
fn=update_model_dropdown, | |
inputs=[provider_dropdown], | |
outputs=[model_dropdown], | |
) | |
# Event for creating new agent prompts | |
create_agent_button.click( | |
fn=save_new_agent_prompt, | |
inputs=[new_agent_name_input, new_agent_prompt_input], | |
outputs=[create_agent_status], | |
).then( | |
fn=update_agent_dropdown, | |
inputs=None, | |
outputs=[agent_prompt_selector], | |
) | |
# Create event trigger | |
# Create event trigger for run button | |
run_event = run_button.click( | |
fn=run_task_wrapper, | |
inputs=[ | |
task_input, | |
max_loops_slider, | |
dynamic_slider, | |
agent_selector, | |
agent_prompt_selector, | |
flow_text, | |
provider_dropdown, | |
model_dropdown, | |
api_key_input, | |
max_tokens_slider | |
], | |
outputs=[ | |
agent_output_display, | |
loading_status, | |
env_api_key_textbox, | |
error_display, | |
log_display, | |
], | |
) | |
# Connect cancel button to interrupt processing | |
def cancel_task(): | |
return "Task cancelled.", "Cancelled", "", gr.update(visible=False), gr.update(visible=False) | |
cancel_button.click( | |
fn=cancel_task, | |
inputs=None, | |
outputs=[ | |
agent_output_display, | |
loading_status, | |
env_api_key_textbox, | |
error_display, | |
log_display | |
], | |
cancels=run_event, | |
) | |
with gr.Column(scale=1): # Right column | |
with gr.Tabs(): | |
with gr.Tab("Agent Details"): | |
ui.create_agent_details_tab() | |
with gr.Tab("Logs"): | |
logs_display = ui.create_logs_tab() | |
def update_logs_display(): | |
"""Update logs display with current logs.""" | |
return "" | |
# Update logs when tab is selected | |
logs_tab = gr.Tab("Logs") | |
logs_tab.select( | |
fn=update_logs_display, | |
inputs=None, | |
outputs=[logs_display], | |
) | |
return ui.build() | |
# if __name__ == "__main__": | |
# app = create_app() | |
# app.launch() |