Spaces:
Sleeping
Sleeping
import json | |
from typing import List, Optional, Tuple | |
import numpy as np | |
from pydantic import BaseModel, Field | |
from tenacity import retry, stop_after_attempt, wait_exponential | |
from swarms.utils.auto_download_check_packages import ( | |
auto_check_and_download_package, | |
) | |
from swarms.utils.lazy_loader import lazy_import_decorator | |
from swarms.utils.loguru_logger import initialize_logger | |
logger = initialize_logger(log_folder="swarm_matcher") | |
class SwarmType(BaseModel): | |
name: str | |
description: str | |
embedding: Optional[List[float]] = Field( | |
default=None, exclude=True | |
) | |
class SwarmMatcherConfig(BaseModel): | |
model_name: str = "sentence-transformers/all-MiniLM-L6-v2" | |
embedding_dim: int = ( | |
512 # Dimension of the sentence-transformers model | |
) | |
class SwarmMatcher: | |
""" | |
A class for matching tasks to swarm types based on their descriptions. | |
It utilizes a transformer model to generate embeddings for task and swarm type descriptions, | |
and then calculates the dot product to find the best match. | |
""" | |
def __init__(self, config: SwarmMatcherConfig): | |
""" | |
Initializes the SwarmMatcher with a configuration. | |
Args: | |
config (SwarmMatcherConfig): The configuration for the SwarmMatcher. | |
""" | |
logger.add("swarm_matcher_debug.log", level="DEBUG") | |
logger.debug("Initializing SwarmMatcher") | |
try: | |
import torch | |
except ImportError: | |
auto_check_and_download_package( | |
"torch", package_manager="pip", upgrade=True | |
) | |
import torch | |
try: | |
import transformers | |
except ImportError: | |
auto_check_and_download_package( | |
"transformers", package_manager="pip", upgrade=True | |
) | |
import transformers | |
self.torch = torch | |
try: | |
self.config = config | |
self.tokenizer = ( | |
transformers.AutoTokenizer.from_pretrained( | |
config.model_name | |
) | |
) | |
self.model = transformers.AutoModel.from_pretrained( | |
config.model_name | |
) | |
self.swarm_types: List[SwarmType] = [] | |
logger.debug("SwarmMatcher initialized successfully") | |
except Exception as e: | |
logger.error(f"Error initializing SwarmMatcher: {str(e)}") | |
raise | |
def get_embedding(self, text: str) -> np.ndarray: | |
""" | |
Generates an embedding for a given text using the configured model. | |
Args: | |
text (str): The text for which to generate an embedding. | |
Returns: | |
np.ndarray: The embedding vector for the text. | |
""" | |
logger.debug(f"Getting embedding for text: {text[:50]}...") | |
try: | |
inputs = self.tokenizer( | |
text, | |
return_tensors="pt", | |
padding=True, | |
truncation=True, | |
max_length=512, | |
) | |
with self.torch.no_grad(): | |
outputs = self.model(**inputs) | |
embedding = ( | |
outputs.last_hidden_state.mean(dim=1) | |
.squeeze() | |
.numpy() | |
) | |
logger.debug("Embedding generated successfully") | |
return embedding | |
except Exception as e: | |
logger.error(f"Error generating embedding: {str(e)}") | |
raise | |
def add_swarm_type(self, swarm_type: SwarmType): | |
""" | |
Adds a swarm type to the list of swarm types, generating an embedding for its description. | |
Args: | |
swarm_type (SwarmType): The swarm type to add. | |
""" | |
logger.debug(f"Adding swarm type: {swarm_type.name}") | |
try: | |
embedding = self.get_embedding(swarm_type.description) | |
swarm_type.embedding = embedding.tolist() | |
self.swarm_types.append(swarm_type) | |
logger.info(f"Added swarm type: {swarm_type.name}") | |
except Exception as e: | |
logger.error( | |
f"Error adding swarm type {swarm_type.name}: {str(e)}" | |
) | |
raise | |
def find_best_match(self, task: str) -> Tuple[str, float]: | |
""" | |
Finds the best match for a given task among the registered swarm types. | |
Args: | |
task (str): The task for which to find the best match. | |
Returns: | |
Tuple[str, float]: A tuple containing the name of the best matching swarm type and the score. | |
""" | |
logger.debug(f"Finding best match for task: {task[:50]}...") | |
try: | |
task_embedding = self.get_embedding(task) | |
best_match = None | |
best_score = -float("inf") | |
for swarm_type in self.swarm_types: | |
score = np.dot( | |
task_embedding, np.array(swarm_type.embedding) | |
) | |
if score > best_score: | |
best_score = score | |
best_match = swarm_type | |
logger.info( | |
f"Best match for task: {best_match.name} (score: {best_score})" | |
) | |
return best_match.name, float(best_score) | |
except Exception as e: | |
logger.error( | |
f"Error finding best match for task: {str(e)}" | |
) | |
raise | |
def auto_select_swarm(self, task: str) -> str: | |
""" | |
Automatically selects the best swarm type for a given task based on their descriptions. | |
Args: | |
task (str): The task for which to select a swarm type. | |
Returns: | |
str: The name of the selected swarm type. | |
""" | |
logger.debug(f"Auto-selecting swarm for task: {task[:50]}...") | |
best_match, score = self.find_best_match(task) | |
logger.info(f"Task: {task}") | |
logger.info(f"Selected Swarm Type: {best_match}") | |
logger.info(f"Confidence Score: {score:.2f}") | |
return best_match | |
def run_multiple(self, tasks: List[str], *args, **kwargs) -> str: | |
swarms = [] | |
for task in tasks: | |
output = self.auto_select_swarm(task) | |
# Append | |
swarms.append(output) | |
return swarms | |
def save_swarm_types(self, filename: str): | |
""" | |
Saves the registered swarm types to a JSON file. | |
Args: | |
filename (str): The name of the file to which to save the swarm types. | |
""" | |
try: | |
with open(filename, "w") as f: | |
json.dump([st.dict() for st in self.swarm_types], f) | |
logger.info(f"Saved swarm types to {filename}") | |
except Exception as e: | |
logger.error(f"Error saving swarm types: {str(e)}") | |
raise | |
def load_swarm_types(self, filename: str): | |
""" | |
Loads swarm types from a JSON file. | |
Args: | |
filename (str): The name of the file from which to load the swarm types. | |
""" | |
try: | |
with open(filename, "r") as f: | |
swarm_types_data = json.load(f) | |
self.swarm_types = [ | |
SwarmType(**st) for st in swarm_types_data | |
] | |
logger.info(f"Loaded swarm types from {filename}") | |
except Exception as e: | |
logger.error(f"Error loading swarm types: {str(e)}") | |
raise | |
def initialize_swarm_types(matcher: SwarmMatcher): | |
logger.debug("Initializing swarm types") | |
swarm_types = [ | |
SwarmType( | |
name="AgentRearrange", | |
description="Optimize agent order and rearrange flow for multi-step tasks, ensuring efficient task allocation and minimizing bottlenecks. Keywords: orchestration, coordination, pipeline optimization, task scheduling, resource allocation, workflow management, agent organization, process optimization", | |
), | |
SwarmType( | |
name="MixtureOfAgents", | |
description="Combine diverse expert agents for comprehensive analysis, fostering a collaborative approach to problem-solving and leveraging individual strengths. Keywords: multi-agent system, expert collaboration, distributed intelligence, collective problem solving, agent specialization, team coordination, hybrid approaches, knowledge synthesis", | |
), | |
SwarmType( | |
name="SpreadSheetSwarm", | |
description="Collaborative data processing and analysis in a spreadsheet-like environment, facilitating real-time data sharing and visualization. Keywords: data analysis, tabular processing, collaborative editing, data transformation, spreadsheet operations, data visualization, real-time collaboration, structured data", | |
), | |
SwarmType( | |
name="SequentialWorkflow", | |
description="Execute tasks in a step-by-step, sequential process workflow, ensuring a logical and methodical approach to task execution. Keywords: linear processing, waterfall methodology, step-by-step execution, ordered tasks, sequential operations, process flow, systematic approach, staged execution", | |
), | |
SwarmType( | |
name="ConcurrentWorkflow", | |
description="Process multiple tasks or data sources concurrently in parallel, maximizing productivity and reducing processing time. Keywords: parallel processing, multi-threading, asynchronous execution, distributed computing, concurrent operations, simultaneous tasks, parallel workflows, scalable processing", | |
), | |
# SwarmType( | |
# name="HierarchicalSwarm", | |
# description="Organize agents in a hierarchical structure with clear reporting lines and delegation of responsibilities. Keywords: management hierarchy, organizational structure, delegation, supervision, chain of command, tiered organization, structured coordination", | |
# ), | |
# SwarmType( | |
# name="AdaptiveSwarm", | |
# description="Dynamically adjust agent behavior and swarm configuration based on task requirements and performance feedback. Keywords: dynamic adaptation, self-optimization, feedback loops, learning systems, flexible configuration, responsive behavior, adaptive algorithms", | |
# ), | |
# SwarmType( | |
# name="ConsensusSwarm", | |
# description="Achieve group decisions through consensus mechanisms and voting protocols among multiple agents. Keywords: group decision making, voting systems, collective intelligence, agreement protocols, democratic processes, collaborative decisions", | |
# ), | |
] | |
for swarm_type in swarm_types: | |
matcher.add_swarm_type(swarm_type) | |
logger.debug("Swarm types initialized") | |
def swarm_matcher(task: str, *args, **kwargs): | |
""" | |
Runs the SwarmMatcher example with predefined tasks and swarm types. | |
""" | |
config = SwarmMatcherConfig() | |
matcher = SwarmMatcher(config) | |
initialize_swarm_types(matcher) | |
# matcher.save_swarm_types(f"swarm_logs/{uuid4().hex}.json") | |
swarm_type = matcher.auto_select_swarm(task) | |
logger.info(f"{swarm_type}") | |
return swarm_type | |
# from typing import List, Tuple, Dict | |
# from pydantic import BaseModel, Field | |
# from loguru import logger | |
# from uuid import uuid4 | |
# import chromadb | |
# import json | |
# from tenacity import retry, stop_after_attempt, wait_exponential | |
# class SwarmType(BaseModel): | |
# """A swarm type with its name, description and optional metadata""" | |
# id: str = Field(default_factory=lambda: str(uuid4())) | |
# name: str | |
# description: str | |
# metadata: Dict = Field(default_factory=dict) | |
# class SwarmMatcherConfig(BaseModel): | |
# """Configuration for the SwarmMatcher""" | |
# collection_name: str = "swarm_types" | |
# distance_metric: str = "cosine" # or "l2" or "ip" | |
# embedding_function: str = ( | |
# "sentence-transformers/all-mpnet-base-v2" # Better model than MiniLM | |
# ) | |
# persist_directory: str = "./chroma_db" | |
# class SwarmMatcher: | |
# """ | |
# An improved swarm matcher that uses ChromaDB for better vector similarity search. | |
# Features: | |
# - Persistent storage of embeddings | |
# - Better vector similarity search with multiple distance metrics | |
# - Improved embedding model | |
# - Metadata filtering capabilities | |
# - Batch operations support | |
# """ | |
# def __init__(self, config: SwarmMatcherConfig): | |
# """Initialize the improved swarm matcher""" | |
# logger.add("swarm_matcher.log", rotation="100 MB") | |
# self.config = config | |
# # Initialize ChromaDB client with persistence | |
# self.chroma_client = chromadb.Client() | |
# # Get or create collection | |
# try: | |
# self.collection = self.chroma_client.get_collection( | |
# name=config.collection_name, | |
# ) | |
# except ValueError: | |
# self.collection = self.chroma_client.create_collection( | |
# name=config.collection_name, | |
# metadata={"hnsw:space": config.distance_metric}, | |
# ) | |
# logger.info( | |
# f"Initialized SwarmMatcher with collection '{config.collection_name}'" | |
# ) | |
# def add_swarm_type(self, swarm_type: SwarmType) -> None: | |
# """Add a single swarm type to the collection""" | |
# try: | |
# self.collection.add( | |
# ids=[swarm_type.id], | |
# documents=[swarm_type.description], | |
# metadatas=[ | |
# {"name": swarm_type.name, **swarm_type.metadata} | |
# ], | |
# ) | |
# logger.info(f"Added swarm type: {swarm_type.name}") | |
# except Exception as e: | |
# logger.error( | |
# f"Error adding swarm type {swarm_type.name}: {str(e)}" | |
# ) | |
# raise | |
# def add_swarm_types(self, swarm_types: List[SwarmType]) -> None: | |
# """Add multiple swarm types in batch""" | |
# try: | |
# self.collection.add( | |
# ids=[st.id for st in swarm_types], | |
# documents=[st.description for st in swarm_types], | |
# metadatas=[ | |
# {"name": st.name, **st.metadata} | |
# for st in swarm_types | |
# ], | |
# ) | |
# logger.info(f"Added {len(swarm_types)} swarm types") | |
# except Exception as e: | |
# logger.error( | |
# f"Error adding swarm types in batch: {str(e)}" | |
# ) | |
# raise | |
# @retry( | |
# stop=stop_after_attempt(3), | |
# wait=wait_exponential(multiplier=1, min=4, max=10), | |
# ) | |
# def find_best_matches( | |
# self, | |
# task: str, | |
# n_results: int = 3, | |
# score_threshold: float = 0.7, | |
# ) -> List[Tuple[str, float]]: | |
# """ | |
# Find the best matching swarm types for a given task | |
# Returns multiple matches with their scores | |
# """ | |
# try: | |
# results = self.collection.query( | |
# query_texts=[task], | |
# n_results=n_results, | |
# include=["metadatas", "distances"], | |
# ) | |
# matches = [] | |
# for metadata, distance in zip( | |
# results["metadatas"][0], results["distances"][0] | |
# ): | |
# # Convert distance to similarity score (1 - normalized_distance) | |
# score = 1 - ( | |
# distance / 2 | |
# ) # Normalize cosine distance to [0,1] | |
# if score >= score_threshold: | |
# matches.append((metadata["name"], score)) | |
# logger.info(f"Found {len(matches)} matches for task") | |
# return matches | |
# except Exception as e: | |
# logger.error(f"Error finding matches for task: {str(e)}") | |
# raise | |
# def auto_select_swarm(self, task: str) -> str: | |
# """ | |
# Automatically select the best swarm type for a task | |
# Returns only the top match | |
# """ | |
# matches = self.find_best_matches(task, n_results=1) | |
# if not matches: | |
# logger.warning("No suitable matches found for task") | |
# return "SequentialWorkflow" # Default fallback | |
# best_match, score = matches[0] | |
# logger.info( | |
# f"Selected swarm type '{best_match}' with confidence {score:.3f}" | |
# ) | |
# return best_match | |
# def run_multiple(self, tasks: List[str]) -> List[str]: | |
# """Process multiple tasks in batch""" | |
# return [self.auto_select_swarm(task) for task in tasks] | |
# def save_swarm_types(self, filename: str) -> None: | |
# """Export swarm types to JSON""" | |
# try: | |
# all_data = self.collection.get( | |
# include=["metadatas", "documents"] | |
# ) | |
# swarm_types = [ | |
# SwarmType( | |
# id=id_, | |
# name=metadata["name"], | |
# description=document, | |
# metadata={ | |
# k: v | |
# for k, v in metadata.items() | |
# if k != "name" | |
# }, | |
# ) | |
# for id_, metadata, document in zip( | |
# all_data["ids"], | |
# all_data["metadatas"], | |
# all_data["documents"], | |
# ) | |
# ] | |
# with open(filename, "w") as f: | |
# json.dump( | |
# [st.dict() for st in swarm_types], f, indent=2 | |
# ) | |
# logger.info(f"Saved swarm types to {filename}") | |
# except Exception as e: | |
# logger.error(f"Error saving swarm types: {str(e)}") | |
# raise | |
# def load_swarm_types(self, filename: str) -> None: | |
# """Import swarm types from JSON""" | |
# try: | |
# with open(filename, "r") as f: | |
# swarm_types_data = json.load(f) | |
# swarm_types = [SwarmType(**st) for st in swarm_types_data] | |
# self.add_swarm_types(swarm_types) | |
# logger.info(f"Loaded swarm types from {filename}") | |
# except Exception as e: | |
# logger.error(f"Error loading swarm types: {str(e)}") | |
# raise | |
# def initialize_default_swarm_types(matcher: SwarmMatcher) -> None: | |
# """Initialize the matcher with default swarm types""" | |
# swarm_types = [ | |
# SwarmType( | |
# name="AgentRearrange", | |
# description=""" | |
# Optimize agent order and rearrange flow for multi-step tasks, ensuring efficient task allocation | |
# and minimizing bottlenecks. Specialized in orchestration, coordination, pipeline optimization, | |
# task scheduling, resource allocation, workflow management, agent organization, and process optimization. | |
# Best for tasks requiring complex agent interactions and workflow optimization. | |
# """, | |
# metadata={ | |
# "category": "optimization", | |
# "complexity": "high", | |
# }, | |
# ), | |
# SwarmType( | |
# name="MixtureOfAgents", | |
# description=""" | |
# Combine diverse expert agents for comprehensive analysis, fostering a collaborative approach | |
# to problem-solving and leveraging individual strengths. Focuses on multi-agent systems, | |
# expert collaboration, distributed intelligence, collective problem solving, agent specialization, | |
# team coordination, hybrid approaches, and knowledge synthesis. Ideal for complex problems | |
# requiring multiple areas of expertise. | |
# """, | |
# metadata={ | |
# "category": "collaboration", | |
# "complexity": "high", | |
# }, | |
# ), | |
# SwarmType( | |
# name="SpreadSheetSwarm", | |
# description=""" | |
# Collaborative data processing and analysis in a spreadsheet-like environment, facilitating | |
# real-time data sharing and visualization. Specializes in data analysis, tabular processing, | |
# collaborative editing, data transformation, spreadsheet operations, data visualization, | |
# real-time collaboration, and structured data handling. Perfect for data-intensive tasks | |
# requiring structured analysis. | |
# """, | |
# metadata={ | |
# "category": "data_processing", | |
# "complexity": "medium", | |
# }, | |
# ), | |
# SwarmType( | |
# name="SequentialWorkflow", | |
# description=""" | |
# Execute tasks in a step-by-step, sequential process workflow, ensuring a logical and methodical | |
# approach to task execution. Focuses on linear processing, waterfall methodology, step-by-step | |
# execution, ordered tasks, sequential operations, process flow, systematic approach, and staged | |
# execution. Best for tasks requiring strict order and dependencies. | |
# """, | |
# metadata={"category": "workflow", "complexity": "low"}, | |
# ), | |
# SwarmType( | |
# name="ConcurrentWorkflow", | |
# description=""" | |
# Process multiple tasks or data sources concurrently in parallel, maximizing productivity | |
# and reducing processing time. Specializes in parallel processing, multi-threading, | |
# asynchronous execution, distributed computing, concurrent operations, simultaneous tasks, | |
# parallel workflows, and scalable processing. Ideal for independent tasks that can be | |
# processed simultaneously. | |
# """, | |
# metadata={"category": "workflow", "complexity": "medium"}, | |
# ), | |
# ] | |
# matcher.add_swarm_types(swarm_types) | |
# logger.info("Initialized default swarm types") | |
# def create_swarm_matcher( | |
# persist_dir: str = "./chroma_db", | |
# collection_name: str = "swarm_types", | |
# ) -> SwarmMatcher: | |
# """Convenience function to create and initialize a swarm matcher""" | |
# config = SwarmMatcherConfig( | |
# persist_directory=persist_dir, collection_name=collection_name | |
# ) | |
# matcher = SwarmMatcher(config) | |
# initialize_default_swarm_types(matcher) | |
# return matcher | |
# # Example usage | |
# def swarm_matcher(task: str) -> str: | |
# # Create and initialize matcher | |
# matcher = create_swarm_matcher() | |
# swarm_type = matcher.auto_select_swarm(task) | |
# print(f"Task: {task}\nSelected Swarm: {swarm_type}\n") | |
# return swarm_type | |
# # # Example usage | |
# # if __name__ == "__main__": | |
# # # Create and initialize matcher | |
# # matcher = create_swarm_matcher() | |
# # # Example tasks | |
# # tasks = [ | |
# # "Analyze this spreadsheet of sales data and create visualizations", | |
# # "Coordinate multiple AI agents to solve a complex problem", | |
# # "Process these tasks one after another in a specific order", | |
# # "Write multiple blog posts about the latest advancements in swarm intelligence all at once", | |
# # "Write a blog post about the latest advancements in swarm intelligence", | |
# # ] | |
# # # Process tasks | |
# # for task in tasks: | |
# # swarm_type = matcher.auto_select_swarm(task) | |
# # print(f"Task: {task}\nSelected Swarm: {swarm_type}\n") | |