swarm matcher docs

pull/836/head
Kye Gomez 5 days ago
parent de7e751025
commit c7c506d721

@ -246,6 +246,7 @@ nav:
- Hybrid Hierarchical-Cluster Swarm: "swarms/structs/hhcs.md" - Hybrid Hierarchical-Cluster Swarm: "swarms/structs/hhcs.md"
- Deep Research Swarm: "swarms/structs/deep_research_swarm.md" - Deep Research Swarm: "swarms/structs/deep_research_swarm.md"
- Auto Swarm Builder: "swarms/structs/auto_swarm_builder.md" - Auto Swarm Builder: "swarms/structs/auto_swarm_builder.md"
- Swarm Matcher: "swarms/structs/swarm_matcher.md"
- Workflows: - Workflows:
- ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md" - ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md"
- SequentialWorkflow: "swarms/structs/sequential_workflow.md" - SequentialWorkflow: "swarms/structs/sequential_workflow.md"

@ -0,0 +1,260 @@
# SwarmMatcher
SwarmMatcher is a tool for automatically matching tasks to the most appropriate swarm type based on their semantic similarity.
## Overview
The SwarmMatcher utilizes transformer-based embeddings to determine the best swarm architecture for a given task. By analyzing the semantic meaning of task descriptions and comparing them to known swarm types, it can intelligently select the optimal swarm configuration for any task.
## Installation
SwarmMatcher is included in the Swarms package. To use it, simply import it from the library:
```python
from swarms.structs.swarm_matcher import SwarmMatcher, SwarmMatcherConfig, SwarmType
```
## Basic Usage
```python
from swarms.structs.swarm_matcher import swarm_matcher
# Use the simplified function to match a task to a swarm type
swarm_type = swarm_matcher("Analyze this dataset and create visualizations")
print(f"Selected swarm type: {swarm_type}")
```
## Advanced Usage
For more control over the matching process, you can create and configure your own SwarmMatcher instance:
```python
from swarms.structs.swarm_matcher import SwarmMatcher, SwarmMatcherConfig, SwarmType, initialize_swarm_types
# Create a configuration
config = SwarmMatcherConfig(
model_name="sentence-transformers/all-MiniLM-L6-v2",
embedding_dim=512
)
# Initialize the matcher
matcher = SwarmMatcher(config)
# Add default swarm types
initialize_swarm_types(matcher)
# Add a custom swarm type
custom_swarm = SwarmType(
name="CustomSwarm",
description="A specialized swarm for handling specific domain tasks with expert knowledge."
)
matcher.add_swarm_type(custom_swarm)
# Find the best match for a task
best_match, score = matcher.find_best_match("Process natural language and extract key insights")
print(f"Best match: {best_match}, Score: {score}")
# Auto-select a swarm type
selected_swarm = matcher.auto_select_swarm("Create data visualizations from this CSV file")
print(f"Selected swarm: {selected_swarm}")
```
## Available Swarm Types
SwarmMatcher comes with several pre-defined swarm types:
| Swarm Type | Description |
| ---------- | ----------- |
| AgentRearrange | Optimize agent order and rearrange flow for multi-step tasks, ensuring efficient task allocation and minimizing bottlenecks. |
| MixtureOfAgents | Combine diverse expert agents for comprehensive analysis, fostering a collaborative approach to problem-solving and leveraging individual strengths. |
| SpreadSheetSwarm | Collaborative data processing and analysis in a spreadsheet-like environment, facilitating real-time data sharing and visualization. |
| SequentialWorkflow | Execute tasks in a step-by-step, sequential process workflow, ensuring a logical and methodical approach to task execution. |
| ConcurrentWorkflow | Process multiple tasks or data sources concurrently in parallel, maximizing productivity and reducing processing time. |
## API Reference
### SwarmType
A class representing a type of swarm with its name and description.
```python
class SwarmType(BaseModel):
name: str
description: str
embedding: Optional[List[float]] = Field(default=None, exclude=True)
```
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| name | str | The name of the swarm type |
| description | str | A detailed description of the swarm type's capabilities and ideal use cases |
| embedding | Optional[List[float]] | The generated embedding vector for this swarm type (auto-populated) |
### SwarmMatcherConfig
Configuration settings for the SwarmMatcher.
```python
class SwarmMatcherConfig(BaseModel):
model_name: str = "sentence-transformers/all-MiniLM-L6-v2"
embedding_dim: int = 512
```
| Parameter | Type | Default | Description |
| --------- | ---- | ------- | ----------- |
| model_name | str | "sentence-transformers/all-MiniLM-L6-v2" | The transformer model to use for embeddings |
| embedding_dim | int | 512 | The dimension of the embedding vectors |
### SwarmMatcher
The main class for matching tasks to swarm types.
```python
class SwarmMatcher:
def __init__(self, config: SwarmMatcherConfig)
def get_embedding(self, text: str) -> np.ndarray
def add_swarm_type(self, swarm_type: SwarmType)
def find_best_match(self, task: str) -> Tuple[str, float]
def auto_select_swarm(self, task: str) -> str
def run_multiple(self, tasks: List[str]) -> List[str]
def save_swarm_types(self, filename: str)
def load_swarm_types(self, filename: str)
```
#### Methods
##### `__init__(config: SwarmMatcherConfig)`
Initializes the SwarmMatcher with a configuration.
##### `get_embedding(text: str) -> np.ndarray`
Generates an embedding vector for a given text using the configured model.
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| text | str | The text to embed |
| Returns | np.ndarray | The embedding vector |
##### `add_swarm_type(swarm_type: SwarmType)`
Adds a swarm type to the matcher, generating an embedding for its description.
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| swarm_type | SwarmType | The swarm type to add |
##### `find_best_match(task: str) -> Tuple[str, float]`
Finds the best matching swarm type for a given task.
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| task | str | The task description |
| Returns | Tuple[str, float] | The name of the best matching swarm type and the similarity score |
##### `auto_select_swarm(task: str) -> str`
Automatically selects the best swarm type for a given task.
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| task | str | The task description |
| Returns | str | The name of the selected swarm type |
##### `run_multiple(tasks: List[str]) -> List[str]`
Matches multiple tasks to swarm types in batch.
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| tasks | List[str] | A list of task descriptions |
| Returns | List[str] | A list of selected swarm type names |
##### `save_swarm_types(filename: str)`
Saves the registered swarm types to a JSON file.
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| filename | str | Path where the swarm types will be saved |
##### `load_swarm_types(filename: str)`
Loads swarm types from a JSON file.
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| filename | str | Path to the JSON file containing swarm types |
## Examples
### Simple Matching
```python
from swarms.structs.swarm_matcher import swarm_matcher
# Match tasks to swarm types
tasks = [
"Analyze this dataset and create visualizations",
"Coordinate multiple agents to tackle different aspects of a problem",
"Process these 10 PDF files in sequence",
"Handle these data processing tasks in parallel"
]
for task in tasks:
swarm_type = swarm_matcher(task)
print(f"Task: {task}")
print(f"Selected swarm: {swarm_type}\n")
```
### Custom Swarm Types
```python
from swarms.structs.swarm_matcher import SwarmMatcher, SwarmMatcherConfig, SwarmType
# Create configuration and matcher
config = SwarmMatcherConfig()
matcher = SwarmMatcher(config)
# Define custom swarm types
swarm_types = [
SwarmType(
name="DataAnalysisSwarm",
description="Specialized in processing and analyzing large datasets, performing statistical analysis, and extracting insights from complex data."
),
SwarmType(
name="CreativeWritingSwarm",
description="Optimized for creative content generation, storytelling, and producing engaging written material with consistent style and tone."
),
SwarmType(
name="ResearchSwarm",
description="Focused on deep research tasks, synthesizing information from multiple sources, and producing comprehensive reports on complex topics."
)
]
# Add swarm types
for swarm_type in swarm_types:
matcher.add_swarm_type(swarm_type)
# Save the swarm types for future use
matcher.save_swarm_types("custom_swarm_types.json")
# Use the matcher
task = "Research quantum computing advances in the last 5 years"
best_match = matcher.auto_select_swarm(task)
print(f"Selected swarm type: {best_match}")
```
## How It Works
SwarmMatcher uses a transformer-based model to generate embeddings (vector representations) of both the task descriptions and the swarm type descriptions. It then calculates the similarity between these embeddings to determine which swarm type is most semantically similar to the given task.
The matching process follows these steps:
1. The task description is converted to an embedding vector
2. Each swarm type's description is converted to an embedding vector
3. The similarity between the task embedding and each swarm type embedding is calculated
4. The swarm type with the highest similarity score is selected
This approach ensures that the matcher can understand the semantic meaning of tasks, not just keyword matching, resulting in more accurate swarm type selection.

