import os from dotenv import load_dotenv from typing import AsyncGenerator, List, Dict, Any, Tuple, Optional import json import time import asyncio import gradio as gr from swarms.structs.agent import Agent from swarms.structs.swarm_router import SwarmRouter from swarms.utils.loguru_logger import initialize_logger import re import csv # Import the csv module for csv parsing from swarms.utils.litellm_wrapper import LiteLLM from litellm import models_by_provider from dotenv import set_key, find_dotenv import logging # Import the logging module import litellm # Import litellm exception # Initialize logger load_dotenv() # Initialize logger logger = initialize_logger(log_folder="swarm_ui") # Define the path to agent_prompts.json PROMPT_JSON_PATH = os.path.join( os.path.dirname(os.path.abspath(__file__)), "agent_prompts.json" ) logger.info(f"Loading prompts from: {PROMPT_JSON_PATH}") # Load prompts first so its available for create_app def load_prompts_from_json() -> Dict[str, str]: try: if not os.path.exists(PROMPT_JSON_PATH): # Load default prompts return { "Agent-Data_Extractor": "You are a data extraction agent...", "Agent-Summarizer": "You are a summarization agent...", "Agent-Onboarding_Agent": "You are an onboarding agent...", } with open(PROMPT_JSON_PATH, "r", encoding="utf-8") as f: try: data = json.load(f) except json.JSONDecodeError: # Load default prompts return { "Agent-Data_Extractor": "You are a data extraction agent...", "Agent-Summarizer": "You are a summarization agent...", "Agent-Onboarding_Agent": "You are an onboarding agent...", } if not isinstance(data, dict): # Load default prompts return { "Agent-Data_Extractor": "You are a data extraction agent...", "Agent-Summarizer": "You are a summarization agent...", "Agent-Onboarding_Agent": "You are an onboarding agent...", } prompts = {} for agent_name, details in data.items(): if ( not isinstance(details, dict) or "system_prompt" not in details ): continue prompts[agent_name] = details["system_prompt"] if not prompts: # Load default prompts return { "Agent-Data_Extractor": "You are a data extraction agent...", "Agent-Summarizer": "You are a summarization agent...", "Agent-Onboarding_Agent": "You are an onboarding agent...", } return prompts except Exception: # Load default prompts return { "Agent-Data_Extractor": "You are a data extraction agent...", "Agent-Summarizer": "You are a summarization agent...", "Agent-Onboarding_Agent": "You are an onboarding agent...", } AGENT_PROMPTS = load_prompts_from_json() api_keys = {} def initialize_agents( dynamic_temp: float, agent_keys: List[str], model_name: str, provider: str, api_key: str, temperature: float, max_tokens: int, ) -> List[Agent]: logger.info("Initializing agents...") agents = [] seen_names = set() try: for agent_key in agent_keys: if agent_key not in AGENT_PROMPTS: raise ValueError(f"Invalid agent key: {agent_key}") agent_prompt = AGENT_PROMPTS[agent_key] agent_name = agent_key # Ensure unique agent names base_name = agent_name counter = 1 while agent_name in seen_names: agent_name = f"{base_name}_{counter}" counter += 1 seen_names.add(agent_name) # Set API key using os.environ temporarily if provider == "openai": os.environ["OPENAI_API_KEY"] = api_key elif provider == "anthropic": os.environ["ANTHROPIC_API_KEY"] = api_key elif provider == "cohere": os.environ["COHERE_API_KEY"] = api_key elif provider == "gemini": os.environ["GEMINI_API_KEY"] = api_key elif provider == "mistral": os.environ["MISTRAL_API_KEY"] = api_key elif provider == "groq": os.environ["GROQ_API_KEY"] = api_key elif provider == "perplexity": os.environ["PERPLEXITY_API_KEY"] = api_key # Add other providers and their environment variable names as needed # Create LiteLLM instance (Now it will read from os.environ) llm = LiteLLM( model_name=model_name, system_prompt=agent_prompt, temperature=temperature, max_tokens=max_tokens, ) agent = Agent( agent_name=agent_name, system_prompt=agent_prompt, llm=llm, max_loops=1, autosave=True, verbose=True, dynamic_temperature_enabled=True, saved_state_path=f"agent_{agent_name}.json", user_name="pe_firm", retry_attempts=1, context_length=200000, output_type="string", # here is the output type which is string temperature=dynamic_temp, ) print(f"Agent created: {agent.agent_name}") agents.append(agent) logger.info(f"Agents initialized successfully: {[agent.agent_name for agent in agents]}") return agents except Exception as e: logger.error(f"Error initializing agents: {e}", exc_info=True) raise def validate_flow(flow, agents_dict): logger.info(f"Validating flow: {flow}") agent_names = flow.split("->") for agent in agent_names: agent = agent.strip() if agent not in agents_dict: logger.error(f"Agent '{agent}' specified in the flow does not exist.") raise ValueError( f"Agent '{agent}' specified in the flow does not exist." ) logger.info(f"Flow validated successfully: {flow}") class TaskExecutionError(Exception): """Custom exception for task execution errors.""" def __init__(self, message: str): self.message = message super().__init__(self.message) def __str__(self): return f"TaskExecutionError: {self.message}" async def execute_task( task: str, max_loops: int, dynamic_temp: float, swarm_type: str, agent_keys: List[str], flow: str = None, model_name: str = "gpt-4o", provider: str = "openai", api_key: str = None, temperature: float = 0.5, max_tokens: int = 4000, agents: dict = None, log_display=None, error_display=None ) -> AsyncGenerator[Tuple[Any, Optional["SwarmRouter"], str], None]: # Changed the return type here logger.info(f"Executing task: {task} with swarm type: {swarm_type}") try: if not task: logger.error("Task description is missing.") yield "Please provide a task description.", gr.update(visible=True), "" return if not agent_keys: logger.error("No agents selected.") yield "Please select at least one agent.", gr.update(visible=True), "" return if not provider: logger.error("Provider is missing.") yield "Please select a provider.", gr.update(visible=True), "" return if not model_name: logger.error("Model is missing.") yield "Please select a model.", gr.update(visible=True), "" return if not api_key: logger.error("API Key is missing.") yield "Please enter an API Key.", gr.update(visible=True), "" return # Initialize agents try: if not agents: agents = initialize_agents( dynamic_temp, agent_keys, model_name, provider, api_key, temperature, max_tokens, ) except Exception as e: logger.error(f"Error initializing agents: {e}", exc_info=True) yield f"Error initializing agents: {e}", gr.update(visible=True), "" return # Swarm-specific configurations router_kwargs = { "name": "multi-agent-workflow", "description": f"Executing {swarm_type} workflow", "max_loops": max_loops, "agents": list(agents.values()), "autosave": True, "return_json": True, "output_type": "string", # Default output type "swarm_type": swarm_type, # Pass swarm_type here } if swarm_type == "AgentRearrange": if not flow: logger.error("Flow configuration is missing for AgentRearrange.") yield "Flow configuration is required for AgentRearrange", gr.update(visible=True), "" return # Generate unique agent names in the flow flow_agents = [] used_agent_names = set() for agent_key in flow.split("->"): agent_key = agent_key.strip() base_agent_name = agent_key count = 1 while agent_key in used_agent_names: agent_key = f"{base_agent_name}_{count}" count += 1 used_agent_names.add(agent_key) flow_agents.append(agent_key) # Update the flow string with unique names flow = " -> ".join(flow_agents) logger.info(f"Updated Flow string: {flow}") router_kwargs["flow"] = flow router_kwargs["output_type"] = "string" # Changed output type here if swarm_type == "MixtureOfAgents": if len(agents) < 2: logger.error("MixtureOfAgents requires at least 2 agents.") yield "MixtureOfAgents requires at least 2 agents", gr.update(visible=True), "" return if swarm_type == "SequentialWorkflow": if len(agents) < 2: logger.error("SequentialWorkflow requires at least 2 agents.") yield "SequentialWorkflow requires at least 2 agents", gr.update(visible=True), "" return if swarm_type == "ConcurrentWorkflow": pass if swarm_type == "SpreadSheetSwarm": pass if swarm_type == "auto": pass # Create and execute SwarmRouter try: timeout = ( 450 if swarm_type != "SpreadSheetSwarm" else 900 ) # SpreadSheetSwarm will have different timeout. if swarm_type == "AgentRearrange": from swarms.structs.rearrange import AgentRearrange router = AgentRearrange( agents=list(agents.values()), flow=flow, max_loops=max_loops, name="multi-agent-workflow", description=f"Executing {swarm_type} workflow", # autosave=True, return_json=True, output_type="string", # Changed output type according to agent rearrange ) result = router(task) # Changed run method logger.info(f"AgentRearrange task executed successfully.") yield result, None, "" return # For other swarm types use the SwarmRouter and its run method router = SwarmRouter(**router_kwargs) # Initialize SwarmRouter if swarm_type == "ConcurrentWorkflow": async def run_agent_task(agent, task_): return agent.run(task_) tasks = [ run_agent_task(agent, task) for agent in list(agents.values()) ] responses = await asyncio.gather(*tasks) result = {} for agent, response in zip(list(agents.values()), responses): result[agent.agent_name] = response # Convert the result to JSON string for parsing result = json.dumps( { "input" : { "swarm_id" : "concurrent_workflow_swarm_id", "name" : "ConcurrentWorkflow", "flow" : "->".join([agent.agent_name for agent in list(agents.values())]) }, "time" : time.time(), "outputs" : [ { "agent_name": agent_name, "steps" : [{"role":"assistant", "content":response}] } for agent_name, response in result.items() ] } ) logger.info(f"ConcurrentWorkflow task executed successfully.") yield result, None, "" return elif swarm_type == "auto": result = await asyncio.wait_for( asyncio.to_thread(router.run, task), timeout=timeout ) if isinstance(result,dict): result = json.dumps( { "input" : { "swarm_id" : "auto_swarm_id", "name" : "AutoSwarm", "flow" : "->".join([agent.agent_name for agent in list(agents.values())]) }, "time" : time.time(), "outputs" : [ { "agent_name": agent.agent_name, "steps" : [{"role":"assistant", "content":response}] } for agent, response in result.items() ] } ) elif isinstance(result, str): result = json.dumps( { "input" : { "swarm_id" : "auto_swarm_id", "name" : "AutoSwarm", "flow" : "->".join([agent.agent_name for agent in list(agents.values())]) }, "time" : time.time(), "outputs" : [ { "agent_name": "auto", "steps" : [{"role":"assistant", "content":result}] } ] } ) else : logger.error("Auto Swarm returned an unexpected type") yield "Error : Auto Swarm returned an unexpected type", gr.update(visible=True), "" return logger.info(f"Auto task executed successfully.") yield result, None, "" return else: result = await asyncio.wait_for( asyncio.to_thread(router.run, task), timeout=timeout ) logger.info(f"{swarm_type} task executed successfully.") yield result, None, "" return except asyncio.TimeoutError as e: logger.error(f"Task execution timed out after {timeout} seconds", exc_info=True) yield f"Task execution timed out after {timeout} seconds", gr.update(visible=True), "" return except litellm.exceptions.APIError as e: # Catch litellm APIError logger.error(f"LiteLLM API Error: {e}", exc_info=True) yield f"LiteLLM API Error: {e}", gr.update(visible=True), "" return except litellm.exceptions.AuthenticationError as e: # Catch litellm AuthenticationError logger.error(f"LiteLLM Authentication Error: {e}", exc_info=True) yield f"LiteLLM Authentication Error: {e}", gr.update(visible=True), "" return except Exception as e: logger.error(f"Error executing task: {e}", exc_info=True) yield f"Error executing task: {e}", gr.update(visible=True), "" return except TaskExecutionError as e: logger.error(f"Task execution error: {e}") yield str(e), gr.update(visible=True), "" return except Exception as e: logger.error(f"An unexpected error occurred: {e}", exc_info=True) yield f"An unexpected error occurred: {e}", gr.update(visible=True), "" return finally: logger.info(f"Task execution finished for: {task} with swarm type: {swarm_type}") def format_output(data:Optional[str], swarm_type:str, error_display=None) -> str: if data is None: return "Error : No output from the swarm." if swarm_type == "AgentRearrange": return parse_agent_rearrange_output(data, error_display) elif swarm_type == "MixtureOfAgents": return parse_mixture_of_agents_output(data, error_display) elif swarm_type in ["SequentialWorkflow", "ConcurrentWorkflow"]: return parse_sequential_workflow_output(data, error_display) elif swarm_type == "SpreadSheetSwarm": if os.path.exists(data): return parse_spreadsheet_swarm_output(data, error_display) else: return data # Directly return JSON response elif swarm_type == "auto": return parse_auto_swarm_output(data, error_display) else: return "Unsupported swarm type." def parse_mixture_of_agents_data(data: dict, error_display=None) -> str: """Parses the MixtureOfAgents output data and formats it for display.""" logger.info("Parsing MixtureOfAgents data within Auto Swarm output...") try: output = "" if "InputConfig" in data and isinstance(data["InputConfig"], dict): input_config = data["InputConfig"] output += f"Mixture of Agents Workflow Details\n\n" output += f"Name: `{input_config.get('name', 'N/A')}`\n" output += ( f"Description:" f" `{input_config.get('description', 'N/A')}`\n\n---\n" ) output += f"Agent Task Execution\n\n" for agent in input_config.get("agents", []): output += ( f"Agent: `{agent.get('agent_name', 'N/A')}`\n" ) if "normal_agent_outputs" in data and isinstance( data["normal_agent_outputs"], list ): for i, agent_output in enumerate( data["normal_agent_outputs"], start=3 ): agent_name = agent_output.get("agent_name", "N/A") output += f"Run {(3 - i)} (Agent: `{agent_name}`)\n\n" for j, step in enumerate( agent_output.get("steps", []), start=3 ): if ( isinstance(step, dict) and "role" in step and "content" in step and step["role"].strip() != "System:" ): content = step["content"] output += f"Step {(3 - j)}: \n" output += f"Response:\n {content}\n\n" if "aggregator_agent_summary" in data: output += ( f"\nAggregated Summary :\n" f"{data['aggregator_agent_summary']}\n{'=' * 50}\n" ) logger.info("MixtureOfAgents data parsed successfully within Auto Swarm.") return output except Exception as e: logger.error( f"Error during parsing MixtureOfAgents data within Auto Swarm: {e}", exc_info=True, ) return f"Error during parsing: {str(e)}" def parse_auto_swarm_output(data: Optional[str], error_display=None) -> str: """Parses the auto swarm output string and formats it for display.""" logger.info("Parsing Auto Swarm output...") if data is None: logger.error("No data provided for parsing Auto Swarm output.") return "Error: No data provided for parsing." print(f"Raw data received for parsing:\n{data}") # Debug: Print raw data try: parsed_data = json.loads(data) errors = [] # Basic structure validation if ( "input" not in parsed_data or not isinstance(parsed_data.get("input"), dict) ): errors.append( "Error: 'input' data is missing or not a dictionary." ) else: if "swarm_id" not in parsed_data["input"]: errors.append( "Error: 'swarm_id' key is missing in the 'input'." ) if "name" not in parsed_data["input"]: errors.append( "Error: 'name' key is missing in the 'input'." ) if "flow" not in parsed_data["input"]: errors.append( "Error: 'flow' key is missing in the 'input'." ) if "time" not in parsed_data: errors.append("Error: 'time' key is missing.") if errors: logger.error( f"Errors found while parsing Auto Swarm output: {errors}" ) return "\n".join(errors) swarm_id = parsed_data["input"]["swarm_id"] swarm_name = parsed_data["input"]["name"] agent_flow = parsed_data["input"]["flow"] overall_time = parsed_data["time"] output = f"Workflow Execution Details\n\n" output += f"Swarm ID: `{swarm_id}`\n" output += f"Swarm Name: `{swarm_name}`\n" output += f"Agent Flow: `{agent_flow}`\n\n---\n" output += f"Agent Task Execution\n\n" # Handle nested MixtureOfAgents data or other swarm type data if ( "outputs" in parsed_data and isinstance(parsed_data["outputs"], list) and parsed_data["outputs"] and isinstance(parsed_data["outputs"][0], dict) ): if parsed_data["outputs"][0].get("agent_name") == "auto": mixture_data = parsed_data["outputs"][0].get("steps", []) if mixture_data and isinstance(mixture_data[0], dict) and "content" in mixture_data[0]: try: mixture_content = json.loads(mixture_data[0]["content"]) output += parse_mixture_of_agents_data(mixture_content) except json.JSONDecodeError as e: logger.error(f"Error decoding nested MixtureOfAgents data: {e}", exc_info=True) return f"Error decoding nested MixtureOfAgents data: {e}" else: for i, agent_output in enumerate(parsed_data["outputs"], start=3): if not isinstance(agent_output, dict): errors.append(f"Error: Agent output at index {i} is not a dictionary") continue if "agent_name" not in agent_output: errors.append(f"Error: 'agent_name' key is missing at index {i}") continue if "steps" not in agent_output: errors.append(f"Error: 'steps' key is missing at index {i}") continue if agent_output["steps"] is None: errors.append(f"Error: 'steps' data is None at index {i}") continue if not isinstance(agent_output["steps"], list): errors.append(f"Error: 'steps' data is not a list at index {i}") continue agent_name = agent_output["agent_name"] output += f"Run {(3-i)} (Agent: `{agent_name}`)\n\n" # Iterate over steps for j, step in enumerate(agent_output["steps"], start=3): if not isinstance(step, dict): errors.append(f"Error: step at index {j} is not a dictionary at {i} agent output.") continue if step is None: errors.append(f"Error: step at index {j} is None at {i} agent output") continue if "role" not in step: errors.append(f"Error: 'role' key missing at step {j} at {i} agent output.") continue if "content" not in step: errors.append(f"Error: 'content' key missing at step {j} at {i} agent output.") continue if step["role"].strip() != "System:": # Filter out system prompts content = step["content"] output += f"Step {(3-j)}:\n" output += f"Response : {content}\n\n" else: logger.error("Error: 'outputs' data is not in the expected format.") return "Error: 'outputs' data is not in the expected format." output += f"Overall Completion Time: `{overall_time}`" if errors: logger.error( f"Errors found while parsing Auto Swarm output: {errors}" ) return "\n".join(errors) logger.info("Auto Swarm output parsed successfully.") return output except json.JSONDecodeError as e: logger.error( f"Error during parsing Auto Swarm output: {e}", exc_info=True ) return f"Error during parsing json.JSONDecodeError: {e}" except Exception as e: logger.error( f"Error during parsing Auto Swarm output: {e}", exc_info=True ) return f"Error during parsing: {str(e)}" def parse_agent_rearrange_output(data: Optional[str], error_display=None) -> str: """ Parses the AgentRearrange output string and formats it for display. """ logger.info("Parsing AgentRearrange output...") if data is None: logger.error("No data provided for parsing AgentRearrange output.") return "Error: No data provided for parsing." print( f"Raw data received for parsing:\n{data}" ) # Debug: Print raw data try: parsed_data = json.loads(data) errors = [] if ( "input" not in parsed_data or not isinstance(parsed_data.get("input"), dict) ): errors.append( "Error: 'input' data is missing or not a dictionary." ) else: if "swarm_id" not in parsed_data["input"]: errors.append( "Error: 'swarm_id' key is missing in the 'input'." ) if "name" not in parsed_data["input"]: errors.append( "Error: 'name' key is missing in the 'input'." ) if "flow" not in parsed_data["input"]: errors.append( "Error: 'flow' key is missing in the 'input'." ) if "time" not in parsed_data: errors.append("Error: 'time' key is missing.") if errors: logger.error(f"Errors found while parsing AgentRearrange output: {errors}") return "\n".join(errors) swarm_id = parsed_data["input"]["swarm_id"] swarm_name = parsed_data["input"]["name"] agent_flow = parsed_data["input"]["flow"] overall_time = parsed_data["time"] output = f"Workflow Execution Details\n\n" output += f"Swarm ID: `{swarm_id}`\n" output += f"Swarm Name: `{swarm_name}`\n" output += f"Agent Flow: `{agent_flow}`\n\n---\n" output += f"Agent Task Execution\n\n" if "outputs" not in parsed_data: errors.append("Error: 'outputs' key is missing") elif parsed_data["outputs"] is None: errors.append("Error: 'outputs' data is None") elif not isinstance(parsed_data["outputs"], list): errors.append("Error: 'outputs' data is not a list.") elif not parsed_data["outputs"]: errors.append("Error: 'outputs' list is empty.") if errors: logger.error(f"Errors found while parsing AgentRearrange output: {errors}") return "\n".join(errors) for i, agent_output in enumerate( parsed_data["outputs"], start=3 ): if not isinstance(agent_output, dict): errors.append( f"Error: Agent output at index {i} is not a" " dictionary" ) continue if "agent_name" not in agent_output: errors.append( f"Error: 'agent_name' key is missing at index {i}" ) continue if "steps" not in agent_output: errors.append( f"Error: 'steps' key is missing at index {i}" ) continue if agent_output["steps"] is None: errors.append( f"Error: 'steps' data is None at index {i}" ) continue if not isinstance(agent_output["steps"], list): errors.append( f"Error: 'steps' data is not a list at index {i}" ) continue if not agent_output["steps"]: errors.append( f"Error: 'steps' list is empty at index {i}" ) continue agent_name = agent_output["agent_name"] output += f"Run {(3-i)} (Agent: `{agent_name}`)**\n\n" # output += "
\nShow/Hide Agent Steps\n\n" # Iterate over steps for j, step in enumerate(agent_output["steps"], start=3): if not isinstance(step, dict): errors.append( f"Error: step at index {j} is not a dictionary" f" at {i} agent output." ) continue if step is None: errors.append( f"Error: step at index {j} is None at {i} agent" " output" ) continue if "role" not in step: errors.append( f"Error: 'role' key missing at step {j} at {i}" " agent output." ) continue if "content" not in step: errors.append( f"Error: 'content' key missing at step {j} at" f" {i} agent output." ) continue if step["role"].strip() != "System:": # Filter out system prompts # role = step["role"] content = step["content"] output += f"Step {(3-j)}: \n" output += f"Response :\n {content}\n\n" # output += "
\n\n---\n" output += f"Overall Completion Time: `{overall_time}`" if errors: logger.error(f"Errors found while parsing AgentRearrange output: {errors}") return "\n".join(errors) else: logger.info("AgentRearrange output parsed successfully.") return output except json.JSONDecodeError as e: logger.error(f"Error during parsing AgentRearrange output: {e}", exc_info=True) return f"Error during parsing: json.JSONDecodeError {e}" except Exception as e: logger.error(f"Error during parsing AgentRearrange output: {e}", exc_info=True) return f"Error during parsing: {str(e)}" def parse_mixture_of_agents_output(data: Optional[str], error_display=None) -> str: """Parses the MixtureOfAgents output string and formats it for display.""" logger.info("Parsing MixtureOfAgents output...") if data is None: logger.error("No data provided for parsing MixtureOfAgents output.") return "Error: No data provided for parsing." print(f"Raw data received for parsing:\n{data}") # Debug: Print raw data try: parsed_data = json.loads(data) if "InputConfig" not in parsed_data or not isinstance(parsed_data["InputConfig"], dict): logger.error("Error: 'InputConfig' data is missing or not a dictionary.") return "Error: 'InputConfig' data is missing or not a dictionary." if "name" not in parsed_data["InputConfig"]: logger.error("Error: 'name' key is missing in 'InputConfig'.") return "Error: 'name' key is missing in 'InputConfig'." if "description" not in parsed_data["InputConfig"]: logger.error("Error: 'description' key is missing in 'InputConfig'.") return "Error: 'description' key is missing in 'InputConfig'." if "agents" not in parsed_data["InputConfig"] or not isinstance(parsed_data["InputConfig"]["agents"], list) : logger.error("Error: 'agents' key is missing in 'InputConfig' or not a list.") return "Error: 'agents' key is missing in 'InputConfig' or not a list." name = parsed_data["InputConfig"]["name"] description = parsed_data["InputConfig"]["description"] output = f"Mixture of Agents Workflow Details\n\n" output += f"Name: `{name}`\n" output += f"Description: `{description}`\n\n---\n" output += f"Agent Task Execution\n\n" for agent in parsed_data["InputConfig"]["agents"]: if not isinstance(agent, dict): logger.error("Error: agent is not a dict in InputConfig agents") return "Error: agent is not a dict in InputConfig agents" if "agent_name" not in agent: logger.error("Error: 'agent_name' key is missing in agents.") return "Error: 'agent_name' key is missing in agents." if "system_prompt" not in agent: logger.error("Error: 'system_prompt' key is missing in agents.") return f"Error: 'system_prompt' key is missing in agents." agent_name = agent["agent_name"] # system_prompt = agent["system_prompt"] output += f"Agent: `{agent_name}`\n" # output += f"* **System Prompt:** `{system_prompt}`\n\n" if "normal_agent_outputs" not in parsed_data or not isinstance(parsed_data["normal_agent_outputs"], list) : logger.error("Error: 'normal_agent_outputs' key is missing or not a list.") return "Error: 'normal_agent_outputs' key is missing or not a list." for i, agent_output in enumerate(parsed_data["normal_agent_outputs"], start=3): if not isinstance(agent_output, dict): logger.error(f"Error: agent output at index {i} is not a dictionary.") return f"Error: agent output at index {i} is not a dictionary." if "agent_name" not in agent_output: logger.error(f"Error: 'agent_name' key is missing at index {i}") return f"Error: 'agent_name' key is missing at index {i}" if "steps" not in agent_output: logger.error(f"Error: 'steps' key is missing at index {i}") return f"Error: 'steps' key is missing at index {i}" if agent_output["steps"] is None: logger.error(f"Error: 'steps' is None at index {i}") return f"Error: 'steps' is None at index {i}" if not isinstance(agent_output["steps"], list): logger.error(f"Error: 'steps' data is not a list at index {i}.") return f"Error: 'steps' data is not a list at index {i}." agent_name = agent_output["agent_name"] output += f"Run {(3-i)} (Agent: `{agent_name}`)\n\n" # output += "
\nShow/Hide Agent Steps\n\n" for j, step in enumerate(agent_output["steps"], start=3): if not isinstance(step, dict): logger.error(f"Error: step at index {j} is not a dictionary at {i} agent output.") return f"Error: step at index {j} is not a dictionary at {i} agent output." if step is None: logger.error(f"Error: step at index {j} is None at {i} agent output.") return f"Error: step at index {j} is None at {i} agent output." if "role" not in step: logger.error(f"Error: 'role' key missing at step {j} at {i} agent output.") return f"Error: 'role' key missing at step {j} at {i} agent output." if "content" not in step: logger.error(f"Error: 'content' key missing at step {j} at {i} agent output.") return f"Error: 'content' key missing at step {j} at {i} agent output." if step["role"].strip() != "System:": # Filter out system prompts # role = step["role"] content = step["content"] output += f"Step {(3-j)}: \n" output += f"Response:\n {content}\n\n" # output += "
\n\n---\n" if "aggregator_agent_summary" in parsed_data: output += f"\nAggregated Summary :\n{parsed_data['aggregator_agent_summary']}\n{'=' * 50}\n" logger.info("MixtureOfAgents output parsed successfully.") return output except json.JSONDecodeError as e: logger.error(f"Error during parsing MixtureOfAgents output: {e}", exc_info=True) return f"Error during parsing json.JSONDecodeError : {e}" except Exception as e: logger.error(f"Error during parsing MixtureOfAgents output: {e}", exc_info=True) return f"Error during parsing: {str(e)}" def parse_sequential_workflow_output(data: Optional[str], error_display=None) -> str: """Parses the SequentialWorkflow output string and formats it for display.""" logger.info("Parsing SequentialWorkflow output...") if data is None: logger.error("No data provided for parsing SequentialWorkflow output.") return "Error: No data provided for parsing." print(f"Raw data received for parsing:\n{data}") # Debug: Print raw data try: parsed_data = json.loads(data) if "input" not in parsed_data or not isinstance(parsed_data.get("input"), dict): logger.error("Error: 'input' data is missing or not a dictionary.") return "Error: 'input' data is missing or not a dictionary." if "swarm_id" not in parsed_data["input"] : logger.error("Error: 'swarm_id' key is missing in the 'input'.") return "Error: 'swarm_id' key is missing in the 'input'." if "name" not in parsed_data["input"]: logger.error("Error: 'name' key is missing in the 'input'.") return "Error: 'name' key is missing in the 'input'." if "flow" not in parsed_data["input"]: logger.error("Error: 'flow' key is missing in the 'input'.") return "Error: 'flow' key is missing in the 'input'." if "time" not in parsed_data : logger.error("Error: 'time' key is missing.") return "Error: 'time' key is missing." swarm_id = parsed_data["input"]["swarm_id"] swarm_name = parsed_data["input"]["name"] agent_flow = parsed_data["input"]["flow"] overall_time = parsed_data["time"] output = f"Workflow Execution Details\n\n" output += f"Swarm ID: `{swarm_id}`\n" output += f"Swarm Name: `{swarm_name}`\n" output += f"Agent Flow: `{agent_flow}`\n\n---\n" output += f"Agent Task Execution\n\n" if "outputs" not in parsed_data: logger.error("Error: 'outputs' key is missing") return "Error: 'outputs' key is missing" if parsed_data["outputs"] is None: logger.error("Error: 'outputs' data is None") return "Error: 'outputs' data is None" if not isinstance(parsed_data["outputs"], list): logger.error("Error: 'outputs' data is not a list.") return "Error: 'outputs' data is not a list." for i, agent_output in enumerate(parsed_data["outputs"], start=3): if not isinstance(agent_output, dict): logger.error(f"Error: Agent output at index {i} is not a dictionary") return f"Error: Agent output at index {i} is not a dictionary" if "agent_name" not in agent_output: logger.error(f"Error: 'agent_name' key is missing at index {i}") return f"Error: 'agent_name' key is missing at index {i}" if "steps" not in agent_output: logger.error(f"Error: 'steps' key is missing at index {i}") return f"Error: 'steps' key is missing at index {i}" if agent_output["steps"] is None: logger.error(f"Error: 'steps' data is None at index {i}") return f"Error: 'steps' data is None at index {i}" if not isinstance(agent_output["steps"], list): logger.error(f"Error: 'steps' data is not a list at index {i}") return f"Error: 'steps' data is not a list at index {i}" agent_name = agent_output["agent_name"] output += f"Run {(3-i)} (Agent: `{agent_name}`)\n\n" # output += "
\nShow/Hide Agent Steps\n\n" # Iterate over steps for j, step in enumerate(agent_output["steps"], start=3): if not isinstance(step, dict): logger.error(f"Error: step at index {j} is not a dictionary at {i} agent output.") return f"Error: step at index {j} is not a dictionary at {i} agent output." if step is None: logger.error(f"Error: step at index {j} is None at {i} agent output") return f"Error: step at index {j} is None at {i} agent output" if "role" not in step: logger.error(f"Error: 'role' key missing at step {j} at {i} agent output.") return f"Error: 'role' key missing at step {j} at {i} agent output." if "content" not in step: logger.error(f"Error: 'content' key missing at step {j} at {i} agent output.") return f"Error: 'content' key missing at step {j} at {i} agent output." if step["role"].strip() != "System:": # Filter out system prompts # role = step["role"] content = step["content"] output += f"Step {(3-j)}:\n" output += f"Response : {content}\n\n" # output += "
\n\n---\n" output += f"Overall Completion Time: `{overall_time}`" logger.info("SequentialWorkflow output parsed successfully.") return output except json.JSONDecodeError as e : logger.error(f"Error during parsing SequentialWorkflow output: {e}", exc_info=True) return f"Error during parsing json.JSONDecodeError : {e}" except Exception as e: logger.error(f"Error during parsing SequentialWorkflow output: {e}", exc_info=True) return f"Error during parsing: {str(e)}" def parse_spreadsheet_swarm_output(file_path: str, error_display=None) -> str: """Parses the SpreadSheetSwarm output CSV file and formats it for display.""" logger.info("Parsing SpreadSheetSwarm output...") if not file_path: logger.error("No file path provided for parsing SpreadSheetSwarm output.") return "Error: No file path provided for parsing." print(f"Parsing spreadsheet output from: {file_path}") try: with open(file_path, 'r', encoding='utf-8') as file: csv_reader = csv.reader(file) header = next(csv_reader, None) # Read the header row if not header: logger.error("CSV file is empty or has no header.") return "Error: CSV file is empty or has no header" output = "### Spreadsheet Swarm Output ###\n\n" output += "| " + " | ".join(header) + " |\n" # Adding header output += "| " + " | ".join(["---"] * len(header)) + " |\n" # Adding header seperator for row in csv_reader: output += "| " + " | ".join(row) + " |\n" # Adding row output += "\n" logger.info("SpreadSheetSwarm output parsed successfully.") return output except FileNotFoundError as e: logger.error(f"Error during parsing SpreadSheetSwarm output: {e}", exc_info=True) return "Error: CSV file not found." except Exception as e: logger.error(f"Error during parsing SpreadSheetSwarm output: {e}", exc_info=True) return f"Error during parsing CSV file: {str(e)}" def parse_json_output(data:str, error_display=None) -> str: """Parses a JSON string and formats it for display.""" logger.info("Parsing JSON output...") if not data: logger.error("No data provided for parsing JSON output.") return "Error: No data provided for parsing." print(f"Parsing json output from: {data}") try: parsed_data = json.loads(data) output = "### Swarm Metadata ###\n\n" for key,value in parsed_data.items(): if key == "outputs": output += f"**{key}**:\n" if isinstance(value, list): for item in value: output += f" - Agent Name : {item.get('agent_name', 'N/A')}\n" output += f" Task : {item.get('task', 'N/A')}\n" output += f" Result : {item.get('result', 'N/A')}\n" output += f" Timestamp : {item.get('timestamp', 'N/A')}\n\n" else : output += f" {value}\n" else : output += f"**{key}**: {value}\n" logger.info("JSON output parsed successfully.") return output except json.JSONDecodeError as e: logger.error(f"Error during parsing JSON output: {e}", exc_info=True) return f"Error: Invalid JSON format - {e}" except Exception as e: logger.error(f"Error during parsing JSON output: {e}", exc_info=True) return f"Error during JSON parsing: {str(e)}" class UI: def __init__(self, theme): self.theme = theme self.blocks = gr.Blocks(theme=self.theme) self.components = {} # Dictionary to store UI components def create_markdown(self, text, is_header=False): if is_header: markdown = gr.Markdown( f"

{text}

" ) else: markdown = gr.Markdown( f"

{text}

" ) self.components[f"markdown_{text}"] = markdown return markdown def create_text_input(self, label, lines=3, placeholder=""): text_input = gr.Textbox( label=label, lines=lines, placeholder=placeholder, elem_classes=["custom-input"], ) self.components[f"text_input_{label}"] = text_input return text_input def create_slider( self, label, minimum=0, maximum=1, value=0.5, step=0.1 ): slider = gr.Slider( minimum=minimum, maximum=maximum, value=value, step=step, label=label, interactive=True, ) self.components[f"slider_{label}"] = slider return slider def create_dropdown( self, label, choices, value=None, multiselect=False ): if not choices: choices = ["No options available"] if value is None and choices: value = choices[0] if not multiselect else [choices[0]] dropdown = gr.Dropdown( label=label, choices=choices, value=value, interactive=True, multiselect=multiselect, ) self.components[f"dropdown_{label}"] = dropdown return dropdown def create_button(self, text, variant="primary"): button = gr.Button(text, variant=variant) self.components[f"button_{text}"] = button return button def create_text_output(self, label, lines=10, placeholder=""): text_output = gr.Textbox( label=label, interactive=False, placeholder=placeholder, lines=lines, elem_classes=["custom-output"], ) self.components[f"text_output_{label}"] = text_output return text_output def create_tab(self, label, content_function): with gr.Tab(label): content_function(self) def set_event_listener(self, button, function, inputs, outputs): button.click(function, inputs=inputs, outputs=outputs) def get_components(self, *keys): if not keys: return self.components # return all components return [self.components[key] for key in keys] def create_json_output(self, label, placeholder=""): json_output = gr.JSON( label=label, value={}, elem_classes=["custom-output"], ) self.components[f"json_output_{label}"] = json_output return json_output def build(self): return self.blocks def create_conditional_input( self, component, visible_when, watch_component ): """Create an input that's only visible under certain conditions""" watch_component.change( fn=lambda x: gr.update(visible=visible_when(x)), inputs=[watch_component], outputs=[component], ) @staticmethod def create_ui_theme(primary_color="red"): return gr.themes.Ocean( primary_hue=primary_color, secondary_hue=primary_color, neutral_hue="gray", ).set( body_background_fill="#20252c", body_text_color="#f0f0f0", button_primary_background_fill=primary_color, button_primary_text_color="#ffffff", button_secondary_background_fill=primary_color, button_secondary_text_color="#ffffff", shadow_drop="0px 2px 4px rgba(0, 0, 0, 0.3)", ) def create_agent_details_tab(self): """Create the agent details tab content.""" with gr.Column(): gr.Markdown("### Agent Details") gr.Markdown( """ **Available Agent Types:** - Data Extraction Agent: Specialized in extracting relevant information - Summary Agent - Analysis Agent: Performs detailed analysis of data **Swarm Types:** - ConcurrentWorkflow: Agents work in parallel - SequentialWorkflow: Agents work in sequence - AgentRearrange: Custom agent execution flow - MixtureOfAgents: Combines multiple agents with an aggregator - SpreadSheetSwarm: Specialized for spreadsheet operations - Auto: Automatically determines optimal workflow **Note:** Spreasheet swarm saves data in csv, will work in local setup ! """ ) return gr.Column() def create_logs_tab(self): """Create the logs tab content.""" with gr.Column(): gr.Markdown("### Execution Logs") logs_display = gr.Textbox( label="System Logs", placeholder="Execution logs will appear here...", interactive=False, lines=10, ) return logs_display def update_flow_agents(agent_keys): """Update flow agents based on selected agent prompts.""" if not agent_keys: return [], "No agents selected" agent_names = [key for key in agent_keys] print(f"Flow agents: {agent_names}") # Debug: Print flow agents return agent_names, "Select agents in execution order" def update_flow_preview(selected_flow_agents): """Update flow preview based on selected agents.""" if not selected_flow_agents: return "Flow will be shown here..." flow = " -> ".join(selected_flow_agents) return flow def create_app(): # Initialize UI theme = UI.create_ui_theme(primary_color="red") ui = UI(theme=theme) global AGENT_PROMPTS # Available providers and models providers = [ "openai", "anthropic", "cohere", "gemini", "mistral", "groq", "perplexity", ] filtered_models = {} for provider in providers: filtered_models[provider] = models_by_provider.get(provider, []) with ui.blocks: with gr.Row(): with gr.Column(scale=4): # Left column (80% width) ui.create_markdown("Swarms", is_header=True) ui.create_markdown( "The Enterprise-Grade Production-Ready Multi-Agent" " Orchestration Framework" ) with gr.Row(): with gr.Column(scale=4): with gr.Row(): task_input = gr.Textbox( label="Task Description", placeholder="Describe your task here...", lines=3, ) with gr.Row(): with gr.Column(scale=1): with gr.Row(): # Provider selection dropdown provider_dropdown = gr.Dropdown( label="Select Provider", choices=providers, value=providers[0] if providers else None, interactive=True, ) # with gr.Row(): # # Model selection dropdown (initially empty) model_dropdown = gr.Dropdown( label="Select Model", choices=[], interactive=True, ) with gr.Row(): # API key input api_key_input = gr.Textbox( label="API Key", placeholder="Enter your API key", type="password", ) with gr.Column(scale=1): with gr.Row(): dynamic_slider = gr.Slider( label="Dyn. Temp", minimum=0, maximum=1, value=0.1, step=0.01, ) # with gr.Row(): # max tokens slider max_loops_slider = gr.Slider( label="Max Loops", minimum=1, maximum=10, value=1, step=1, ) with gr.Row(): # max tokens slider max_tokens_slider = gr.Slider( label="Max Tokens", minimum=100, maximum=10000, value=4000, step=100, ) with gr.Column(scale=2, min_width=200): with gr.Column(scale=1): # Get available agent prompts available_prompts = ( list(AGENT_PROMPTS.keys()) if AGENT_PROMPTS else ["No agents available"] ) agent_prompt_selector = gr.Dropdown( label="Select Agent Prompts", choices=available_prompts, value=[available_prompts[0]] if available_prompts else None, multiselect=True, interactive=True, ) # with gr.Column(scale=1): # Get available swarm types swarm_types = [ "SequentialWorkflow", "ConcurrentWorkflow", "AgentRearrange", "MixtureOfAgents", "SpreadSheetSwarm", "auto", ] agent_selector = gr.Dropdown( label="Select Swarm", choices=swarm_types, value=swarm_types[0], multiselect=False, interactive=True, ) # Flow configuration components for AgentRearrange with gr.Column(visible=False) as flow_config: flow_text = gr.Textbox( label="Agent Flow Configuration", placeholder="Enter agent flow !", lines=2, ) gr.Markdown( """ **Flow Configuration Help:** - Enter agent names separated by ' -> ' - Example: Agent1 -> Agent2 -> Agent3 - Use exact agent names from the prompts above """ ) # Create Agent Prompt Section with gr.Accordion( "Create Agent Prompt", open=False ) as create_prompt_accordion: with gr.Row(): with gr.Column(): new_agent_name_input = gr.Textbox( label="New Agent Name" ) with gr.Column(): new_agent_prompt_input = ( gr.Textbox( label="New Agent Prompt", lines=3, ) ) with gr.Row(): with gr.Column(): create_agent_button = gr.Button( "Save New Prompt" ) with gr.Column(): create_agent_status = gr.Textbox( label="Status", interactive=False, ) # with gr.Row(): # temperature_slider = gr.Slider( # label="Temperature", # minimum=0, # maximum=1, # value=0.1, # step=0.01 # ) # Hidden textbox to store API Key env_api_key_textbox = gr.Textbox( value="", visible=False ) with gr.Row(): with gr.Column(scale=1): run_button = gr.Button( "Run Task", variant="primary" ) cancel_button = gr.Button( "Cancel", variant="secondary" ) with gr.Column(scale=1): with gr.Row(): loading_status = gr.Textbox( label="Status", value="Ready", interactive=False, ) # Add loading indicator and status with gr.Row(): agent_output_display = gr.Textbox( label="Agent Responses", placeholder="Responses will appear here...", interactive=False, lines=10, ) with gr.Row(): log_display = gr.Textbox( label="Logs", placeholder="Logs will be displayed here...", interactive=False, lines=5, visible=False, ) error_display = gr.Textbox( label="Error", placeholder="Errors will be displayed here...", interactive=False, lines=5, visible=False, ) def update_agent_dropdown(): """Update agent dropdown when a new agent is added""" global AGENT_PROMPTS AGENT_PROMPTS = load_prompts_from_json() available_prompts = ( list(AGENT_PROMPTS.keys()) if AGENT_PROMPTS else ["No agents available"] ) return gr.update( choices=available_prompts, value=available_prompts[0] if available_prompts else None, ) def update_ui_for_swarm_type(swarm_type): """Update UI components based on selected swarm type.""" is_agent_rearrange = swarm_type == "AgentRearrange" is_mixture = swarm_type == "MixtureOfAgents" is_spreadsheet = swarm_type == "SpreadSheetSwarm" max_loops = ( 5 if is_mixture or is_spreadsheet else 10 ) # Return visibility state for flow configuration and max loops update return ( gr.update(visible=is_agent_rearrange), # For flow_config gr.update( maximum=max_loops ), # For max_loops_slider f"Selected {swarm_type}", # For loading_status ) def update_model_dropdown(provider): """Update model dropdown based on selected provider.""" models = filtered_models.get(provider, []) return gr.update( choices=models, value=models[0] if models else None, ) def save_new_agent_prompt(agent_name, agent_prompt): """Saves a new agent prompt to the JSON file.""" try: if not agent_name or not agent_prompt: return ( "Error: Agent name and prompt cannot be" " empty." ) if ( not agent_name.isalnum() and "_" not in agent_name ): return ( "Error : Agent name must be alphanumeric or" " underscore(_) " ) if "agent." + agent_name in AGENT_PROMPTS: return "Error : Agent name already exists" with open( PROMPT_JSON_PATH, "r+", encoding="utf-8" ) as f: try: data = json.load(f) except json.JSONDecodeError: data = {} data[agent_name] = { "system_prompt": agent_prompt } f.seek(0) json.dump(data, f, indent=4) f.truncate() return "New agent prompt saved successfully" except Exception as e: return f"Error saving agent prompt {str(e)}" # In the run_task_wrapper function, modify the API key handling async def run_task_wrapper( task, max_loops, dynamic_temp, swarm_type, agent_prompt_selector, flow_text, provider, model_name, api_key, temperature, max_tokens, ): """Execute the task and update the UI with progress.""" try: # Update status yield "Processing...", "Running task...", "", gr.update(visible=False), gr.update(visible=False) # Prepare flow for AgentRearrange flow = None if swarm_type == "AgentRearrange": if not flow_text: yield ( "Please provide the agent flow" " configuration.", "Error: Flow not configured", "", gr.update(visible=True), gr.update(visible=False) ) return flow = flow_text print( f"Flow string: {flow}" ) # Debug: Print flow string # save api keys in memory api_keys[provider] = api_key agents = initialize_agents( dynamic_temp, agent_prompt_selector, model_name, provider, api_keys.get(provider), # Access API key from the dictionary temperature, max_tokens, ) print( "Agents passed to SwarmRouter:" f" {[agent.agent_name for agent in agents]}" ) # Debug: Print agent list # Convert agent list to dictionary agents_dict = { agent.agent_name: agent for agent in agents } # Execute task async for result, router, error in execute_task( task=task, max_loops=max_loops, dynamic_temp=dynamic_temp, swarm_type=swarm_type, agent_keys=agent_prompt_selector, flow=flow, model_name=model_name, provider=provider, api_key=api_keys.get(provider), # Pass the api key from memory temperature=temperature, max_tokens=max_tokens, agents=agents_dict, # Changed here log_display=log_display, error_display = error_display ): if error: yield f"Error: {error}", f"Error: {error}", "", gr.update(visible=True), gr.update(visible=True) return if result is not None: formatted_output = format_output(result, swarm_type, error_display) yield formatted_output, "Completed", api_key, gr.update(visible=False), gr.update(visible=False) return except Exception as e: yield f"Error: {str(e)}", f"Error: {str(e)}", "", gr.update(visible=True), gr.update(visible=True) return # Connect the update functions agent_selector.change( fn=update_ui_for_swarm_type, inputs=[agent_selector], outputs=[ flow_config, max_loops_slider, loading_status, ], ) provider_dropdown.change( fn=update_model_dropdown, inputs=[provider_dropdown], outputs=[model_dropdown], ) # Event for creating new agent prompts create_agent_button.click( fn=save_new_agent_prompt, inputs=[new_agent_name_input, new_agent_prompt_input], outputs=[create_agent_status], ).then( fn=update_agent_dropdown, inputs=None, outputs=[agent_prompt_selector], ) # Create event trigger # Create event trigger for run button run_event = run_button.click( fn=run_task_wrapper, inputs=[ task_input, max_loops_slider, dynamic_slider, agent_selector, agent_prompt_selector, flow_text, provider_dropdown, model_dropdown, api_key_input, max_tokens_slider ], outputs=[ agent_output_display, loading_status, env_api_key_textbox, error_display, log_display, ], ) # Connect cancel button to interrupt processing def cancel_task(): return "Task cancelled.", "Cancelled", "", gr.update(visible=False), gr.update(visible=False) cancel_button.click( fn=cancel_task, inputs=None, outputs=[ agent_output_display, loading_status, env_api_key_textbox, error_display, log_display ], cancels=run_event, ) with gr.Column(scale=1): # Right column with gr.Tabs(): with gr.Tab("Agent Details"): ui.create_agent_details_tab() with gr.Tab("Logs"): logs_display = ui.create_logs_tab() def update_logs_display(): """Update logs display with current logs.""" return "" # Update logs when tab is selected logs_tab = gr.Tab("Logs") logs_tab.select( fn=update_logs_display, inputs=None, outputs=[logs_display], ) return ui.build() # if __name__ == "__main__": # app = create_app() # app.launch()