from typing import Any, Callable, Dict, Optional, Sequence from swarms.structs.base_swarm import BaseSwarm from swarms.utils.loguru_logger import logger class AutoSwarmRouter(BaseSwarm): """AutoSwarmRouter class represents a router for the AutoSwarm class. This class is responsible for routing tasks to the appropriate swarm based on the provided name. It allows customization of the preprocessing, routing, and postprocessing of tasks. Attributes: name (str): The name of the router. description (str): The description of the router. verbose (bool): Whether to enable verbose mode. custom_params (dict): Custom parameters for the router. swarms (list): A list of BaseSwarm objects. custom_preprocess (callable): Custom preprocessing function for tasks. custom_postprocess (callable): Custom postprocessing function for task results. custom_router (callable): Custom routing function for tasks. Methods: run(task: str = None, *args, **kwargs) -> Any: Run the swarm simulation and route the task to the appropriate swarm. Flow: name -> router -> swarm entry point """ def __init__( self, name: Optional[str] = None, description: Optional[str] = None, verbose: bool = False, custom_params: Optional[Dict[str, Any]] = None, swarms: Sequence[BaseSwarm] = None, custom_preprocess: Optional[Callable] = None, custom_postprocess: Optional[Callable] = None, custom_router: Optional[Callable] = None, *args, **kwargs, ): super().__init__( name=name, description=description, *args, **kwargs ) self.name = name self.description = description self.verbose = verbose self.custom_params = custom_params self.swarms = swarms self.custom_preprocess = custom_preprocess self.custom_postprocess = custom_postprocess self.custom_router = custom_router # Create a dictionary of swarms self.swarm_dict = {swarm.name: swarm for swarm in self.swarms} logger.info( f"AutoSwarmRouter has been initialized with {self.len_of_swarms()} swarms." ) def run(self, task: str = None, *args, **kwargs): try: """Run the swarm simulation and route the task to the appropriate swarm.""" if self.custom_preprocess: # If custom preprocess function is provided then run it logger.info("Running custom preprocess function.") task, args, kwargs = self.custom_preprocess( task, args, kwargs ) if self.custom_router: # If custom router function is provided then use it to route the task logger.info("Running custom router function.") out = self.custom_router(self, task, *args, **kwargs) if self.custom_postprocess: # If custom postprocess function is provided then run it out = self.custom_postprocess(out) return out if self.name in self.swarm_dict: # If a match is found then send the task to the swarm out = self.swarm_dict[self.name].run( task, *args, **kwargs ) if self.custom_postprocess: # If custom postprocess function is provided then run it out = self.custom_postprocess(out) return out # If no match is found then return None raise ValueError( f"Swarm with name {self.name} not found." ) except Exception as e: logger.error(f"Error: {e}") raise e def len_of_swarms(self): return print(len(self.swarms)) def list_available_swarms(self): for swarm in self.swarms: try: logger.info( f"Swarm Name: {swarm.name} || Swarm Description: {swarm.description} " ) except Exception as error: logger.error( f"Error Detected You may not have swarms available: {error}" ) raise error class AutoSwarm(BaseSwarm): """AutoSwarm class represents a swarm of agents that can be created automatically. Flow: name -> router -> swarm entry point Args: name (Optional[str]): The name of the swarm. Defaults to None. description (Optional[str]): The description of the swarm. Defaults to None. verbose (bool): Whether to enable verbose mode. Defaults to False. custom_params (Optional[Dict[str, Any]]): Custom parameters for the swarm. Defaults to None. router (Optional[AutoSwarmRouter]): The router for the swarm. Defaults to None. """ def __init__( self, name: Optional[str] = None, description: Optional[str] = None, verbose: bool = False, custom_params: Optional[Dict[str, Any]] = None, custom_preprocess: Optional[Callable] = None, custom_postprocess: Optional[Callable] = None, custom_router: Optional[Callable] = None, max_loops: int = 1, *args, **kwargs, ): super().__init__() self.name = name self.description = description self.verbose = verbose self.custom_params = custom_params self.custom_preprocess = custom_preprocess self.custom_postprocess = custom_postprocess self.custom_router = custom_router self.max_loops = max_loops self.router = AutoSwarmRouter( name=name, description=description, verbose=verbose, custom_params=custom_params, custom_preprocess=custom_preprocess, custom_postprocess=custom_postprocess, custom_router=custom_router, *args, **kwargs, ) if name is None: raise ValueError( "A name must be provided for the AutoSwarm, what swarm do you want to use?" ) if verbose is True: self.init_logging() def init_logging(self): logger.info("AutoSwarm has been activated. Ready for usage.") # def name_swarm_check(self, name: str = None): def run(self, task: str = None, *args, **kwargs): """Run the swarm simulation.""" try: loop = 0 while loop < self.max_loops: if self.custom_preprocess: # If custom preprocess function is provided then run it logger.info("Running custom preprocess function.") task, args, kwargs = self.custom_preprocess( task, args, kwargs ) if self.custom_router: # If custom router function is provided then use it to route the task logger.info("Running custom router function.") out = self.custom_router( self, task, *args, **kwargs ) else: out = self.router.run(task, *args, **kwargs) if self.custom_postprocess: # If custom postprocess function is provided then run it out = self.custom_postprocess(out) # LOOP loop += 1 return out except Exception as e: logger.error( f"Error: {e} try optimizing the inputs and try again." ) raise e def list_all_swarms(self): for swarm in self.swarms: try: logger.info( f"Swarm Name: {swarm.name} || Swarm Description: {swarm.description} " ) except Exception as error: logger.error( f"Error Detected You may not have swarms available: {error}" ) raise error