@ -47,7 +47,6 @@ from swarms.structs.multi_agent_exec import (
get_swarms_info, get_swarms_info,
) )
from swarms.structs.multi_agent_router import MultiAgentRouter from swarms.structs.multi_agent_router import MultiAgentRouter
from swarms.structs.queue_swarm import TaskQueueSwarm
from swarms.structs.rearrange import AgentRearrange, rearrange from swarms.structs.rearrange import AgentRearrange, rearrange
from swarms.structs.round_robin import RoundRobinSwarm from swarms.structs.round_robin import RoundRobinSwarm
from swarms.structs.sequential_workflow import SequentialWorkflow from swarms.structs.sequential_workflow import SequentialWorkflow
@ -120,7 +119,6 @@ __all__ = [
"sigmoid_swarm", "sigmoid_swarm",
"staircase_swarm", "staircase_swarm",
"star_swarm", "star_swarm",
"TaskQueueSwarm",
"SpreadSheetSwarm", "SpreadSheetSwarm",
"SwarmRouter", "SwarmRouter",
"SwarmType", "SwarmType",

@ -1,193 +0,0 @@
import queue
import threading
from typing import List
from swarms.structs.agent import Agent
from pydantic import BaseModel
import os
from swarms.utils.loguru_logger import logger
from swarms.structs.base_swarm import BaseSwarm
import time
class AgentOutput(BaseModel):
agent_name: str
task: str
result: str
timestamp: str
class SwarmRunMetadata(BaseModel):
run_id: str
name: str
description: str
agents: List[str]
start_time: str
end_time: str
tasks_completed: int
outputs: List[AgentOutput]
class TaskQueueSwarm(BaseSwarm):
"""
A swarm that processes tasks from a queue using multiple agents on different threads.
Args:
agents (List[Agent]): A list of agents of class Agent.
name (str, optional): The name of the swarm. Defaults to "Task-Queue-Swarm".
description (str, optional): The description of the swarm. Defaults to "A swarm that processes tasks from a queue using multiple agents on different threads.".
autosave_on (bool, optional): Whether to automatically save the swarm metadata. Defaults to True.
save_file_path (str, optional): The file path to save the swarm metadata. Defaults to "swarm_run_metadata.json".
workspace_dir (str, optional): The directory path of the workspace. Defaults to os.getenv("WORKSPACE_DIR").
return_metadata_on (bool, optional): Whether to return the swarm metadata after running. Defaults to False.
max_loops (int, optional): The maximum number of loops to run the swarm. Defaults to 1.
Attributes:
agents (List[Agent]): A list of agents of class Agent.
task_queue (queue.Queue): A queue to store the tasks.
lock (threading.Lock): A lock for thread synchronization.
autosave_on (bool): Whether to automatically save the swarm metadata.
save_file_path (str): The file path to save the swarm metadata.
workspace_dir (str): The directory path of the workspace.
return_metadata_on (bool): Whether to return the swarm metadata after running.
max_loops (int): The maximum number of loops to run the swarm.
metadata (SwarmRunMetadata): The metadata of the swarm run.
"""
def __init__(
self,
agents: List[Agent],
name: str = "Task-Queue-Swarm",
description: str = "A swarm that processes tasks from a queue using multiple agents on different threads.",
autosave_on: bool = True,
save_file_path: str = "swarm_run_metadata.json",
workspace_dir: str = os.getenv("WORKSPACE_DIR"),
return_metadata_on: bool = False,
max_loops: int = 1,
*args,
**kwargs,
):
super().__init__(
name=name,
description=description,
agents=agents,
*args,
**kwargs,
)
self.agents = agents
self.task_queue = queue.Queue()
self.lock = threading.Lock()
self.autosave_on = autosave_on
self.save_file_path = save_file_path
self.workspace_dir = workspace_dir or os.getenv(
"WORKSPACE_DIR", "agent_workspace"
)
self.return_metadata_on = return_metadata_on
self.max_loops = max_loops
current_time = time.strftime("%Y%m%d%H%M%S")
self.metadata = SwarmRunMetadata(
run_id=f"swarm_run_{current_time}",
name=name,
description=description,
agents=[agent.agent_name for agent in agents],
start_time=current_time,
end_time="",
tasks_completed=0,
outputs=[],
)
def reliability_checks(self):
logger.info("Initializing reliability checks.")
if not self.agents:
raise ValueError(
"You must provide a non-empty list of Agent instances."
)
if self.max_loops <= 0:
raise ValueError("max_loops must be greater than zero.")
logger.info(
"Reliability checks successful. Swarm is ready for usage."
)
def add_task(self, task: str):
"""Adds a task to the queue."""
self.task_queue.put(task)
def _process_task(self, agent: Agent):
"""Processes tasks from the queue using the provided agent."""
while True:
try:
task = self.task_queue.get_nowait()
except queue.Empty:
break
try:
logger.info(
f"Agent {agent.agent_name} is running task: {task}"
)
result = agent.run(task)
with self.lock:
self.metadata.tasks_completed += 1
self.metadata.outputs.append(
AgentOutput(
agent_name=agent.agent_name,
task=task,
result=result,
timestamp=time.strftime(
"%Y-%m-%d %H:%M:%S"
),
)
)
logger.info(
f"Agent {agent.agent_name} completed task: {task}"
)
logger.debug(f"Result: {result}")
except Exception as e:
logger.error(
f"Agent {agent.agent_name} failed to complete task: {task}"
)
logger.exception(e)
finally:
self.task_queue.task_done()
def run(self):
"""Runs the swarm by having agents pick up tasks from the queue."""
logger.info(f"Starting swarm run: {self.metadata.run_id}")
threads = [
threading.Thread(
target=self._process_task, args=(agent,), daemon=True
)
for agent in self.agents
]
for thread in threads:
thread.start()
self.task_queue.join()
for thread in threads:
thread.join()
self.metadata.end_time = time.strftime("%Y%m%d%H%M%S")
if self.autosave_on:
self.save_json_to_file()
# if self.return_metadata_on:
# return self.metadata.model_dump_json(indent=4)
return self.export_metadata()
def save_json_to_file(self):
json_string = self.export_metadata()
file_path = os.path.join(
self.workspace_dir, self.save_file_path
)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w") as f:
f.write(json_string)
logger.info(f"Metadata saved to {file_path}")
def export_metadata(self):
return self.metadata.model_dump_json(indent=4)

@ -1,23 +0,0 @@
import time
from typing import List
import uuid
from pydantic import BaseModel, Field
class AgentRespond(BaseModel):
id: str = Field(default=uuid.uuid4().hex)
timestamp: str = Field(default=time.time())
agent_position: int = Field(description="Agent in swarm position")
agent_name: str
agent_response: str = Field(description="Agent response")
class SwarmOutput(BaseModel):
id: str = Field(default=uuid.uuid4().hex)
timestamp: str = Field(default=time.time())
name: str = Field(description="Swarm name")
description: str = Field(description="Swarm description")
swarm_type: str = Field(description="Swarm type")
agent_outputs: List[AgentRespond] = Field(
description="List of agent responses"
)
Loading…
Cancel
Save